Push Framework Dashboard

Table of contents

Introduction

Beyond websites, there are other types of applications which -if not impossible- can not be easily built using the web. What the web offers as a technology is good for situations where we need to publish content and ensure the immediate and easy deployment of changes incurred to it. The user only reacts to content. If he reacts with other users, then its "reaction" is compiled into the content, persisted, then shown to others when they request it. Applications that need to broadcast information in real time, or applications that want to enable connected users to "engage" with each other in real time need a dedicated technology.

For the simplest example, think of a chat application, or a media streaming server. You can also think about many sophisticated applications that collect signals from a large number of remotely connected devices and provide a timely decision. Such technology should be able to manage a large number of active full duplex connections. It should be able to relay information from a connection to another one in an immediate way and enable the efficient and asynchronous broadcast of information so that spontaneous and timely responses are received by the client-side. I would like to give you my contribution here in the form of a C++ library which emerged from multiple refactoring of the server side code of an amateur chess server I am developing, and which I hope people may find useful for other domains as well. Beyond this article, you can find articles, complete documentation guides, and other helpful material at the Push Framework home page: http://www.pushframework.com.

Deployment layout

Let us look at the deployment picture here. Your server logic will be written on top of the Push Framework library. It is clear if you want to deploy the application, then you need a dedicated server machine. You should design / use a protocol that helps the efficient communication between your client side application and the server side of it. Clients will connect to a TCP port that you specify. It is possible that you have heterogeneous clients which use different protocols to talk with your server. One major feature of the library however is its ability to profile itself and send periodic statistics. You just need to activate that feature with a simple API call and connect the Dashboard, a separately developed Applet, to the "monitoring" port of the server so you can receive the "feed" of those stats.

Deployment View

The developer perspective

Personally, as a developer, when I discover a new library and want to asses it, I hurry to look at an example usage code. From there, I can see the big picture. So let me sacrifice many essential details here and only report the main view. A single instance encapsulates all of the library functionalities:

PushFramework::Server server;

Configure such an instance by calling the member methods, then call start to run your server. This is how the code surrounding that instance would look like when you are finished:

