December 15, 2004
@ 12:37 PM

Just had to figure this out and thought I’d share. With the XmlSerializer (.NET v1.1), one would think that TimeSpan maps to the XML Schema type duration, but it doesn’t – for whatever reason. Anyways … here’s a trick to make it work. Interestingly enough, the XmlConvert class understands TimeSpan. However it does not work correctly with fractional seconds and ignores them. That’s enough for my purposes in the given app and therefore I am ignoring that issue in the snippet below and treat all times of less than one second as equivalent to zero. If it isn’t enough for you, you’d have to write an alternate implementation for the respective XmlConvert functionality or beg Microsoft to fix it. (Doug? ;-)

 

private TimeSpan interval;

[XmlElement("Interval", DataType="duration")]
public string IntervalXml
{
    get
    {
        if (Interval < TimeSpan.FromSeconds(1) )
        {
            return null;
        }
        else
        {
            return XmlConvert.ToString(interval);
        }
    }
    set
    {
        if (value == null )
        {
            interval = TimeSpan.Zero;
            return;
        }
        TimeSpan newInterval = XmlConvert.ToTimeSpan(value);
        if (interval == newInterval)
            return;
        interval = newInterval;
    }
}

[XmlIgnore]
public TimeSpan Interval
{
    get
    {
        return interval;
    }
    set
    {
        if (interval == value)
            return;
        interval = value;
    }
}

 

 

Categories: XML

December 15, 2004
@ 12:13 PM

Was den deutschen Sprachgebrauch angeht ist die IT Branche in Deutschland unfassbar gruselig, wenn's um den Einsatz von Anglizismen angeht. Doch da wird nicht nur aus Gründen der "Coolness" der (fast) passende englische Begiff der deutschen Entsprechung vorgezogen. Nein! Wirklich übel ist, dass man dabei häufig auch noch direkt die Ernsthaftigkeit des gemeinten direkt mit aus dem Fenster wirft. Drei Beispiele:

"Meeting": Lass uns mal hinsetzen und etwas unverbindlich ein bisschen Kaffee trinken, Schnittchen knabbern und labern.
dagegen
"Besprechung": Wir setzen uns um einen Tisch, arbeiten die Agenda durch und nachher weiss jeder was als nächstes zu tun ist.

"Community": Hallo, wir lieben Euch alle. Wir wissen zwar nicht genau warum, ist ja aber auch irgendwie egal, gell?
dagegen
"Interessengemeinschaft": Wir haben alle die gleichen Interessen (und Probleme), arbeiten zusammen dran und haben vielleicht auch Spass dabei.

"Get Together": Bleibt doch nach der Knechterei eben noch 10 Minuten hier. Wir haben auch noch ein Gläschen Prosecco.
dagegen
"Treffen in der Kneipe um die Ecke": Habt Ihr noch Lust auf ein Bier oder 5, damit wir auch mal über was anderes reden können als das Projekt?

Wenn ich über "Service Oriented Architectures" rede und den englischen Begriff auch im Deutschen verwende, dann tue ich dass, damit jeder weiss, worum es geht und mit der gleichen Fachbegriffswelt auch in der englischsprachigen Literatur nachschlagen kann. Dass aber direkt jeder Satz in dieser unserer Branche ohne Einsatz eines einzelnen (englischen) Fremdwortes offenbar gleich als unvollständig oder grammatikalisch falsch (oder zu unaffig?) gilt, ist arg peinlich. Mea culpa. Ich erwische mich auch selbst dabei. Nicht gut.

Categories: Other Stuff

December 11, 2004
@ 02:35 PM

The stack trace below (snapshot taken at a breakpoint in [WebMethod] "HelloWorld") shows that I am having quite a bit of programming fun these days. Server-side ASP.NET hooked up to a MSMQ listener.  

