|
| 1 | +%%%------------------------------------------------------------------- |
| 2 | +%%% @author trevor |
| 3 | +%%% @copyright (C) 2020, trevor |
| 4 | +%%% @doc |
| 5 | +%%% The queue server to handle queuing sending individual |
| 6 | +%%% @end |
| 7 | +%%%------------------------------------------------------------------- |
| 8 | +-module(prime_tester_load_balancer). |
| 9 | + |
| 10 | +-behaviour(gen_server). |
| 11 | + |
| 12 | +%% API |
| 13 | +-export([start_link/1, is_prime/1]). |
| 14 | + |
| 15 | +%% gen_server callbacks |
| 16 | +-export([init/1, |
| 17 | +handle_call/3, |
| 18 | +handle_cast/2, |
| 19 | +handle_info/2, |
| 20 | +terminate/2, |
| 21 | +code_change/3]). |
| 22 | + |
| 23 | +-define(SERVER, ?MODULE). |
| 24 | + |
| 25 | +-record(state,{limit=0, |
| 26 | +free_pids=[], |
| 27 | +jobs=#{}, |
| 28 | +queue=queue:new()}). |
| 29 | + |
| 30 | +%%%=================================================================== |
| 31 | +%%% API |
| 32 | +%%%=================================================================== |
| 33 | + |
| 34 | +%%-------------------------------------------------------------------- |
| 35 | +%% @doc |
| 36 | +%% Starts the server |
| 37 | +%% |
| 38 | +%% @end |
| 39 | +%%-------------------------------------------------------------------- |
| 40 | +-specstart_link(Limit::integer()) ->{ok, Pid::pid()} | ignore |{error, Error::any()}. |
| 41 | + |
| 42 | +start_link(Limit) whenis_integer(Limit) -> |
| 43 | +gen_server:start_link({local, ?SERVER}, ?MODULE,{Limit}, []). |
| 44 | + |
| 45 | +-specis_prime(Number::integer()) ->any(). |
| 46 | + |
| 47 | +is_prime(Number) -> |
| 48 | +gen_server:call(?SERVER,{is_prime, Number}). |
| 49 | + |
| 50 | +%%%=================================================================== |
| 51 | +%%% gen_server callbacks |
| 52 | +%%%=================================================================== |
| 53 | + |
| 54 | +init({Limit}) -> |
| 55 | +% Start up all workers |
| 56 | +Pids=lists:map(fun(_Num) -> |
| 57 | +% Start the worker |
| 58 | +{ok, Pid} =supervisor:start_child(prime_tester_worker_sup, []), |
| 59 | +_Ref=erlang:monitor(process, Pid), |
| 60 | +Pid |
| 61 | +end, lists:seq(1, Limit)), |
| 62 | + |
| 63 | +% Store pids in state |
| 64 | +{ok, #state{free_pids=Pids}}. |
| 65 | + |
| 66 | +handle_call({is_prime, Number}, From, State=#state{free_pids=[], queue=Q}) -> |
| 67 | +% If we've already hit our limit for workers queue the request |
| 68 | +{noreply, State#state{queue=queue:in({From, Number}, Q)}}; |
| 69 | +handle_call({is_prime, Number}, From, State=#state{free_pids=[Pid|Pids], jobs=R}) -> |
| 70 | +% If we still haven't hit our limit for workers, give the worker the new job |
| 71 | +ok=prime_tester_worker:is_prime(Pid, Number), |
| 72 | + |
| 73 | +% We will reply when the worker replies to us |
| 74 | +{noreply, State#state{free_pids=Pids, jobs=maps:put(Pid,{From, Number}, R)}}; |
| 75 | +handle_call(_Request, _From, State) -> |
| 76 | +Reply=ok, |
| 77 | +{reply, Reply, State}. |
| 78 | + |
| 79 | +handle_cast({worker_result, Pid, Result}, State=#state{free_pids=FreePids, jobs=Jobs, queue=Queue}) -> |
| 80 | +% Lookup client pid in map |
| 81 | +casemaps:is_key(Pid, Jobs) of |
| 82 | +true -> |
| 83 | +% Send the reply back to the original `From` pid |
| 84 | +{From, _Number} =maps:get(Pid, Jobs), |
| 85 | +gen_server:reply(From,{ok, Result}), |
| 86 | + |
| 87 | +% Start the next job |
| 88 | +casequeue:out(Queue) of |
| 89 | +{{value,{From, Number}}, NewQueue} -> |
| 90 | +% Put the old pid back in the list of pids and get the new one |
| 91 | + [NewPid|RestPids] =FreePids, |
| 92 | +NewFreePids=RestPids++ [Pid], |
| 93 | + |
| 94 | +% Start the new job |
| 95 | +ok=prime_tester_worker:is_prime(NewPid, Number), |
| 96 | +NewJobs=maps:put(NewPid,{From, Number}, maps:remove(Pid, Jobs)), |
| 97 | + |
| 98 | +{noreply, State#state{free_pids=NewFreePids, jobs=NewJobs, queue=NewQueue}}; |
| 99 | +{empty, _} -> |
| 100 | +% If empty no change |
| 101 | +{noreply, State#state{free_pids=FreePids++ [Pid], jobs=maps:remove(Pid, Jobs)}} |
| 102 | +end; |
| 103 | +false -> |
| 104 | +% If we don't find a pid in a map ignore this message |
| 105 | +{noreply, State#state{jobs=maps:remove(Pid, Jobs)}} |
| 106 | +end; |
| 107 | +handle_cast(_Msg, State) -> |
| 108 | +{noreply, State}. |
| 109 | + |
| 110 | +% Handle getting a down message from a finished worker |
| 111 | +handle_info({'DOWN', _Ref, process, Pid, _}, State=#state{jobs=Pids}) -> |
| 112 | +casemaps:is_key(Pid, Pids) of |
| 113 | +true -> |
| 114 | +handle_down_worker(Pid, State); |
| 115 | +false -> %% Not our responsibility |
| 116 | +{noreply, State} |
| 117 | +end; |
| 118 | +handle_info(_Info, State) -> |
| 119 | +{noreply, State}. |
| 120 | + |
| 121 | +terminate(_Reason, _State) -> |
| 122 | +ok. |
| 123 | + |
| 124 | +code_change(_OldVsn, State, _Extra) -> |
| 125 | +{ok, State}. |
| 126 | + |
| 127 | +%%%=================================================================== |
| 128 | +%%% Internal functions |
| 129 | +%%%=================================================================== |
| 130 | + |
| 131 | +handle_down_worker(Pid, State=#state{limit=Limit, jobs=Pids, queue=Queue}) -> |
| 132 | +io:format("got worker down"), |
| 133 | +casequeue:out(Queue) of |
| 134 | +{{value,{From, Number}}, NewQueue} -> |
| 135 | +{ok, Pid} =supervisor:start_child(prime_tester_worker_sup, [Number]), |
| 136 | +NewPid=erlang:monitor(process, Pid), |
| 137 | +NewPids=gb_sets:insert(NewPid, maps:remove(Pid,Pids)), |
| 138 | +gen_server:reply(From,{ok, Pid}), |
| 139 | +{noreply, State#state{jobs=NewPids, queue=NewQueue}}; |
| 140 | +{empty, _} -> |
| 141 | +% If nothing in the queue continue as before |
| 142 | +{noreply, State#state{limit=Limit+1, jobs=gb_sets:delete(Pid, Pids)}} |
| 143 | +end. |
0 commit comments