using System; using System.Collections; using System.Messaging; using System.Threading;
namespace newtelligence.EnterpriseTools.Msmq { /// <summary> /// Event argument class for the MessageQueueListener.MessageReceived unicast event /// </summary> public class MessageReceivedEventArgs : EventArgs { private Message receivedMessage;
/// <summary> /// Constructor /// </summary> /// <param name="receivedMessage">A message</param> public MessageReceivedEventArgs( Message receivedMessage ) { this.receivedMessage = receivedMessage; }
/// <summary> /// Received message /// </summary> public Message ReceivedMessage { get { return receivedMessage; } } }
/// <summary> /// Event argument class for the MessageQueueListener.ProcessingError event /// </summary> public class ProcessingErrorEventArgs : EventArgs { /// <summary> /// /// </summary> private Exception exception;
/// <summary> /// Constructor /// </summary> /// <param name="exception">An exception</param> public ProcessingErrorEventArgs( Exception exception ) { this.exception = exception; }
/// <summary> /// Intercepted exception /// </summary> public Exception Exception { get { return exception; } } }
/// <summary> /// Delegate for the MessageQueueListener.MessageReceived unicast event /// </summary> public delegate void MessageReceivedEventHandler( object sender, MessageReceivedEventArgs ea ); /// <summary> /// Delegate for the MessageQueueListener.ProcessingError event /// </summary> public delegate void ProcessingErrorEventHandler( object sender, ProcessingErrorEventArgs ea ); /// <summary> /// This class implements a multi-threaded message queue listener /// </summary> public class MessageQueueListener { protected Guid instanceId = Guid.NewGuid(); protected Thread[] listenerThreads; protected string queueName=null; protected int numThreads=1; protected ThreadPriority priority=ThreadPriority.Normal; protected ManualResetEvent continueInterrupt = new ManualResetEvent(true); protected ManualResetEvent pauseInterrupt = new ManualResetEvent(false); protected ManualResetEvent stopInterrupt = new ManualResetEvent(false); protected int peekTimeout = 5; protected MessageReceivedEventHandler messageReceived; protected int refCount = 0; /// <summary> /// Initializes a new instance of the MessageQueueListener class /// </summary> /// <param name="queueName">Message name where listener waits messages</param> public MessageQueueListener(string queueName) { queueName = queueName; Initialize(); }
/// <summary> /// Initializes a new instance of the MessageQueueListener class /// </summary> /// <param name="queueName">Name of the queue to listen on</param> /// <param name="numThreads">Number of threads to run the listener on</param> public MessageQueueListener(string queueName, int numThreads) { queueName = queueName; numThreads = numThreads; Initialize(); }
/// <summary> /// Initializes a new instance of the MessageQueueListener class /// </summary> /// <param name="queueName">Name of the queue to listen on</param> /// <param name="numThreads">Number of threads run the listener on</param> /// <param name="priority">Thread priority</param> public MessageQueueListener(string queueName, int numThreads, ThreadPriority priority) { queueName = queueName; numThreads = numThreads; priority = priority; Initialize(); }
/// <summary> /// Adds a reference to the reference counter. /// </summary> /// <returns>Current reference count</returns> public int AddReference() { return ++ refCount; }
/// <summary> /// Releases a reference from the reference counter. /// </summary> /// <returns>Current reference count</returns> /// <remarks>If the reference counter drops to or below zero, the listener is stopped.</remarks> public int ReleaseReference() { if ( -- refCount <= 0 ) { Stop(); } return refCount; }
/// <summary> /// Initializes the MessageQueueListener /// </summary> protected virtual void Initialize() { listenerThreads = new Thread[ numThreads]; }
/// <summary> /// Unicast event that is raised when a message has been received /// </summary> public MessageReceivedEventHandler MessageReceived { get { return messageReceived; } set { if ( messageReceived == value) return; messageReceived = value; } }
/// <summary> /// Multicast event that is raised when an exception occurs during processing. /// </summary> public event ProcessingErrorEventHandler ProcessingError;
/// <summary> /// Raises the ProcessingError event /// </summary> /// <param name="pea"></param> protected void OnProcessingError( ProcessingErrorEventArgs pea ) { if (ProcessingError != null ) { ProcessingError( this, pea ); } }
/// <summary> /// Starts the message queue listener. If the threads are already running, /// Start() does nothing. /// </summary> public void Start() { for(int i=0;i< numThreads;i++) { if ( listenerThreads[i] == null ) { Thread thread = new Thread(new ThreadStart(this.ReceiveLoop)); thread.Name = String.Format("{0}:{1}:{2}",i, instanceId,GetType().Name); thread.IsBackground = true; thread.Priority = priority; listenerThreads[i] = thread; thread.Start(); } } }
/// <summary> /// Set listener state in stop /// </summary> public void Stop() { Resume(); stopInterrupt.Set(); for(int i=0;i< numThreads;i++) { listenerThreads[i].Join(); listenerThreads[i] = null; } }
/// <summary> /// Set listener state to "paused". /// </summary> /// <remarks> /// Setting the listener ito the "paused" state will not suspend processing /// immediately. All messages that are currently being processed will be /// processed, but no new messages are being dequeued. /// </remarks> public void Pause() { continueInterrupt.Reset(); pauseInterrupt.Set(); }
/// <summary> /// Set listener state to "resume". This will resume the listener from /// the "paused" state. /// </summary> public void Resume() { pauseInterrupt.Reset(); continueInterrupt.Set(); }
/// <summary> /// Gets a value indicating whether the listener is paused. /// </summary> public bool IsPaused { get { return pauseInterrupt.WaitOne(TimeSpan.Zero,false); } }
/// <summary> /// Gets a value indicating whether the listener is paused. /// </summary> public bool IsStopping { get { return stopInterrupt.WaitOne(TimeSpan.Zero,false); } }
/// <summary> /// This is the thread entry point for the main receive loop. /// </summary> protected virtual void ReceiveLoop() { using (MessageQueue sourceQueue = new MessageQueue( queueName, false )) { bool isTransactional = sourceQueue.Transactional; sourceQueue.MessageReadPropertyFilter.SetAll();
while ( !IsStopping ) { // if the continue interrupt is not set, we will hang here // until it is set again (resuming) WaitHandle.WaitAny(new WaitHandle[]{ continueInterrupt, stopInterrupt});
try { //peek on the queue and wait for a message becoming available IAsyncResult asynchResult = sourceQueue.BeginPeek(TimeSpan.FromSeconds( peekTimeout),null,null); int firedWaitHandle = WaitHandle.WaitAny(new WaitHandle[]{asynchResult.AsyncWaitHandle, pauseInterrupt, stopInterrupt}); if ( firedWaitHandle == 0 ) { sourceQueue.EndPeek(asynchResult); Message receivedMessage = sourceQueue.Receive( TimeSpan.FromSeconds(0), isTransactional?MessageQueueTransactionType.Single:MessageQueueTransactionType.None ); try { messageReceived(this,new MessageReceivedEventArgs(receivedMessage)); } finally { receivedMessage.Dispose(); } } else if ( firedWaitHandle == 1 ) { // Pausing. Clean up the asynchronous Peek operation // (we'll assume that we have time for that) and then go // back to the top of the loop where we'll hang on the // continueInterrupt. sourceQueue.EndPeek(asynchResult); } else { // stopping. Skip the EndPeek() and exit the thread immediately. return; } } catch(MessageQueueException ex) { if ( ex.MessageQueueErrorCode != MessageQueueErrorCode.IOTimeout ) { break; } } catch( Exception ex ) { // absorb all other exceptions and surface them from he listener // using the ProcessingError event. OnProcessingError( new ProcessingErrorEventArgs(ex)); } } } } } } |