simpleservicerequestinweb.dll!SimpleServiceRequestInWeb.Hello.HelloWorld() Line 53 C#
system.web.services.dll!System.Web.Services.Protocols.LogicalMethodInfo.Invoke(System.Object target, System.Object[] values) + 0x92 bytes 
system.web.services.dll!System.Web.Services.Protocols.WebServiceHandler.Invoke() + 0x9e bytes 
system.web.services.dll!System.Web.Services.Protocols.WebServiceHandler.CoreProcessRequest() + 0x142 bytes 
system.web.services.dll!System.Web.Services.Protocols.SyncSessionlessHandler.ProcessRequest(System.Web.HttpContext context) + 0x6 bytes 
system.web.dll!CallHandlerExecutionStep.System.Web.HttpApplication+IExecutionStep.Execute() + 0xb4 bytes 
system.web.dll!System.Web.HttpApplication.ExecuteStep(System.Web.HttpApplication.IExecutionStep step, bool completedSynchronously) + 0x58 bytes 
system.web.dll!System.Web.HttpApplication.ResumeSteps(System.Exception error) + 0xfa bytes 
system.web.dll!System.Web.HttpApplication.System.Web.IHttpAsyncHandler.BeginProcessRequest(System.Web.HttpContext context, System.AsyncCallback cb, System.Object extraData) + 0xe3 bytes 
system.web.dll!System.Web.HttpRuntime.ProcessRequestInternal(System.Web.HttpWorkerRequest wr) + 0x1e7 bytes 
system.web.dll!System.Web.HttpRuntime.ProcessRequest(System.Web.HttpWorkerRequest wr) + 0xb0 bytes 
newtelligence.enterprisetools.dll!newtelligence.EnterpriseTools.Msmq.MessageQueueAsmxDispatcher.MessageReceived(System.Object sender = {newtelligence.EnterpriseTools.Msmq.MessageQueueListener}, newtelligence.EnterpriseTools.Msmq.MessageReceivedEventArgs ea = {newtelligence.EnterpriseTools.Msmq.MessageReceivedEventArgs}) Line 33 C#
newtelligence.enterprisetools.dll!newtelligence.EnterpriseTools.Msmq.MessageQueueListener.ReceiveLoop() Line 305 + 0x2b bytes C#

Categories: ASP.NET | MSMQ | Web Services

December 11, 2004
@ 10:26 AM

I haven't really looked around what other people's ideas (or implementations) are on referrer spam, but I think these idiots who want to use our blogs as a way to boost their Google rank are setting themselves up for trouble, because we are not really stupid. For the time being, I am simply letting it all run to collect evidence. There are wonderful things we can do with all these spam URLs. Distributed denial of service attacks come to mind ;-)  Or just redirect them to themselves.

Seriously, I am thinking of having word filtering and a manual negative list in the blog engine and to expose that list as a separate RSS as well as to allow import of such RSS lists into my blog engine. My exported list might also reference all my trusted negative lists that I import, so that this forms a mesh where folks can report those idiots and the engines will pick it up and throw out the crap. 

Categories: Blog | dasBlog

See Part 3

In Part 3, I illustrated a simple strategy to re-run a “repeatable” transaction using a simple “while” loop. A better and more fault resilient way of handling such cases (we are talking about transactions, after all) is to employ a transactional resource manager such as MSMQ from which to retrieve the transaction input. If the transaction fails due to a catastrophic failure (power outage or worse), an in-memory recovery strategy like the one shown in “RunTx()” will not get us very far. Instead, we will put the transaction input into a transactional queue, read it from the queue using a specialized message queue listener and equip that listener with some handling logic that is similar to what is shown in the “RunTx()” example.

Before I will show that, however, I will give you a generic, multi-threaded message queue listener that is not transaction aware (in the desired way) and which I will specialize in Part 5 (the last part) of this series. The reasons that I post the message listener logic in two steps are that (a) you might find a generic listener useful, (b) it is quite a bit of code and I don’t want the “essentials” of the recovery code to be buried amongst the basic queue listener framework, and (c) I am simply too lazy today to explain it all ;-)

Using this queue listener is pretty straightforward and should be rather self-explanatory. To listen, create an instance of the listener using one of the constructors and supply a “queueName” argument that is compatible with the System.Messaging.MessageQueue class. The queue must exist. The listener will bubble messages to the MessageReceived delegate (“unicast event”), which you must implement in your class and assign to the listener in order to get at the messages.

MessageQueueListener queueListener = new MessageQueueListener(sourceQueuePath, 4 );
queueListener.MessageReceived = new MessageReceivedEventHandler(queueListener MessageReceived);
queueListener.ProcessingError += new ProcessingErrorEventHandler(queueListener ProcessingError);
queueListener.Start();

And here are 344 lines of pure developer joy:

using System;
using System.Collections;
using System.Messaging;
using System.Threading;

namespace newtelligence.EnterpriseTools.Msmq
{
    /// <summary>
    /// Event argument class for the MessageQueueListener.MessageReceived unicast event
    /// </summary>
    public class MessageReceivedEventArgs : EventArgs
    {
        private Message receivedMessage;

