This session is a followup to the Service Bus session that I did at the build conference and explains advanced usage patterns:

Categories: .NET Services | AppFabric | Azure | Talks | MSMQ | Web Services

For programmers writing distributed systems and are not using queues in them just yet. If you are a message-oriented middleware veteran - move along ;-)

Categories: Technology | MSMQ

March 29, 2007
@ 08:02 AM

A bad sign for how much I’m coding these days is that I had a HDD crash three weeks ago and only restored Visual Studio into fully working condition with all my tools and stuff today. I’ve decided that that has to change otherwise I’ll get really rusty.

Picking up the thread from “Professor Indigo” Nicholas Allen, I’ve built a little program that illustrates an alternate handling strategy for poisonous messages that WCF throws into the poison queue on Vista and Longhorn Server if you ask it to (ReceiveErrorHandling.Move). The one we’re showing in the docs is implementing a local resolution strategy that’s being fired within the service when the service ends up faulting; that’s the strategy for ReceiveErrorHandling.Fault and works for MSMQ 3.0. The strategy I’m showing here requires our latest OS wave.

When a message arrives at a WCF endpoint through a queue, WCF will – if the queue is transactional – open a transaction and de-queue the message. It will then try to dispatch it to the target service and operation. Assuming the dispatch works, the operation gets invoked and – might – tank. If it does, an exception is raised, thrown back into the WCF stack and the transaction aborts. Happily, WCF grabs the next message from the queue – which happens to be the one that just caused the failure due to the rollback – and the operation – might – tank again.

Now, the reasons why the operation might fail are as numerous as the combinations of program statement combinations that you could put there. Anything could happen. The program is completely broken, the input data causes the app to go to that branch that nobody ever cared to test – or apparently not enough, the backend database is permanently offline, the machine is having an extremely bad hardware day, power fails, you name it.

So what if the application just keeps choking and throwing on that particular message? With either of the aforementioned error handling modes, WCF is going to take the message out of the loop when its patience with the patient is exhausted. With the ReceiveErrorHandling.Fault option, WCF will raise an error event that can be caught and processed with a handler. When you use ReceiveErrorHandling.Move things are a bit more flexible, because the message causing all that trouble now sits in a queue again.

The headache-causing problem with poison messages is that you really, really need to do something about them. From the sender’s perspective, the message has been delivered and it puts its trust into the receiver to do the right thing. “Here’s that $1,000,000 purchase order! I’m done, go party!”. If the receiving service goes into the bug-induced loop of recurring death, you’ve got two problems: You have a nasty bug that’s probably difficult to repro since it happens under stress, and you’ve got a $1,000,000 purchase order unhappily sitting in a dark hole. Guess what your great-grand-boss’ boss cares more about.

The second, technically slightly more headache-causing problem with poison messages (if that’s possible to imagine) is that they just sit there with all the gold and diamonds that they might represent, but they are effectively just a bunch of (if you’re lucky) XML goo. Telling a system operator to go and check the poison message queues or to surface their contents to him/her and look what’s going on there is probably not a winning strategy.

So what to do? Your high-throughput automated-processing solution that does the regular business behind the queue has left the building for lunch. That much is clear. How do you hook in some alternate processing path that does at least surface the problem to an operator or “information worker”– or even a call center agent pool – in a legible and intelligible fashion so that a human can look at the problem and try finding a fix? In the end, we’ve got the best processing unit for non-deterministic and unexpected events sitting  between our shoulders, one would hope. How about writing a slightly less automated service alternative that’s easy to adjust and try to get the issue surfaced to someone or just try multiple things [Did someone just say “Workflow”?] – and hook that straight up to where all the bad stuff lands: the poison queue.

Here’s the code. I just coded that up for illustrative purposes and hence there’s absolutely room for improvement. I’m going to put the project files up on wcf.netfx3.com and will update this post with the link. We’ll start with the boilerplate stuff and the “regular” service:

using System;
using System.Collections.Generic;
using System.Text;
using System.ServiceModel.Channels;
using System.ServiceModel;
using System.Runtime.Serialization;
using System.ServiceModel.Description;
using System.Workflow.Runtime;
using ServerErrorHandlingWorkflow;
using ServerData;

namespace Server
{
    [
ServiceContract(Namespace=Program.ServiceNamespaceURI)]
   
interface IApplicationContract
    {
        [
OperationContract(IsOneWay=true)]
       
void SubmitData(ApplicationData data);
    }


    [
ServiceBehavior(TransactionAutoCompleteOnSessionClose=true,
                     ReleaseServiceInstanceOnTransactionComplete=
true)]
   
class ApplicationService : IApplicationContract
    {
        [
OperationBehavior(TransactionAutoComplete=true,TransactionScopeRequired=true),
         System.Diagnostics.
DebuggerStepThrough]
       
public void SubmitData(ApplicationData data)
        {
           
throw new Exception("The method or operation is not implemented.");
        }
    }

Not much excitement here except that the highlighted line will always cause the service to tank. In real life, the path to that particular place where the service consistently finds its way into a trouble-spot is more convoluted and may involve a few thousand lines, but this is a good approximation for what happens when you hit a poison message. Stuff keeps failing.

The next snippet is our alternate service. Instead of boldly trying to do complex processing, it simply punts the message data to a Workflow. That’s assuming that the message isn’t completely messed up to begin with and can indeed be de-serialized. To mitigate that scenario we could also use a one-way universal contract and be even more careful. The key difference between this and the “regular” service is that the alternate service turns off the WCF address filter check. We’ll get back to that. 


