Create a nodejs aws step-function worker/pooler easily :-)
npm install step-function-worker constfn=function(input,cb,heartbeat){// do somethingdoSomething(input)// call heartbeat to avoid timeoutheartbeat()// call callback in the endcb(null,{"foo" : "bar"});// output must be compatible with JSON.stringify};constworker=newStepFunctionWorker({activityArn : '<activity-ARN>',workerName : 'workerName',fn : fn,taskConcurrency : 22,// default is null = InfinitypoolConcurrency : 2// default is 1});Since version 3.0, concurrency has been replaced by poolConcurrency and taskConcurrency.
see more information in #16 (comment)
poolConcurrencyis the maximum number of parallel getActivity, http request (seesdk.getActivity) (default:1) Increase this to have a more responsive worker, decrease this to consume less http connections.taskConcurrency(nullmeans Infinite) represents the maximum number of parallel tasks done by the worker (default: equals topoolConcurrency).
Anyway, you should always have poolConcurrency <= taskConcurrency.
By default, this package is built on top of aws-sdk so you should set your AWS Region by changing AWS_REGION environment variable.
If you want to set it in JS code directly you can do it using awsConfig (see https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Config.html to see all available options) like
constworker=newStepFunctionWorker({activityArn : '<activity-ARN>',workerName : 'workerName',fn : fn,awsConfig: {region: '<your-region>'}});// when finish close the worker with a callback// this closing process may take up to 60 seconds per concurent worker, to close all connections smoothly without loosing any taskworker.close(function(){process.exit();})// A worker as multiple poolers and multiple running tasks// You can have infos about it by doingconst{poolers, tasks}=worker.report();// poolers is an array of{// startTime: <Date>,// workerName: <String>,// status: <String>// }//// tasks is an array of{// taskToken: <String>,// input: <Object>,// startTime: <Date>// }//You can customize logging by using a winston logger (or winston-like logger) as input
constwinston=require('winston');constlogger=winston.createLogger({level: 'debug',format: winston.format.json(),defaultMeta: {service: 'user-service'},transports: [//// - Write to all logs with level `info` and below to `combined.log` // - Write all logs error (and below) to `error.log`.//newwinston.transports.File({filename: 'error.log',level: 'error'}),newwinston.transports.File({filename: 'combined.log'})]});constworker=newStepFunctionWorker({activityArn : '<activity-ARN>',workerName : 'workerName',fn : fn, logger });Alternatively, you can just use a winston-like logger
constlogger=console;constworker=newStepFunctionWorker({activityArn : '<activity-ARN>',workerName : 'workerName',fn : fn, logger });// when a task startsworker.on('task',function(task){// task.taskToken// task.inputconsole.log("task ",task.input)});// when a task failsworker.on('failure',function(failure){// out.error// out.taskTokenconsole.log("Failure :",failure.error)});// when a heartbeat signal is sentworker.on('heartbeat',function(beat){// out.taskTokenconsole.log("Heartbeat");});// when a task succeedworker.on('success',function(out){// out.output// out.taskTokenconsole.log("Success :",out.output)});// when an error happensworker.on('error',function(err){console.log("error ",err)});// when the worker has no more task to processworker.on('empty',function(){console.log("error ",err)});// when the worker reaches taskConcurrency tasksworker.on('full',function(err){console.log("error ",err)});See JSDoc in the code.