Active Middleware for multicast and reduction operations in distributed cluster environments


Nitin Bahadur and Nadathur Gokul

{ bnitin, gokul }


This paper describes a scalable, dynamic middleware for use in cluster environments. The middleware provides a base for building applications that scale with the number of nodes. It provides data communication facilities to applications built on top of it through an event-driven model and a typical send-receive paradigm. New functionality is added dynamically to nodes in the cluster by using dynamic code execution techniques. Scalability is achieved by logically partitioning the nodes into a binomial tree and intermediate nodes of the tree perform reduction on results received from child nodes. Our model handles node failures and does automatic tree reconfiguration. We have built two applications, system-monitoring tool and file transfer/program spawning to demonstrate the use of our middleware. Performance results show that our middleware is scalable.

  1. Introduction

One of the key issues in high performance distributed computing is scalability. For large distributed applications, we require tools for program monitoring, debugging, which should not only operate in a distributed manner but also scale with the number of nodes. The underlying framework should allow tools to operate efficiently oblivious of the underlying communication mechanism. The goal of this project is to provide a middleware for performing distributed reduction[1] applications in a scalable and dynamic manner. The middleware is targeted at cluster environments but can be easily adapted to a wide-area network. This section presents the motivation for the project and a summary of our objectives.

1.1Motivation and Objectives

  1. Scalability

Scalability is an important issue in distributed applications. In a master-client model, the master can become the bottleneck as the number of client increase. We aim to improve scalability by reducing the number of messages processed by each node.

  1. Communication primitives

The middleware should provide primitives for reliable communication between 2 nodes. It should also provide an efficient mechanism for sending a message to all nodes.

  1. Dynamism

A problem with a static master-client paradigm is that we cannot adapt the reduction function (function that interprets the results) based on current needs or observed conditions. So we need to change the client application every-time we make a change to the master so that the new reduction function is used everywhere. It would be good if just by changing the functionality at one (master) node, we can affect the entire computation favorably. This also reduces the associated maintenance and upgradation cost of the clients. We provide the ability to add new features to clients without interrupting their normal execution.

  1. Continuous operation

It is important that clients continue to function in the event of any kind of failure. Also, it must be possible to restart a client after it crashes. To this end, we provide a crash detection and handling and node restart.

  1. Dynamic processes

It should be possible to add new clients to an existing set without having to bring down all of them. Our support for dynamic processes allows one to add clients dynamically.

We aim to achieve the above objectives through a middleware, which provides the following features:

  • Point-to-point message delivery
  • Scalable point-to-multipoint message delivery using multicast
  • Dynamic addition of new functionality in the clients through the master
  • Detection of node failures and facility to restart a node
  • Dynamic addition of new clients to the existing set

We do not make any assumptions about the underlying hardware and our code can be easily ported to other platforms. The outline of the rest of the paper is as follows: Section 2 presents the design of our library. Section 3 is dedicated to implementation issues. We discuss some applications we have developed in Section 4. Performance evaluation of our library is presented in Section 5. Finally we discuss some related work and other possible approaches in Section 6 and conclude in Section 7.

  1. Design

In this section we present the design of the Active Reduction Tree Library (ARTL). The general architecture is as shown in Figure 1.

Consider a cluster of nodes connected by a high-speed network. The application on each node that uses the ART library starts the ART runtime. The network is partitioned logically by ARTL into a tree. Figure 1 shows the logical connectivity and not the actual underlying network connectivity. The implication of this design is discussed in Section 2.2. The ART runtime, which now resides at every node handles all inter node communication for the application. It provides functionality to send unicast/multicast messages. The runtime also provides threads of execution (pthreads) for the application to :

1. Specify its responses for queries.

2. Perform reduction operation on responses that pass through it on the way to the master.

Depending on whether the node is the master, a leaf or an intermediate node the ARTL library provides appropriate functionality transparently. The ART library operates at user level and requires no special privileges.

2.1Binomial Tree

To handle messages that are to be sent to all clients in an efficient manner the ART library partitions the nodes in the cluster in the form of a binomial tree. It is to be noted that this is a logical or application level partition of the nodes and does not involve any changes to the network level connectivity. The assumption behind this scheme is that, in cluster environments the network latency is very low (as compared to WAN environments) between 2 nodes and hence if a single node is bombarded with lots of messages, the CPU becomes the bottleneck. We now list the properties of binomial trees [Cormen 90].