    [ServiceBehavior(AddressFilterMode = AddressFilterMode.Any)]
    class ApplicationErrorService : IApplicationContract
    {
       
public void SubmitData(ApplicationData data)
        {
           
Dictionary<string,object> workflowArgs = new Dictionary<string,object>();
            workflowArgs.Add(
"ApplicationData",data);
           
WorkflowInstance workflowInstance =
               
Program.WorkflowRuntime.CreateWorkflow(
                         
typeof(ErrorHandlingWorkflow),
                          workflowArgs);
            workflowInstance.Start();
        }
    }

So now we’ve got the fully automated middle-of-the-road default service and our “what do we do next” alternate service. Let’s hook them up.

    class Program
    {
       
public const string ServiceNamespaceURI =
               
"http://samples.microsoft.com/2007/03/WCF/PoisonHandling/Service";
       
public static WorkflowRuntime WorkflowRuntime = new WorkflowRuntime();

       
static void Main(string[] args)
        {
           
string msmqQueueName = Properties.Settings.Default.QueueName;
           
string msmqPoisonQueueName = msmqQueueName+";poison";
           
string netMsmqQueueName =
                
"net.msmq://" + msmqQueueName.Replace('\\', '/').Replace("$","");
           
string netMsmqPoisonQueueName = netMsmqQueueName+";poison";
           
           
if (!System.Messaging.MessageQueue.Exists(msmqQueueName))
            {
                System.Messaging.
MessageQueue.Create(msmqQueueName, true);
            }

First – and for this little demo only – we’re setting up a local queue and do a little stringsmithing to get the app.config stored MSMQ format queue name into the net.msmq URI format. Next …

            ServiceHost applicationServiceHost = new ServiceHost(typeof(ApplicationService));
           
NetMsmqBinding queueBinding = new NetMsmqBinding(NetMsmqSecurityMode.None);
            queueBinding.ReceiveErrorHandling =
ReceiveErrorHandling.Move;
            queueBinding.ReceiveRetryCount = 1;
            queueBinding.RetryCycleDelay =
TimeSpan.FromSeconds(1);
            applicationServiceHost.AddServiceEndpoint(
typeof(IApplicationContract),
                                                      queueBinding,
                                                      netMsmqQueueName);

Now we’ve bound the “regular” application service to the queue. I’m setting the binding parameters (look them up at your leisure) in a way that we’re failing very fast here. By default, the RetryCycleDelay is set to 30 minutes, which means that WCF is giving you a reasonable chance to fix temporary issues while stuff hangs out in the retry queue. Now for the poison handler service:

      
           
ServiceHost poisonHandlerServiceHost = new ServiceHost(typeof(ApplicationErrorService));
           
NetMsmqBinding poisonBinding = new NetMsmqBinding(NetMsmqSecurityMode.None);
            poisonBinding.ReceiveErrorHandling =
ReceiveErrorHandling.Drop;
            poisonHandlerServiceHost.AddServiceEndpoint(
typeof(IApplicationContract),
                                                        poisonBinding,
                                                        netMsmqPoisonQueueName);

Looks almost the same, hmm? The trick here is that we’re pointing this one to the poison queue into which the regular service drops all the stuff that it can’t deal with. Otherwise it’s (almost) just a normal service. The key difference between the ApplicationErrorService service and its sibling is that the poison-message handler service implementation is decorated with [ServiceBehavior(AddressFilterMode = AddressFilterMode.Any)].Since the original message was sent to the a different (the original) queue and we’re now looking at a sub-queue that has a different name and therefore a different WS-Addressing:To identity, WCF would normally reject processing that message. With this behavior setting we can tell WCF to ignore that and have the service treat the message as if it landed at the right place – which is what we want.

And now for the unspectacular run-it and drop-a-message-into-queue finale:

            applicationServiceHost.Open();
            poisonHandlerServiceHost.Open();
           
           
Console.WriteLine("Application running");

           
ChannelFactory<IApplicationContract> client =
              
new ChannelFactory<IApplicationContract>(queueBinding,
                                                        netMsmqQueueName);
           
IApplicationContract channel = client.CreateChannel();
            
ApplicationData data = new ApplicationData();
            data.FirstName =
"Clemens";
            data.LastName =
"Vasters";
            channel.SubmitData(data);    
            ((
IClientChannel)channel).Close();

           
Console.WriteLine("Press ENTER to exit");
       
           
Console.ReadLine();
        }
    }
}

The Workflow that’s hooked up to the poison handler in my particular sample project does nothing big. It’s got a property that is initialized with the data item and just has a code activity that spits out the message to the console. It could send an email, page an operator through messenger, etcetc. Whatever works.

