Click here to Skip to main content
15,867,488 members
Articles / Programming Languages / Python

Brief Introduction of SocketPro High Performance and Scalable Persistent Message Queue

Rate me:
Please Sign up or sign in to vote.
4.52/5 (9 votes)
8 Apr 2018CPOL14 min read 18.5K   14   5
Continuous in-line request/result batching, real-time stream sending/processing, asynchronous data transferring and parallel computation for best performance and scalability

Introduction

Persistent message queue allows applications running on separate machines/processes to communicate in a failsafe manner. A message queue is a temporary storage location or file from which messages can be saved and read reliably, as and when conditions permit. Unlike sockets and other common channels that require direct connections always exist, persistent message queue enables communication among applications which may not always be connected. There are many persistent message queues implemented in own ways. SocketPro comes with an extremely high performance persistent message queue for you to freely reuse.

Both SocketPro client and server core libraries are internally implemented with persistent message queue. Its client queue is used to back up requests so that all requests can be resent to a server for processing in case the server is not accessible for whatever reasons such as server power-off, server application down, network off and so on. Essentially, client queue is used as a tool for fault auto recovery to increase application stability and reduction of development complexity as shown at the directory of socketpro/samples/auto_recovery/(test_cplusplus|test_java|test_python|test_sharp) after cloning source codes from https://github.com/udaparts/socketpro.

This article is focused on SocketPro server side persistent message queue. It is noted that precompiled SocketPro server side library of persistent message queue is completely free to you with open source codes which are extremely simple and understandable. You can also rely on the open source codes to extend them for your complex needs.

Source Codes and Samples

All related source codes and samples are located at https://github.com/udaparts/socketpro. After cloning it into your computer by GIT, pay attention to the subdirectory uasyncqueue inside the directory socketpro/samples/module_sample. You can see these samples are created from .NET, C/C++, Java and Python development environments. They can be compiled and run on either Linux or window platforms. SocketPro comes with a pre-compiled system library uasyncqueue, which is located at directories socketpro/bin/win and socketpro/bin/linux for both Windows and Linux platforms, respectively. In addition, you can figure out how to load the SocketPro queue service into a server application with your familiar development environment by looking at tutorial sample all_servers at the directory socketpro/tutorials/(cplusplus|csharp|vbnet|java/src)/all_servers. However, we only use C# client code (socketpro/samples/module_sample/uasyncqueue /test_csahrp) in this article for explanations.

You should distribute these system libraries inside the directory socketpro/bin into your system directory before running these sample applications. In regards to SocketPro communication framework, you may also refer to its development guide documentation at socketpro/doc/SocketPro development guide.pdf.

Main Function

SocketPro is written from bottom to support parallel computation by use of one or more pools of non-blocking sockets. Each of pools may be made of one or more threads, and each of the threads hosts one or more non-blocking sockets at client side. To increase scalability, you can create one or more pools having multiple non-block sockets that are connected to different queue servers so that you can send messages for queuing in parallel style. However, we just use one pool for demonstration clarity here. Further, the pool is only made of one thread and one socket for this sample at client side as shown in the below code snippet 1.