        /// <summary>
        /// Constructor
        /// </summary>
        /// <param name="receivedMessage">A message</param>
        public MessageReceivedEventArgs( Message receivedMessage )
        {
           this.receivedMessage = receivedMessage;
        }

        /// <summary>
        /// Received message
        /// </summary>
        public Message ReceivedMessage
        {
            get
            {
                return receivedMessage;
            }
        }
    }

    /// <summary>
    /// Event argument class for the MessageQueueListener.ProcessingError event
    /// </summary>
    public class ProcessingErrorEventArgs : EventArgs
    {
        /// <summary>
        ///
        /// </summary>
        private Exception exception;

        /// <summary>
        /// Constructor
        /// </summary>
        /// <param name="exception">An exception</param>
        public ProcessingErrorEventArgs( Exception exception )
        {
            this.exception = exception;
        }

        /// <summary>
        /// Intercepted exception
        /// </summary>
        public Exception Exception
        {
            get
            {
                return exception;
            }
        }
    }

    /// <summary>
    /// Delegate for the MessageQueueListener.MessageReceived unicast event
    /// </summary>
    public delegate void MessageReceivedEventHandler( object sender, MessageReceivedEventArgs ea );
    /// <summary>
    /// Delegate for the MessageQueueListener.ProcessingError event
    /// </summary>
    public delegate void ProcessingErrorEventHandler( object sender, ProcessingErrorEventArgs ea );
   
    /// <summary>
    /// This class implements a multi-threaded message queue listener
    /// </summary>
    public class MessageQueueListener
    {
        protected Guid instanceId = Guid.NewGuid();
        protected Thread[] listenerThreads;
        protected string queueName=null;
        protected int numThreads=1;
        protected ThreadPriority priority=ThreadPriority.Normal;
        protected ManualResetEvent continueInterrupt = new ManualResetEvent(true);
        protected ManualResetEvent pauseInterrupt = new ManualResetEvent(false);
        protected ManualResetEvent stopInterrupt = new ManualResetEvent(false);
        protected int peekTimeout = 5;
        protected MessageReceivedEventHandler messageReceived;
        protected int refCount = 0;
       
        /// <summary>
        /// Initializes a new instance of the MessageQueueListener class
        /// </summary>
        /// <param name="queueName">Message name where listener waits messages</param>
        public MessageQueueListener(string queueName)
        {
            queueName = queueName;
            Initialize();
        }

        /// <summary>
        /// Initializes a new instance of the MessageQueueListener class
        /// </summary>
        /// <param name="queueName">Name of the queue to listen on</param>
        /// <param name="numThreads">Number of threads to run the listener on</param>
        public MessageQueueListener(string queueName, int numThreads)
        {
            queueName = queueName;
            numThreads = numThreads;
            Initialize();
        }


        /// <summary>
        /// Initializes a new instance of the MessageQueueListener class
        /// </summary>
        /// <param name="queueName">Name of the queue to listen on</param>
        /// <param name="numThreads">Number of threads run the listener on</param>
        /// <param name="priority">Thread priority</param>
        public MessageQueueListener(string queueName, int numThreads, ThreadPriority priority)
        {
            queueName = queueName;
            numThreads = numThreads;
            priority = priority;
            Initialize();
        }


        /// <summary>
        /// Adds a reference to the reference counter.
        /// </summary>
        /// <returns>Current reference count</returns>
        public int AddReference()
        {
            return ++ refCount;
        }

        /// <summary>
        /// Releases a reference from the reference counter.
        /// </summary>
        /// <returns>Current reference count</returns>
        /// <remarks>If the reference counter drops to or below zero, the listener is stopped.</remarks>
        public int ReleaseReference()
        {
            if ( -- refCount <= 0 )
            {
                Stop();
            }
            return refCount;
        }

        /// <summary>
        /// Initializes the MessageQueueListener
        /// </summary>
        protected virtual void Initialize()
        {
            listenerThreads = new Thread[ numThreads];
        }

        /// <summary>
        /// Unicast event that is raised when a message has been received
        /// </summary>
        public MessageReceivedEventHandler MessageReceived
        {
            get
            {
                return messageReceived;
            }
            set
            {
                if ( messageReceived == value)
                    return;
                 messageReceived = value;
            }
        }

        /// <summary>
        /// Multicast event that is raised when an exception occurs during processing.
        /// </summary>
        public event ProcessingErrorEventHandler ProcessingError;