Categories: MSMQ | WCF

I just got a comment from Oran about the lack of durable messaging in WCF and the need for a respective extensibility point. Well... the thing is: Durable messaging is there; use the MSMQ bindings. One of the obvious "problems" with durable messaging that's only based on WS-ReliableMessaging is that that spec (intentionally) does not make any assertions about the behavior of the respective endpoints.

There is no rule saying: "the received message MUST be written do disk". WS-ReliableMessaging is as reliable (and unreliable in case of very long-lasting network failures or an endpoint outright crashing) and plays the same role as TCP. The mapping is actually pretty straightforward like this: WS-Addressing = IP, WS-ReliableMessaging = TCP.

So if you do durable messaging on one end and the other end doesn't do it, the sum of the gained reliability doesn't add up to anything more than it was before. MSMQ is fully in control of both ends of the wire and makes assertions about the endpoint behavior and was therefore the logical choice for our durable messaging strategy in V1, because it already ships with Windows and there is (as of yet) no agreed interoperable set of behavioral assertions for WS-RM around how endpoints must deal with received messages except ACKing them.

See Shy's comments.

Categories: Indigo | MSMQ

December 11, 2004
@ 02:35 PM

The stack trace below (snapshot taken at a breakpoint in [WebMethod] "HelloWorld") shows that I am having quite a bit of programming fun these days. Server-side ASP.NET hooked up to a MSMQ listener.  

simpleservicerequestinweb.dll!SimpleServiceRequestInWeb.Hello.HelloWorld() Line 53 C#
system.web.services.dll!System.Web.Services.Protocols.LogicalMethodInfo.Invoke(System.Object target, System.Object[] values) + 0x92 bytes 
system.web.services.dll!System.Web.Services.Protocols.WebServiceHandler.Invoke() + 0x9e bytes 
system.web.services.dll!System.Web.Services.Protocols.WebServiceHandler.CoreProcessRequest() + 0x142 bytes 
system.web.services.dll!System.Web.Services.Protocols.SyncSessionlessHandler.ProcessRequest(System.Web.HttpContext context) + 0x6 bytes 
system.web.dll!CallHandlerExecutionStep.System.Web.HttpApplication+IExecutionStep.Execute() + 0xb4 bytes 
system.web.dll!System.Web.HttpApplication.ExecuteStep(System.Web.HttpApplication.IExecutionStep step, bool completedSynchronously) + 0x58 bytes 
system.web.dll!System.Web.HttpApplication.ResumeSteps(System.Exception error) + 0xfa bytes 
system.web.dll!System.Web.HttpApplication.System.Web.IHttpAsyncHandler.BeginProcessRequest(System.Web.HttpContext context, System.AsyncCallback cb, System.Object extraData) + 0xe3 bytes 
system.web.dll!System.Web.HttpRuntime.ProcessRequestInternal(System.Web.HttpWorkerRequest wr) + 0x1e7 bytes 
system.web.dll!System.Web.HttpRuntime.ProcessRequest(System.Web.HttpWorkerRequest wr) + 0xb0 bytes 
newtelligence.enterprisetools.dll!newtelligence.EnterpriseTools.Msmq.MessageQueueAsmxDispatcher.MessageReceived(System.Object sender = {newtelligence.EnterpriseTools.Msmq.MessageQueueListener}, newtelligence.EnterpriseTools.Msmq.MessageReceivedEventArgs ea = {newtelligence.EnterpriseTools.Msmq.MessageReceivedEventArgs}) Line 33 C#
newtelligence.enterprisetools.dll!newtelligence.EnterpriseTools.Msmq.MessageQueueListener.ReceiveLoop() Line 305 + 0x2b bytes C#

Categories: ASP.NET | MSMQ | Web Services

See Part 3

In Part 3, I illustrated a simple strategy to re-run a “repeatable” transaction using a simple “while” loop. A better and more fault resilient way of handling such cases (we are talking about transactions, after all) is to employ a transactional resource manager such as MSMQ from which to retrieve the transaction input. If the transaction fails due to a catastrophic failure (power outage or worse), an in-memory recovery strategy like the one shown in “RunTx()” will not get us very far. Instead, we will put the transaction input into a transactional queue, read it from the queue using a specialized message queue listener and equip that listener with some handling logic that is similar to what is shown in the “RunTx()” example.

Before I will show that, however, I will give you a generic, multi-threaded message queue listener that is not transaction aware (in the desired way) and which I will specialize in Part 5 (the last part) of this series. The reasons that I post the message listener logic in two steps are that (a) you might find a generic listener useful, (b) it is quite a bit of code and I don’t want the “essentials” of the recovery code to be buried amongst the basic queue listener framework, and (c) I am simply too lazy today to explain it all ;-)

Using this queue listener is pretty straightforward and should be rather self-explanatory. To listen, create an instance of the listener using one of the constructors and supply a “queueName” argument that is compatible with the System.Messaging.MessageQueue class. The queue must exist. The listener will bubble messages to the MessageReceived delegate (“unicast event”), which you must implement in your class and assign to the listener in order to get at the messages.