A binomial tree Bk is an ordered tree defined recursively. B0 consists of a single node. The binomial tree Bk consists of two binomial trees B k-1 that are linked together in the following way. The root of one tree is made the leftmost child of the root of the other tree. The building process is as shown in Figure 2.

The binomial tree gives a distribution that takes advantage of the parallelism in the non-leaf nodes for reducingmessages flowing upstream towards the master. If there are N nodes then the greatest number of hops from root to any leaf is log2N. This increases the number of hops but as we have mentioned before the main aim is to alleviate the processing bottleneck at the master. Towards this end, the binomial tree is an optimal configuration.

Advantages of using a binomial tree:

  1. The binomial tree allows more than 1 node to take part in computing (reducing) the messages rather than straining a single node. For example, in Figure 1, the label “reduction” shows that the corresponding node reduces the responses before passing it to the master. The operation has no dependence on peers and hence can be performed in parallel.
  2. The other motivation for using the binomial tree is that it minimizes ( as compared to a binary tree, B-Tree and any k-ary tree in general [Cormen 90]) the time required to propagate a message to all the nodes. This result can be explained intuitively as follows: For any tree – like configuration, it is the root that waits for the longest time to get all the responses. So in this wait time the root could issue more queries or messages to other nodes. This means a tree configuration that has the greatest out degree (number of children for a particular node) at the root and decreases this as we move downstream is well suited. The binomial tree has this desired characteristic. A binomial tree Bk (figure 2 shows B3) has a depth of k and a maximum out degree k. This maximum out degree is at root of the tree.

2.2Tree Setup

When a node starts, it builds a binomial tree of the network based on an initial configuration file (configuration file contains all the nodes on which the application should run). It then connects to its child nodes and parent. This process takes place at all the nodes and when the full tree is connected, the master node sends a configuration message down the tree. The configuration message transfers information about the reduction functions to be executed at intermediate nodes. The client side ARTL then loads the functions into the client application process. The mechanisms for these are explained in sections 2.5 and 3.2.

2.3Communication Subsystem

By default, ARTL provides an event-driven model for communication. A node can send a message to another node without the receiver node expecting that message. The received message can be handled by some default handler registered by the application. If the sender and receiver are synchronized then the receiver can post a receive for a message it expects to receive. Thus we allow the application to communicate using an event-driven approach as well as the traditional send-recv paradigm. An example of an event-driven message would be a message from the master informing all nodes that some node has just gone down. The running nodes could then some appropriate action based on this message, or just ignore it.

2.4Framework for processing messages

Once a message has been received, it has to be processed in a way that is unique to the application that sent the message. This means that the ART library must provide a framework into which the client side of the application can plug in its response to a message that has been received. It should be noted that functions for reducing messages (reduction functions) and functions that are invoked as a response to a message from the master (response functions), are specific to a service (message identifier) . The response functions are callbacks [Paul 97] registered by the client application with ART library (Figure 3).

Figure 3 : The design of the Event handler. Mapping is a table that binds services to objects containing reduction operations using which responses are reduced. The “RESPONSE” is a callback registered by the client application.

The functions for reducing responses are active in the sense that they are sent[2] over the wire during the start up phase and the client has no knowledge about these functions before that. The idea is similar to the concept of active networks [ANTS 98].

Active reduction functions have the advantage that if the existing data have to be interpreted or reduced in a new way, all that has to be done is to create a function that implements the reduction operation and a specification that binds the function to a particular service (interpretation of data). Using the specification a mapping is created between the function and the service for which it is used. It is to be noted that this scheme is not restricted to just reduction operations but can be used for loading new responses on the fly.

As shown in Figure 3, a mapping binds the reduction and response functions to services for which they should be invoked. Once callbacks have been specified and appropriate services have been bound, a mechanism is needed for providing an environment to execute the response and reduction operations .The environment should handle both operations transparently. When a message is received, the appropriate operation registered by the application is scheduled for execution by the scheduler. These outstanding services are then serviced by a pool of threads (figure 7 section 3.3 ) .

The scheduler consists of a static pool of threads embedded with the scheduling logic. The main motivation for maintaining a static pool of threads is that thread creation and destruction are expensive [Firefly 90]. The scheduler assigns idle threads for processing incoming messages. The result of the operation is then sent to the parent node along the binomial tree. The thread frees all resources associated with the current task and waits for the next task. We chose to implement a multithreaded message processor for ART because queries for multiple services can arrive at the same time and hence can be processed concurrently. This is especially useful in cluster environments where node typically houses more than one processor.