        /// <summary>
        /// Raises the ProcessingError event
        /// </summary>
        /// <param name="pea"></param>
        protected void OnProcessingError( ProcessingErrorEventArgs pea )
        {
            if (ProcessingError != null )
            {
                ProcessingError( this, pea );
            }
        }

        /// <summary>
        /// Starts the message queue listener. If the threads are already running,
        /// Start() does nothing.
        /// </summary>
        public void Start()
        {
            for(int i=0;i< numThreads;i++)
            {
                if ( listenerThreads[i] == null )
                {
                    Thread thread = new Thread(new ThreadStart(this.ReceiveLoop));
                    thread.Name = String.Format("{0}:{1}:{2}",i, instanceId,GetType().Name);
                    thread.IsBackground = true;
                    thread.Priority = priority;
                    listenerThreads[i] = thread;
                    thread.Start();
                }
            }
        }

        /// <summary>
        /// Set listener state in stop
        /// </summary>
        public void Stop()
        {
            Resume();
            stopInterrupt.Set();
            for(int i=0;i< numThreads;i++)
            {
                listenerThreads[i].Join();
                listenerThreads[i] = null;
            }
        }

        /// <summary>
        /// Set listener state to "paused".
        /// </summary>
        /// <remarks>
        /// Setting the listener ito the "paused" state will not suspend processing
        /// immediately. All messages that are currently being processed will be
        /// processed, but no new messages are being dequeued.
        /// </remarks>
        public void Pause()
        {
            continueInterrupt.Reset();
            pauseInterrupt.Set();                            
        }

        /// <summary>
        /// Set listener state to "resume". This will resume the listener from
        /// the "paused" state.
        /// </summary>
        public void Resume()
        {
            pauseInterrupt.Reset();
            continueInterrupt.Set();           
        }

        /// <summary>
        /// Gets a value indicating whether the listener is paused.
        /// </summary>
        public bool IsPaused
        {
            get
            {
                return pauseInterrupt.WaitOne(TimeSpan.Zero,false);
            }
        }

        /// <summary>
        /// Gets a value indicating whether the listener is paused.
        /// </summary>
        public bool IsStopping
        {
            get
            {
                return stopInterrupt.WaitOne(TimeSpan.Zero,false);
            }
        }


        /// <summary>
        /// This is the thread entry point for the main receive loop.
        /// </summary>
        protected virtual void ReceiveLoop()
        {
            using (MessageQueue sourceQueue = new MessageQueue( queueName, false ))
            {
                bool isTransactional = sourceQueue.Transactional;
                sourceQueue.MessageReadPropertyFilter.SetAll();

                while ( !IsStopping )
                {
                    // if the continue interrupt is not set, we will hang here
                    // until it is set again (resuming)
                    WaitHandle.WaitAny(new WaitHandle[]{ continueInterrupt, stopInterrupt});

                    try
                    {
                        //peek on the queue and wait for a message becoming available
                        IAsyncResult asynchResult = sourceQueue.BeginPeek(TimeSpan.FromSeconds( peekTimeout),null,null);
                        int firedWaitHandle = WaitHandle.WaitAny(new WaitHandle[]{asynchResult.AsyncWaitHandle, pauseInterrupt, stopInterrupt});
                        if ( firedWaitHandle == 0 )
                        {
                            sourceQueue.EndPeek(asynchResult);
                            Message receivedMessage = sourceQueue.Receive( TimeSpan.FromSeconds(0),
                                isTransactional?MessageQueueTransactionType.Single:MessageQueueTransactionType.None );
                          
                            try
                            {
                                messageReceived(this,new MessageReceivedEventArgs(receivedMessage));                       
                            }
                            finally
                            {
                                receivedMessage.Dispose();
                            }
                        }
                        else if ( firedWaitHandle == 1 )
                        {
                            // Pausing. Clean up the asynchronous Peek operation
                            // (we'll assume that we have time for that) and then go
                            // back to the top of the loop where we'll hang on the
                            // continueInterrupt.
                            sourceQueue.EndPeek(asynchResult);
                        }
                        else
                        {
                            // stopping. Skip the EndPeek() and exit the thread immediately.
                            return;
                        }
                    }
                    catch(MessageQueueException ex)
                    {
                        if ( ex.MessageQueueErrorCode != MessageQueueErrorCode.IOTimeout  )
                        {
                            break;
                        }
                    }
                    catch( Exception ex )
                    {
                        // absorb all other exceptions and surface them from he listener
                        // using the ProcessingError event.
                        OnProcessingError( new ProcessingErrorEventArgs(ex));
                    }
                }
            }
        }
    }
}

 

