Click here to Skip to main content
15,891,253 members
Articles / General Programming
Tip/Trick

Synch Point Commit, Back out and Correlation id

Rate me:
Please Sign up or sign in to vote.
4.00/5 (1 vote)
18 Sep 2012CPOL2 min read 8.7K   2  
Asynchronous communication through messaging using Websphere MQ Synch Point Commit, Back out and correlation id

Introduction

In an environment where many applications/devices run on different OS/infrastructure and communicate through messaging using WebSphere MQ as middleware, there is always some chance of messages getting lost due to network problems or due to application crashes.

We must handle those exceptions in our application to make sure that we do not lose any message.

Here are some practical examples of problems and their solutions that I have faced while working on some big Financial Applications.

If an application throws some type of exception (Business logic failure, DB failure) after reading messages from Queues, how are you going to handle the message that you already read?

How to handle a proper asynchronous request/reply communication, to make sure that we do not misunderstand a reply for a specific request?

What if WebSphere Queue Manager crashed out after you put messages on a queue?

Background

Syncpoint Coordination

Syncpoint coordination is the process by which units of work are either committed or backed out with data integrity.

You must write your application to handle Synch Point Commit and Back out so that if anything fails, you would be able to easily back out the message.

Correlation ID

A correlation ID is usually put in the header of a message. The ID is not part of the command or data the caller is trying to communicate to the receiver.

The receiver saves the ID from the request and adds it to the reply for the caller's benefit. Since the message body is the content being transmitted between the two systems, and the ID is not a part of that, the ID is added to the header.

In the request message, the ID can be stored as a correlation ID property or simply a message ID property. When used as a correlation ID, this can cause confusion about which message is the request and which is the reply. If a request has a message ID but no correlation ID, then a reply has a correlation ID that is the same as the request's message ID.

You must write your application to read write messages using correlation id in an asynchronous request/reply environment.

Using the Code

Given below is the C++ code for reading/writing messages from/to queues using Synch Point Commit, Back Out and Correlation ID...

C++
        //Application – 1
//Puts a message on request queue 
void put_request()
{
 ImqQueue request_queue;       // request queue.
 ImqMessage msg ;             // request message.
 ImqPutMessageOptions pmo;
 
 //for syncpoint commit and backout.
 pmo.setSyncPointParticipation(true);
 
 //for correlation id
 msg.setMessageId();
 
 //for persistance messages   
 msg.setPersistence(MQPER_PERSISTENT); 
 
 //Use pmo while putting messages on queue.
 request_queue.put( msg, pmo ); 
 
 //copy the messag id into correlation id for reply
 ImqBinary pID( msg.messageId() );
 pID.copyOut( &corrID[0], 128 );
 request_queue.close();
}
//Read corresponding Reply message put by Application-2 below from reply_queue
void get_reply()
{
 ImqQueue reply_queue;       // reply queue.
 ImqMessage msg ;             // reply message.
 ImqGetMessageOptions GMO;
 
 //for syncpoint commit and backout.
 GMO.setSyncPointParticipation(true);
 //Must use correlation id for asynchronous request/reply 
 msg.setCorrelationId(corrID);
 reply_queue.get(msg, GMO));
 reply_queue.close();
}
//Application -2
//Read Request Messages from request queue and put reply back on reply queue.
void get_request()
{ 
 ImqQueue request_queue;       // request queue.
 ImqQueue reply_queue;       // reply queue.
 ImqGetMessageOptions GMO;
 
 //for syncpoint commit and backout.
 GMO.setSyncPointParticipation(true); 
 request_queue.get(msg,GMO);
 //Process the message 
 //Business logic
 //Database operations
 //etc....
 if(/*All Processing Successfull*/)
 {
  //Set message ID of request message to correlation id of reply message
  reply_msg.setCorrelationId(msg.messageId());
  
  //Send reply to reply queue.
  reply_queue.put(reply_msg); 
 }
 else if (/*Depends on your requirements if you want to try the message again after sometime*/)
 {
  //Wait();
  
  //Put the message back on queue for re-processing.
  mgr.backout();
 }
 else
 {
  //Put the message in Dead Letter queue
 }
} 

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Software Developer (Senior) Nihilent Technologies Pvt Ltd
South Africa South Africa
I am working with Nihilent Technologies Pvt Ltd.I have worked on Banking and Finance domain, mainly in Cash Management and Repo Trading.
Technologies:-
My technological forte is VC++, C++, Win32, MFC, ATL/COM, C#.NET Windows Application, C, WebSphere MQ and DB2.
Specialties
Domain:-
BFS (Cash Management and Repo Trading)
Technologies:-
C, C++, VC++, Win32, MFC, ATL/COM, WebSphere MQ, DB2

Comments and Discussions

 
-- There are no messages in this forum --