Click here to Skip to main content
15,900,907 members
Please Sign up or sign in to vote.
5.00/5 (3 votes)
See more:
Hi all, I have a client/server app. They both use asynchronous calls when receiving data. its build with TCP and is meant, primarily for sending files.

A command is sent along a socket which is then 'converted' into and action with a simple switch case. If the client send the command "SENDFILE" I want the server to be able to enter a case which calls a function which then handles any further data along that socket and combines it into a file.

This is OnDataReceive callback function and switch case on the server side:

public void OnDataReceived(IAsyncResult asyn)
        {
            SocketData socketData = (SocketData)asyn.AsyncState;
            try
            {

                // Complete the BeginReceive() asynchronous call by EndReceive() method
                // which will return the number of characters written to the stream 
                // by the client
                socketData.m_currentSocket.EndReceive(asyn);

                //Get packet dtata
                Packet returnPacket = Helper.ByteArrayToPacket(socketData.dataBuffer);

                switch (returnPacket.command)
                {
                    case Command.SENDREQUEST: //Get ready for an incoming file      


Here the class that reads from a network stream asynchronously and write to a filestream synchronously:
Is that right?


public static void NetToFile(NetworkStream net, FileStream file)
        {
            var copier = new AsyncStreamCopier(net, file);
            copier.Start();
        }

public class AsyncStreamCopier
    {
        public event EventHandler Completed;

        private readonly Stream input;
        private readonly Stream output;

        private byte[] buffer = new byte[Settings.BufferSize];

        public AsyncStreamCopier(Stream input, Stream output)
        {
            this.input = input;
            this.output = output;
        }

        public void Start()
        {
            GetNextChunk();
        }

        private void GetNextChunk()
        {
            input.BeginRead(buffer, 0, buffer.Length, InputReadComplete, null);
        }

        private void InputReadComplete(IAsyncResult ar)
        {
            // input read asynchronously completed
            int bytesRead = input.EndRead(ar);

            if (bytesRead == 0)
            {
                RaiseCompleted();
                return;
            }

            // write synchronously
            output.Write(buffer, 0, bytesRead);

            // get next
            GetNextChunk();
        }

        private void RaiseCompleted()
        {
            if (Completed != null)
            {
                Completed(this, EventArgs.Empty);
            }
        }
    }


The problem im having is doing the opposite, reading from a filestream to a networkstream, should I read from the filestream asynchronously and write synchronously to the network stream?

also because with the above example, the first time Start() is called and the function ends it goes back to the server switch case and any further (file)data is then hitting the
Packet returnPacket = Helper.ByteArrayToPacket(socketData.dataBuffer);


and erroring :(

I previously had a synchronous method of doing it using while loops but that takes up a lot of CPU and im trying to prevent that.

Could someone point me in the right direction ? there isn't much on the net from what i can see.
Posted

1 solution

I would suggest you to create your own socketPacket class.
Here is an example from my old poject:
public class SocketPacket
    {
        internal byte[]         DataBuffer;
        public PackHeader       Header;
        public Guid             SocketID        { get; set; }
        public byte[]           Buffer          { get; set; }
        public Socket           ThisSocket      { get; set; }
        public bool             Authorized      { get; set; }
        public DateTime         KeepAliveTime   { get; set; }

        private const int SIZE = 1024;

        public void ResetBuffer()
        {
            Buffer = new byte[SIZE];
            DataBuffer = new byte[0];
        }
        public SocketPacket()
        {
            Buffer          = new byte[SIZE];
            DataBuffer      = new byte[0];
            SocketID        = new Guid();
            
            ThisSocket      = new Socket(   AddressFamily.InterNetwork, 
                                            SocketType.Stream, 
                                            ProtocolType.Tcp    );

            KeepAliveTime   = new DateTime();
        }
        public void AddToBuffer(byte[] array, int count)
        {
            byte[] newarray = new byte[DataBuffer.Length + count];
            System.Buffer.BlockCopy(DataBuffer, 0, newarray, 0, DataBuffer.Length);
            System.Buffer.BlockCopy(array, 0, newarray, DataBuffer.Length, count);

            DataBuffer = newarray;
        }
    }



Then you might want to create classes, which would represent your packs you want to send thru sockets. For instance:
[Serializable]
public class PackageBase
{
    private byte[]  _packageBinaryData;
    public byte[]   BinaryData
    {
        get
        {
            _packageBinaryData = null;
            var _formatter              = new BinaryFormatter();
            var _serializationStream    = new MemoryStream();
            _formatter.Serialize(_serializationStream, this);
           _packageBinaryData   = _serializationStream.ToArray();
            return _packageBinaryData;
        }
    }
}

    [Serializable]
    public class MessagePackage : PackageBase
    {
        public string Message;
    }


When you get your first package bytes in your OnDataReceive
method: read only the package header first -- it will give you info about the incoming package, its length and whatever you have in your Header. Then make the read the rest from the socket. While you itterating within a package (usualy its only two itterations: header, and a package body), collect package data, for example in SocketPacket.


I will provide the example of such 'engine'. Here is a class:

public abstract class Runner
    {
        private AsyncCallback       _pfnCallBack;
        protected const int         DEFAULT_PACKSIZE    = 1024;
        protected readonly int      HEADER_SIZE         = sizeof(int) + sizeof(long) + Marshal.SizeOf(Guid.NewGuid());
        public delegate void PackageHandler<T>(T package, PackCodes code, Guid clientID);
        public abstract event PackageHandler<PackageBase> OnPackageRecieved;
        public bool IsRunning { get; internal set; }
        protected void OnDataReceived(IAsyncResult asyn)
        {
            var Header  = new PackHeader();
            var packet  = (SocketPacket)asyn.AsyncState;
            int iRx;
            try
            {
                iRx = packet.ThisSocket.EndReceive(asyn);
            }
            catch (Exception)
            {
                return;
            }
            

            packet.AddToBuffer(packet.Buffer, iRx);
            /* if it is a start of a packet (no data accepted, yet) */
            if (packet.DataBuffer.Length == HEADER_SIZE)
            {
                var code        = new byte[sizeof(int)];
                var length      = new byte[sizeof(long)];
                var clientId    = new byte[Marshal.SizeOf(Guid.NewGuid())];

                /* recreate package header from recieved bytes */
                Buffer.BlockCopy(packet.DataBuffer, 0, code, 0, code.Length);
                Buffer.BlockCopy(packet.DataBuffer, code.Length, length, 0, length.Length);
                Buffer.BlockCopy(packet.DataBuffer, length.Length + code.Length, clientId, 0, clientId.Length);
                Header.Code     = (PackCodes)BitConverter.ToInt32(code, 0);
                Header.Length   = BitConverter.ToInt64(length, 0);
                Header.ClientId = new Guid(clientId);

                packet.Header = Header;
            }
            if (_pfnCallBack == null)
                _pfnCallBack = OnDataReceived;

            if (packet.Header.Length > packet.DataBuffer.Length - HEADER_SIZE)
            {

                int bytesToReceive;
                if (packet.Header.Length - packet.DataBuffer.Length + HEADER_SIZE < DEFAULT_PACKSIZE)
                    bytesToReceive = (int)(packet.Header.Length - packet.DataBuffer.Length + HEADER_SIZE);
                else
                    bytesToReceive = DEFAULT_PACKSIZE;

                packet.ThisSocket.BeginReceive(packet.Buffer, 0, bytesToReceive, SocketFlags.None, _pfnCallBack, packet);
            }
            else
            {
                ProcessIncomingPackage(packet);
            }
        }

        protected T GetPackageFromSocketPacket<T>(SocketPacket packet, ref PackCodes code) where T : class
        {
            var body = new byte[packet.DataBuffer.Length - HEADER_SIZE];
            var formatter = new BinaryFormatter();

            Buffer.BlockCopy(packet.DataBuffer, HEADER_SIZE, body, 0, body.Length);

            var stream = new MemoryStream(body);

            PackageBase pack = null;
            code = PackCodes.Unknown;

            switch (packet.Header.Code)
            {

                case PackCodes.Login:
                    pack = (LoginPackage)formatter.Deserialize(stream);
                    code = PackCodes.Login;
                    break;
                case PackCodes.Register:
                    pack = (RegisterPackage)formatter.Deserialize(stream);
                    code = PackCodes.Register;
                    break;
                case PackCodes.ErrorMessage:
                    pack = (ErrorMessagePackage)formatter.Deserialize(stream);
                    code = PackCodes.ErrorMessage;
                    break;
                case PackCodes.Message:
                    pack = (MessagePackage)formatter.Deserialize(stream);
                    code = PackCodes.Message;
                    break;
                case PackCodes.ClientId:
                    pack = (ClientIdentificationPackage)formatter.Deserialize(stream);
                    code = PackCodes.ClientId;
                    break;
                case PackCodes.SymbolsRequest:
                    pack = (SymbolsRequestPackage) formatter.Deserialize(stream);
                    code = PackCodes.SymbolsRequest;
                    break;
                case PackCodes.SymbolsResponse:
                    pack = (SymbolsResponsePackage)formatter.Deserialize(stream);
                    code = PackCodes.SymbolsResponse;
                    break;
                case PackCodes.SymbolDataRequest:
                    pack = (SymbolDataRequestPackage)formatter.Deserialize(stream);
                    code = PackCodes.SymbolDataRequest;
                    break;
                case PackCodes.SymbolsDataResponse:
                    pack = (SymbolDataResponsePackage)formatter.Deserialize(stream);
                    code = PackCodes.SymbolsDataResponse;
                    break;
                case PackCodes.SymbolDataStop:
                    pack = (StopSymbolDataPackage)formatter.Deserialize(stream);
                    code = PackCodes.SymbolDataStop;
                    break;
                case PackCodes.NewOrder:
                    pack = (OrderPackage)formatter.Deserialize(stream);
                    code = PackCodes.NewOrder;
                    break;
                case PackCodes.OrdersRequest:
                    pack = (CustomerOrdersRequestPackage)formatter.Deserialize(stream);
                    code = PackCodes.OrdersRequest;
                    break;
                case PackCodes.OrdersResponse:
                    pack = (CustomerOrdersResponsePackage)formatter.Deserialize(stream);
                    code = PackCodes.OrdersResponse;
                    break;
                case PackCodes.CloseOrderRequest:
                    pack = (CloseOrderPackage) formatter.Deserialize(stream);
                    code = PackCodes.CloseOrderRequest;
                    break;
                case PackCodes.RefreshOrdersCommand:
                    pack = (RefreshYourOrdersCommandPackage)formatter.Deserialize(stream);
                    code = PackCodes.RefreshOrdersCommand;
                    break;
            }
            return pack as T;
        }

        protected abstract void ProcessIncomingPackage(SocketPacket packet);



Hope, I helped you.
 
Share this answer
 

This content, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)



CodeProject, 20 Bay Street, 11th Floor Toronto, Ontario, Canada M5J 2N8 +1 (416) 849-8900