2009-04-30

Sample code: Length-prefix message framing for streams

(This post is part of the TCP/IP .NET Sockets FAQ)

The necessity of message framing is discussed at http://nitoprograms.blogspot.com/2009/04/message-framing.html.

The class below is a modified version of Nito.Async.Sockets.SocketPacketProtocol from the Nito.Async library. The main difference is that the Nito.Async.Sockets.SocketPacketProtocol class communicates directly with the asynchronous Nito socket classes, allowing a more efficient implementation. The PacketProtocol class below is slightly less efficient, but can be used with any socket classes, including synchronous sockets or even non-socket streams such as files.

Note that PacketProtocol is not threadsafe, so the instance members of this class must be synchronized when necessary.

// Original source: http://nitoprograms.blogspot.com/2009/04/sample-code-length-prefix-message.html
/// <summary>
/// Maintains the necessary buffers for applying a length-prefix message framing protocol over a stream.
/// </summary>
/// <remarks>
/// <para>Create one instance of this class for each incoming stream, and assign a handler to <see cref="MessageArrived"/>. As bytes arrive at the stream, pass them to <see cref="DataReceived"/>, which will invoke <see cref="MessageArrived"/> as necessary.</para>
/// <para>If <see cref="DataReceived"/> raises <see cref="System.Net.ProtocolViolationException"/>, then the stream data should be considered invalid. After that point, no methods should be called on that <see cref="PacketProtocol"/> instance.</para>
/// <para>This class uses a 4-byte signed integer length prefix, which allows for message sizes up to 2 GB. Keepalive messages are supported as messages with a length prefix of 0 and no message data.</para>
/// <para>This is EXAMPLE CODE! It is not particularly efficient; in particular, if this class is rewritten so that a particular interface is used (e.g., Socket's IAsyncResult methods), some buffer copies become unnecessary and may be removed.</para>
/// </remarks>
public class PacketProtocol
{
    /// <summary>
    /// Wraps a message. The wrapped message is ready to send to a stream.
    /// </summary>
    /// <remarks>
    /// <para>Generates a length prefix for the message and returns the combined length prefix and message.</para>
    /// </remarks>
    /// <param name="message">The message to send.</param>
    public static byte[] WrapMessage(byte[] message)
    {
        // Get the length prefix for the message
        byte[] lengthPrefix = BitConverter.GetBytes(message.Length);
 
        // Concatenate the length prefix and the message
        byte[] ret = new byte[lengthPrefix.Length + message.Length];
        lengthPrefix.CopyTo(ret, 0);
        message.CopyTo(ret, lengthPrefix.Length);
 
        return ret;
    }
 
    /// <summary>
    /// Wraps a keepalive (0-length) message. The wrapped message is ready to send to a stream.
    /// </summary>
    public static byte[] WrapKeepaliveMessage()
    {
        return BitConverter.GetBytes((int)0);
    }
 
    /// <summary>
    /// Initializes a new <see cref="PacketProtocol"/>, limiting message sizes to the given maximum size.
    /// </summary>
    /// <param name="maxMessageSize">The maximum message size supported by this protocol. This may be less than or equal to zero to indicate no maximum message size.</param>
    public PacketProtocol(int maxMessageSize)
    {
        // We allocate the buffer for receiving message lengths immediately
        this.lengthBuffer = new byte[sizeof(int)];
        this.maxMessageSize = maxMessageSize;
    }
 
    /// <summary>
    /// The buffer for the length prefix; this is always 4 bytes long.
    /// </summary>
    private byte[] lengthBuffer;
 
    /// <summary>
    /// The buffer for the data; this is null if we are receiving the length prefix buffer.
    /// </summary>
    private byte[] dataBuffer;
 
    /// <summary>
    /// The number of bytes already read into the buffer (the length buffer if <see cref="dataBuffer"/> is null, otherwise the data buffer).
    /// </summary>
    private int bytesReceived;
 
    /// <summary>
    /// The maximum size of messages allowed.
    /// </summary>
    private int maxMessageSize;
 
    /// <summary>
    /// Indicates the completion of a message read from the stream.
    /// </summary>
    /// <remarks>
    /// <para>This may be called with an empty message, indicating that the other end had sent a keepalive message. This will never be called with a null message.</para>
    /// <para>This event is invoked from within a call to <see cref="DataReceived"/>. Handlers for this event should not call <see cref="DataReceived"/>.</para>
    /// </remarks>
    public Action<byte[]> MessageArrived { get; set; }
 
