An Intro to Rio: A Lightweight Job Scheduler in Go

Written by susamn | Published 2021/07/06
Tech Story Tags: golang | task-scheduler | job | prioritization | asynchronous | non-blocking | web-development | programming | web-monetization

TLDR Rioio tries to solve this problem by introducing two concepts: an asynchronous job processor and a priority queue. It is the piece which runs the multiple jobs asynchronously. It also provides easy semantics to join multiple data sources based on their output and input types, at the same time having no coupling between the data sources. This helps in creating new APIs or resolvers for GraphQL APIs a breeze. The idea is that multiple calls by chaining calls together means using closures and function types and runs one goroutine. No dependency between these calls is dependent on data dependency.via the TL;DR App

What is RIO?

It also provides very easy semantics to join multiple data sources based on their output and input types, at the same time having no coupling between the data sources. This helps in creating new APIs or resolvers for GraphQL APIs a breeze.

https://github.com/susamn/rio

Concern

Many times we write web apps which connect to different data sources, combine the data obtained from these sources and then do some more jobs. During these processes, we do a lot of boilerplate to transform one data type to other. Also in the absence of a proper job scheduler, we create goroutines abruptly and without proper management. These create unmanageable code. To update those codes is even more hard in future, when there is a new team member in the team.

Rio tries to solve this problem by introducing two concepts.

An asynchronous job processor

This is the piece which runs the multiple jobs asynchronously. It has a priority queue (balancer.go and pool.go) which hands off incoming requests to a set of managed workers.

// The balancer struct, this struct is used inside the GetBalancer method to provide a load balancer to the caller
type Balancer struct {

	// Its the pool of Worker, which is itself a priority queue based on min heap.
	pool Pool

	// This channel is used to receive a request instance form the caller. After getting the request it is dispatched
	// to the most lightly loaded worker
	jobChannel chan *Request

	// This channel is used by the worker. After processing a task, a worker uses this channel to let the balancer know
	// that it is done and able to take new requests from its request channel
	done chan *Worker

	// Its the number of queued requests
	queuedItems int

	// The close channel. When the Close method is called by any calling goroutine sending a chanel of boolean, the
	// balancer waits for all the requests to be processed, then closes all the worker, closes all its owen loops and
	// then finally respond by sending boolean true to the passed channel by the caller, confirming that all the inner
	// loop are closed and the balancer is shutdown.
	closeChannel chan chan bool
}

The balancer is implemented by a min heap priority queue and when assigning a new task it checks the least loaded worker.

To implement the min heap, we just need to implement 4 handy methods of the pool interface like this:

// The pool is a list of workers. The pool is also a priority queue.
type Pool []*Worker

func (p Pool) Len() int {
	return len(p)
}

func (p Pool) Less(i, j int) bool {
	return p[i].pending < p[j].pending
}

func (p *Pool) Swap(i, j int) {
	(*p)[i], (*p)[j] = (*p)[j], (*p)[i]
}

func (p *Pool) Push(x interface{}) {
	//n := len(*p)
	item := x.(*Worker)
	//item.index = n
	*p = append(*p, item)
}

func (p *Pool) Pop() interface{} {
	old := *p
	n := len(old)
	item := old[n-1]
	//item.index = 0 // for safety
	*p = old[0 : n-1]
	return item
}

Easy management of these goroutines and chaining them

How many times do we do this:

call service 1 in goroutine 1 wait and get response from goroutine 1

call service 2 in goroutine 2, taking piece of data from service call 1 wait and get response from goroutine 2

call service 3 in goroutine 3, taking piece of data from service call 3 wait and get response from goroutine 3

You get the idea, this only delays things more and does a lot of context switching. Rio helps in this, by chaining multiple calls together by means of using closures and function types and runs in one goroutine.

Now many can think is it not going to be slower compared to doing multiple goroutine calls. Let’s see.

Think of the previous example. If you do not get a response from service 1, can you invoke service 2, or if service 2 fails, can you call service 3? No, as there is data dependency between these calls.

Rio chains dependent jobs together by introducing this pattern.

request := rio.BuildRequests(context,
          (<callback of service 1>.WithTimeOut(100 ms).WithRetry(3))
          .FollowedBy(<function for transforming data from service 1 response to request or partial request of 2>,
                      <callback of service 2>)
          .FollowedBy(<function for transforming data data from service 2 response to request or partial request of 3>,
                                  <callback of service 3>)

Let’s see an example

func SampleHandler(w http.ResponseWriter, r *http.Request) {
	// Create the load balancer, this should be created only once.
	balancer := rio.GetBalancer(10, 2) // 10 threads

	// Setup the callbacks
	callback1 := GetNameById("Some Name")
	callback2 := GetStreetAddressByNameAndLocationId(rio.EMPTY_ARG_PLACEHOLDER, "Some Location ID")

	// Set up the pipeline
	request := rio.BuildRequests(context.Background(),
		rio.NewFutureTask(callback1).WithMilliSecondTimeout(10).WithRetry(3), 2).
		FollowedBy(Call1ToCall2, rio.NewFutureTask(callback2).WithMilliSecondTimeout(20))

	// Post job
	balancer.PostJob(request)

	// Wait for response
	<-request.CompletedChannel

	// Responses
	response1, err := request.GetResponse(0)
	if err == nil {
		// Do something with the response
		fmt.Println(response1)
	}
	response2, err := request.GetResponse(1)
	if err == nil {
		// Do something with the response
		fmt.Println(response2)
	}

}

Once the chaining is done in line 10, we are posting the jobs like this

balancer.PostJob(request)

And finally waiting for the chain to complete

<-request.CompletedChannel

Once the call chain happens, the request comes back with responses for all these calls in a slice and we can do this:

  • Only one job response

    request.GetOnlyResponse()
    
  • Multiple job responses

    request.GetResponse(index) //---0,1,2
    

If any job fails, the response will be empty response, specifically rio.EMPTY_CALLBACK_RESPONSE

Watch out for the full example in the example folder if anyone wants to use it.

Thanks !!!

Also published on Medium’s subdomain: https://medium.com/geekculture/rio-a-lightweight-job-scheduler-in-go-with-batteries-included-fe1040d5a3c3


Written by susamn | I am a daddy of a 2 year old, a coder, tech-blogger, music lover, photographer and a all in all nerd
Published by HackerNoon on 2021/07/06