PushFramework::Server server;
server.setClientFactory(pClientFactory);
server.setProtocol(pProtocol);
server.createListener(sPort);
server.registerService(myServiceId, pService, "myService";
//
server.start(false);

Now questions are raised. Well first, pClientFactory should point to a concrete implementation of the PushFramework::ClientFactory class. This class manages the client connection/disconnection; it also manages the transition when a socket connection gets translated into a legitimate client which could be discovered by others. It also manages the case when a client reconnects (abrupt TCP failures can happen, then the same client reconnects) and when we need to bind the new TCP socket to the existing client information and previously allocated resources. Note that in the servicing code, you never deal with the socket, you rather deal with PushFramework::Client instances representing your active clients.

The protocol is an all essential information. You pass this by providing a concrete implementation of the PushFramework::Protocol class to the ::setProtocol method or within the ListenerOptions parameter of ::createListener if you intend to support multiple protocols. By overriding its pure methods, you will tell the library how the received information should be deserialized and how the outgoing information should be serialized. To send an information to a specific client or to a specific group of clients (broadcast), PushFramework expects input in the form of instances of type PushFramework::OutgoingPacket. Encoding and framing is acted upon such a class. When data is received, decoding and deframing are triggered and that is called de-serialization. At the end of the day, deserialized data is communicated back in the form of IncomingPacket instances. These instances are dispatched to your "servicing code" which you may find it useful to organize into many "services" (very useful when there are multiple types of requests that can be received with each type requiring a special processing).

A service is materialized by an instance of a PushFramework::Service subclass. You devise your business logic by overriding the handle method of it, then you affect an ID to the instance. Of course, you should take care of passing that ID information at de-serialization time so the library knows how to route the packets.

Let us see how you can treat incoming data. In the first case, we will handle a particular request and echo back a response to the requestor:

MyService::handle(ClinetKey key, IncomingPacket& pPacket)
{

    CMyClient* pClient = getClient(key);

    if(pClient)
    {
        //See what client wants by examining pPacket
   
       //Send response
       OutgoingPacket response;
       //build your response
   
       pClient->push(&response);
   
       returnClient(key);
    }
}

The library is multithreaded. To be more precise, multiple threads are spawn to execute the servicing code. So the code in the handle method of your Service subclasses is a concurrent one. The library maintains reference counters for each Client instance so that the safe release of instances is achieved. Let us consider this example where information has to be relayed from a connection to another connection. Imagine we are in the context of a chat application:

MyService::handle(ClinetKey key, IncomingPacket& pPacket)
{
    CMyClient* pClient = getClient(key);
    if(pClient)
    {    
        ClientKey recipientKey;// furnish this info from pPacket
        char* msg// to be furnished from pPacket.
        CMyClient* pRecipient = getClient(recipientKey);
        if(pRecipient)
        {
            OutgoingPacket response;
           //build the chat response by putting the sender info and the msg
           pRecipient->push(&response);
           returnClient(recipientKey);
        }
        returnClient(key);
    }
}

Finally, let us see how to send broadcast information.

MyService2: public PushFramework::Service
{
   void handle(ClientKey key, PushFramework::IncomingPacket& request)
   {
       MyClient* pClient = getClient(key);
       if(pClient)
       {
           //Do something eg.
           PushFramework::OutgoingPacket* pResponse = new PushFramework::OutgoingPacket();
           getBroadcastManager()->pushPacket("broadcastGroup1", pResponse);
           returnClient(key);
       }
    }
}

Technical architecture

It is good to have a look at the internals of the library even though you would only find yourself dealing with a handful of public classes that encapsulate all the details. Let me ease it by enumerating the notes.

Technical architecture

  • The library uses the powerful mechanism of Windows Server IOCP. Asynchronous IOs are issued by Worker threads managed at the library layer, and their completions are notified back by the OS in the form of completion packets into sort of a Multiple-Producers Multiple Consumers Queue. By default, the library sets up the number of threads according to the available processors. Interestingly, when such a thread blocks on the servicing code, the system awakes another worker to let the efficient use of available calculus resources.
  • The main thread is the one that manages the different parts of the system: it launches the separate listening threads, the worker threads, schedules periodic tasks, and listens on the termination signal so it can stop all threads and clean all memory.
  • Listening is performed by a separate thread. Incoming connection requests are communicated to the Channels factory. There happens the decision to associate the connection with a new Client object or to attach it to an existing one (client reconnection identification).
  • The Demux communicates the sending/receiving operation completion status back to the dispatcher. When a Write operation finishes, the intermediate Send buffer is examined for any outstanding data to be sent. The intermediate buffer is filled by explicit calls to Send operations and an internal mechanism of streaming broadcast responses. If a Read operation completes, the dispatcher triggers de-serialization which in turn calls the protocol-defined de-framing and decoding code, then gives back control to the handle method of the Service object responsible for the incoming request type.
  • Most of the execution time is spent on the available Service::handle methods. There you put the processing logic for each type of incoming request. Naturally, there you would push data to specific clients or to a broadcasting channel so the library implicitly streams the information to the list of clients that are subscribed.
  • Each broadcasting channel has four attributes: requireSubscription, maxPacket, priority, and quota. If requireSubscription is false, then all logged clients are implicitly subscribed to it. Otherwise, you must explicitly register each client you want.
  • Each broadcasting channel maintains a FIFO queue of the packets that are pushed into it. maxPacket is its maximum size. Ancient packets get released when the queue is full. Avoiding the phenomenon of broadcast amplification (uncontrollable increased memory consumption) comes at the cost of packet loss for those clients with B(i) < F(j) where B(i) is the available bandwidth between the server and the client i, and F(j) is the fill rate of broadcasting channel j which i is subscribed to.
  • When the bandwidth between a certain client x and the server along with the server’s degree of activity are such that not all data can be streamed out, the share of each broadcasting channel to which client x is subscribed is dependent on the priority and quotas attributes.
    • Suppose that broadcasting channels {Ci} are ordered in such way that if i < j => (either Pi > Pj or (Pi = Pj and Qi >= Qj)), P and Q being the priority and quota attributes.
    • Let’s denote Fi as the rate at which Ci is filled with new packets.
    • Let’s assume S to be the total rate of broadcast data sent to client x.
    • Further assuming that all outgoing messages have the same length.

    Then, the share Si of broadcasting channel i is given by:

  • The monitoring listener is started in case remote monitoring is enabled. It helps accept connections from remote Dashboards so profiling info are sent.

Tutorials and examples

Chat application

The best way you can see the Push Framework in action is to guide you in the development of a whole new application that will be based on it. This allows to better assess the library and weigh multiple factors at once.

You can follow the step-by-step tutorial here. Source code and binaries are also available.

The application that is developed uses XML to encode requests and responses. The server and the client "see" the same prototypes that represent those requests/responses. If a direct chat message is received, the server knows the destination client and forwards the request to the relevant connection. At startup, the chat server creates broadcasting channels to represent the "available chat rooms". If a room chat request is received, a room chat response is pushed into the corresponding broadcasting channel so every participant in the room receives that response. Joining a room is technically implemented by "subscribing" a client to a broadcasting channel. The same goes for how to let chat participants see each other: a broadcasting channel is setup and filled with signal packets each time a client logs in.

Client-server using the Google Protobuf protocol

Google says it created its own protocol to let its heterogeneous applications exchange data. Encoding/decoding is extremely fast and produces a minimum message. To allows users who want to use this protocol along side Push Framework, a client-server application is created. You can follow the complete tutorial here. The functionality is extremely simple: the client sends a request about some item with a particular ID. The response is expected to contain the detailed information about that item. One can imagine anything, the aim here is just to let the integration between ProtoBuf and the Push Framework be understood.

Communication data must be designed in a specific language before being compiled by ProtoBuf so we can produce the classes with which we can work with. For our example, we need to design these three structures only:

// content of loginrequest.proto : 
package gprotoexample;

message LoginRequest {  
}

//content of loginresponse.proto
package gprotoexample;

message LoginResponse {
  required bool result = true;        
}

//content of datainforrequest.proto
package gprotoexample;

message DataInfoRequest {
  required int32 id = 2; // id for item.
}

//content of datainforesponse.proto
package gprotoexample;

message DataInfoResponse {
   enum ResultType {
    InfoFound = 0;
    InfoNotFound = 1;
    InfounspecifiedError = 2;
  }
  
  required ResultType result = 0;
  required int32 id = 1;        //
  optional string description = 1;
}

//content of logoutrequest.proto
package gprotoexample;

message LoginResponse {
  required bool result = true;        
}

For use with C++, the ProtoBuf compiler produces C++ classes that we include in our project. To adapt ProtoBuf and allow multiple messages to be sent in the same connection (ProtoBuf messages are not self delimiting), we create a new PushFramework::Protocol subclass alongside a template class, deriving both from PushFramework::OutgoingPacket and PushFramework::IncomingPacket which are the currency for the Push Framework:

class ProtobufPacketImpl : public PushFramework::IncomingPacket, 
                           public PushFramework::OutgoingPacket
{
public:
    ProtobufPacketImpl(int serviceId);
    ~ProtobufPacketImpl(void);
protected:
    virtual bool Decode(char* pBuf, unsigned int nSize);
    virtual bool Encode();
    virtual google::protobuf::Message& getStructuredData() = 0;
private:
    int serviceId;
    std::string* pEncodedStream;
    
public:
    std::string* getEncodedStream() const { return pEncodedStream; }
    int getEncodedStreamSize();
    int getServiceId() const { return serviceId; }
};

template<class T>
class ProtobufPacket : public ProtobufPacketImpl
{
public:
    ProtobufPacket(int serviceId)
        : ProtobufPacketImpl(serviceId)
    {
        //
    }
    ~ProtobufPacket()
    {
        //
    }
private:
    T data;
public:
    virtual google::protobuf::Message& getStructuredData()
    {
        return data;
    }
};

Here is how the servicing code looks like when we receive a datainforequest:

void CDataInfoRequestService::handle( ClientKey clientKey, 
          PushFramework::IncomingPacket* pRequest )
{
    ExampleClient* pClient = (ExampleClient*) getClient(clientKey);
    if(!pClient)
        return;
 
    ProtobufPacket<DataInfoRequest>* pDataInfoRequest = 
             (ProtobufPacket<DataInfoRequest>*) pRequest;
 
    ProtobufPacket<DataInfoResponse> response(DataInfoResponseID);
    response.getData().set_id(pDataInfoRequest->getData().id());
    response.getData().set_result(DataInfoResponse_ResultType_InfoFound);
    response.getData().set_description("this is a description");
    //
    pClient->pushPacket(&response);
 
    returnClient(clientKey);
}

Benchmarking

Monitoring the chat application

An interesting feature of the Push Framework is its ability to profile itself and send a real time report to a remote Dashboard where the numbers are drawn into usable display in the form of qualitative charts and grids. You activate this by calling a simple method of your server object:

//Assuming server is your Server instance
server.enableRemoteMonitor(monitoringPort, password);

//And to activate profiling, you do :
server.enableProfiling(samplingRate);

monitoringPort is the listening port that the Monitoring Listener will listen to. Of course, you will have to put that information in the Dashboard login info along with the server IP and password so your connection succeeds. The dashboard can be used to execute remote commands. By overriding Server::handleMonitorRequest, you can receive any input that your user tapes in the Dashboard console. The answer is redirected in the form of a text in the console. Statistics and Performance information can not be displayed in the Dashboard unless you make a call to Server::enableProfiling. This instructs the library to start to collect performance values and send a report at a regular interval of samplingRate seconds.

The aim now is to let you see the use of the remote monitoring features. So let's enable profiling in the previously developed chat server and try to connect to the server using the Monitoring Dashboard. To be close to a real world scenario where a very large number of people connect to the same chat server and chat with each other, I've made a separate multi-clients simulator able to launch many connections and approximately behave like a human participant who sends chats, replies to receive chat, and also engages in chat rooms. This is contained in the ChatRobots project. The Simulator is- to tell the truth- not very optimized because it launches one thread for each Chat Robot. So this can lead to some issues when launching many clients.

In one test, 100 bots were started against the previous chat server which reports profiling at intervals of 10s duration. The following is a screenshot of the Performance tab of the Dashboard:

You can find other screenshots and read details in this article: Bursting the Library. A zoom on the Time Processing Zone gives:

The vertical segments represent the periodic report that is sent every 10 seconds. The segment center is the average processing time of all the requests that were received within the specific observation interval. The vertical length is the dispersion calculated and sent by the server.

This profiling feature can help you a lot when you design an application and decide to deploy it. I think even at deployment time, it may still be useful to keep profiling activated as it does not cost much time and may give valuable information about your visitor's behavior, the incoming/outgoing bandwidth of exchanged data, and the performance the server is able to give in terms of throughput (number of requests per seconds), processing time, etc.

Broadcast QoS

The goal in this article is to validate the claim that Push Framework is able to provide different priorities and quotas for each broadcasting channel. This QoS feature is explained in the Developer Guide page (see the Broadcasting data paragraph). For that goal, a new client-server application will be developed. The Push Framework – based server will setup a number of broadcasting channels having different priorities/quotas attributes and constantly fill them with packets. At the client side, we collect the received packets and try report statistics. The protocol to use for the demo is really simple: we just need to record the broadcasting channel each incoming packet belongs to. An extra payload of 8K is however added in order to put ourselves in a realistic application protocol. Broadcasting channels (broadcasting groups) are setup before the call to Server::start:

server.getBroadcastManager()->createChannel(
       broadcastId, 100, false, uPriority, uQuota);
server.start();

For each broadcasting group, a new thread is spawned to push packets into it. You can examine the code details in the QoS Application Validation package. More results and details are reported in the article, I just report one scenario here where we have three broadcasting channels sharing the same priority:

1 => { priority = 5, quota = 10 }
2 => { priority = 5, quota = 20}
3 => { priority = 5, quota = 5}

The effect of the quota parameter can be seen in the following client-side statistics:

References

This work would not have been possible without other people's contributions that cleared the many technical dusts of server development:

History

This is the first release of this article. By this time, the furnished material may have changed or become obsolete, so visit the library home page to get the latest updates, and engage in the Forum space to help others and get help.

推荐.NET配套的通用数据层ORM框架:CYQ.Data 通用数据层框架
新浪微博粉丝精灵,刷粉丝、刷评论、刷转发、企业商家微博营销必备工具"