MessageQueueListener queueListener = new MessageQueueListener(sourceQueuePath, 4 );
queueListener.MessageReceived = new MessageReceivedEventHandler(queueListener MessageReceived);
queueListener.ProcessingError += new ProcessingErrorEventHandler(queueListener ProcessingError);
queueListener.Start();

And here are 344 lines of pure developer joy:

using System;
using System.Collections;
using System.Messaging;
using System.Threading;

namespace newtelligence.EnterpriseTools.Msmq
{
    /// <summary>
    /// Event argument class for the MessageQueueListener.MessageReceived unicast event
    /// </summary>
    public class MessageReceivedEventArgs : EventArgs
    {
        private Message receivedMessage;

        /// <summary>
        /// Constructor
        /// </summary>
        /// <param name="receivedMessage">A message</param>
        public MessageReceivedEventArgs( Message receivedMessage )
        {
           this.receivedMessage = receivedMessage;
        }

        /// <summary>
        /// Received message
        /// </summary>
        public Message ReceivedMessage
        {
            get
            {
                return receivedMessage;
            }
        }
    }

    /// <summary>
    /// Event argument class for the MessageQueueListener.ProcessingError event
    /// </summary>
    public class ProcessingErrorEventArgs : EventArgs
    {
        /// <summary>
        ///
        /// </summary>
        private Exception exception;

        /// <summary>
        /// Constructor
        /// </summary>
        /// <param name="exception">An exception</param>
        public ProcessingErrorEventArgs( Exception exception )
        {
            this.exception = exception;
        }

        /// <summary>
        /// Intercepted exception
        /// </summary>
        public Exception Exception
        {
            get
            {
                return exception;
            }
        }
    }

    /// <summary>
    /// Delegate for the MessageQueueListener.MessageReceived unicast event
    /// </summary>
    public delegate void MessageReceivedEventHandler( object sender, MessageReceivedEventArgs ea );
    /// <summary>
    /// Delegate for the MessageQueueListener.ProcessingError event
    /// </summary>
    public delegate void ProcessingErrorEventHandler( object sender, ProcessingErrorEventArgs ea );
   
    /// <summary>
    /// This class implements a multi-threaded message queue listener
    /// </summary>
    public class MessageQueueListener
    {
        protected Guid instanceId = Guid.NewGuid();
        protected Thread[] listenerThreads;
        protected string queueName=null;
        protected int numThreads=1;
        protected ThreadPriority priority=ThreadPriority.Normal;
        protected ManualResetEvent continueInterrupt = new ManualResetEvent(true);
        protected ManualResetEvent pauseInterrupt = new ManualResetEvent(false);
        protected ManualResetEvent stopInterrupt = new ManualResetEvent(false);
        protected int peekTimeout = 5;
        protected MessageReceivedEventHandler messageReceived;
        protected int refCount = 0;
       
        /// <summary>
        /// Initializes a new instance of the MessageQueueListener class
        /// </summary>
        /// <param name="queueName">Message name where listener waits messages</param>
        public MessageQueueListener(string queueName)
        {
            queueName = queueName;
            Initialize();
        }

        /// <summary>
        /// Initializes a new instance of the MessageQueueListener class
        /// </summary>
        /// <param name="queueName">Name of the queue to listen on</param>
        /// <param name="numThreads">Number of threads to run the listener on</param>
        public MessageQueueListener(string queueName, int numThreads)
        {
            queueName = queueName;
            numThreads = numThreads;
            Initialize();
        }


        /// <summary>
        /// Initializes a new instance of the MessageQueueListener class
        /// </summary>
        /// <param name="queueName">Name of the queue to listen on</param>
        /// <param name="numThreads">Number of threads run the listener on</param>
        /// <param name="priority">Thread priority</param>
        public MessageQueueListener(string queueName, int numThreads, ThreadPriority priority)
        {
            queueName = queueName;
            numThreads = numThreads;
            priority = priority;
            Initialize();
        }


        /// <summary>
        /// Adds a reference to the reference counter.
        /// </summary>
        /// <returns>Current reference count</returns>
        public int AddReference()
        {
            return ++ refCount;
        }

        /// <summary>
        /// Releases a reference from the reference counter.
        /// </summary>
        /// <returns>Current reference count</returns>
        /// <remarks>If the reference counter drops to or below zero, the listener is stopped.</remarks>
        public int ReleaseReference()
        {
            if ( -- refCount <= 0 )
            {
                Stop();
            }
            return refCount;
        }

        /// <summary>
        /// Initializes the MessageQueueListener
        /// </summary>
        protected virtual void Initialize()
        {
            listenerThreads = new Thread[ numThreads];
        }

        /// <summary>
        /// Unicast event that is raised when a message has been received
        /// </summary>
        public MessageReceivedEventHandler MessageReceived
        {
            get
            {
                return messageReceived;
            }
            set
            {
                if ( messageReceived == value)
                    return;
                 messageReceived = value;
            }
        }

