Dealing with Deadlocks and Other Unfortunate Events on the Application Level: Part 4 (Or: A Simple Message Queue Listener)

6 minutes read

See Part 3

In Part 3, I illustrated a simple strategy to re-run a “repeatable” transaction using a simple “while” loop. A better and more fault resilient way of handling such cases (we are talking about transactions, after all) is to employ a transactional resource manager such as MSMQ from which to retrieve the transaction input. If the transaction fails due to a catastrophic failure (power outage or worse), an in-memory recovery strategy like the one shown in “RunTx()” will not get us very far. Instead, we will put the transaction input into a transactional queue, read it from the queue using a specialized message queue listener and equip that listener with some handling logic that is similar to what is shown in the “RunTx()” example.

Before I will show that, however, I will give you a generic, multi-threaded message queue listener that is not transaction aware (in the desired way) and which I will specialize in Part 5 (the last part) of this series. The reasons that I post the message listener logic in two steps are that (a) you might find a generic listener useful, (b) it is quite a bit of code and I don’t want the “essentials” of the recovery code to be buried amongst the basic queue listener framework, and (c) I am simply too lazy today to explain it all ;-)

Using this queue listener is pretty straightforward and should be rather self-explanatory. To listen, create an instance of the listener using one of the constructors and supply a “queueName” argument that is compatible with the System.Messaging.MessageQueue class. The queue must exist. The listener will bubble messages to the MessageReceived delegate (“unicast event”), which you must implement in your class and assign to the listener in order to get at the messages.

MessageQueueListener queueListener = new MessageQueueListener(sourceQueuePath, 4 );
queueListener.MessageReceived = new MessageReceivedEventHandler(queueListener MessageReceived);
queueListener.ProcessingError += new ProcessingErrorEventHandler(queueListener ProcessingError);
queueListener.Start();

And here are 344 lines of pure developer joy:

using System;
using System.Collections;
using System.Messaging;
using System.Threading;

namespace newtelligence.EnterpriseTools.Msmq
{
    /// <summary>
    /// Event argument class for the MessageQueueListener.MessageReceived unicast event
    /// </summary>
    public class MessageReceivedEventArgs : EventArgs
    {
        private Message receivedMessage;

        /// <summary>
        /// Constructor
        /// </summary>
        /// <param name="receivedMessage">A message</param>
        public MessageReceivedEventArgs( Message receivedMessage )
        {
           this.receivedMessage = receivedMessage;
        }

        /// <summary>
        /// Received message
        /// </summary>
        public Message ReceivedMessage
        {
            get
            {
                return receivedMessage;
            }
        }
    }

    /// <summary>
    /// Event argument class for the MessageQueueListener.ProcessingError event
    /// </summary>
    public class ProcessingErrorEventArgs : EventArgs
    {
        /// <summary>
        ///
        /// </summary>
        private Exception exception;

        /// <summary>
        /// Constructor
        /// </summary>
        /// <param name="exception">An exception</param>
        public ProcessingErrorEventArgs( Exception exception )
        {
            this.exception = exception;
        }

        /// <summary>
        /// Intercepted exception
        /// </summary>
        public Exception Exception
        {
            get
            {
                return exception;
            }
        }
    }

    /// <summary>
    /// Delegate for the MessageQueueListener.MessageReceived unicast event
    /// </summary>
    public delegate void MessageReceivedEventHandler( object sender, MessageReceivedEventArgs ea );
    /// <summary>
    /// Delegate for the MessageQueueListener.ProcessingError event
    /// </summary>
    public delegate void ProcessingErrorEventHandler( object sender, ProcessingErrorEventArgs ea );
   
    /// <summary>
    /// This class implements a multi-threaded message queue listener
    /// </summary>
    public class MessageQueueListener
    {
        protected Guid instanceId = Guid.NewGuid();
        protected Thread[] listenerThreads;
        protected string queueName=null;
        protected int numThreads=1;
        protected ThreadPriority priority=ThreadPriority.Normal;
        protected ManualResetEvent continueInterrupt = new ManualResetEvent(true);
        protected ManualResetEvent pauseInterrupt = new ManualResetEvent(false);
        protected ManualResetEvent stopInterrupt = new ManualResetEvent(false);
        protected int peekTimeout = 5;
        protected MessageReceivedEventHandler messageReceived;
        protected int refCount = 0;
       
        /// <summary>
        /// Initializes a new instance of the MessageQueueListener class
        /// </summary>
        /// <param name="queueName">Message name where listener waits messages</param>
        public MessageQueueListener(string queueName)
        {
            queueName = queueName;
            Initialize();
        }

        /// <summary>
        /// Initializes a new instance of the MessageQueueListener class
        /// </summary>
        /// <param name="queueName">Name of the queue to listen on</param>
        /// <param name="numThreads">Number of threads to run the listener on</param>
        public MessageQueueListener(string queueName, int numThreads)
        {
            queueName = queueName;
            numThreads = numThreads;
            Initialize();
        }


        /// <summary>
        /// Initializes a new instance of the MessageQueueListener class
        /// </summary>
        /// <param name="queueName">Name of the queue to listen on</param>
        /// <param name="numThreads">Number of threads run the listener on</param>
        /// <param name="priority">Thread priority</param>
        public MessageQueueListener(string queueName, int numThreads, ThreadPriority priority)
        {
            queueName = queueName;
            numThreads = numThreads;
            priority = priority;
            Initialize();
        }


        /// <summary>
        /// Adds a reference to the reference counter.
        /// </summary>
        /// <returns>Current reference count</returns>
        public int AddReference()
        {
            return ++ refCount;
        }