    /// <summary>
    /// Notifies the <see cref="PacketProtocol"/> instance that incoming data has been received from the stream. This method will invoke <see cref="MessageArrived"/> as necessary.
    /// </summary>
    /// <remarks>
    /// <para>This method may invoke <see cref="MessageArrived"/> zero or more times.</para>
    /// <para>Zero-length receives are ignored. Many streams use a 0-length read to indicate the end of a stream, but <see cref="PacketProtocol"/> takes no action in this case.</para>
    /// </remarks>
    /// <param name="data">The data received from the stream. Cannot be null.</param>
    /// <exception cref="System.Net.ProtocolViolationException">If the data received is not a properly-formed message.</exception>
    public void DataReceived(byte[] data)
    {
        // Process the incoming data in chunks, as the ReadCompleted requests it
 
        // Logically, we are satisfying read requests with the received data, instead of processing the
        //  incoming buffer looking for messages.
 
        int i = 0;
        while (i != data.Length)
        {
            // Determine how many bytes we want to transfer to the buffer and transfer them
            int bytesAvailable = data.Length - i;
            if (this.dataBuffer != null)
            {
                // We're reading into the data buffer
                int bytesRequested = this.dataBuffer.Length - this.bytesReceived;
 
                // Copy the incoming bytes into the buffer
                int bytesTransferred = Math.Min(bytesRequested, bytesAvailable);
                Array.Copy(data, i, this.dataBuffer, this.bytesReceived, bytesTransferred);
                i += bytesTransferred;
 
                // Notify "read completion"
                this.ReadCompleted(bytesTransferred);
            }
            else
            {
                // We're reading into the length prefix buffer
                int bytesRequested = this.lengthBuffer.Length - this.bytesReceived;
 
                // Copy the incoming bytes into the buffer
                int bytesTransferred = Math.Min(bytesRequested, bytesAvailable);
                Array.Copy(data, i, this.lengthBuffer, this.bytesReceived, bytesTransferred);
                i += bytesTransferred;
 
                // Notify "read completion"
                this.ReadCompleted(bytesTransferred);
            }
        }
    }
 
    /// <summary>
    /// Called when a read completes. Parses the received data and calls <see cref="MessageArrived"/> if necessary.
    /// </summary>
    /// <param name="count">The number of bytes read.</param>
    /// <exception cref="System.Net.ProtocolViolationException">If the data received is not a properly-formed message.</exception>
    private void ReadCompleted(int count)
    {
        // Get the number of bytes read into the buffer
        this.bytesReceived += count;
 
        if (this.dataBuffer == null)
        {
            // We're currently receiving the length buffer
 
            if (this.bytesReceived != sizeof(int))
            {
                // We haven't gotten all the length buffer yet: just wait for more data to arrive
            }
            else
            {
                // We've gotten the length buffer
                int length = BitConverter.ToInt32(this.lengthBuffer, 0);
 
                // Sanity check for length < 0
                if (length < 0)
                    throw new System.Net.ProtocolViolationException("Message length is less than zero");
 
                // Another sanity check is needed here for very large packets, to prevent denial-of-service attacks
                if (this.maxMessageSize > 0 && length > this.maxMessageSize)
                    throw new System.Net.ProtocolViolationException("Message length " + length.ToString(System.Globalization.CultureInfo.InvariantCulture) + " is larger than maximum message size " + this.maxMessageSize.ToString(System.Globalization.CultureInfo.InvariantCulture));
 
                // Zero-length packets are allowed as keepalives
                if (length == 0)
                {
                    this.bytesReceived = 0;
                    if (this.MessageArrived != null)
                        this.MessageArrived(new byte[0]);
                }
                else
                {
                    // Create the data buffer and start reading into it
                    this.dataBuffer = new byte[length];
                    this.bytesReceived = 0;
                }
            }
        }
        else
        {
            if (this.bytesReceived != this.dataBuffer.Length)
            {
                // We haven't gotten all the data buffer yet: just wait for more data to arrive
            }
            else
            {
                // We've gotten an entire packet
                if (this.MessageArrived != null)
                    this.MessageArrived(this.dataBuffer);
 
                // Start reading the length buffer again
                this.dataBuffer = null;
                this.bytesReceived = 0;
            }
        }
    }
}

(This post is part of the TCP/IP .NET Sockets FAQ)

Message Framing

(This post is part of the TCP/IP .NET Sockets FAQ)

The Problem

One of the most common beginner mistakes for people designing protocols for TCP/IP is that they assume that message boundaries are preserved. For example, they assume a single "Send" will result in a single "Receive".

Some TCP/IP documentation is partially to blame. Many people read about how TCP/IP preserves packets - splitting them up when necessary and re-ordering and re-assembling them on the receiving side. This is perfectly true; however, a single "Send" does not send a single packet.