        /// <summary>
        /// Multicast event that is raised when an exception occurs during processing.
        /// </summary>
        public event ProcessingErrorEventHandler ProcessingError;

        /// <summary>
        /// Raises the ProcessingError event
        /// </summary>
        /// <param name="pea"></param>
        protected void OnProcessingError( ProcessingErrorEventArgs pea )
        {
            if (ProcessingError != null )
            {
                ProcessingError( this, pea );
            }
        }

        /// <summary>
        /// Starts the message queue listener. If the threads are already running,
        /// Start() does nothing.
        /// </summary>
        public void Start()
        {
            for(int i=0;i< numThreads;i++)
            {
                if ( listenerThreads[i] == null )
                {
                    Thread thread = new Thread(new ThreadStart(this.ReceiveLoop));
                    thread.Name = String.Format("{0}:{1}:{2}",i, instanceId,GetType().Name);
                    thread.IsBackground = true;
                    thread.Priority = priority;
                    listenerThreads[i] = thread;
                    thread.Start();
                }
            }
        }

        /// <summary>
        /// Set listener state in stop
        /// </summary>
        public void Stop()
        {
            Resume();
            stopInterrupt.Set();
            for(int i=0;i< numThreads;i++)
            {
                listenerThreads[i].Join();
                listenerThreads[i] = null;
            }
        }

        /// <summary>
        /// Set listener state to "paused".
        /// </summary>
        /// <remarks>
        /// Setting the listener ito the "paused" state will not suspend processing
        /// immediately. All messages that are currently being processed will be
        /// processed, but no new messages are being dequeued.
        /// </remarks>
        public void Pause()
        {
            continueInterrupt.Reset();
            pauseInterrupt.Set();                            
        }

        /// <summary>
        /// Set listener state to "resume". This will resume the listener from
        /// the "paused" state.
        /// </summary>
        public void Resume()
        {
            pauseInterrupt.Reset();
            continueInterrupt.Set();           
        }

        /// <summary>
        /// Gets a value indicating whether the listener is paused.
        /// </summary>
        public bool IsPaused
        {
            get
            {
                return pauseInterrupt.WaitOne(TimeSpan.Zero,false);
            }
        }

        /// <summary>
        /// Gets a value indicating whether the listener is paused.
        /// </summary>
        public bool IsStopping
        {
            get
            {
                return stopInterrupt.WaitOne(TimeSpan.Zero,false);
            }
        }


        /// <summary>
        /// This is the thread entry point for the main receive loop.
        /// </summary>
        protected virtual void ReceiveLoop()
        {
            using (MessageQueue sourceQueue = new MessageQueue( queueName, false ))
            {
                bool isTransactional = sourceQueue.Transactional;
                sourceQueue.MessageReadPropertyFilter.SetAll();

                while ( !IsStopping )
                {
                    // if the continue interrupt is not set, we will hang here
                    // until it is set again (resuming)
                    WaitHandle.WaitAny(new WaitHandle[]{ continueInterrupt, stopInterrupt});

                    try
                    {
                        //peek on the queue and wait for a message becoming available
                        IAsyncResult asynchResult = sourceQueue.BeginPeek(TimeSpan.FromSeconds( peekTimeout),null,null);
                        int firedWaitHandle = WaitHandle.WaitAny(new WaitHandle[]{asynchResult.AsyncWaitHandle, pauseInterrupt, stopInterrupt});
                        if ( firedWaitHandle == 0 )
                        {
                            sourceQueue.EndPeek(asynchResult);
                            Message receivedMessage = sourceQueue.Receive( TimeSpan.FromSeconds(0),
                                isTransactional?MessageQueueTransactionType.Single:MessageQueueTransactionType.None );
                          
                            try
                            {
                                messageReceived(this,new MessageReceivedEventArgs(receivedMessage));                       
                            }
                            finally
                            {
                                receivedMessage.Dispose();
                            }
                        }
                        else if ( firedWaitHandle == 1 )
                        {
                            // Pausing. Clean up the asynchronous Peek operation
                            // (we'll assume that we have time for that) and then go
                            // back to the top of the loop where we'll hang on the
                            // continueInterrupt.
                            sourceQueue.EndPeek(asynchResult);
                        }
                        else
                        {
                            // stopping. Skip the EndPeek() and exit the thread immediately.
                            return;
                        }
                    }
                    catch(MessageQueueException ex)
                    {
                        if ( ex.MessageQueueErrorCode != MessageQueueErrorCode.IOTimeout  )
                        {
                            break;
                        }
                    }
                    catch( Exception ex )
                    {
                        // absorb all other exceptions and surface them from he listener
                        // using the ProcessingError event.
                        OnProcessingError( new ProcessingErrorEventArgs(ex));
                    }
                }
            }
        }
    }
}

 

Categories: MSMQ

The little series I am currently writing here on my blog has inspired me to write way too more code than actually necessary to get my point across ;-) So by now I've got my own MSMQ transport for WSE 2.0 (yes, I know that others have written that already, but I am shooting for a "enterprise strength" implementation), a WebRequest/WebResponse pair to smuggle under arbitrary ASMX proxies and I am more than halfway done with a server-side host for MSMQ-to-ASMX (spelled out: ASP.NET Web Services).