Categories: MSMQ

The little series I am currently writing here on my blog has inspired me to write way too more code than actually necessary to get my point across ;-) So by now I've got my own MSMQ transport for WSE 2.0 (yes, I know that others have written that already, but I am shooting for a "enterprise strength" implementation), a WebRequest/WebResponse pair to smuggle under arbitrary ASMX proxies and I am more than halfway done with a server-side host for MSMQ-to-ASMX (spelled out: ASP.NET Web Services).

What bugs me is that WSE 2.0's messaging model is "asynchronous only" and that it always performs a push/pull translation and that there is no way to push a message through to a service on the receiving thread. Whenever I grab a message from the queue and put it into my SoapTransport's "Dispatch()" method, the message gets queued up in an in-memory queue and that is then, on a concurrent thread, pulled (OnReceiveComplete) by the SoapReceivers collection and submitted into ProcessMessage() of the SoapReceiver (like any SoapService derived implementation) matching the target endpoint. So while I can dequeue from MSMQ within a transaction scope (ServiceDomain), that transaction scope doesn't make it across onto the thread that will actually execute the action inside the SoapReceiver/SoapService.

So now I am sitting here, contemplating and trying to figure out a workaround that doesn't require me to rewrite a big chunk of WSE 2.0 (which I am totally not shy of if that is what it takes). Transaction marshaling, thread synchronization, ah, I love puzzles. Once I am know how to solve this and have made the adjustments, I'll post the queue listener I promised to wrap up the series. The other code I've written in the process will likely surface in some other way.

December 5, 2004
@ 02:33 PM

"My Lists", "My Photos", "My Profile" .... sounds all very familiar over there in MSN Spaces. So ... roll in the Web service interfaces, please.

Categories: Web Services | Weblogs | XML

December 2, 2004
@ 11:01 PM

That blue thing that I am running PowerPoint on during my talks is an Alienware Area51-m laptop. Heavy like a block of lead, battery life of about 2 hrs (which makes me carry two), but very fast and "feel good". Except for some minor annoyances, I am happy with it for about a year now. And I don't think I have every had a machine that was still faster than most of the other machines out there 12 months after I got it. One big problem I had with it until today, though, was that the machine got increasingly louder - more precisely, the fan noise was just unbearable. Since the machine is essentially a high-end desktop in a notebook shell, the box needs a lot of cooling. It turns out that the elaborate cooling mechanism that Alienware puts in those machines clogs up easily with dust, but hides the dust quite well. So while the machine looked "clean", it really, really, really wasn't. Wow. Yucky! Now it's all clean again and the fan is back to a bearable noise level. It's happy to be breathing again.

Categories: Other Stuff

December 2, 2004
@ 10:40 PM

Wow. It's been a long time. 2 years again, already? Mr. Forte lets me stay at his house and invited me to his Christmas party in New York and since I am not going to Denver for the rest of the year as I was planning and I still need a few miles to retain my current status tier with Lufthansa for '05/'06, there wasn't much to think about. I am thrilled to go back. In the two years I lived in Manhattan ('95 and '96), New York has become my "second home" and it is always fantastic to go back. I love The City. Dec 17-21.

Categories: Other Stuff

See Part 2

Now that we have a way to detect deadlocks and similar error conditions we might encounter during a transaction, we need to find a mechanism that will allow us to back out of the doomed transaction and retry the failed operation. The good thing about transaction failures of that sort is that you can essentially retry as often as you want without doing anything wrong – as long as all the resources you are reading from and writing to are guarded by transactional resource managers that are enlisted in the transaction. And that’s the hint at the trick we need to use.

If we were executing the code below through a remote call, it might be rather difficult to recover from a failure. The local transaction inside the component will roll back – ok – but that punts the problem back out to the caller, who will have to implement the appropriate logic to single out transaction failure causes that can be fixed by a retry and to actually resubmit the request. That can be rather tricky.

[Transaction(TransactionOption.Required)]
public class DoesStuff : ServicedComponent
{
    /* things omitted */
 
    [AutoComplete]
    public void DoStuff( string argument )
    {
        try
        {
           dbConnection.Open();
           sprocUpdateAndQueryStuff.Parameters["@StuffArgument"].Value = argument;
           result = this.GetResultFromReader( sprocUpdateAndQueryStuff.ExecuteReader() );
        }
        catch( Exception exception )
        {
            throw RepeatableOperationExceptionMapper.MapException( exception );                           
        }
        finally
        {
            dbConnection.Close();
        }
    }
}