Local machine (loopback) testing confirms this misunderstanding, because usually when client and server are on the same machine they communicate quickly enough that single "sends" do in fact correspond to single "receives". Unfortunately, this is only a coincidence.

This problem usually manifests itself when attempting to deploy a solution to the Internet (increasing latency between client and server) or when trying to send larger amounts of data (requiring fragmentation). Unfortunately, at this point, the project is usually in its final stages, and sometimes the application protocol has even been published!

True story: I once worked for a company that developed custom client/server software. The original communications code had made this common mistake. However, they were all on dedicated networks with high-end hardware, so the underlying problem only happened very rarely. When it did, the operators would just chalk it up to "that buggy Windows OS" or "another network glitch" and reboot. One of my tasks at this company was to change the communication to include a lot more information; of course, this caused the problem to manifest regularly, and the entire application protocol had to be changed to fix it. The truly amazing thing is that this software had been used in countless 24x7 automation systems for 20 years; it was fundamentally broken and no one noticed.

The Solution, Part 1 - Understanding

First, one must understand the abstraction of TCP/IP. From the application's perspective, TCP operates on streams of data, never packets. Repeat this mantra three times: "TCP does not operate on packets of data. TCP operates on streams of data."

There is no way to send a packet of data over TCP; that function call does not exist. Rather, there are two streams in a TCP connection: an incoming stream and an outgoing stream. One may read from the incoming stream by calling a "receive" method, and one may write to the outgoing stream by calling a "send" method. If one side calls "send" to send 5 bytes, and then calls "send" to send 5 more bytes, then there are 10 bytes that are placed in the outgoing stream. The receiving side may decide to read them one at a time from its receiving stream if it so wishes (calling "receive" 10 times), or it may wait for all 10 bytes to arrive and then read them all at once with a single call to "receive".

Sending data to the TCP stream is rather easy; all one has to do is call "send", and the appropriate bytes are queued to the outgoing stream. Receiving data from the TCP stream is a bit more tricky, because the "receive N bytes" operation will wait until at least one byte and at most N bytes arrive on the incoming stream before it returns. Note that the "receive N bytes" operation will complete even if it doesn't read all N bytes, giving the application a chance to act on partial data while the rest of the data bytes are in transit. In the real world, very few programs can process partial receives; almost all programs need a buffer to store partial receives until they have enough data to do meaningful work.

To repeat: TCP operates on streams, not on packets. However, most application protocols are based on the idea of "messages"; for example, a client may send a "Lookup X" message to the server, and the server will respond with an "X Data" or "X Not Found" message. Since TCP operates on streams, one must design a "message framing" protocol that will wrap the messages sent back and forth.

The Solution, Part 2 - Design

There are two approaches commonly used for message framing: length prefixing and delimiters.

