Module gossip

The behaviour modul (gossip_beh.erl) of the gossiping framework.

Copyright © 2010-2015 Zuse Institute Berlin

Version: $Id$

Behaviours: gen_component.

Authors: Jens V. Fischer (jensvfischer@gmail.com).

Description

The behaviour modul (gossip_beh.erl) of the gossiping framework.

The framework is designed to allow the implementation of gossip based dissemination and gossip based aggregation protocols. Anti-entropy gossiping was not considered. The communication scheme used by the framework is push-pull gossiping as this offers the best speed of convergence. The membership protocol used for the peer selection is Cyclon.

The gossiping framework comprises three kinds of components:
  1. The gossiping behaviour (interface) gossip_beh.erl. The behaviour defines the contract that allows the callback module to be used by the behaviour module. The behaviour defines the contract by specifying functions the callback module has to implement.
  2. The callback modules. A callback module implements a concrete gossiping protocol by implementing the gossip_beh.erl, i.e. by implementing the functions specified in the gossip_beh.erl. The callback module provides the protocol specific code. For an example callback module see gossip_load.erl.
  3. The behaviour module gossip.erl (this module). The behaviour module provides the generic code of the gossiping framework. It calls the callback functions of the callback modules defined in gossip_beh.erl.

The relation between behaviour and callback modules is modelled as a one-to-many relation. That is to say, the behaviour module is implemented as single process (per node) and all the callback module run in the context of this single process. This has the advantage of reducing the number of spawned processes and allowing for a better grouping of messages.

The framework is started as part of the startup procedure of a dht_node. The framework maintains a list of callback modules in the CBMODULES macro which are started together with the framework. It is also possible to individually start and stop callback modules later.

The pattern for communication between the behaviour module and a callback module is the following: From the behaviour module to a callback module communication occurs as a call to a function of the callback module. These calls have to return quickly, no long-lasting operations, especially no receiving of messages, are allowed. Therefore, the answers to these function calls are mainly realised as messages from the respective callback module to the behaviour module, not as return values of the function calls.

Phases of a Gossiping Operation

Prepare-Request Phase

The prepare-request phase consists of peer and data selection. The selection of the peer is usually managed by the framework. At the beginning of every cycle the behaviour module requests a peer from the Cyclon module of Scalaris, which is then used for the data exchange. The peer selection is governed by the select_node() function: returning false causes the behaviour module to handle the peer selection as described. Returning true causes the behaviour module to expect a selected_peer message with a peer to be used by for the exchange. How many peers are contracted for data exchanges every cycle depends on the fanout() config function.

The selection of the exchange data is dependent on the specific gossiping task and therefore done by a callback module. It is initiated by a call to select_data(). When called with select_data(), the respective callback module has to initiate a selected_data message to the behaviour module, containing the selected exchange data. Both peer and data selection are initiated in immediate succession through periodical trigger messages, so they can run concurrently. When both data and peer are received by the behaviour module, a p2p_exch message with the exchange data is sent to the peer, that is to say to the gossip behaviour module of the peer.

Prepare-Reply Phase

Upon receiving a p2p_exch message, a node enters the prepare-reply phase and is now in its passive role as responder. This phase is about the integration of the received data and the preparation of the reply data. Both of these tasks need to be handled by the callback module. The behaviour module passes the received data with a call to select_reply_data(QData) to the correspondent callback module, which merges the data with its own local data and prepares the reply data. The reply data is sent back to the behaviour module with a selected_reply_data message. The behaviour module then sends the reply data as a p2p_exch_reply message back to the original requester.

Integrate-Reply Phase