        /// <summary>
        /// Releases a reference from the reference counter.
        /// </summary>
        /// <returns>Current reference count</returns>
        /// <remarks>If the reference counter drops to or below zero, the listener is stopped.</remarks>
        public int ReleaseReference()
        {
            if ( -- refCount <= 0 )
            {
                Stop();
            }
            return refCount;
        }

        /// <summary>
        /// Initializes the MessageQueueListener
        /// </summary>
        protected virtual void Initialize()
        {
            listenerThreads = new Thread[ numThreads];
        }

        /// <summary>
        /// Unicast event that is raised when a message has been received
        /// </summary>
        public MessageReceivedEventHandler MessageReceived
        {
            get
            {
                return messageReceived;
            }
            set
            {
                if ( messageReceived == value)
                    return;
                 messageReceived = value;
            }
        }

        /// <summary>
        /// Multicast event that is raised when an exception occurs during processing.
        /// </summary>
        public event ProcessingErrorEventHandler ProcessingError;

        /// <summary>
        /// Raises the ProcessingError event
        /// </summary>
        /// <param name="pea"></param>
        protected void OnProcessingError( ProcessingErrorEventArgs pea )
        {
            if (ProcessingError != null )
            {
                ProcessingError( this, pea );
            }
        }

        /// <summary>
        /// Starts the message queue listener. If the threads are already running,
        /// Start() does nothing.
        /// </summary>
        public void Start()
        {
            for(int i=0;i< numThreads;i++)
            {
                if ( listenerThreads[i] == null )
                {
                    Thread thread = new Thread(new ThreadStart(this.ReceiveLoop));
                    thread.Name = String.Format("{0}:{1}:{2}",i, instanceId,GetType().Name);
                    thread.IsBackground = true;
                    thread.Priority = priority;
                    listenerThreads[i] = thread;
                    thread.Start();
                }
            }
        }

        /// <summary>
        /// Set listener state in stop
        /// </summary>
        public void Stop()
        {
            Resume();
            stopInterrupt.Set();
            for(int i=0;i< numThreads;i++)
            {
                listenerThreads[i].Join();
                listenerThreads[i] = null;
            }
        }

        /// <summary>
        /// Set listener state to "paused".
        /// </summary>
        /// <remarks>
        /// Setting the listener ito the "paused" state will not suspend processing
        /// immediately. All messages that are currently being processed will be
        /// processed, but no new messages are being dequeued.
        /// </remarks>
        public void Pause()
        {
            continueInterrupt.Reset();
            pauseInterrupt.Set();                            
        }

        /// <summary>
        /// Set listener state to "resume". This will resume the listener from
        /// the "paused" state.
        /// </summary>
        public void Resume()
        {
            pauseInterrupt.Reset();
            continueInterrupt.Set();           
        }

        /// <summary>
        /// Gets a value indicating whether the listener is paused.
        /// </summary>
        public bool IsPaused
        {
            get
            {
                return pauseInterrupt.WaitOne(TimeSpan.Zero,false);
            }
        }

        /// <summary>
        /// Gets a value indicating whether the listener is paused.
        /// </summary>
        public bool IsStopping
        {
            get
            {
                return stopInterrupt.WaitOne(TimeSpan.Zero,false);
            }
        }


        /// <summary>
        /// This is the thread entry point for the main receive loop.
        /// </summary>
        protected virtual void ReceiveLoop()
        {
            using (MessageQueue sourceQueue = new MessageQueue( queueName, false ))
            {
                bool isTransactional = sourceQueue.Transactional;
                sourceQueue.MessageReadPropertyFilter.SetAll();

                while ( !IsStopping )
                {
                    // if the continue interrupt is not set, we will hang here
                    // until it is set again (resuming)
                    WaitHandle.WaitAny(new WaitHandle[]{ continueInterrupt, stopInterrupt});

                    try
                    {
                        //peek on the queue and wait for a message becoming available
                        IAsyncResult asynchResult = sourceQueue.BeginPeek(TimeSpan.FromSeconds( peekTimeout),null,null);
                        int firedWaitHandle = WaitHandle.WaitAny(new WaitHandle[]{asynchResult.AsyncWaitHandle, pauseInterrupt, stopInterrupt});
                        if ( firedWaitHandle == 0 )
                        {
                            sourceQueue.EndPeek(asynchResult);
                            Message receivedMessage = sourceQueue.Receive( TimeSpan.FromSeconds(0),
                                isTransactional?MessageQueueTransactionType.Single:MessageQueueTransactionType.None );
                          
                            try
                            {
                                messageReceived(this,new MessageReceivedEventArgs(receivedMessage));                       
                            }
                            finally
                            {
                                receivedMessage.Dispose();
                            }
                        }
                        else if ( firedWaitHandle == 1 )
                        {
                            // Pausing. Clean up the asynchronous Peek operation
                            // (we'll assume that we have time for that) and then go
                            // back to the top of the loop where we'll hang on the
                            // continueInterrupt.
                            sourceQueue.EndPeek(asynchResult);
                        }
                        else
                        {
                            // stopping. Skip the EndPeek() and exit the thread immediately.
                            return;
                        }
                    }
                    catch(MessageQueueException ex)
                    {
                        if ( ex.MessageQueueErrorCode != MessageQueueErrorCode.IOTimeout  )
                        {
                            break;
                        }
                    }
                    catch( Exception ex )
                    {
                        // absorb all other exceptions and surface them from he listener
                        // using the ProcessingError event.
                        OnProcessingError( new ProcessingErrorEventArgs(ex));
                    }
                }
            }
        }
    }
}

 

Updated:

Leave a Comment