What bugs me is that WSE 2.0's messaging model is "asynchronous only" and that it always performs a push/pull translation and that there is no way to push a message through to a service on the receiving thread. Whenever I grab a message from the queue and put it into my SoapTransport's "Dispatch()" method, the message gets queued up in an in-memory queue and that is then, on a concurrent thread, pulled (OnReceiveComplete) by the SoapReceivers collection and submitted into ProcessMessage() of the SoapReceiver (like any SoapService derived implementation) matching the target endpoint. So while I can dequeue from MSMQ within a transaction scope (ServiceDomain), that transaction scope doesn't make it across onto the thread that will actually execute the action inside the SoapReceiver/SoapService.

So now I am sitting here, contemplating and trying to figure out a workaround that doesn't require me to rewrite a big chunk of WSE 2.0 (which I am totally not shy of if that is what it takes). Transaction marshaling, thread synchronization, ah, I love puzzles. Once I am know how to solve this and have made the adjustments, I'll post the queue listener I promised to wrap up the series. The other code I've written in the process will likely surface in some other way.

See Part 2

Now that we have a way to detect deadlocks and similar error conditions we might encounter during a transaction, we need to find a mechanism that will allow us to back out of the doomed transaction and retry the failed operation. The good thing about transaction failures of that sort is that you can essentially retry as often as you want without doing anything wrong – as long as all the resources you are reading from and writing to are guarded by transactional resource managers that are enlisted in the transaction. And that’s the hint at the trick we need to use.

If we were executing the code below through a remote call, it might be rather difficult to recover from a failure. The local transaction inside the component will roll back – ok – but that punts the problem back out to the caller, who will have to implement the appropriate logic to single out transaction failure causes that can be fixed by a retry and to actually resubmit the request. That can be rather tricky.

[Transaction(TransactionOption.Required)]
public class DoesStuff : ServicedComponent
{
    /* things omitted */
 
    [AutoComplete]
    public void DoStuff( string argument )
    {
        try
        {
           dbConnection.Open();
           sprocUpdateAndQueryStuff.Parameters["@StuffArgument"].Value = argument;
           result = this.GetResultFromReader( sprocUpdateAndQueryStuff.ExecuteReader() );
        }
        catch( Exception exception )
        {
            throw RepeatableOperationExceptionMapper.MapException( exception );                           
        }
        finally
        {
            dbConnection.Close();
        }
    }
}

Even worse, we might not even learn about a failure depending on when and why the transaction fails. In distributed transaction scenarios, it is absolutely possible (even though rare) that a transaction fails long after the application code is done with all of its processing. Let’s assume everything in the call to DoStuff() above works just fine. The code calls a stored procedure on a remote SQL Server, SQL Server is completely happy and so is the component. Hence, neither has any good reason to complain. Let’s further assume there is a component DoesOtherStuff just like the one above and that component is subsequently called by the remote client and updates some other SQL Server database from within the same distributed transaction. That SQL Server and that component are both happy as well. Everybody is happy. The client commits the transaction, maybe by calling ServiceDomain.Leave() or leaving from a method of a ServicedComponent that serves as the transaction root. Now DTC (the transaction coordinator) wants to go around and ask everyone for their opinion about the transaction outcome. At this point, all user code you’ve contributed to this story is done executing. And now (we are talking about transactions, so this is where the fun is) disaster strikes and one of the SQL Servers you were talking to suddenly gets cut off from the network in some way. Hardware issue. Crash. Something. If you are lucky, the problem occurs while DTC is still walking around asking for votes (Phase 1). In this case, the transaction will just abort. If you are horribly unlucky, disaster strikes while DTC is already issuing commit commands (Phase 2). In that case, the transaction may (and will) just hang forever until either the network connection is restored or some operator throws in the hammer and resolves the transaction manually (and may deliberately cause inconsistency). Since all this might happen asynchronously, none of your code might ever end up knowing that the transaction failed and the (transient) transaction results just end up being quietly rolled back in the background.

Before you get all nervous about the asynchronous case, let’s look at a strategy to deal with the scenario from the client side, assuming that the transaction is resolved synchronously. We will take into account that there are transaction failures that can be fixed by rerunning the transaction.

public void RunTx(string argument)
{
    TransactionStatus status;
    bool done = false;

    while( !done )
    {
        ServiceConfig cfg = new ServiceConfig();
        cfg.Transaction = TransactionOption.Required;
        ServiceDomain.Enter(cfg);

        try
        {
            DoesStuff doesStuff = new DoesStuff();
            doesStuff.DoStuff(argument);

            ContextUtil.SetComplete();
            done = true;
        }
        catch (RepeatableOperationException re)
        {
            ContextUtil.SetAbort();
        }
        catch
        {
            ContextUtil.SetAbort();
            throw;
        }
        finally
        {
            status = ServiceDomain.Leave();
        }
    }
    return status;
}