The integrate-reply phase is triggered by a p2p_exch_reply message. Every p2p_exch_reply is the response to an earlier p2p_exch (although not necessarily to the last p2p_exch request. The p2p_exch_reply contains the reply data from the peer, which is passed to the correspondent callback module with a call to integrate_data(QData). The callback module processes the received data and signals to the behaviour module the completion with an integrated_data message. On a conceptual level, a full cycle is finished at this point and the behaviour module counts cycles by counting the integrated_data messages. Due to the uncertainties of message delays and local clock drift it should be clear however, that this can only be an approximation. For instance, a new cycle could have been started before the reply to the current request has been received (phase interleaving) and, respectively, replies from the other cycle could be "wrongly" counted as finishing the current cycle (cycle interleaving).

Instantiation

Many of the interactions conducted by the behaviour module are specific to a certain callback module. Therefore, all messages and function concerning a certain callback module need to identify with which callback module the message or call is associated. This is achieved by adding a tuple of the module name and an instance id to all those messages and calls. While the name would be enough to identify the module, adding the instance id allows for multiple instantiation of the same callback module by one behaviour module. This tuple of callback module and instance id is also used to store information specific to a certain callback module in the behaviour module's state.

Messages to the Callback Modules (cb_msg)

Messages which shall be passed directly to a callback module need to have the form {cb_msg, CModule, Msg}, where CBModule is of type cb_module() and Msg is any message the respective callback module handles.

Messages in this form are unpacked by the gossip module and only the Msg is send to the given CMModule.

If a callback module wants to receive a response from another process, it should pack its Pid with an envelope of the form {PidOfRequestor, e, 3, {cb_msg, CBModule, '_'}} with a call to EnvPid = comm:reply_as(PidOfRequestor, 3, {cb_msg, CBModule, '_'}) and use the EnvPid as SourcePid when sending the request, e.g. comm:send(Pid, {get_dht_nodes, EnvPid}, [{?quiet}])

Used abbreviations

Data Types

bh_message()

bh_message() = {activate_gossip,
                Neighbors :: nodelist:neighborhood()}
             | {start_gossip_task,
                CBModule :: cb_module(),
                Args :: list()}
             | {gossip_trigger,
                TriggerInterval :: pos_integer()}
             | {trigger_action,
                TriggerInterval :: pos_integer()}
             | {update_range, NewRange :: intervals:interval()}
             | {web_debug_info, SourcePid :: comm:mypid()}
             | send_error()
             | {bulkowner,
                deliver,
                Id :: uid:global_uid(),
                Range :: intervals:interval(),
                Msg :: comm:message(),
                Parents :: [comm:mypid(), ...]}
             | {remove_all_tombstones}

cb_message()

cb_message() = {selected_data,
                CBModule :: cb_module(),
                PData :: gossip_beh:exch_data()}
             | {selected_peer,
                CBModule :: cb_module(),
                CyclonMsg ::
                    {cy_cache,
                     RandomNodes :: [node:node_type()]}}
             | {p2p_exch,
                CBModule :: cb_module(),
                SourcePid :: comm:mypid(),
                PData :: gossip_beh:exch_data(),
                OtherRound :: non_neg_integer()}
             | {selected_reply_data,
                CBModule :: cb_module(),
                QData :: gossip_beh:exch_data(),
                Ref :: pos_integer(),
                Round :: non_neg_integer()}
             | {p2p_exch_reply,
                CBModule :: cb_module(),
                SourcePid :: comm:mypid(),
                QData :: gossip_beh:exch_data(),
                OtherRound :: non_neg_integer()}
             | {integrated_data,
                CBModule :: cb_module(),
                current_round}
             | {new_round,
                CBModule :: cb_module(),
                NewRound :: non_neg_integer()}
             | {cb_msg,
                CBModule :: cb_module(),
                Msg :: comm:message()}
             | {stop_gossip_task, CBModule :: cb_module()}
             | {no_msg}

cb_module()

cb_module() = 
    {Module :: cb_module_name(), Id :: cb_module_id()}

cb_module_id()

cb_module_id() = atom() | uid:global_uid()

cb_module_name()

cb_module_name() = module()

cb_status()

cb_status() = unstarted | started | tombstone

exch_data()

exch_data() = 
    {ExchData :: undefined | any(),
     Peer :: undefined | comm:mypid()}

message()

message() = bh_message() | cb_message()

send_error()

send_error() = 
    {send_error,
     _Pid :: comm:mypid(),
     Msg :: message(),
     Reason :: atom()}

state()

state() = 
    #state{cb_modules = [cb_module()],
           msg_queue = msg_queue:msg_queue(),
           range = intervals:interval(),
           status = init | uninit,
           trigger_add = [pos_integer()],
           trigger_groups =
               [{TriggerInterval :: pos_integer(),
                 CBModules :: [cb_module()]}],
           trigger_locks = [{cb_module(), locked | free}],
           trigger_remove = [pos_integer()],
           cb_states = [{cb_module(), any()}],
           cb_stati = [{cb_module(), cb_status()}],
           cycles = [{cb_module(), non_neg_integer()}],
           exch_datas = [{cb_module(), exch_data()}],
           reply_peers =
               [{Ref :: pos_integer(), Pid :: comm:mypid()}],
           rounds = [{cb_module(), non_neg_integer()}]}

