-module(robin_handoff). -export([start/0,start/1,start/2,stop/0]). -export([step/0]). %% see if a handoff is in progress active_count(Node) when is_atom(Node) -> Receivers=rpc:call(Node,supervisor,count_children,[riak_core_handoff_receiver_sup]), Senders=rpc:call(Node,supervisor,count_children,[riak_core_handoff_sender_sup]), lager:log(debug,self(),"Check Active: ~p~n~p~n~p",[Node,Receivers,Senders]), case is_list(Receivers) andalso proplists:is_defined(active,Receivers) andalso is_list(Senders) andalso proplists:is_defined(active,Senders) of true -> ActiveReceivers=proplists:get_value(active,Receivers), ActiveSenders=proplists:get_value(active,Senders), ActiveReceivers + ActiveSenders; false -> lager:log(info,self(),"invalid response from ~p",Node), reload end; active_count([Node | Rest]) -> active_count(Node) + active_count(Rest); active_count([]) -> 0; active_count(Node) -> lager:log(info,self(),"Handoff rotation enountered invalid node: ~p.",[Node]), 0. %% advance to next pair loop(Delay,[{Node1,Node2}|Rest], NodeList,Concurrency) -> receive tick -> %% disable any new handoffs starting rpc:multicall(NodeList,application,set_env,[riak_core,disable_inbound_handoff,true]), rpc:multicall(NodeList,application,set_env,[riak_core,disable_outbound_handoff,true]), case active_count(NodeList) of 0 -> %% if no handoffs active, move on to the next pair rpc:multicall(NodeList,riak_core_handoff_manager,set_concurrency,[0]), rpc:multicall([Node1,Node2],application,set_env,[riak_core,disable_inbound_handoff,false]), rpc:multicall([Node1,Node2],application,set_env,[riak_core,disable_outbound_handoff,false]), rpc:multicall([Node1,Node2],riak_core_handoff_manager,set_concurrency,[Concurrency]), lager:log(info,self(),"handoff_concurrency changed: ~p",[[{Node,rpc:call(Node,application,get_env,[riak_core,handoff_concurrency])} || Node <- NodeList]]), erlang:send_after(Delay,self(),tick), loop(Delay,Rest ++ [{Node1,Node2}],NodeList,Concurrency); reload -> lager:log(info,self(),"Reloading member list"), {ok,Ring} = riak_core_ring_manager:get_my_ring(), Members = riak_core_ring:all_members(Ring), case length(Members) >= 2 of true -> {Left,Right} = lists:splitwith(fun(A) -> A =/= {Node1,Node2} end, buildpairs(Members)), %%immediate tick to try again erlang:send_after(1000,self(),tick), loop(Delay, Right ++ Left, Members, Concurrency); false -> lager:log(info,self(),"Stopping round-robin handoff: Need at least 2 cluster members to handoff") end; _ -> %% don't interrupt handoff in progess, wait 1 minute erlang:send_after(60000,self(),tick) end; _ -> lager:log(info,self(),"Stopping round-robin handoff."), rpc:multicall(NodeList,application,set_env,[riak_core,disable_inbound_handoff,true]), rpc:multicall(NodeList,application,set_env,[riak_core,disable_outbound_handoff,true]), rpc:multicall(NodeList,riak_core_handoff_manager,set_concurrency,[0]), unregister(?MODULE) end. %% build list of all possible 2-node combinations buildpairs([ Node1, Node2 ]) -> [{Node1, Node2}]; buildpairs([Node | Rest]) -> [{Node,Node2} || Node2 <- Rest] ++ buildpairs(Rest). %% start(Delay) begin round-robin concurrency, advance after Delay %% minutes - default 1 hour start() -> start(60,2). start(N) -> start(N,2). start(Delay,Concurrency) -> case whereis(?MODULE) of undefined -> lager:log(info,self(),"Starting round-robin handoff_concurrency of ~p every ~p minutes.",[Concurrency, Delay]), {ok,Ring} = riak_core_ring_manager:get_my_ring(), Members = riak_core_ring:all_members(Ring), if length(Members) > 1 -> register(?MODULE, spawn(fun() -> loop(Delay * 60000, buildpairs(Members),Members,Concurrency) end)), erlang:send_after(Delay,?MODULE,tick); true -> {error, "Need at least 2 cluster members to handoff"} end; _ -> {error, "Already Started"} end. stop() -> case whereis(?MODULE) of undefined -> {error, "Not running"}; Pid -> Pid ! stop end. step() -> whereis(?MODULE) ! tick.