Even worse, we might not even learn about a failure depending on when and why the transaction fails. In distributed transaction scenarios, it is absolutely possible (even though rare) that a transaction fails long after the application code is done with all of its processing. Let’s assume everything in the call to DoStuff() above works just fine. The code calls a stored procedure on a remote SQL Server, SQL Server is completely happy and so is the component. Hence, neither has any good reason to complain. Let’s further assume there is a component DoesOtherStuff just like the one above and that component is subsequently called by the remote client and updates some other SQL Server database from within the same distributed transaction. That SQL Server and that component are both happy as well. Everybody is happy. The client commits the transaction, maybe by calling ServiceDomain.Leave() or leaving from a method of a ServicedComponent that serves as the transaction root. Now DTC (the transaction coordinator) wants to go around and ask everyone for their opinion about the transaction outcome. At this point, all user code you’ve contributed to this story is done executing. And now (we are talking about transactions, so this is where the fun is) disaster strikes and one of the SQL Servers you were talking to suddenly gets cut off from the network in some way. Hardware issue. Crash. Something. If you are lucky, the problem occurs while DTC is still walking around asking for votes (Phase 1). In this case, the transaction will just abort. If you are horribly unlucky, disaster strikes while DTC is already issuing commit commands (Phase 2). In that case, the transaction may (and will) just hang forever until either the network connection is restored or some operator throws in the hammer and resolves the transaction manually (and may deliberately cause inconsistency). Since all this might happen asynchronously, none of your code might ever end up knowing that the transaction failed and the (transient) transaction results just end up being quietly rolled back in the background.

Before you get all nervous about the asynchronous case, let’s look at a strategy to deal with the scenario from the client side, assuming that the transaction is resolved synchronously. We will take into account that there are transaction failures that can be fixed by rerunning the transaction.

public void RunTx(string argument)
{
    TransactionStatus status;
    bool done = false;

    while( !done )
    {
        ServiceConfig cfg = new ServiceConfig();
        cfg.Transaction = TransactionOption.Required;
        ServiceDomain.Enter(cfg);

        try
        {
            DoesStuff doesStuff = new DoesStuff();
            doesStuff.DoStuff(argument);

            ContextUtil.SetComplete();
            done = true;
        }
        catch (RepeatableOperationException re)
        {
            ContextUtil.SetAbort();
        }
        catch
        {
            ContextUtil.SetAbort();
            throw;
        }
        finally
        {
            status = ServiceDomain.Leave();
        }
    }
    return status;
}

If a component throws an exception indicating that running the transaction again might yield a success, we’ll catch that condition, abort the local transaction and since we don’t set done to true, we will get back to the top of the loop and rerun the transaction. If we find another exception, we will abort the transaction and re-throw the exception we caught. The loop might look a bit awkward, but it does the job of re-running the transaction whenever that’s desired.  So all you need to do is to use that sort of a loop everywhere in your code and you are all set. Well … almost.

Assume that the argument we pass into RunTx() is something as complex and valuable (!) as a purchase order that you just got into your system. A purchase order is as good as cash. The longer you keep that purchase order around in memory, the greater is the risk that your local process (or machine) will fail catastrophically for some arbitrary reason. So if you need to re-run the transaction one or two times because the backend system is struggling (again, this all technology for the worst imaginable case) you are making yourself increasingly vulnerable to a local failure until the transaction finally succeeds. To reduce that risk, it therefore makes sense to add another step here.

Before we even start the transaction that does the processing, we grab the input data and guard it using a transactional resource manager – such as MSMQ. If we throw the input data into a private, transactional MSMQ queue as the first step and before we do anything else, the data is safely guarded on disk. If the machine fails once the data is in the queue, the data – encapsulated in a message – will still be available once the machine comes back up. Using a message queue listener, the messages in the queue are then each read from within individual transactions and the data is submitted into the processing component. Because the messages are simply put back into the queue if the transaction fails, re-running the transaction is practically automatic once the message queue listener restarts. Which leaves a problem: There are failing transactions that we want to repeat and there are failing transactions of which we know that they will not succeed no matter how often we try them. If a transactional component throws an exception because it has a bug, the transaction will abort and no number of re-runs will have that transaction succeed. Messages (data) that cause this sort of behavior are “poisonous” and we need to sort them out. Stay tuned; the message queue listener that takes care of all of that is up next.

Categories: MSMQ