If a component throws an exception indicating that running the transaction again might yield a success, we’ll catch that condition, abort the local transaction and since we don’t set done to true, we will get back to the top of the loop and rerun the transaction. If we find another exception, we will abort the transaction and re-throw the exception we caught. The loop might look a bit awkward, but it does the job of re-running the transaction whenever that’s desired.  So all you need to do is to use that sort of a loop everywhere in your code and you are all set. Well … almost.

Assume that the argument we pass into RunTx() is something as complex and valuable (!) as a purchase order that you just got into your system. A purchase order is as good as cash. The longer you keep that purchase order around in memory, the greater is the risk that your local process (or machine) will fail catastrophically for some arbitrary reason. So if you need to re-run the transaction one or two times because the backend system is struggling (again, this all technology for the worst imaginable case) you are making yourself increasingly vulnerable to a local failure until the transaction finally succeeds. To reduce that risk, it therefore makes sense to add another step here.

Before we even start the transaction that does the processing, we grab the input data and guard it using a transactional resource manager – such as MSMQ. If we throw the input data into a private, transactional MSMQ queue as the first step and before we do anything else, the data is safely guarded on disk. If the machine fails once the data is in the queue, the data – encapsulated in a message – will still be available once the machine comes back up. Using a message queue listener, the messages in the queue are then each read from within individual transactions and the data is submitted into the processing component. Because the messages are simply put back into the queue if the transaction fails, re-running the transaction is practically automatic once the message queue listener restarts. Which leaves a problem: There are failing transactions that we want to repeat and there are failing transactions of which we know that they will not succeed no matter how often we try them. If a transactional component throws an exception because it has a bug, the transaction will abort and no number of re-runs will have that transaction succeed. Messages (data) that cause this sort of behavior are “poisonous” and we need to sort them out. Stay tuned; the message queue listener that takes care of all of that is up next.

Categories: MSMQ

See Part 1

Before we can do anything about deadlocks or deal with similar troubles, we first need to be able to tell that we indeed have a deadlock situation. Finding this out is a matter of knowing the respective error codes that your database gives you and a mechanism to bubble that information up to some code that will handle the situation. So before we can think about and write the handling logic for failed/failing but safely repeatable transactions, we need to build a few little things. The first thing we’ll need is an exception class that will wrap the original exception indicating the reason for the transaction failure. The new exception class’s identity will later serve to filter out exceptions in a “catch” statement and take the appropriate actions.

using System;
using System.Runtime.Serialization;

namespace newtelligence.EnterpriseTools.Data
{
   [Serializable]
   public class RepeatableOperationException : Exception
   {
       public RepeatableOperationException():base()
       {
       }

       public RepeatableOperationException(Exception innerException)
           :base(null,innerException)
       {
       }

       public RepeatableOperationException(string message, Exception innerException)
           :base(message,innerException)
       {
       }

       public RepeatableOperationException(string message):base(message)
       {
       }

        public RepeatableOperationException(
          SerializationInfo serializationInfo,
          StreamingContext streamingContext)
            :base(serializationInfo,streamingContext)
        {
        }

        public override void GetObjectData(
           System.Runtime.Serialization.SerializationInfo info,
           System.Runtime.Serialization.StreamingContext context)
        {
            base.GetObjectData (info, context);
        }
   }
}

Having an exception wrapper with the desired semantics, we know need to be able to figure out when to replace the original exception with this wrapper and re-throw it up on the call stack. The idea is that whenever you execute a database operation – or, more generally, any operation that might be repeatable on failure – you will catch the resulting exception and run it through a factory, which will analyze the exception and wrap it with the RepeatableOperationException if the issue at hand can be resolved by re-running the transaction. The (still a little naïve) code below illustrates how to such a factory in the application code. Later we will flesh out the catch block a little more, since we will lose the original call stack if we end up re-throwing the original exception like shown here:

Try
{
   dbConnection.Open();
   sprocUpdateAndQueryStuff.Parameters["@StuffArgument"].Value = argument;
   result = this.GetResultFromReader( sprocUpdateAndQueryStuff.ExecuteReader() );
}
catch( Exception exception )
{
   throw RepeatableOperationExceptionMapper.MapException( exception );                           
}
finally
{
   dbConnection.Close();
}

The factory class itself is rather simple in structure, but a bit tricky to put together, because you have to know the right error codes for all resource managers you will ever run into. In the example below I put in what I believe to be the appropriate codes for SQL Server and Oracle (corrections are welcome) and left the ODBC and OLE DB factories (for which would have to inspect the driver type and the respective driver-specific error codes) blank. The factory will check out the exception data type and delegate mapping to a private method that is specialized for a specific managed provider.

using System;
using System.Data.SqlClient;
using System.Data.OleDb;
using System.Data.Odbc;
using System.Data.OracleClient;

