Skip to content

ukitgroup/amq-rpc

Repository files navigation

AMQ RPC

semantic-releaseConventional ChangelogFlowJSBuild StatusCoverage Status

Greenkeeper badgedependencies StatusdevDependencies Status

npmnodeMIT License

NPM

Attention, module currently in active development ⚠️
Soon to be released, maybe around 15 october 2018 🖖

Samples

Client:

import{RpcClient}from'amq-rpc';(async()=>{constclient=newRpcClient({service: 'my-awesome-service',version: '1.2',connectParams: {url: 'amqp://guest:guest@localhost:5672/?vhost=/',heartbeat: 30,},waitResponseTimeout: 30*1000,// timeout for wait result from service});awaitclient.ensureConnection();// accept in first param object as connectParams in constructorconstresult=awaitclient.send({foo: 'bar'},{correlationId: 'e.g. nginx req id'});constresult2=awaitclient.call('myAction',{foo: 'bar'},{correlationId: 'e.g. nginx req id'});awaitclient.destroy();})().catch(err=>console.error(err)||process.exit(1));

Service:

import{RpcService,RpcServiceHandler}from'amq-rpc';(async()=>{constservice=newRpcService({service: 'my-awesome-service',version: '1.2',connectParams: {url: 'amqp://guest:guest@localhost:5672/?vhost=/',heartbeat: 30,},queue: {prefetch: 1,durable: true,maxPriority: 100,},});service.setErrorHandler((error)=>{// All errors, which can't passed to reject operation (as error in subscriber function,// outside of user handler), will be passed to this callback.});awaitservice.addHandler(classextendsRpcServiceHandler{// If in message "type" property didn't fill (send without special options),// service will find handler with action 'default' getaction(){// in base class, RpcServiceHandler, action equal to 'default'return'myAction2';}asyncbeforeHandle(){// called nearly before handle method// use it for prepare data, init resources or logging// all throwed errors, as in handle method passed to handleFail method}// ⚠️ you must redefine this method from RpcServiceHandler classasynchandle(){// this.payload - sended payload// this.context - special object, shared between methods. By default equal to{}.// returned data passed to client as reply payloadreturn{bar: 'foo'};}// ⚠️ redefine this method only if you know what you doasynchandleFail(error: Error){/* In base class, RpcServiceHandler: - reject message in queue - reply to client error with messageId and correlationId */// you can redefine and customize error handling behavior }// ⚠️ redefine this method only if you know what you doasynchandleSuccess(replyPayload: Object){/* In base class, RpcServiceHandler: - ack message in queue - reply to client with payload and error: null */// you can redefine and customize success handling behavior }asynconFail(error: Error){// hook for logging}asynconSuccess(replyPayload: Object){// hook for logging}asyncafterHandle(error: ?Error,replyPayload: ?Object){// if current handler failed, error passed in first argument// if success handling, replyPayload passed as second argument// use it for logging or deinit resouces// wrap this code in try..catch block, because all errors from afterHandle method just // pass to error handler callback}});// Minimal handlerawaitservice.addHandler(classextendsRpcServiceHandler{asynchandle(){return{bar: `${this.payload.foo} 42`};}});awaitservice.ensureConnection();// If process receive SIGINT, service will be gracefully stopped// (wait for handler end work until timeout exceeded and then call for process.exit())awaitservice.interventSignalInterceptors({stopSignal: 'SIGINT',gracefulStopTimeout: 10*1000});})().catch(err=>console.error(err)||process.exit(1));

About

Abstraction over MQ for RPC service 📬

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages

  • JavaScript100.0%