Length prefixing prepends each message with the length of that message. The format (and length) of the length prefix must be explicitly stated; "4-byte signed little-endian" (i.e., "int" in C#) is a common choice. To send a message, the sending side first converts the message to a byte array and then sends the length of the byte array followed by the byte array itself.

Receiving a length-prefixed message is harder, because of the possibility of partial receives. First, one must read the length of the message into a buffer until the buffer is full (e.g., if using "4-byte signed little-endian", this buffer is 4 bytes). Then one allocates a second buffer and reads the data into that buffer. When the second buffer is full, then a single message has arrived, and one goes back to reading the length of the next message.

Delimiters are more complex to get right. When sending, any delimiter characters in the data must be replaced, usually with an escaping function. The receiving code cannot predict the incoming message size, so it must append all received data onto the end of a receiving buffer, growing the buffer as necessary. When a delimiter is found, the receiving side can apply an unescaping function to the receiving buffer to get the message. If the messages will never contain delimiters, then one may skip the escaping/unescaping functions.

A Brief Security Note

Whether using length-prefixing or delimiters, one must include code to prevent denial of service attacks. Length-prefixed readers can be given a huge message size; delimiting readers can be given a huge amount of data without delimiters. Either of these may result in an OutOfMemoryException, so one must include a maximum message size "sanity check" in the socket reading code.

The Solution, Part 3 - Code

A code sample for using length-prefixing is in its own blog post at http://nitoprograms.blogspot.com/2009/04/sample-code-length-prefix-message.html.

Another decent code example of length prefixing is on Jon Cole's blog, although he assumes all the messages are just ASCII strings.

Yet another example of length prefixing is in the Nito.Async library: the Nito.Async.Sockets.SocketPacketProtocol class can be used to send or receive length-prefixed binary messages. It is written to use the Nito.Async socket classes, but the same code concepts translate well to the .NET Socket class.

(This post is part of the TCP/IP .NET Sockets FAQ)

TCP/IP .NET Sockets FAQ

This is an attempt to address some TCP/IP frequently asked questions and present best practices. While the WinSock Programmer's FAQ will remain the ultimate FAQ for native code, there is a growing need for a simplified version that addresses the managed interface to TCP/IP sockets.

Section 1 - Application Protocol Design
1.1 - Message framing, also known as:
  "One side sent X bytes, but the other side only got Y bytes."
  "One side sent several packets, but the other side only got one packet, which was all the sent packets appended together."
  "I need the function that will send exactly one packet of data."
1.2 - Detection of half-open (dropped) connections, also known as:
  "My socket doesn't detect a lost connection; it just sits there forever waiting for more data to arrive."
1.3 - Application Protocol Specifications
1.4 - XML over TCP/IP

Section 2 - Socket Class
2.1 - Socket operations
2.2 - Error handling
2.3 - Using Socket as a client socket
2.4 - Using Socket as a server (listening) socket
2.5 - Using Socket as a connected socket

Section 3 - Miscellaneous
3.1 - Resources
3.2 - Getting the local IP address

Section C - Code
C.1 - Length-prefix message framing for streams
C.2 - Getting the local IP addresses

2009-04-24

Asynchronous Callback Contexts

One major - and often overlooked - issue when designing asynchronous components is the difficulty of cancellation, particularly during object disposal.

The Problem

End-users do not expect components to raise events after they have been disposed. It is natural to assume that after an object has been disposed of, it will not raise an event some time in the future. Likewise, if the component has repeating events and supports cancellation, it is reasonable to assume that the events will stop firing after the component has been "cancelled". However, implementing this expected behavior takes some forethought.

When asynchronous components raise events, these events are generally either queued to an "originating" thread or queued to be executed by the ThreadPool. Usually, it is a simple matter to cancel an upcoming event as long as it is not yet queued; the trick comes in how to handle cancelled events that have already been queued.

The Solution

The answer is to define some sort of "context". When the component queues an event, it copies the current value of the context, and when the event is actually processed, it first checks its value of the context against the component's current context value. If they match, then the event knows it is safe to continue; if they don't match, then the event knows it has been cancelled. The component then just changes its context value whenever it is cancelled or disposed.

The .NET framework provides an excellent choice for contexts: object. Objects are compared using fast reference equality and they are fast to allocate and deallocate. A component includes an object reference as its context, and allocates a new one when it is cancelled or disposed. Earlier versions of Nito.Async asynchronous components used exactly this method.

Callback Contexts in the Real World

Fire up Reflector and take a look at System.Timers.Timer in System.dll (2.0.0.0). It has a private field of type object named "cookie". When the timer is enabled, it allocates a new object, saves it into "cookie", and passes it as the state object to the underlying System.Threading.Timer callback. The underlying timer callback (in MyTimerCallback) compares the state object to the current value of cookie, and doesn't proceed with the event if they don't match.

System.Timers.Timer does not change "cookie" when it is disposed because the underlying System.Threading.Timer will not invoke its TimerCallback after it has been disposed. At first glance this appears correct, but this is actually a race condition bug because there is a possibility of System.Timers.Timer.Elapsed being invoked while another thread is executing System.Timers.Timer.Close. If SynchronizingObject is non-null, MyTimerCallback may queue the callback to the thread executing Dispose, resulting in a situation where an event (Elapsed) is fired after the object has been disposed.

Reusable CallbackContext Type

One of the new classes in version 1.2 of the Nito Async library is a reusable CallbackContext type. This class encapsulates all the semantics necessary, and introduces a few new terms:

  • A delegate may be bound to a CallbackContext. Binding a delegate results in a new delegate (the bound delegate) - which wraps the original delegate.
  • Every bound delegate is either valid or invalid. When a valid delegate executes, it will execute its wrapped delegate; when an invalid delegate executes, it will do nothing.

Delegates are bound to the CallbackContext by calling CallbackContext.Bind; delegates are valid when they are bound. A CallbackContext will invalidate all of its bound delegates when CallbackContext.Reset is called.

To use CallbackContext from an asynchronous component, bind each delegate that needs to check the context. Then call CallbackContext.Reset when the operation is cancelled. CallbackContext also derives from IDisposable and implements Dispose (as a synonym for Reset) to remind users to call CallbackContext.Dispose when the asynchronous component is disposed.

The only other note regarding CallbackContext is that the delegates should be synchronized (using SynchronizingObject or SynchronizationContext) before the bound delegate is invoked. We are considering adding overloads to CallbackContext to allow for synchronization and binding in a single step, ensuring the correct order; if they are added, they will be included in Nito.Async version 1.3.