2.5Crash Detection, node restart and dynamic processes

The communication layer provides crash detection. When a node goes down[3], all nodes are informed of this event. The master node recomputes the binomial tree and nodes are sent a reconfiguration message. Reconfiguration is not required when a leaf node goes down. Similarly, when a previously down node restarts, it sends a join message to the master, which again does the aforementioned process and the restarted node becomes part of the tree. Figure 4 shows that a leaf node link failure (node 5) is simple to handle and there is no change in tree structure. Figure 5 shows the case when intermediate node 2 goes down.

Figure 4: Change in tree when a leaf-node goes down

Figure 5: Change in tree when a non-leaf node goes down

We allow the user to add new nodes to the existing set. The master inserts the new node in the tree and all other nodes are informed of its existence. Thus we can dynamically increase and decrease the set of running nodes. Node failure, node restart and new node are events for which the application can register callback functions and take appropriate action.

  1. Implementation

The implementation of our design in described in this section. The design was implemented on a cluster of 64 nodes running Linux version 2.2.12. Each is a dual Pentium III Xeon processor running at 550 Mhz. Each node has a RAM of 2 GB. The middleware has been implemented in C++.

Figure 6: Setting up of the binomial tree

3.1Communication Subsystem

The communication subsystem is responsible for the setup of the binomial tree and message delivery to all nodes.


Communication takes places using TCP. During startup, each node builds up TCP connections with its children and its parent (except for master). The TCP connections are kept persistent. TCP is used to ensure reliable data transfer. By using persistent connections we save on TCP handshake during open and close for each message. Each node sets up connections with its children first, followed by with its parent as shown in Figure 6. This way, when the master is connected with all its children, it is sure that all nodes (which are up) have been connected to the tree (since its children would have first connected with their children and so on). Then the master sends the reduction functions ( see Sec 3.2 for details ) to the all nodes in the tree. On receipt of these the nodes load them into their address space.


Every message sent has an ARTL header attached to it. Using this ARTL at the receiver node determines the message type and amount of data the sender is sending. ARTL supports two kinds of messages: event-driven and traditional send-recv. Event-driven messages are those for which the application has not posted an explicit receive. For such messages the communication subsystem allocates memory on behalf of the application and the message is given to the Event handler which will then call the appropriate callback function in a separate thread. In send-recv messages, the application explicitly posts a receive, the communication subsystem receives the message on behalf of the application and returns it to the application. We do not support asynchronous messages as in MPI [MPI 1.1]. Supporting that would require some storage of state information and our framework can be extended to implement asynchronous messages.

3.2Implementing active reduction

The functions that implement the reduction operations have been implemented as shared objects. These are compiled at the master end and sent to the client nodes. Once ARTL receives the shared objects and the specification, it loads the objects into the address space of the client application using dlopen(). The individual functions are loaded using dlsym(). The binding between a reduction function and a particular service is specified in the message that carries that service.

3.3Event scheduler

The event scheduler consists of three main resource components (Figure 7).

Figure 7 : Implementation of Event handler

  1. The static pool of threads that provide the resource for computation.
  2. An Event table that provides memory to maintain state for the various services to be processed.

Each entry in the Event table also provides a stack to store the responses for that entry.

  1. A queue that interfaces the event table and the thread pool.

The queue maintains the set of outstanding tasks that have to be processed by the threads and the index into the Event table for the corresponding state information. An incoming query is registered in the Event table. After the setup process, the behavior of nodes is dependent on their position in the multicast tree. If a node is a non-leaf node then the query message is passed on downstream. Further the appropriate response function is scheduled. Once the response function is executed, the state is not freed since this response has to be reduced with other responses going through this node towards the master. When all the downstream responses have arrived, the reduction function is invoked. The responses may arrive in any order. Since responses are bound to a service (sec 2.5), the order does not matter. They get stacked in the appropriate entry according to the service to which they are responding. The responses are then reduced and sent upstream towards the master. If the node is a leaf node the response for the incoming query is executed and the result is sent to the parent node. If the node is a master node, after the responses are reduced, an application specific master – response function is called to return the results of the final reduction to the application.