|
22 | 22 |
|
23 | 23 | -define(SERVER, ?MODULE). |
24 | 24 |
|
25 | | --record(state,{limit=0, |
26 | | -free_pids=[], |
| 25 | +-record(state,{free_pids=[], |
27 | 26 | jobs=#{}, |
28 | 27 | queue=queue:new()}). |
29 | 28 |
|
@@ -128,16 +127,24 @@ code_change(_OldVsn, State, _Extra) -> |
128 | 127 | %%% Internal functions |
129 | 128 | %%%=================================================================== |
130 | 129 |
|
131 | | -handle_down_worker(Pid, State=#state{limit=Limit, jobs=Pids, queue=Queue}) -> |
132 | | -io:format("got worker down"), |
| 130 | +handle_down_worker(Pid, State=#state{free_pids=FreePids, jobs=Jobs, queue=Queue}) -> |
| 131 | +io:format("Got worker down. Sending error back to caller~n"), |
| 132 | +% Send error back to the caller |
| 133 | +{From, Number} =maps:get(Pid, Jobs), |
| 134 | +gen_server:reply(From,{error,{failed_to_check, Number}}), |
| 135 | + |
| 136 | +% Start a new worker in its place |
| 137 | +{ok, NewPid} =supervisor:start_child(prime_tester_worker_sup, []), |
| 138 | +_Ref=erlang:monitor(process, NewPid), |
| 139 | + |
133 | 140 | casequeue:out(Queue) of |
134 | 141 | {{value,{From, Number}}, NewQueue} -> |
135 | | -{ok, Pid} =supervisor:start_child(prime_tester_worker_sup, [Number]), |
136 | | -NewPid=erlang:monitor(process, Pid), |
137 | | -NewPids=gb_sets:insert(NewPid, maps:remove(Pid,Pids)), |
138 | | -gen_server:reply(From,{ok, Pid}), |
139 | | -{noreply, State#state{jobs=NewPids, queue=NewQueue}}; |
| 142 | +% Start the new job |
| 143 | +ok=prime_tester_worker:is_prime(NewPid, Number), |
| 144 | +NewJobs=maps:put(NewPid, {From, Number}, maps:remove(Pid,Jobs)), |
| 145 | + |
| 146 | +{noreply, State#state{jobs=NewJobs, queue=NewQueue}}; |
140 | 147 | {empty, _} -> |
141 | 148 | % If nothing in the queue continue as before |
142 | | -{noreply, State#state{limit=Limit+1, jobs=gb_sets:delete(Pid, Pids)}} |
| 149 | +{noreply, State#state{free_pids=[NewPid|FreePids], jobs=maps:remove(Pid, Jobs)}} |
143 | 150 | end. |
0 commit comments