Parallel Processing WorkerPool for PHP
10K Downloads within 4 Months, thank you very much! We're adding features as anyone requires them.
The WorkerPool class provides a very simple interface to pass data to a worker pool and have it processed. You can at any time fetch the results from the workers. Each worker child can return any value that can be serialized.
<?php$wp=new \QXS\WorkerPool\WorkerPool(); $wp->setWorkerPoolSize(4) ->create(new \QXS\WorkerPool\ClosureWorker( /** * @param mixed $input the input from the WorkerPool::run() Method * @param \QXS\WorkerPool\Semaphore $semaphore the semaphore to synchronize calls accross all workers * @param \ArrayObject $storage a persistent storage for the current child process */function($input, $semaphore, $storage){echo"[".getmypid()."]"." hi $input\n"; sleep(rand(1,3)); // this is the working load!return$input; // return null here, in case you do not want to pass any data to the parent } ) ); for($i=0; $i<10; $i++){$wp->run($i)} $wp->waitForAllWorkers(); // wait for all workersforeach($wpas$val){var_dump($val); // dump the returned values }<?phpuseQXS\WorkerPool\WorkerPool; useQXS\WorkerPool\Worker; useQXS\WorkerPool\Semaphore; /** * Our Worker Class */Class MyWorker implements Worker{protected$sem; /** * after the worker has been forked into another process * * @param \QXS\WorkerPool\Semaphore $semaphore the semaphore to run synchronized tasks * @throws \Exception in case of a processing Error an Exception will be thrown */publicfunctiononProcessCreate(Semaphore$semaphore){// semaphore can be used in the run method to synchronize the workers$this->sem=$semaphore; // write something to the stdoutecho"\t[".getmypid()."] has been created.\n"; // initialize mt_randlist($usec, $sec) = explode('', microtime()); mt_srand( (float) $sec + ((float) $usec * 100000) )} /** * before the worker process is getting destroyed * * @throws \Exception in case of a processing Error an Exception will be thrown */publicfunctiononProcessDestroy(){// write something to the stdoutecho"\t[".getmypid()."] will be destroyed.\n"} /** * run the work * * @param Serializeable $input the data, that the worker should process * @return Serializeable Returns the result * @throws \Exception in case of a processing Error an Exception will be thrown */publicfunctionrun($input){$input=(string)$input; echo"\t[".getmypid()."] Hi $input\n"; sleep(mt_rand(0,10)); // this is the workload!// and sometimes exceptions might occurif(mt_rand(0,10)==9){thrownew \RuntimeException('We have a problem for '.$input.'.')} return"Hi $input"; // return null here, in case you do not want to pass any data to the parent } } $wp=newWorkerPool(); $wp->setWorkerPoolSize(10) ->create(newMyWorker()); // produce some tasksfor($i=1; $i<=50; $i++){$wp->run($i)} // some statisticsecho"Busy Workers:".$wp->getBusyWorkers()." Free Workers:".$wp->getFreeWorkers()."\n"; // wait for completion of all tasks$wp->waitForAllWorkers(); // collect all the resultsforeach($wpas$val){if(isset($val['data'])){echo"RESULT: ".$val['data']."\n"} elseif(isset($val['workerException'])){echo"WORKER EXCEPTION: ".$val['workerException']['class'].": ".$val['workerException']['message']."\n".$val['workerException']['trace']."\n"} elseif(isset($val['poolException'])){echo"POOL EXCEPTION: ".$val['poolException']['class'].": ".$val['poolException']['message']."\n".$val['poolException']['trace']."\n"} } // write something, before the parent exitsecho"ByeBye\n";See what's happening when running a PS:
root 2378 \_ simpleExample.php: Parent root 2379 \_ simpleExample.php: Worker 1 of QXS\WorkerPool\ClosureWorker [busy] root 2380 \_ simpleExample.php: Worker 2 of QXS\WorkerPool\ClosureWorker [busy] root 2381 \_ simpleExample.php: Worker 3 of QXS\WorkerPool\ClosureWorker [free] root 2382 \_ simpleExample.php: Worker 4 of QXS\WorkerPool\ClosureWorker [free] The documentation can be found here http://qxsch.github.io/WorkerPool/doc/



