Introduction
Microsoft created the SocketAsyncEventArgs class to help you write scalable, high performance socket server code. SocketAsyncEventArgs uses I/O Completion Ports via the asynchronous methods in the .NET Socket
class. A proven way to write scalable, high performance socket code for TCP/IP in Windows can be seen in this article on I/O Completion Ports. And here's another link to a Microsoft page on I/O Completion Ports. SocketAsyncEventArgs helps us to access a socket with advantages like working asynchronously, raising the Completed
event, setting buffer space, object pooling, having a state object, accessing the socket through a property, etc., while having the performance characteristics of I/O completion ports. Very nice.
The purpose of this article is to help you understand the fundamentals of using the SocketAsyncEventArgs class.
Background
You may have started your research into this topic at Microsoft's main page for the SocketAsyncEventArgs class, as I did. The example code on that page got me started. But it was also confusing. Some of the problems that I noticed with their example code were:
- It seems that Microsoft removed example code about the UserToken property. The
UserToken
is really important, because if you have to post multiple receive operations to receive a message, then you will need a place to store data between operations. And the same is true for send operations. - Some of the method names in the example code were a bit confusing, as were some of the variable names.
- Their reason for using a Semaphore was not explained really.
- The example in the BufferManager code on Microsoft's page for the SetBuffer method shows how to build the buffer. While their code for building the BufferManager was mostly good, the way that they dealt with the SetBuffer method in their
ProcessReceive
method in the example code for the SocketAsyncEventArgs class will pretty much work in only the narrowest of examples. If you send a 10 byte string
, and then a 20 byte string
, it won't work. Their code sets the buffer to be whatever size you send on the first message from the client. So after the first message, it would just send back the first 10 bytes. So, we need a better example of how to get the data from the buffer after a receive operation completes, use the data, put data in the buffer for a send operation, and resize the buffer before and after a send operation. Also, in the SocketListener
constructor where the BufferManager
is created, Microsoft's example included the variable opsToPreAlloc
in the calculation for totalBytes
, but not of bufferSize
. That's a mistake which leaves half of the total buffer space being unused. - In the explanation of their example code, they said: "For example, if a server application needs to have 15 socket accept operations outstanding at all times to support incoming client connection rates, it can allocate 15 reusable SocketAsyncEventArgs objects for that purpose." But then their example only included reusable SocketAsyncEventArgs objects for receive/send, not for accept. The SocketAsyncEventArgs object for the accept operation would wait until the receive/send finished to do another accept op. Instead, we can use a pool, as they mentioned in their explanation, and post accept operations faster.
After Microsoft invested the resources to create the SocketAsyncEventArgs class, it is surprising that they did not invest the resources to give good understandable example code and explanation to help us learn how to use it. This article is designed to fill in that void, because the class really is well done, and very helpful after you understand it.
The code in this article was developed on Visual Studio 2008 using .NET 3.5. This article assumes some knowledge of delegates and event handling in Windows.
Regarding the SocketAsyncEventArgs class, Microsoft's website says it requires "Platforms: Windows 7, Windows Vista, Windows XP SP3, Windows Server 2008, Windows Server 2003. (The) .NET Framework Supported in: 4, 3.5 SP1, 3.0 SP1, 2.0 SP1. (The) .NET Framework Client Profile Supported in: 4, 3.5 SP1."
TCP Socket Basics
If you have experience with socket server code, you can skip this section.
A socket is like a reference or "handle" to a port which allows us to access data sent to that port, which is a reserved space in memory. We will be accessing network data through a socket which "listens" on a TCP port. For those new to socket programming, there are four main steps in using a socket server with TCP. (It's often described as six parts, but I like to put the first three together into one.)
- Listen for connection requests on the server
In order to listen, you need to:
- create a socket
- bind that socket to a port
- listen with that socket
The client must do its part too. A client (not a server) can initiate a connection request by using the Connect
or ConnectAsync
method. The client machine's Windows TCP/IP subsystem will automatically assign an outgoing port to the socket on the client machine. It will contact the server by sending a SYN packet which is addressed to the socket server's IP address and port number. The client does not listen for incoming connections. It always initiates connections and the server responds. After a client initiates a connection on the server's listening socket, the Windows TCP/IP subsystem of the server will respond with SYN, ACK. Then the client machine's Windows TCP/IP subsystem responds back with an ACK packet. When the ACK is received by the server, the "handshake" is complete, and the connection is established. Windows will handle this TCP/IP protocol stuff for you. In other words, SYN, ACK, PSH, packets, and similar parts of TCP/IP protocol do not have to be coded by you. (Smile here.)
The server's listening socket can maintain a queue of connection requests waiting to be accepted. This queue is called the "backlog". The listening socket passes the connection info to another socket via an "accept" operation, and then gets the next incoming connection in the backlog queue, or if there is none, waits till there is a new connection request from a client.
- Accept connection requests
In order to have multiple connections on the same port, the server's listening socket must pass off the connection info to another socket, which accepts it. The accepting socket is not bound to the port. You post an accept operation to pass the connection from the listening socket to the accepting socket. The accept operation can be posted before the incoming connection is established, so that the listening socket immediately passes off the new connection info to the accepting socket. The client does not need to perform an accept operation.
- Receive/Send via the connection
After the accept operation has completed, you can now receive or send data with that connection. (The same SocketAsyncEventArgs object that did the accept operation could also do the receiving or sending, if we post a receive or send on it and have buffer space for it.) In the design of the code below, the SocketAsyncEventArgs which did the accept operation passes the connection info over to another SocketAsyncEventArgs object to do receiving/sending. "Receive" is also known as "read". "Send" is also referred to as "write". (We could also split the receiving and sending into two separate SocketAsyncEventArgs objects, if we wish. But that is more difficult.)
- Close the connection
Either client or server can initiate an operation to close the connection. Usually, the client would initiate that. Again, the lower level TCP/IP of the disconnect is handled by the Windows Operating System. The connection can be closed using the Close
method, which destroys the Socket
and cleans up its managed and unmanaged resources.
So, those are the four main steps in using a socket server with TCP. There are a few more things that you must understand about TCP, in order to be able to write code that uses it.
With TCP, there is no guarantee that one send operation on the client will be equal to one receive operation on the server. One send operation on the client might be equal to one, two, or more receive operations on the server. And the same is true going back to the client from the server. This peculiarity can be due to buffer size, network lag, and the way that the Operating System handles TCP to improve performance. So you must have some way of determining where a TCP message begins and/or ends. Three possible ways to handle TCP messages are:
- Prefix every message with an integer that tells the length of the message.
- All messages be fixed length. And both client and server must know the length before the message is sent.
- Append every message with a delimiter to show where it ends. And both client and server must know what the delimiter is before the message is sent.
Also, your communications protocol should include whether there will be a response (send operation) from the server back to the client after each received message or not. Will that response be after one complete received TCP message, or can it be after more than one message? If it is after one message, the code is simpler probably.
Okay, so let's think about the possible situations that might occur with the data that the server receives in one receive operation:
- On the first receive op, receive less bytes than the length of the prefix.
- After having the received part of the prefix on a previous receive op or ops, then receive another part of the prefix, but not all of it.
- After having received part of the prefix on a previous receive op or ops, then receive the rest of the prefix, but nothing more.
- After having received part of the prefix on a previous receive op or ops, we then receive the rest of it, plus part of the message.
- After having received part of the prefix on a previous receive op or ops, we then receive the rest of it, plus all of the message.
- Receive exactly the number of bytes that are in the prefix, but nothing more.
- After having received exactly the number of bytes that are in the prefix on a previous receive op or ops, we then receive part of the message.
- After having received exactly the number of bytes that are in the prefix on a previous receive op or ops, we then receive all of the message.
- Receive the number of bytes for the prefix plus part of the message, but not all of the message.
- After having received the prefix and part of the message on a previous receive op or ops, we then receive another part of the message, but not all of it.
- After having received the prefix and part of the message on a previous receive op or ops, we then receive all the rest of the message.
- Receive the number of bytes for the prefix plus all of the message on the first receive op.
The last one is actually the most common thing that will happen. But all of the above things can happen and do happen. If both client and server have buffer sizes larger than the messages, then the situations above may not happen when running both the client and the server on the same machine, or even on a LAN. But TCP is more unpredictable over the Internet where the data passes through multiple machines. So your code needs to allow for all of those possibilities.
Intro to the code
Accept operations. In this app, the socket which does the accept operation can be accessed through a SocketAsyncEventArgs object, in its AcceptSocket property. On Microsoft's AcceptSocket page, it says, "If not supplied (set to null
) before calling the Socket.AcceptAsync method, a new socket will be created automatically." That's what we will do in the code below. A new Socket
object will be created for every new connection by the Socket.AcceptAsync method. According to the Socket.AcceptAsync page, the "new socket is constructed with the same AddressFamily, SocketType, and ProtocolType as the current socket", which is the listening socket. I have found that in .NET 3.5 the Socket.AcceptAsync method also copies the settings for LingerState and NoDelay, even though the Socket.AcceptAsync page does not state it. Not sure about other versions of .NET.
We can have a pool of these SocketAsyncEventArgs objects to deal with accept operations. In this pool, you do not need one object for each connection the server is maintaining, because after the accept operation completes, a reference to the socket is handed off to another SocketAsyncEventArgs object pretty fast. It does not seem to help to put a lot of SocketAsyncEventArgs objects in the pool for accept operations. Again, repeating for clarity, the socket which does the accept operation can be accessed through the SocketAsyncEventArgs.AcceptSocket property of the SocketAsyncEventArgs objects that come out of the pool of the SocketAsyncEventArgs objects that we create for accept operations. After we pass a reference to the socket object from the SocketAsyncEventArgs object that does accept ops to a SocketAsyncEventArgs object that does send/receive operations, then you will access the socket through the SocketAsyncEventArgs.AcceptSocket property of the SocketAsyncEventArgs objects that does send/receive operations.
In .NET, instead of creating a new socket object for every accept op, there is the option of having a pool of Socket
objects and reusing sockets. A socket pool can yield a performance increase on a server in a situation where there are many clients connecting and disconnecting very quickly. (Don't try to reuse a socket on a client unless you will be connecting to a different server endpoint when you reuse it.) Use the Disconnect
or DisconnectAsync
method with appropriate options, instead of the Close
method, if you use a socket pool.
Receive/Send operations. In this app, the receive and send operations are handled via SocketAsyncEventArgs objects that come out of a pool of SocketAsyncEventArgs objects that we create for receive/send operations. This is not the same pool as we just examined regarding accept operations. To improve performance, we have a pool of these objects which do receive and send operations. The number of SocketAsyncEventArgs objects in the pool for receive/send operations should probably be at least equal to the maximum number of concurrent connections allowed.
What is our communication protocol in this code?
- One message from client will correspond with one message from the server.
- After a connection is made, the client will send a message first, and then post a receive op to wait for the response from the server. And for each message that the client sends, the server will respond with a message to the client. Then, it will post another receive op and wait for the next message from the client. In our code, the server will make a few changes to the data before responding to the client, so that we do more than just echo data sent by the client. That approach should help you see what happens with the buffers.
- We will prefix every message with an integer that tells the length of the message.
Note: The code below is not sufficient to handle the situation where one computer sends multiple messages to the other before the second one responds. That's more complex, and having code for that would pull our thoughts away from the primary purpose of this article and code.
Buffer: Buffers in TCP are unmanaged, that is, not controlled by the .NET Framework, but by the Windows system. So the buffer gets "pinned" to one place in memory, thereby causing memory fragmentation, since the .NET Garbage Collector will not be able to collect that space. This situation is improved by putting all the buffers together in one block of memory, and just reusing that same space over and over. Pay special attention to the code related to buffers, as buffer-related stuff seems to be an area where people have more difficulty.
In this code, I use separate buffer space for send and receive. You could just reuse the same space for both send and receive, thereby using only half as much memory. It is not necessarily best to separate the two. I just did it to help you think about buffers. (If you reuse the same space, then you can get rid of bufferOffsetReceive
and bufferOffsetSend
in DataHoldingUserToken
. And instead, just use the Offset
property in the SocketAsyncEventArgs
object. That's what the Offset
property is there for.)
The theoretical maximum size for the buffer block is 2.147 GB, since it uses an integer data type. And you would probably really want less than 500 MB, if on 32 bit Windows. This limitation should only be relevant if you use a large buffer size and/or have a large number of simultaneous connections. For example, if you use a buffer size of 50,000 bytes, and have a separate buffer for send and receive, then that is 100,000 bytes per connection. 2.147 GB divided by 100,000 bytes = 21,470, which would be the maximum number of connections that could use this buffer block with this buffer size and design.
General: I use explanatory comments in the code below, to make them easily understandable whether viewed on screen or a printed page. Sometimes in code comments, I abbreviate "SocketAsyncEventArgs
" as "SAEA".
class Program
{
public const Int32 maxNumberOfConnections = 3000;
public const Int32 port = 4444;
public const Int32 testBufferSize = 25;
public const Int32 maxSimultaneousAcceptOps = 10;
public const Int32 backlog = 100;
public const Int32 opsToPreAlloc = 2;
public const Int32 excessSaeaObjectsInPool = 1;
public const Int32 receivePrefixLength = 4;
public const Int32 sendPrefixLength = 4;
public static Int32 mainTransMissionId = 10000;
public static Int32 startingTid;
public static Int32 mainSessionId = 1000000000;
public static List<dataholder /> listOfDataHolders;
public static Int32 maxSimultaneousClientsThatWereConnected = 0;
static void Main(String[] args)
{
try
{
IPEndPoint localEndPoint = new IPEndPoint(IPAddress.Any, port);
WriteInfoToConsole(localEndPoint);
SocketListenerSettings theSocketListenerSettings =
new SocketListenerSettings(maxNumberOfConnections,
excessSaeaObjectsInPool, backlog, maxSimultaneousAcceptOps,
receivePrefixLength, testBufferSize, sendPrefixLength, opsToPreAlloc,
localEndPoint);
SocketListener socketListener = new SocketListener(theSocketListenerSettings);
}
catch (Exception ex)
{
Console.WriteLine("Error: " + ex.Message);
}
}
The primary class is SocketListener
.
class SocketListener
{
BufferManager theBufferManager;
Socket listenSocket;
Semaphore theMaxConnectionsEnforcer;
SocketListenerSettings socketListenerSettings;
PrefixHandler prefixHandler;
MessageHandler messageHandler;
SocketAsyncEventArgsPool poolOfAcceptEventArgs;
SocketAsyncEventArgsPool poolOfRecSendEventArgs;
public SocketListener(SocketListenerSettings theSocketListenerSettings)
{
this.socketListenerSettings = theSocketListenerSettings;
this.prefixHandler = new PrefixHandler();
this.messageHandler = new MessageHandler();
this.theBufferManager = new BufferManager(this.socketListenerSettings.BufferSize
* this.socketListenerSettings.NumberOfSaeaForRecSend
* this.socketListenerSettings.OpsToPreAllocate,
this.socketListenerSettings.BufferSize
* this.socketListenerSettings.OpsToPreAllocate);
this.poolOfRecSendEventArgs = new
SocketAsyncEventArgsPool(this.socketListenerSettings.NumberOfSaeaForRecSend);
this.poolOfAcceptEventArgs = new
SocketAsyncEventArgsPool(this.socketListenerSettings.MaxAcceptOps);
this.theMaxConnectionsEnforcer = new
Semaphore(this.socketListenerSettings.MaxConnections,
this.socketListenerSettings.MaxConnections);
Init();
StartListen();
}
internal void Init()
{
this.theBufferManager.InitBuffer();
for (Int32 i = 0; i < this.socketListenerSettings.MaxAcceptOps; i++)
{
this.poolOfAcceptEventArgs.Push(
CreateNewSaeaForAccept(poolOfAcceptEventArgs));
}
SocketAsyncEventArgs eventArgObjectForPool;
Int32 tokenId;
for (Int32 i = 0; i < this.socketListenerSettings.NumberOfSaeaForRecSend; i++)
{
eventArgObjectForPool = new SocketAsyncEventArgs();
this.theBufferManager.SetBuffer(eventArgObjectForPool);
tokenId = poolOfRecSendEventArgs.AssignTokenId() + 1000000;
eventArgObjectForPool.Completed += new
EventHandler<socketasynceventargs />(IO_Completed);
DataHoldingUserToken theTempReceiveSendUserToken = new
DataHoldingUserToken(eventArgObjectForPool, eventArgObjectForPool.Offset,
eventArgObjectForPool.Offset + this.socketListenerSettings.BufferSize,
this.socketListenerSettings.ReceivePrefixLength,
this.socketListenerSettings.SendPrefixLength, tokenId);
theTempReceiveSendUserToken.CreateNewDataHolder();
eventArgObjectForPool.UserToken = theTempReceiveSendUserToken;
this.poolOfRecSendEventArgs.Push(eventArgObjectForPool);
}
internal SocketAsyncEventArgs CreateNewSaeaForAccept(SocketAsyncEventArgsPool pool)
{
SocketAsyncEventArgs acceptEventArg = new SocketAsyncEventArgs();
acceptEventArg.Completed +=
new EventHandler<SocketAsyncEventArgs>(AcceptEventArg_Completed);
AcceptOpUserToken theAcceptOpToken = new
AcceptOpUserToken(pool.AssignTokenId() + 10000);
acceptEventArg.UserToken = theAcceptOpToken;
return acceptEventArg;
}
internal void StartListen()
{
listenSocket = new
Socket(this.socketListenerSettings.LocalEndPoint.AddressFamily,
SocketType.Stream, ProtocolType.Tcp);
listenSocket.Bind(this.socketListenerSettings.LocalEndPoint);
listenSocket.Listen(this.socketListenerSettings.Backlog);
StartAccept();
}
internal void StartAccept()
{
SocketAsyncEventArgs acceptEventArg;
if (this.poolOfAcceptEventArgs.Count > 1)
{
try
{
acceptEventArg = this.poolOfAcceptEventArgs.Pop();
}
catch
{
acceptEventArg = CreateNewSaeaForAccept(poolOfAcceptEventArgs);
}
}
else
{
acceptEventArg = CreateNewSaeaForAccept(poolOfAcceptEventArgs);
}
this.theMaxConnectionsEnforcer.WaitOne();
bool willRaiseEvent = listenSocket.AcceptAsync(acceptEventArg);
if (!willRaiseEvent)
{
ProcessAccept(acceptEventArg);
}
}
private void AcceptEventArg_Completed(object sender, SocketAsyncEventArgs e)
{
ProcessAccept(e);
}
private void ProcessAccept(SocketAsyncEventArgs acceptEventArgs)
{
if (acceptEventArgs.SocketError != SocketError.Success)
{
LoopToStartAccept();
AcceptOpUserToken theAcceptOpToken =
(AcceptOpUserToken)acceptEventArgs.UserToken;
HandleBadAccept(acceptEventArgs);
return;
}
LoopToStartAccept();
SocketAsyncEventArgs receiveSendEventArgs = this.poolOfRecSendEventArgs.Pop();
((DataHoldingUserToken)receiveSendEventArgs.UserToken).CreateSessionId();
receiveSendEventArgs.AcceptSocket = acceptEventArgs.AcceptSocket;
acceptEventArgs.AcceptSocket = null;
this.poolOfAcceptEventArgs.Push(acceptEventArgs);
StartReceive(receiveSendEventArgs);
}
private void LoopToStartAccept()
{
StartAccept();
}
private void StartReceive(SocketAsyncEventArgs receiveSendEventArgs)
{
receiveSendEventArgs.SetBuffer(receiveSendToken.bufferOffsetReceive,
this.socketListenerSettings.BufferSize);
bool willRaiseEvent =
receiveSendEventArgs.AcceptSocket.ReceiveAsync(receiveSendEventArgs);
if (!willRaiseEvent)
{
ProcessReceive(receiveSendEventArgs);
}
}
void IO_Completed(object sender, SocketAsyncEventArgs e)
{
switch (e.LastOperation)
{
case SocketAsyncOperation.Receive:
ProcessReceive(e);
break;
case SocketAsyncOperation.Send:
ProcessSend(e);
break;
default:
throw new ArgumentException("The last operation completed on
the socket was not a receive or send");
}
}
private void ProcessReceive(SocketAsyncEventArgs receiveSendEventArgs)
{
DataHoldingUserToken receiveSendToken =
(DataHoldingUserToken)receiveSendEventArgs.UserToken;
if (receiveSendEventArgs.SocketError != SocketError.Success)
{
receiveSendToken.Reset();
CloseClientSocket(receiveSendEventArgs);
return;
}
if (receiveSendEventArgs.BytesTransferred == 0)
{
receiveSendToken.Reset();
CloseClientSocket(receiveSendEventArgs);
return;
}
Int32 remainingBytesToProcess = receiveSendEventArgs.BytesTransferred;
if (receiveSendToken.receivedPrefixBytesDoneCount <
this.socketListenerSettings.ReceivePrefixLength)
{
remainingBytesToProcess = prefixHandler.HandlePrefix(receiveSendEventArgs,
receiveSendToken, remainingBytesToProcess);
if (remainingBytesToProcess == 0)
{
StartReceive(receiveSendEventArgs);
return;
}
}
bool incomingTcpMessageIsReady = messageHandler
.HandleMessage(receiveSendEventArgs,
receiveSendToken, remainingBytesToProcess);
if (incomingTcpMessageIsReady == true)
{
receiveSendToken.theMediator.HandleData(receiveSendToken.theDataHolder);
receiveSendToken.CreateNewDataHolder();
receiveSendToken.Reset();
receiveSendToken.theMediator.PrepareOutgoingData();
StartSend(receiveSendToken.theMediator.GiveBack());
}
else
{
receiveSendToken.receiveMessageOffset = receiveSendToken.bufferOffsetReceive;
receiveSendToken.recPrefixBytesDoneThisOp = 0;
StartReceive(receiveSendEventArgs);
}
}
private void StartSend(SocketAsyncEventArgs receiveSendEventArgs)
{
DataHoldingUserToken receiveSendToken =
(DataHoldingUserToken)receiveSendEventArgs.UserToken;
if (receiveSendToken.sendBytesRemainingCount
<= this.socketListenerSettings.BufferSize)
{
receiveSendEventArgs.SetBuffer(receiveSendToken.bufferOffsetSend,
receiveSendToken.sendBytesRemainingCount);
Buffer.BlockCopy(receiveSendToken.dataToSend,
receiveSendToken.bytesSentAlreadyCount,
receiveSendEventArgs.Buffer, receiveSendToken.bufferOffsetSend,
receiveSendToken.sendBytesRemainingCount);
}
else
{
receiveSendEventArgs.SetBuffer(receiveSendToken.bufferOffsetSend,
this.socketListenerSettings.BufferSize);
Buffer.BlockCopy(receiveSendToken.dataToSend,
receiveSendToken.bytesSentAlreadyCount,
receiveSendEventArgs.Buffer, receiveSendToken.bufferOffsetSend,
this.socketListenerSettings.BufferSize);
}
bool willRaiseEvent =
receiveSendEventArgs.AcceptSocket.SendAsync(receiveSendEventArgs);
if (!willRaiseEvent)
{
ProcessSend(receiveSendEventArgs);
}
}
private void ProcessSend(SocketAsyncEventArgs receiveSendEventArgs)
{
DataHoldingUserToken receiveSendToken =
(DataHoldingUserToken)receiveSendEventArgs.UserToken;
receiveSendToken.sendBytesRemainingCount =
receiveSendToken.sendBytesRemainingCount
- receiveSendEventArgs.BytesTransferred;
receiveSendToken.bytesSentAlreadyCount +=
receiveSendEventArgs.BytesTransferred;
if (receiveSendEventArgs.SocketError == SocketError.Success)
{
if (receiveSendToken.sendBytesRemainingCount == 0)
{
StartReceive(receiveSendEventArgs);
}
else
{
StartSend(receiveSendEventArgs);
}
}
else
{
receiveSendToken.Reset();
CloseClientSocket(receiveSendEventArgs);
}
}
private void CloseClientSocket(SocketAsyncEventArgs e)
{
var receiveSendToken = (e.UserToken as DataHoldingUserToken);
try
{
e.AcceptSocket.Shutdown(SocketShutdown.Both);
}
catch (Exception)
{
}
e.AcceptSocket.Close();
if (receiveSendToken.theDataHolder.dataMessageReceived != null)
{
receiveSendToken.CreateNewDataHolder();
}
this.poolOfRecSendEventArgs.Push(e);
Interlocked.Decrement(ref this.numberOfAcceptedSockets);
this.theMaxConnectionsEnforcer.Release();
}
private void HandleBadAccept(SocketAsyncEventArgs acceptEventArgs)
{
var acceptOpToken = (acceptEventArgs.UserToken as AcceptOpUserToken);
acceptEventArgs.AcceptSocket.Close();
poolOfAcceptEventArgs.Push(acceptEventArgs);
}
}
class PrefixHandler
{
public Int32 HandlePrefix(SocketAsyncEventArgs e,
DataHoldingUserToken receiveSendToken,
Int32 remainingBytesToProcess)
{
if (receiveSendToken.receivedPrefixBytesDoneCount == 0)
{
receiveSendToken.byteArrayForPrefix = new
Byte[receiveSendToken.receivePrefixLength];
}
if (remainingBytesToProcess >= receiveSendToken.receivePrefixLength
- receiveSendToken.receivedPrefixBytesDoneCount)
{
Buffer.BlockCopy(e.Buffer, receiveSendToken.receiveMessageOffset
- receiveSendToken.receivePrefixLength
+ receiveSendToken.receivedPrefixBytesDoneCount,
receiveSendToken.byteArrayForPrefix,
receiveSendToken.receivedPrefixBytesDoneCount,
receiveSendToken.receivePrefixLength
- receiveSendToken.receivedPrefixBytesDoneCount);
remainingBytesToProcess = remainingBytesToProcess
- receiveSendToken.receivePrefixLength
+ receiveSendToken.receivedPrefixBytesDoneCount;
receiveSendToken.recPrefixBytesDoneThisOp =
receiveSendToken.receivePrefixLength
- receiveSendToken.receivedPrefixBytesDoneCount;
receiveSendToken.receivedPrefixBytesDoneCount =
receiveSendToken.receivePrefixLength;
receiveSendToken.lengthOfCurrentIncomingMessage =
BitConverter.ToInt32(receiveSendToken.byteArrayForPrefix, 0);
return remainingBytesToProcess;
}
else
{
Buffer.BlockCopy(e.Buffer, receiveSendToken.receiveMessageOffset
- receiveSendToken.receivePrefixLength
+ receiveSendToken.receivedPrefixBytesDoneCount,
receiveSendToken.byteArrayForPrefix,
receiveSendToken.receivedPrefixBytesDoneCount,
remainingBytesToProcess);
receiveSendToken.recPrefixBytesDoneThisOp = remainingBytesToProcess;
receiveSendToken.receivedPrefixBytesDoneCount += remainingBytesToProcess;
remainingBytesToProcess = 0;
}
if (remainingBytesToProcess == 0)
{
receiveSendToken.receiveMessageOffset =
receiveSendToken.receiveMessageOffset -
receiveSendToken.recPrefixBytesDoneThisOp;
receiveSendToken.recPrefixBytesDoneThisOp = 0;
}
return remainingBytesToProcess;
}
}
class MessageHandler
{
public bool HandleMessage(SocketAsyncEventArgs receiveSendEventArgs,
DataHoldingUserToken receiveSendToken,
Int32 remainingBytesToProcess)
{
bool incomingTcpMessageIsReady = false;
if (receiveSendToken.receivedMessageBytesDoneCount == 0)
{
receiveSendToken.theDataHolder.dataMessageReceived =
new Byte[receiveSendToken.lengthOfCurrentIncomingMessage];
}
if (remainingBytesToProcess + receiveSendToken.receivedMessageBytesDoneCount
== receiveSendToken.lengthOfCurrentIncomingMessage)
{
Buffer.BlockCopy(receiveSendEventArgs.Buffer,
receiveSendToken.receiveMessageOffset,
receiveSendToken.theDataHolder.dataMessageReceived,
receiveSendToken.receivedMessageBytesDoneCount,
remainingBytesToProcess);
incomingTcpMessageIsReady = true;
}
else
{
Buffer.BlockCopy(receiveSendEventArgs.Buffer,
receiveSendToken.receiveMessageOffset,
receiveSendToken.theDataHolder.dataMessageReceived,
receiveSendToken.receivedMessageBytesDoneCount,
remainingBytesToProcess);
receiveSendToken.receiveMessageOffset =
receiveSendToken.receiveMessageOffset -
receiveSendToken.recPrefixBytesDoneThisOp;
receiveSendToken.receivedMessageBytesDoneCount += remainingBytesToProcess;
}
return incomingTcpMessageIsReady;
}
}
class BufferManager
{
Int32 totalBytesInBufferBlock;
byte[] bufferBlock;
Stack<int> freeIndexPool;
Int32 currentIndex;
Int32 bufferBytesAllocatedForEachSaea;
public BufferManager(Int32 totalBytes, Int32 totalBufferBytesInEachSaeaObject)
{
totalBytesInBufferBlock = totalBytes;
this.currentIndex = 0;
this.bufferBytesAllocatedForEachSaea = totalBufferBytesInEachSaeaObject;
this.freeIndexPool = new Stack<int>();
}
internal void InitBuffer()
{
this.bufferBlock = new byte[totalBytesInBufferBlock];
}
internal bool SetBuffer(SocketAsyncEventArgs args)
{
if (this.freeIndexPool.Count > 0)
{
args.SetBuffer(this.bufferBlock, this.freeIndexPool.Pop(),
this.bufferBytesAllocatedForEachSaea);
}
else
{
if ((totalBytesInBufferBlock - this.bufferBytesAllocatedForEachSaea) <
this.currentIndex)
{
return false;
}
args.SetBuffer(this.bufferBlock, this.currentIndex,
this.bufferBytesAllocatedForEachSaea);
this.currentIndex += this.bufferBytesAllocatedForEachSaea;
}
return true;
}
internal void FreeBuffer(SocketAsyncEventArgs args)
{
this.freeIndexPool.Push(args.Offset);
args.SetBuffer(null, 0, 0);
}
}
class DataHoldingUserToken
{
internal Mediator theMediator;
internal DataHolder theDataHolder;
internal readonly Int32 bufferOffsetReceive;
internal readonly Int32 permanentReceiveMessageOffset;
internal readonly Int32 bufferOffsetSend;
private Int32 idOfThisObject;
internal Int32 lengthOfCurrentIncomingMessage;
internal Int32 receiveMessageOffset;
internal Byte[] byteArrayForPrefix;
internal readonly Int32 receivePrefixLength;
internal Int32 receivedPrefixBytesDoneCount = 0;
internal Int32 receivedMessageBytesDoneCount = 0;
internal Int32 recPrefixBytesDoneThisOp = 0;
internal Int32 sendBytesRemainingCount;
internal readonly Int32 sendPrefixLength;
internal Byte[] dataToSend;
internal Int32 bytesSentAlreadyCount;
private Int32 sessionId;
public DataHoldingUserToken(SocketAsyncEventArgs e, Int32 rOffset, Int32 sOffset,
Int32 receivePrefixLength, Int32 sendPrefixLength, Int32 identifier)
{
this.idOfThisObject = identifier;
this.theMediator = new Mediator(e);
this.bufferOffsetReceive = rOffset;
this.bufferOffsetSend = sOffset;
this.receivePrefixLength = receivePrefixLength;
this.sendPrefixLength = sendPrefixLength;
this.receiveMessageOffset = rOffset + receivePrefixLength;
this.permanentReceiveMessageOffset = this.receiveMessageOffset;
}
public Int32 TokenId
{
get
{
return this.idOfThisObject;
}
}
internal void CreateNewDataHolder()
{
theDataHolder = new DataHolder();
}
internal void CreateSessionId()
{
sessionId = Interlocked.Increment(ref Program.mainSessionId);
}
public Int32 SessionId
{
get
{
return this.sessionId;
}
}
public void Reset()
{
this.receivedPrefixBytesDoneCount = 0;
this.receivedMessageBytesDoneCount = 0;
this.recPrefixBytesDoneThisOp = 0;
this.receiveMessageOffset = this.permanentReceiveMessageOffset;
}
}
class Mediator
{
private IncomingDataPreparer theIncomingDataPreparer;
private OutgoingDataPreparer theOutgoingDataPreparer;
private DataHolder theDataHolder;
private SocketAsyncEventArgs saeaObject;
public Mediator(SocketAsyncEventArgs e)
{
this.saeaObject = e;
this.theIncomingDataPreparer = new IncomingDataPreparer(saeaObject);
this.theOutgoingDataPreparer = new OutgoingDataPreparer();
}
internal void HandleData(DataHolder incomingDataHolder)
{
theDataHolder = theIncomingDataPreparer.HandleReceivedData
(incomingDataHolder, this.saeaObject);
}
internal void PrepareOutgoingData()
{
theOutgoingDataPreparer.PrepareOutgoingData(saeaObject, theDataHolder);
}
internal SocketAsyncEventArgs GiveBack()
{
return saeaObject;
}
}
class IncomingDataPreparer
{
private DataHolder theDataHolder;
private SocketAsyncEventArgs theSaeaObject;
public IncomingDataPreparer(SocketAsyncEventArgs e)
{
this.theSaeaObject = e;
}
private Int32 ReceivedTransMissionIdGetter()
{
Int32 receivedTransMissionId =
Interlocked.Increment(ref Program.mainTransMissionId);
return receivedTransMissionId;
}
private EndPoint GetRemoteEndpoint()
{
return this.theSaeaObject.AcceptSocket.RemoteEndPoint;
}
internal DataHolder HandleReceivedData(DataHolder incomingDataHolder,
SocketAsyncEventArgs theSaeaObject)
{
DataHoldingUserToken receiveToken =
(DataHoldingUserToken)theSaeaObject.UserToken;
theDataHolder = incomingDataHolder;
theDataHolder.sessionId = receiveToken.SessionId;
theDataHolder.receivedTransMissionId =
this.ReceivedTransMissionIdGetter();
theDataHolder.remoteEndpoint = this.GetRemoteEndpoint();
this.AddDataHolder();
return theDataHolder;
}
private void AddDataHolder()
{
lock (Program.lockerForList)
{
Program.listOfDataHolders.Add(theDataHolder);
}
}
}
class OutgoingDataPreparer
{
private DataHolder theDataHolder;
internal void PrepareOutgoingData(SocketAsyncEventArgs e,
DataHolder handledDataHolder)
{
DataHoldingUserToken theUserToken = (DataHoldingUserToken)e.UserToken;
theDataHolder = handledDataHolder;
Byte[] idByteArray = BitConverter.GetBytes
(theDataHolder.receivedTransMissionId);
Int32 lengthOfCurrentOutgoingMessage = idByteArray.Length
+ theDataHolder.dataMessageReceived.Length;
Byte[] arrayOfBytesInPrefix = BitConverter.GetBytes
(lengthOfCurrentOutgoingMessage);
theUserToken.dataToSend = new Byte[theUserToken.sendPrefixLength
+ lengthOfCurrentOutgoingMessage];
Buffer.BlockCopy(arrayOfBytesInPrefix, 0, theUserToken.dataToSend,
0, theUserToken.sendPrefixLength);
Buffer.BlockCopy(idByteArray, 0, theUserToken.dataToSend,
theUserToken.sendPrefixLength, idByteArray.Length);
Buffer.BlockCopy(theDataHolder.dataMessageReceived, 0,
theUserToken.dataToSend, theUserToken.sendPrefixLength
+ idByteArray.Length, theDataHolder.dataMessageReceived.Length);
theUserToken.sendBytesRemainingCount =
theUserToken.sendPrefixLength + lengthOfCurrentOutgoingMessage;
theUserToken.bytesSentAlreadyCount = 0;
}
}
class DataHolder
{
internal Byte[] dataMessageReceived;
internal Int32 receivedTransMissionId;
internal Int32 sessionId;
internal EndPoint remoteEndpoint;
}
internal sealed class SocketAsyncEventArgsPool
{
private Int32 nextTokenId = 0;
Stack<socketasynceventargs /> pool;
internal SocketAsyncEventArgsPool(Int32 capacity)
{
this.pool = new Stack<socketasynceventargs />(capacity);
}
internal Int32 Count
{
get { return this.pool.Count; }
}
internal Int32 AssignTokenId()
{
Int32 tokenId = Interlocked.Increment(ref nextTokenId);
return tokenId;
}
internal SocketAsyncEventArgs Pop()
{
lock (this.pool)
{
return this.pool.Pop();
}
}
internal void Push(SocketAsyncEventArgs item)
{
if (item == null)
{
throw new ArgumentNullException("Items added to a
SocketAsyncEventArgsPool cannot be null");
}
lock (this.pool)
{
this.pool.Push(item);
}
}
}
class SocketListenerSettings
{
private Int32 maxConnections;
private Int32 numberOfSaeaForRecSend;
private Int32 backlog;
private Int32 maxSimultaneousAcceptOps;
private Int32 receiveBufferSize;
private Int32 receivePrefixLength;
private Int32 sendPrefixLength;
private Int32 opsToPreAllocate;
private IPEndPoint localEndPoint;
public SocketListenerSettings(Int32 maxConnections,
Int32 excessSaeaObjectsInPool, Int32 backlog, Int32 maxSimultaneousAcceptOps,
Int32 receivePrefixLength, Int32 receiveBufferSize, Int32 sendPrefixLength,
Int32 opsToPreAlloc, IPEndPoint theLocalEndPoint)
{
this.maxConnections = maxConnections;
this.numberOfSaeaForRecSend = maxConnections + excessSaeaObjectsInPool;
this.backlog = backlog;
this.maxSimultaneousAcceptOps = maxSimultaneousAcceptOps;
this.receivePrefixLength = receivePrefixLength;
this.receiveBufferSize = receiveBufferSize;
this.sendPrefixLength = sendPrefixLength;
this.opsToPreAllocate = opsToPreAlloc;
this.localEndPoint = theLocalEndPoint;
}
public Int32 MaxConnections
{
get
{
return this.maxConnections;
}
}
public Int32 NumberOfSaeaForRecSend
{
get
{
return this.numberOfSaeaForRecSend;
}
}
public Int32 Backlog
{
get
{
return this.backlog;
}
}
public Int32 MaxAcceptOps
{
get
{
return this.maxSimultaneousAcceptOps;
}
}
public Int32 ReceivePrefixLength
{
get
{
return this.receivePrefixLength;
}
}
public Int32 BufferSize
{
get
{
return this.receiveBufferSize;
}
}
public Int32 SendPrefixLength
{
get
{
return this.sendPrefixLength;
}
}
public Int32 OpsToPreAllocate
{
get
{
return this.opsToPreAllocate;
}
}
public IPEndPoint LocalEndPoint
{
get
{
return this.localEndPoint;
}
}
}
The Server App
After downloading the zip file that contains the code, save it to disk. In order not to have problems using it in Visual Studio, before extracting it, right-click on the saved zip file and choose Properties, and then Unblock, then OK. Then extract it. If you do not do that, you may get security warning errors from Visual Studio.
Before running the server code the first time, you may want to change the folder where the logs are written. The default path is c:\LogForSaeaTest\, which will be created at server startup, if it does not exist. If you do not want to use the default folder, change the path in the TestFileWriter
class before running the app the first time. (The TestFileWriter
code is not included in the article above, but is in the source code.) For the most part, I have not set the server application up so that the SocketListenerSettings
and other variables can be controlled from the Console. You'll need to change the source code and rebuild to make most changes during testing.
It's much better to run the client on one machine and server on another. If you try to run the client and server on the same machine, try to use the computer name first as the value of the "host" variable on the client. If that does not work, try "localhost" as the value of the "host" variable on the client.
When trying to connect the client to the server, if you get a "connection actively refused" message, check to see if you have a firewall that is blocking the transmissions on the incoming port of the server. You might have to allow incoming transmissions on that port on your local network. And if you have a firewall that blocks outgoing transmission on the client, then you would need to change settings for that too.
The Client App
A lot of the code in the client app is very similar to the server app. The server code is fully commented. So I did not always fully comment the client code. If in the client you find code that you do not understand and it is not commented on, then check similar portions of the server app for code comments.
The client app is not a normal client, but an app designed to test the server. It is set up to deliver as many connections as you want to throw at the server. The client app is set up to build all the messages for all of those connections before the test is started. It sends a different message each time for each client. And all the messages are put in memory before the test starts, so message creation won't be a drag on the client app during the test. If you choose to have 3000 connections sending 50000 messages per connection, then that is 150 million messages. That is too many for memory probably. If you want to do a long test like that, then in the client app, change runLongTest
to true
. In that case, instead of sending a separate message for each message from each client, it will send the same array of messages over and over for each client. That way, the messages can fit in memory. (If you are doing a long test like that, also set runLongTest
to true
on the server. That will keep the server app from writing the received data to a dictionary, which displays data at the end of the test. Otherwise, you'll run the server out of memory probably.)
The client app is set up so that you can do the following from the Console:
- Put in the method for finding the network address of the host machine, either machine name or IP address,
- Put in the correct string for the host machine name or IP address, depending on what method you chose for getting the network address of the host,
- Type in the port number of the host app, or accept the default,
- Specify a folder name for the log file to be written to, or accept the default,
- Specify the buffer size, or accept the previous value,
- Specify the number of client connections to attempt, or accept the previous value,
- Indicate the number of TCP messages to send per connection, or accept the previous value.
In the downloadable source code, there is plenty of capability to visualize what is happening by writing to a log and the Console. The things that you can visualize are:
- Program flow, as it moves from method to method,
- Connects and disconnects,
- The data which was sent from client to server and server to client,
- Threads, only in the server app (Save the thread watching for last)
Simple Process to Understand the Code Well
First, if you have software firewalls on client and/or server, open those firewalls to allow the apps to transmit to port 4444 on the server, or whatever port you use. 4444 is the default port.
Start the server app, and make sure you see "Server is listening" in the console. Then start the client app. You'll be asked to specify the host in the console. Use the machine name, unless the server has a fixed IP address, in which case you can just use the IP address. For the other items it will ask you, hopefully you can just accept the defaults. When it displays "Press Enter to begin socket test", press Enter. It should finish quickly. Then close both client and server, to make the logs finish writing. You just sent one message from one client connection to the server, and a response message from the server back to that client connection. (If you had a problem, see the stuff about firewalls above.)
Now look at the log files from both the server and client. (It's easier if you print them.) Compare them to the code, and think about it a long time. You are looking at one message sent from one connection, and the response to that message. You should be able to see and understand the program flow very well from it.
(Tools that you may find helpful. When testing any network application, you will learn much more if you use the great free program Wireshark, or something like it, to see everything that is happening on the network. For reading very large text files which can be generated by the logger when running long tests, try Cream for Vim.)
Now go through that same process for each of the tests below. Start the server, and then the client. The client will ask you about buffer size, number of connections, and number of messages. You'll make some selections for those things on the client. When you change the number of connections, you are changing the number of simulated users. One connection is like one client user running on one machine on the Internet, or a LAN. Run the test. And then after each test, close the client and server apps to write the logs. Look at the logs and make sure you understand what is happening after each test. Then go to the next test in the list.
- Test 1. buffer size stays same (25), number of connections = 2, number of messages = 2.
- Test 2. buffer size stays same (25), number of connections = 3, number of messages = 5.
- Test 3. buffer size = 5 on client, number of connections = 3, number of messages = 5. (In this test, the buffer is smaller than the message with its prefix. So the client will require multiple send ops to send one message. And the client will require multiple receive ops to receive one message. You can see that in the client log. What happens in the server logs can vary, since one send op from client does not necessarily correlate with one receive op on the server, due to the way that TCP works.)
- Test 4. buffer size = 3 on client, number of connections = 3, number of messages = 5. (In this test, the buffer is smaller than even the prefix by itself. Multiple send ops will be required.)
- Test 5. Put buffer size back to 25 on client, number of connections = 3, number of messages = 5. Now on the server, in the Program.cs code, change
testBufferSize
from 25 to 5. Build the server app. Run the test. You'll see that multiple receive ops are required to receive a message from the client. And multiple send ops are required to send from the server back to the client. - Test 6. On the server, put
testBufferSize
back to 25, change watchThreads
from false
to true
, and build. Leave settings the same as they were on the client, and run the test again. Now, after you close the server app, the server log file will show info on threads. In Notepad, or whatever program you use to look at the logs, search for the phrase "New managed thread". You'll see that phrase every time a new thread starts. Usually, there are only 2-4 managed threads running for the app. The managed thread numbers and socket handle identifiers are displayed throughout the log file now. - Test 7. On the server, change
watchThreads
to false
, watchProgramFlow
to false
, maxNumberOfConnections
to 10,000, and build. On the client, change watchProgramFlow
to false
, and build. In the client console, at startup, make number of connections = 1000, number of messages = 500. Run the test. (If this crashes your client app, change runLongTest
to true
on the client, and build it again. If you change runLongTest
to true
on the client, there will not be a huge array of messages created on the client before starting the test. Instead, just one array of messages will be created, and it will be sent over and over, the same array being used for all of the connections. When runLongTest
is false
on the client, every connection has its own unique array of messages.) - Test 8. To run bigger tests, I suggest you change
runLongTest
to true
on the server also. Try setting number of connections = 8000, number of messages = 500000 on the client, and see if it crashes your server app. On the client, there is a tickDelayBeforeNextConn
variable which is set to delay the next connection by 50000 ticks (5 milliseconds). You can play around with that some. If you send too many new connection requests at once, you'll overwhelm the server. It's kind of fun to do once or twice. How many connections can the server handle? It depends on your hardware, configuration and version of Windows. In some testing I did, when running the server on an older single processor Dell desktop with Windows XP Pro 32 bit on a wired local area network with 100 MB NICs, it could handle 2000 connections sending/receiving messages continually with no problems. It usually does well with 3000 connections. And 4000 connections may cause problems sometimes. Keep it mind that it is possible to overwhelm a machine that is running the client app, if you open thousands of connections, because you are opening a port for every connection. If the server is under heavy load, it might reject some new connections. But the client app is able to retry a rejected connection until the connection is successful.
History
- December 13, 2010: Version 1.3 of code. Changed client to use a blocking
Stack<T>
to handle the initiation of new connections in a more controlled way. Previously, when the server was overloaded and rejecting a lot of connections, the connection retries were not handled very well by the client app. Added Shutdown()
before DisconnectAsync()
in client. Changed client so that it is closing a socket properly after finishing with it. In server app in v1.2, the code to handle bad accept socket was in CloseClientSocket
. Now it is handled in HandleBadAccept
. That change eliminates the checking for null
after the cast of the user token in the CloseClientSocket
method that was in v1.2. - October 12, 2010: Version 1.2 of code. Fixed bug in
ProcessAccept
where, in the case of socket error, we needed to LoopToStartAccept
and get rid of the bad socket. Fixed bug in CloseClientSocket
when we tried to destroy the socket in an SAEA for accept ops, because the SAEA's user token could not be cast to DataHoldingUserToken
type. Fixed bug in client code -- when the runlongtest
variable is true
the client was still trying to save received data in a List<T>, which would eventually cause it to run out of memory and crash in a test with a huge number of messages. Improved some log writing code. Cleaned up the client test app code, which I had just sort of thrown together without worrying about readability. Moved a couple of lines from SocketListener.ProcessReceive
to PrefixHandler
. Improved error handling in the client app. In the server, some code was moved from the Main
method into other methods, just for readability. - September 8, 2010: Version 1.1 of code. The
ProcessReceive
method was too big previously. I moved part of it into the PrefixHandler
and MessageHandler
classes. Moved receive buffer setting from ProcessSend()
to StartReceive()
. Fixed two bugs. One was when the number of bytes received exactly equalled the prefix length. The other was noticed by user mixal11. I had failed to reset some of the Token members in the case of abnormal disconnect. Thanks to mixal11. Made some minor changes to the article. - August 14, 2010: Fixed a few typographical errors in the article. Changed some formatting in the article, and moved one paragraph for clarity. Reworded a few things.