C#
static void Main(string[] args) {
    Console.WriteLine("Remote host: "); string host = Console.ReadLine();
    CConnectionContext cc = new CConnectionContext
                            (host, 20901, "async_queue_client", "pwd_for_async_queue");
    using (CSocketPool<CAsyncQueue> spAq = new CSocketPool<CAsyncQueue>()) {
        //spAq.QueueName = "aq_backup"; //uncomment for message no loss
                                        //by use of local message queue
        if (!spAq.StartSocketPool(cc, 1, 1)) {
            Console.WriteLine("Failed in connecting to remote async queue server,
                               and press any key to close the application ......");
            Console.Read(); return;
        }
        CAsyncQueue aq = spAq.Seek(); //CAsyncQueue aq = spAq.SeekByQueue();
        //Optionally, you can enqueue messages with transaction style
        //by calling the methods StartQueueTrans and EndQueueTrans in pair
        aq.StartQueueTrans(TEST_QUEUE_KEY, (errCode) => {
            //error code could be one of CAsyncQueue.QUEUE_OK,
            //CAsyncQueue.QUEUE_TRANS_ALREADY_STARTED, ......
        });
        TestEnqueue(aq);
        //test manual message batching
        using (CScopeUQueue sb = new CScopeUQueue()) {
            CUQueue q = sb.UQueue;
            CAsyncQueue.BatchMessage(idMessage3, "Hello", "World", q);
            CAsyncQueue.BatchMessage(idMessage4, true, 234.456, "MyTestWhatever", q);
            aq.EnqueueBatch(TEST_QUEUE_KEY, q, (res) => {
                System.Diagnostics.Debug.Assert(res == 2);
            });
        }
        aq.EndQueueTrans(false);
        TestDequeue(aq); aq.WaitAll();
        //get a queue message count and queue file size with default option oMemoryCached
        aq.FlushQueue(TEST_QUEUE_KEY, (messageCount, fileSize) => {
            Console.WriteLine("Total message count={0},
                              queue file size={1}", messageCount, fileSize);
        });
        aq.GetKeys((keys) => {
        });
        aq.CloseQueue(TEST_QUEUE_KEY, (errCode) => {
            //error code could be one of CAsyncQueue.QUEUE_OK,
            //CAsyncQueue.QUEUE_TRANS_ALREADY_STARTED, ......
        });
        Console.WriteLine("Press any key to close the application ......"); Console.Read();
    }
}
Code snippet 1: Main function for demonstration of use of SocketPro persistent message queue at client side

Starting one socket pool: The above code snippet 1 starts one socket pool which only has one worker thread that only hosts one non-blocking socket (if (!spAq.StartSocketPool(cc, 1, 1))) for demonstration clarity by use of one instance of connection context. It is noted that you can create multiple pools within one client application if necessary. Afterwards, we get one asynchronous CAsyncQueue handler (CAsyncQueue aq = spAq.Seek();).

Streaming message: We can send individual messages onto a server for saving in stream style without batching at client side (TestEnqueue(aq);). We are going to talk with details in a new section TestEnqueue.

Manual message batching: When there are many small messages to be sent for saving, these small messages will require very much CPU costs at both client and server sides because of thread synchronization, function processing, SocketPro internal inline batching as well as others. To reduce these costs, we can batch these small messages into one bigger chunk, and send them as one larger unit to server for saving (using (CScopeUQueue sb = new CScopeUQueue()) { ....}). This is a way to improve message en-queue performance, but it also increases latency because it requires a time interval, which is usually more than 1 millisecond, for collecting an enough number of small messages before manual batching. Also, it requires more codes. It is NOT recommended with SocketPro as long as either performance of streaming message queues meets your needs or message sizes are not very small.

Saving message in transaction style: SocketPro persistent message queue supports saving messages in transaction style. To use this feature, you have to call the methods StartQueueTrans and EndQueueTrans in pairs as shown in the above Code snippet 1. It is noted that the total size of batched messages shouldn’t be over four gig bytes.

Reading messages in a queue file from multiple consumers: Certainly, you can read messages from a queue (TestDequeue(aq);). We’ll elaborate it more in the coming section TestDequeue in detail. It is noted that one SocketPro queue supports message writing from multiple providers and message reading from multiple consumers simultaneously at the same time. Just for your information, many other queue implementations don’t support multiple consumers on one queue file.

Scalability: A client is able to create a pool that has multiple sockets connected to different server queue machines. A client is able to use the pool method Seek or SeekByQueue, and distribute messages onto different servers for saving. Don’t be fooled by this sample code because the demonstration is designed for clarity and beginner.

No message loss: Message saving requires transferring messages from client or provider to a message queue server. The server and network may be down for many possible reasons. Therefore, messages could be lost without your care by extra coding. You can prevent it with SocketPro easily by use of client or local message queue for backing up these messages before putting them on wire (spAq.QueueName = "aq_backup";). In case a server or network is down, SocketPro can resend messages that are backed up in a local or client message queue file when a queue server application is re-accessible.

Other functionalities: SocketPro persistent message queue provides other methods to check the count of messages, the size of a queue file and keys to different message queues as well as closing a queue as shown at the end of the above code snippet 1.

TestEnqueue

The function, an example for en-queuing messages, is simple as shown in the below code snippet 2.

C#
static bool TestEnqueue(CAsyncQueue aq) {
        bool ok = true; Console.WriteLine("Going to enqueue 1024 messages ......");
        for (int n = 0; n < 1024; ++n) {
            string str = n + " Object test";
            ushort idMessage;
            switch (n % 3) {
                case 0:
                    idMessage = idMessage0;
                    break;
                case 1:
                    idMessage = idMessage1;
                    break;
                default:
                    idMessage = idMessage2;
                    break;
            }
            //en-queue two unicode strings and one int
            ok = aq.Enqueue(TEST_QUEUE_KEY, idMessage, "SampleName", str, n);
            if (!ok) break;
        }
        return ok;
    }
Code snippet 2: Sample code for sending 1024 message queues to a server for saving

As shown at the above code snippet 2, we can continuously send individual messages (aq.Enqueue) in streaming style. You can see that it is really easy to en-queue messages with SocketPro.

TestDequeue

The below code snippet 3 is a demonstration for de-queuing messages in batch.

C#
static void TestDequeue(CAsyncQueue aq) {
        //prepare callback for parsing messages dequeued from server side
        aq.ResultReturned += (sender, idReq, q) => {
            bool processed = true;
            switch (idReq) {
                case idMessage0:
                case idMessage1:
                case idMessage2:
                    Console.Write("message id={0}", idReq);
                    {
                        string name, str; int index;
                        //parse a dequeued message which should be the same 
                        //as the above enqueued message (two unicode strings and one int)
                        q.Load(out name).Load(out str).Load(out index);
                        Console.WriteLine(", name={0}, str={1}, index={2}", name, str, index);
                    }
                    break;
                case idMessage3: {
                        string s1, s2;
                        q.Load(out s1).Load(out s2);
                        Console.WriteLine("{0} {1}", s1, s2);
                    }
                    break;
                case idMessage4: {
                        bool b; double dbl; string s;
                        q.Load(out b).Load(out dbl).Load(out s);
                        Console.WriteLine("b= {0}, d= {1}, s= {2}", b, dbl, s);
                    }
                    break;
                default:
                    processed = false;
                    break;
            }
            return processed;
        };
        //prepare a callback for processing returned result of dequeue request
        CAsyncQueue.DDequeue d = (messageCount, fileSize, messages, bytes) => {
            Console.WriteLine("Total message count={0}, queue file size={1}, 
            messages dequeued={2}, message bytes dequeued={3}", messageCount, fileSize, messages, bytes);
            if (messageCount > 0) {
                //there are more messages left at server queue, we re-send a request to dequeue
                aq.Dequeue(TEST_QUEUE_KEY, aq.LastDequeueCallback);
            }
        };
        Console.WriteLine("Going to dequeue messages ......");
        bool ok = aq.Dequeue(TEST_QUEUE_KEY, d);
        //optionally, add one extra to improve processing concurrency at both client 
        //and server sides for better performance and through-output
        ok = aq.Dequeue(TEST_QUEUE_KEY, d);
    }  
Code snippet 3: Sample code for de-queuing messages in batch

The callback (aq.ResultReturned += (sender, idReq, q) => { .....};) in code snippet 3 is used to parse messages that come from remote message queue file. The codes (case idMessage0: case idMessage1: case idMessage2:) are used to parse messages that originated from the previous code snippet 2. The codes (case idMessage3: case idMessage4:) are used to parse manual batched messages (using (CScopeUQueue sb = new CScopeUQueue()) { ....}) that originated in the previous code snippet 1. As hinted by comment (//prepare a callback for processing returned result of dequeue request), the callback (CAsyncQueue.DDequeue d = ......) is used to monitor key message queue data like message count (messages to be de-queued), server queue file size, messages transferred by the below call Dequeue, and message size in bytes. Inside the callback, it is necessary to call the method Dequeue recursively if there is a message remaining in a server queue file (if (messageCount > 0)).

After preparing the previous two callbacks, we finally call the method Dequeue for sending a request to server for reading messages in batch. Optionally, we can call the method Dequeue one or two times more so that it can increase de-queuing throughput or performance because client side message parsing and server message reading can have better concurrency in processing.

Performance Study

SocketPro is written from beginning to support streaming requests by use of non-block sockets and inner algorithms for the best network and code efficiency. The performance study samples, which are written from C++, Java and C#, are located at the directory socketpro/samples/qperf. In addition, we also compared SocketPro queue with the two popular queues Kafka and RabbitMQ, as shown at the two short articles, perf_comparison.pdf and sq_kafka_perf.pdf.

Our results show that SocketPro queue is significantly faster than not only RabbitMQ but also Kafka, especially when writing high volume of small messages. For clarity, this short article focuses on performance comparison of SocketPro queue with Kafka only.

It is the most important to compare SocketPro queue with Kafka on local area network (LAN) environment as this scenario is much closer to real queue applications under most of cases. Our test results are listed in the below Figure 1. It is noted that Kafka performance test script kafka_perf_test.txt is located at the directory ../socketpro/samples/qperf. SocketPro queue supports message enqueuing in real-time streaming style by continuously sending messages by its nature. Further, SocketPro also supports sending all batched messages as one larger message though its manual batching feature, which is designed to sacrifice latency for better enqueuing speed or throughput. Finally, it is noted that Kafka enqueuing performance tests are always completed in batch style by setting configuration property batch.size.

Image 1

Figure 1: Queue performance comparison between SocketPro and Kafka on LAN

Small messages: In regards to small messages (4 bytes & 32 bytes), Kafka is slightly faster (< 25%) than SocketPro for both enqueuing ([1,083,000 & 819,300]/[839,300 & 752,300]) and dequeuing ([1,406,000 & 1,043,000]/[1,195,000 & 974,100]) if SocketPro manual batching feature is NOT employed. SocketPro queue is slower in enqueuing because transferring small messages cross wire is very expensive in CPU without manual batching. However, SocketPro queue, if it is armed with manual batch like Kafka does, SocketPro queue could be significantly faster (5,988,000/1,083,00 = 5.53 or 450%) than Kafka.

It is found that Kafka dequeuing is about 15% faster than SocketPro queue for small messages as shown in the Figure 1. Its explanation is that SocketPro dequeuing always sends a dequeue acknowledgement from consumer to server for each of dequeued messages automatically. Further, dequeue acknowledgement also causes disk seeking for marking all dequeued messages at server side. This happens silently for reducing consumer side coding complexity, but it obviously degrades SocketPro dequeuing performance somewhat. Kafka is very simple in dequeuing message and has no support with the similar dequeue acknowledgement at all.

Middle messages: Considering middle size of messages (200 bytes), SocketPro is considerably faster than Kafka in enqueuing message even if SocketPro doesn’t use manual batch. If SocketPro is armed with manual batching like Kafka does, SocketPro could be 90% (457,500/238,900 = 1.91 = 90%) faster than Kafka. However, both SocketPro and Kafka show similar performance in dequeuing middle size of messages.

Large messages: SocketPro is about 40% faster than Kafka in enqueuing large size of messages (1024 bytes & 10240 bytes). SocketPro and Kafka don’t show performance difference in dequeuing middle size of messages.

At last, it is pointed that SocketPro manual batching is not recommended as long as message enqueuing speed meets your needs. The reason is that manual batch could significantly increase message enqueuing latency because you must have a time interval for collecting enough messages before putting messages onto wire in real applications. SocketPro manual batching should be used only if your application requires better enqueuing speed for high volume of small messages.

Wide Area Network: It is time to talk about wide area network (WAN). WAN challenges distributed application developments because it has not only low bandwidth but also significantly large latency. We like to compare SocketPro with Kafka on WAN for remoting message enqueuing and dequeuing. It is found that our SocketPro runs very well for both performance and stability as shown in the below Figure 2. Unfortunately, we cannot finish a Kafka performance testing on WAN, and eventually give it up after trying a few days and searching many web sites for its possible configuration settings. It seems to us that Kafka doesn’t support remote message enqueuing or dequeuing at all!

Image 2

Figure 2: SocketPro persistent queue performance results on two cheap virtual machines across Google cloud data centers

The above Figure 2 shows SocketPro queue results measured from two cheap virtual machines across Google cloud data centers. The network has a bandwidth around 40 Mbps with a high latency about 35 milliseconds. Both client and server application codes (sq_client and sq_server) could be found at the directory ../socketpro/samples/qperf/cperf. After cloning SocketPro at the site https://github.com/udaparts/socketpro by GIT and checking out the branch linux_tests, you will find the two pre-compiled applications at the directory ../socketpro/test_apps. You can use the two applications for your own testing.

The above test results show that the network bandwidth is fully consumed no matter enqueuing or dequeuing messages. WAN bandwidth determines speeds of both message enqueuing and dequeuing. The above Figure 2 also shows that both enqueuing and dequeuing speeds could be easily over 10,000 messages per second if message size is not more than 300 bytes. It is noted that the performance data could be doubled if you can turn on SocketPro in-line compression and decompression features.

If message sizes are smaller than 64 bytes, you can also use SocketPro manual batching feature to increase performance further.

At last, it is pointed that no other message queue could be found publicly to get similar WAN performance results as far as we know. If you find one, please let us know so that we can change this claim quickly.

Highlights of SocketPro Persistent Message Queue

At end, it is worth highlighting SocketPro persistent message queue advantages over Kafka.

  1. SocketPro persistent message queue has no complex configuration settings for you to understand and configure. Contrarily, Kafka requires you must understand many configuration settings ahead.
  2. SocketPro persistent message queue runs on WAN very well with decent performance and stability, but Kafka doesn’t.
  3. SocketPro persistent message queue supports manual transaction for better stability, but Kafka doesn’t.
  4. A queue file can be sharable among multiple consumers at the same time with SocketPro queue, but Kafka is not capable to do so.
  5. SocketPro queue can guarantee no message loss as long as you turn on local or client message queue, but you cannot do so with Kafka.
  6. SocketPro queue supports message availability to notify all connected consumers in real-time fashion for the shortest latency. Its latency is always equal to 1.5 times of network latency and could be as low as 0.3 ms on local area network. Kafka’s lowest latency is 1 ms at best after you must configure a setting specifically for it.
  7. SocketPro queue is significantly faster than Kafka especially in high volume of small message writing when turning on manual batching.
  8. You can selectively en-queue a portion of messages with SocketPro, but you are forced to en-queue all messages with Kafka. Further, you can integrate message queue with SocketPro other features such as online message bus, local message queue, client server communication, and so on.
  9. Both client and server codes of SocketPro persistent message queue are extremely simple, you can easily extend and modify them for your complex needs. It is not so easy for you to do so with Kafka.
  10. You can embed SocketPro queue within your application system with much simpler distribution and low dependency. It is not so easy for you to do so with Kafka.
  11. Like Kafka, SocketPro queue is extremely scalable too.

History

  • 02/14/2018 ==> Initial release
  • 04/08/2018 ==> Add performance test results and correct document errors

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Software Developer (Senior)
United States United States
Yuancai (Charlie) Ye, an experienced C/C++ software engineer, lives in Atlanta, Georgia. He is an expert at continuous inline request/result batching, real-time stream processing, asynchronous data transferring and parallel computation for the best communication throughput and latency. He has been working at SocketPro (https://github.com/udaparts/socketpro) for more than fifteen years.

Comments and Discussions

 
QuestionJust curious Pin
sestoenner16-Apr-18 5:49
sestoenner16-Apr-18 5:49 
AnswerRe: Just curious Pin
Yuancai (Charlie) Ye16-Apr-18 8:52
Yuancai (Charlie) Ye16-Apr-18 8:52 
QuestionNot a million miles away from what i am working on. Pin
stopthespying15-Feb-18 8:28
stopthespying15-Feb-18 8:28 
AnswerRe: Not a million miles away from what i am working on. Pin
OriginalGriff15-Feb-18 8:30
mveOriginalGriff15-Feb-18 8:30 
AnswerRe: Not a million miles away from what i am working on. Pin
Yuancai (Charlie) Ye16-Feb-18 9:28
Yuancai (Charlie) Ye16-Feb-18 9:28 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Praise Praise    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.