namespace newtelligence.EnterpriseTools.Data
{
   public class RepeatableOperationExceptionMapper
   {
        /// <summary>
        /// Maps the exception to a Repeatable exception, if the error code
        /// indicates that the transaction is repeatable.
        /// </summary>
        /// <param name="sqlException"></param>
        /// <returns></returns>
        private static Exception MapSqlException( SqlException sqlException )
        {
            switch ( sqlException.Number )
            {
                case -2: /* Client Timeout */
                case 701: /* Out of Memory */
                case 1204: /* Lock Issue */
                case 1205: /* Deadlock Victim */
                case 1222: /* Lock Request Timeout */
                case 8645: /* Timeout waiting for memory resource */
                case 8651: /* Low memory condition */
                    return new RepeatableOperationException(sqlException);
                default:
                    return sqlException;
            }
        }

        private static Exception MapOleDbException( OleDbException oledbException )
        {
            switch ( oledbException.ErrorCode )
            {
                default:
                    return oledbException;
            }
        }

        private static Exception MapOdbcException( OdbcException odbcException )
        {
            return odbcException;           
        }

        private static Exception MapOracleException( OracleException oracleException )
        {
            switch ( oracleException.Code )
            {
                case 104:  /* ORA-00104: Deadlock detected; all public servers blocked waiting for resources */
                case 1013: /* ORA-01013: User requested cancel of current operation */
                case 2087: /* ORA-02087: Object locked by another process in same transaction */
                case 60:   /* ORA-00060: Deadlock detected while waiting for resource */
                    return new RepeatableOperationException( oracleException );
                default:
                    return oracleException;
            }
        }

        public static Exception MapException( Exception exception )
        {
            if ( exception is SqlException )
            {
                return MapSqlException( exception as SqlException );
            }
            else if ( exception is OleDbException )
            {
                return MapOleDbException( exception as OleDbException );
            }
            else if (exception is OdbcException )
            {
                return MapOdbcException( exception as OdbcException );
            }
            else if (exception is OracleException )
            {
                return MapOracleException( exception as OracleException );
            }
            else
            {
                return exception;
            }
        }
   }
}

With that little framework of two classes, we can now selectively throw exceptions that convey whether a failed/failing transaction is worth repeating. Next step: How do we do actually run such repeats and make sure we neither lose data nor make the user unhappy in the process? Stay tuned.

Categories: Architecture | SOA | Enterprise Services | MSMQ

Deadlocks and other locking conflicts that cause transactional database operations to fail are things that puzzle many application developers. Sure, proper database design and careful implementation of database access (and appropriate support by the database engine) should take care of that problem, but it cannot do so in all cases. Sometimes, especially under stress and other situations with high lock contention, a database just has not much of a choice but picking at least one of the transactions competing for the same locks as the victim in resolving the deadlock situation and then aborts the chosen transaction. Generally speaking, transactions that abort and roll back are a good thing, because this behavior guarantees data integrity. In the end, we use transaction technology for those cases where data integrity is at risk. What’s interesting is that even though transactions are a technology that is explicitly about things going wrong, the strategy for dealing with failing transaction is often not much more than to bubble the problem up to the user and say “We apologize for the inconvenience. Please press OK”.

The appropriate strategy for handling a deadlock or some other recoverable reason for a transaction abort on the application level is to back out of the entire operation and to retry the transaction. Retrying is a gamble that the next time the transaction runs, it won’t run into the same deadlock situation again or that it will at least come out victorious when the database picks its victims. Eventually, it’ll work. Even if it takes a few attempts. That’s the idea. It’s quite simple.

What is not really all that simple is the implementation. Whenever you are using transactions, you must make your code aware that such “good errors” may occur at any time. Wrapping your transactional ODBC/OLEDB/ADO/ADO.NET code or calls to transactional Enterprise Services or COM+ components with a try/catch block, writing errors to log-files and showing message boxes to users just isn’t the right thing to do. The right thing is to simply do the same batch of work again and until it succeeds.

The problem that some developers seem to have with “just retry” is that it’s not so clear what should be retried. It’s a problem of finding and defining the proper transaction scope. Especially when user interaction is in the picture, things easily get very confusing. If a user has filled in a form on a web page or some dialog window and all of his/her input is complete and correct, should the user be bothered with a message that the update transaction failed due to a locking issue? Certainly not. Should the user know when the transaction fails because the database is currently unavailable? Maybe, but not necessarily. Should the user be made aware that the application he/she is using is for some sudden reason incompatible with the database schema of the backend database? Maybe, but what does Joe in the sales department do with that valuable piece of information?

If stuff fails, should we just forget about Joe’s input and tell him to come back when the system is happier to serve him? So, in other words, do we have Joe retry the job? That’s easy to program, but that sort of strategy doesn’t really make Joe happy, does it?

So what’s the right thing to do? One part of the solution is a proper separation between the things the user (or a program) does and the things that the transaction does. This will give us two layers and “a job” that can be handed down from the presentation layer down to the “transaction layer”. Once this separation is in place, we can come up with a mechanism that will run those jobs in transactions and will automate how and when transactions are to be retried. Transactional MSMQ queues turn out to be a brilliant tool to make this very easy to implement. More tomorrow. Stay tuned.

Categories: Architecture | SOA | Enterprise Services | MSMQ