npm install asynchelper
While writing nodejs code, it is not uncommon to have to do something like this:
- get database connection
- count records for user
- if count is 0, insert record for user
- create user game descriptor
- return JSON with user data
Another use case would be:
- get facebook credentials
- make a post to your REST
- make a get to another REST
- increment REDIS for API call track
these use cases translated into plain node code, are a callback nightmare.
Not to mention error control flow.
Not to mention timeout constraints (you really don't want this to take more than 200 milliseconds)
What if you need to know when all this code ends execution.
And then:
- how about having both use cases running at the same time ?
- how about not having more than 4 of these (remember scarce database connection number ?) running at the same time ?
AsyncHelper is a general purpose library to handle asynchronous processing in nodejs.
The goal is to expose a clean interface, not only for code readability, but to keep under control all the synchronous operations that could eventually span in the lifecycle of a node js application, and to keep scarce resources under fine control.
The library relies on the concept of a Condition for signaling asynchronous code activity and a Future returned to the developer, so she can be notified about computation results.
The Condition allows to choreograph complex operations and have a fast short-circuit mechanism to allow your code to progress whenever the conditions are met or not. A Condition is hence a super-powered boolean events broadcaster.
Conditions are defined to fail-fast because it met its criteria, or because of built-in timeout control.
The main object to interact with is a Dispatcher object.
Since sometimes is important to handle different scarce resources at a time, as much as needed Dispatcher objects can be created. It makes sense to interact with different pieces of software separately like dealing with your database or an exposed REST API.
The creation of a Dispatcher, relies on objects instead of calling AsyncHelper.waterfall( [], callback ) for well reasons:
- concurrency purposes. A dispatcher can set a maximum number of concurrent operations, so that you can keep under fine control the scarce resources of your application, like for example, database connections.
- keep track of running operations, and pending running operations.
A dispatcher is hence a Queue, served as a first-in first-out, with interesting control flow capabilities. How you get notified about dispatched tasks status is by means of Future objects.
For each element submitted for execution to a Dispatcher, a Future object will be returned in exchange. A Future object offers a more formal and complete notification scheme than a callback:
- a future can be checked for its value. Either not set (still pending execution), and if set, whether is an Error.
- multiple observers can be listening to the future's value change.
- it is latched by a
Condition, so its value can only be set once. And the future's observers will only be notified once.
constfuture=_dispatcher.waterfall([...]);Future objects expose three different callback notification mechanisms (in fact they are the same, but with some sugar icing):
future.onValueset((f:Future<number>)=>{// check future's value.// it will be either a number or Error. });Promisify a future object:
future.then((v:number)=>{// waterfall returned v},(e:Error)=>{// waterfall errored});notify on a nodejs standard callback:
future.node((err:Error,v:number)=>{// waterfall returned v or Error});But what can be actually be submitted to a Dispatcher. There are three operations a Dispatcher can execute:
waterfallof nodejs-style functions.ParallelConditionobjects.- arbitrary functions with time control.
waterfall-ing is the most common scenario for the AsyncHelper. It allows to have fine timing control for the whole function waterfall, and pass-through error control. A thrown Error in any of the functions can be catch and propagated to the Future as an error value.
Bonus points: if an Error is thrown, you can get a detailed string representation of the waterfall status at the moment of throw, and all parameters passed to each waterfall function by accessing sequenceStackTrace on the error object.
const_d=newDispatcher(4);// 4 concurrent elements. // so up to 4 submitted elements will be executing concurrently. // all others will be queued until another element ends execution. _d.waterfall([functionf1(){ ... },functionf2(err:Error,ret_from_prev_function : any){}],2000,// 2 seconds to execute the waterfalltrue// if a function of the waterfall throws an error, halt execution ).onValueSet((f:Future){// the future has value set.});- Functions must comply with nodejs's callback convention: (e:Error, args:any).
- Functions return value will be propagated to the next function. The latest waterfall function return value will be propagated as the
Futureobject's value. - Functions can't be fat arrow functions. Internally, the
Dispatchersets this as the waterfall control function itself, otherwise, it won't be able to propagate returned values from function to function. - Functions must return a value. If no the function has no return value it is expected that the next waterfall function will be invoked as a callback from something invoked in the current waterfall's function.
- You can set properties on this. The execution context will be a per-waterfall submission function which handles the control flow.
With all this, a real life use case for the waterfall could be:
game_dispatcher.waterfall([function(){// set the next waterfall function as postgres' connect callback. pg.connect(DU.PG_CONN_STRING,this);},function(err:Error,client:pg.Client,done:()=>void){if(DU.HandleError(err,client,done)){// stop waterfall, and propagate error to the Future value.throwerr;}// pass postgres specific objects to next waterfall functions this.props={client : client,done : done};this.props.client.query("update client_created_games "+"set"+" context= $1,"+" status= $2,"+" ...",[ ... ],this)},function(err:Error/*, result:pg.ResultBuilder*/){if(DU.HandleError(err,this.props.client,this.props.done)){throwerr;}this.props.client.query("select "+" id,"+" client_id, ...",[ ... ],this);},function(err:Error,result:pg.ResultBuilder){if(DU.HandleError(err,this.props.client,this.props.done)){throwerr;}// return postgres connection to the poolthis.props.done();if(result.rows.length===1){// future's valuereturn12345;}else{// future's valuethrownewError("An error.");}}],1000,// take a second as much to execute waterfall functions. 0 for no timeout. optionaltrue// error pass-through enabled ?).onValueSet(// check future.getValue() for an error.(future:Future)=> ... );A ParallelCondition can be scheduled for execution. The future's value will be a boolean, indicating Condition met or not. Remember, the Condition might have short-circuited early, and does not mean all Condition operations are done, only the Condition value has been set.
constpc=newParallelCondition( ... );_dispatcher.submitCondition(pc).then((condition_result:boolean)=>{// the condition result was successful or not },(err:Error)=>{// an error ocurred});This Dispatcher function submits an arbitrary function for execution. It also exposes timeout control. Since a dispatcher does not have any knowledge of the execution context, the function has to be bound with its parameters.
_dispatcher.submit(function(){},0// no timeout);functionfn(a,b,c){ ... }// execute fn with a maximum time of 100 milliseconds._dispatcher.submit(fn.bind(null,3,4,5),100);The method Dispatcher.addIsEmptyListener( (d:Dispatcher)=>void ) will be invoked whenever a dispatcher has drained all its scheduled tasks.
Though the most interesting object in AsyncHelper is the Dispatcher, it exposes some really useful objects as well.
Internally, Dispatcher relies on choreographing these objects, but they could also be used directly
Conditions are the basic building blocks. A condition is a simple object wrapping a boolean variable, and exposing events based on its changes.
exporttypeSignalObserver=(...args:any[])=>void;constc=newCondition().onTrue((c:Condition)=>{}).onFalse((c:Condition)=>{}).onStateChange((c:Condition)=>{});Initially, a Condition has no internal state. It is nor true, nor false. When it has its values set, the onXXX callbacks will be invoked. Internally, a Condition uses a Signal object, which is useful to register multiple function callbacks for each event.
c.onTrue((c:Condition)=>{...}).onTrue((c:Condition)=>{...});c.setTrue();// will notify to both callbacks.A value is set by externally calling:
c.setTrue();c.setFalse();can also be reseted
c.setNotSet();A Condition can also have value set based on a Timer
const_condition : Condition=newCondition().onTrue((c:Condition)=>{ ... }).onFalse((c:Condition)=>{// invoked in 200ms unless _condition.setTrue() is called before.}).setTimeout(200);A simple boolean variable can be leveraged with a whole tree of Condition objects, which will be represented by a ConditionTree. A tree is defined by one or more Condition and a boolean operator. ConditionTree objects short circuit, and notify onXXX methods as fast as possible. The evaluation is totally lazy, and may eventually happen when any of the Conditions inside the tree fire.
constc0=newCondition();constc1=newCondition();constc2=newCondition();constct : ConditionTree=newConditionTree(BOOLEAN_OPERATOR.AND).onFalse((ct : Condition)=>{}).addCondition(c0).addCondition(c1).addCondition(c2);c0.setTrue();// ct still has no value.c1.setFalse();// short circuit, ct evaluates to false now, // regardless c1 and c2 have values set later.c1.setTrue();// it does not change ct's value.Condition trees can be nested:
constc0=newCondition();constc1=newCondition();constc2=newCondition();constc3=newCondition();constct0 : ConditionTree=newConditionTree(BOOLEAN_OPERATOR.OR).addCondition(c0).addCondition(c1);constct1 : ConditionTree=newConditionTree(BOOLEAN_OPERATOR.OR).addCondition(c2).addCondition(c3);constctct : ConditionTree=newConditionTree(BOOLEAN_OPERATOR.OR).addCondition(ct0).addCondition(ct1).onTrue((c : Condition){console.log(c.getCurrentValue());// BOOL_OPERATOR.TRUE | BOOL_OPERATOR.FALSE |BOOL_OPERATOR.NOT_SET });c2.setTrue();// ctct invoked onTrue();ParallelCondition objects are asynchronous ConditionTrees. It is initialized from a collection of ParallelCondition and or callbacks of the form: (c:Condition, index:number)=>void.
The idea with this object is each of the elements supplied at construction time will be executed on the next tick. Elements supplied are either other ParallelCondition objects, or worker functions which will notify their activity result on the Condition object supplied as parameter.
The ParallelCondition evaluates as an AND ConditionTree, so it expects every action to notify back with setTrue(). It will shot-circuit fast, so any action notifying in its parameter Condition with setFalse() will make the ParallelCondition evaluate as false, and notify immediately onFalse() regardless all parallel operations have ended.
A real use case for this object is:
- write some activity to the database
- get postgres connection
- insert
- check result
- notify on the parameter condition
- (and) send some content to redis
- (and) get some XHR content
// Wait for **all** these things to happen, with a 3 second limit.functiondo_postgres_stuff(_condition:Condition,index:number){// more on the `Dispatcher` beast later // Execute the sequence of functions, // + take a maximum of 2 seconds to execute, otherwise set an error as result// + if an error is thrown in any function, halt sequence execution.//_dispatcher.submitNodeSequence([functiongetConnection(){},functioninsertContent(e:Error, ...){},functioncheckResult(e:Error, ...){}],true,// error fall through2000// max timeout).onValueSet((f : Future)=>{constf : any=future.getValue();if(finstanceofError){// you can read f.sequenceStacktrace to get detailed info of what went wrongconsole.log(f.sequenceStackTrace);_condition.setFalse();}else{_condition.setTrue();}});}functiondo_redis_stuff(_condition:Condition,index:number){ ... }functiondo_xhr_stuff(_condition:Condition,index:number){ ... }constpc=newParallelCondition([do_posgres_stuff,do_redis_stuff,do_xhr_stuff]).onTrue((c : Condition)=>{// all operations ended correctly}).onFalse((c : Condition)=>{// something went wrong. // condition short-circuited, some operations may not have ended yet.}).setTimeout(3000);// take 3 secs as much to process the condition, othewise, onFalse will be invoked.