Function Index

activate/1Activate the gossip module.
check_config/0Check the config of the gossip module.
init/1Initialises the state of the gossip module.
on_active/2Message handler during the normal operation of the gossip module.
on_inactive/2Message handler during the startup of the gossip module.
remove_all_tombstones/0Globally removes all tombstones from previously stopped callback modules.
rm_filter_slide_msg/3Checks whether the received notification is a {slide_finished, succ} or {slide_finished, pred} msg.
rm_my_range_changed/3Checks whether the node's range has changed, i.e.
rm_send_activation_msg/5Sends the activation message to the behaviour module (this module) Used to subscribe to the ring maintenance for {slide_finished, succ} or {slide_finished, pred} msg.
rm_send_new_range/5Notifies the node's gossip process of a changed range.
start_gen_component/5
start_gossip_task/2Globally starts a gossip task identified by CBModule.
start_link/1Start the process of the gossip module.
stop_gossip_task/1Globally stop a gossip task.
tester_create_cb_module_names/1Value creater for type_check_SUITE.

Function Details

start_gen_component/5

start_gen_component(Module :: module(),
                    Handler :: gen_component:handler(),
                    Args :: term(),
                    Options :: [gen_component:option()],
                    Self :: pid()) ->
                       no_return() | ok

start_link/1

start_link(DHTNodeGroup :: pid_groups:groupname()) -> {ok, pid()}

Start the process of the gossip module.
Called by sup_dht_node, calls gen_component:start_link to start the process.

init/1

init(X1 :: []) -> state()

Initialises the state of the gossip module.
Called by gen_component, results in on_inactive handler.

activate/1

activate(Neighbors :: nodelist:neighborhood()) -> ok

Activate the gossip module.
Called by dht_node_join. Activates process (when only node of the system) or subscribes to the rm to activate on slide_finished messages.
Result of the activation is to switch to the on_active handler.

remove_all_tombstones/0

remove_all_tombstones() -> ok

Globally removes all tombstones from previously stopped callback modules.

rm_filter_slide_msg/3

rm_filter_slide_msg(Neighbors, Neighbors, Reason) -> boolean()

Checks whether the received notification is a {slide_finished, succ} or {slide_finished, pred} msg. Used as filter function for the ring maintanance.

rm_send_activation_msg/5

rm_send_activation_msg(Subscriber,
                       X2 :: gossip,
                       Neighbours,
                       Neighbours,
                       Reason) ->
                          ok

Sends the activation message to the behaviour module (this module) Used to subscribe to the ring maintenance for {slide_finished, succ} or {slide_finished, pred} msg.

on_inactive/2

on_inactive(Msg :: message(), State :: state()) -> state()

Message handler during the startup of the gossip module.

on_active/2

on_active(Msg :: message(), State :: state()) -> state()

Message handler during the normal operation of the gossip module.

rm_my_range_changed/3

rm_my_range_changed(OldNeighbors :: nodelist:neighborhood(),
                    NewNeighbors :: nodelist:neighborhood(),
                    IsSlide :: rm_loop:reason()) ->
                       boolean()

Checks whether the node's range has changed, i.e. either the node itself or its pred changed.

rm_send_new_range/5

rm_send_new_range(Subscriber :: pid(),
                  Tag :: gossip,
                  OldNeighbors :: nodelist:neighborhood(),
                  NewNeighbors :: nodelist:neighborhood(),
                  Reason :: rm_loop:reason()) ->
                     ok

Notifies the node's gossip process of a changed range. Used to subscribe to the ring maintenance.

check_config/0

check_config() -> boolean()

Check the config of the gossip module.
Calls the check_config functions of all callback modules.

start_gossip_task/2

start_gossip_task(GossipTask, Args) -> ok

Globally starts a gossip task identified by CBModule.
Args is passed to the init function of the callback module.
CBModule is either the name of a callback module or an name-instance_id tuple.

stop_gossip_task/1

stop_gossip_task(CBModule :: cb_module()) -> ok

Globally stop a gossip task.

tester_create_cb_module_names/1

tester_create_cb_module_names(X1 :: 1) -> cb_module_name()

Value creater for type_check_SUITE.


Generated by EDoc, Feb 29 2016, 16:12:18.