As our team was starting to transform our parts of the Azure Services Platform from a CTP ‘labs’ service exploring features into a full-on commercial service, it started to dawn on us that we had set ourselves up for writing a bunch of ‘enterprise apps’. The shiny parts of Service Bus and Access Control that we parade around are all about user-facing features, but if I look back at the work we had to go from a toy service to a commercial offering, I’d guess that 80%-90% of the effort went into aspects like infrastructure, deployment, upgradeability, billing, provisioning, throttling, quotas, security hardening, and service optimization. The lesson there was: when you’re boarding the train to shipping a V1, you don’t load new features on that train –  you rather throw some off.

The most interesting challenge for these infrastructure apps sitting on the backend was that we didn’t have much solid ground to stand on. Remember – these were very early days, so we couldn’t use SQL Azure since the folks over in SQL were on a pretty heroic schedule themselves and didn’t want to take on any external dependencies even from close friends. We also couldn’t use any of the capabilities of our own bits because building infrastructure for your features on your features would just be plain dumb. And while we could use capabilities of the Windows Azure platform we were building on, a lot of those parts still had rough edges as those folks were going through a lot of the same that we went through. In those days, the table store would be very moody, the queue store would sometimes swallow or duplicate messages, the Azure fabric controller would occasionally go around and kill things. All normal –  bugs.

So under those circumstances we had to figure out the architecture for some subsystems where we need to do a set of coordinated action across a distributed set of resources – a distributed transaction or saga of sorts. The architecture had a few simple goals: when we get an activation request, we must not fumble that request under any circumstance, we must run the job to completion for all resources and, at the same time, we need to minimize any potential for required operator intervention, i.e. if something goes wrong, the system better knows how to deal with it – at best it should self-heal.

My solution to that puzzle is a pattern I call “Scheduler-Agent-Supervisor Pattern” or, short, “Supervisor Pattern”. We keep finding applications for this pattern in different places, so I think it’s worth writing about it in generic terms – even without going into the details of our system.

The pattern foots on two seemingly odd and very related assumptions: the system is perfect’ and ‘all error conditions are transient’. As a consequence, the architecture has some character traits of a toddler. It’s generally happily optimistic and gets very grumpy, very quickly when things go wrong – to the point that it will simply drop everything and run away screaming. It’s very precisely like that, in fact.

image

The first picture here shows all key pieces except the Supervisor that I’ll introduce later. At the core we have a Scheduler that manages a simple state machine made up of Jobs and those jobs have Steps. The steps may have a notion of interdependency or may be completely parallelizable. There is a Job Store that holds jobs and steps and there are Agents that execute operations on some resource.  Each Agent is (usually) fronted by a queue and the Scheduler has a queue (or service endpoint) through which it receives reply messages from the Agents.

Steps are recorded in a durable storage table of some sort that has at least the following fields: Current State (say: Disabled, Active), Desired State (say: Disabled, Active), LockedUntil (Date/Time value), and Actor plus any step specific information you want to store and eventually submit with the job to the step agent.

When Things Go Right

The initial flow is as follows:

(1)a – Submit a new job into the Scheduler (and wait)
(2)a – The Scheduler creates a new job and steps with an initial current state (‘Disabled’) in the job store 
(2)b – The Scheduler sets ‘desired state’ of the job and of all schedulable steps (dependencies?) to the target state (‘Active’) and sets the ‘locked until’ timeout of the step to a value in the near future, e.g. ‘Now’ + 2 minutes.
(1)b – Job submission request unblocks and returns

If all went well, we now have a job record and, here in this example, two step records in our store. They have a current state of ‘Disabled’ and a desired state of ‘Active’. If things didn’t go well, we’d have incomplete or partially wedged records or nothing in the job store, at all. The client would also know about it since we’ve held on to the reply until we have everything done – so the client is encouraged to retry. If we have nothing in the store and the client doesn’t retry – well, then the job probably wasn’t all that important, after all. But if we have at least a job record, we can make it all right later. We’re optimists, though; let’s assume it all went well.

For the next steps we assume that there’s a notion of dependencies between the steps and the second steps depends on the first. If that were not the case, the two actions would just be happening in parallel.

(3) – Place a step message into the queue for the actor for the first step; Agent 1 in this case. The message contains all the information about the step, including the current and desired state and also the LockedUntil that puts an ultimatum on the activity. The message may further contain an action indicator or arguments that are taken from the step record.
(4) – After the agent has done the work, it places a completion record into the reply queue of the Scheduler.
(5) – The Scheduler records the step as complete by setting the current state from ‘Disabled’ to ‘Active’; as a result the desired and the current state are now equal.
(6) – The Scheduler sets the next step’s desired state to the target state (‘Active’) and sets the LockedUntil timeout of the step to a value in the near future, e.g. ‘Now’ + 1 minute. The lock timeout value is an ultimatum for when the operation is expected to be complete and reported back as being complete in a worst-case success case. The actual value therefore depends on the common latency of operations in the system. If operations usually complete in milliseconds and at worst within a second, the lock timeout can be short – but not too short. We’ll discuss this  value in more detail a bit later.
(7), (8), (9) are equivalent to (3), (4), (5).

Once the last step’s current state is equal to the current state, the job’s current state gets set to the desired state and we’re done. So that was the “99% of the time” happy path.

image

When Things Go Wrong

So what happens when anything goes wrong? Remember the principle ‘all errors are transient’. What we do in the error case – anywhere – is to log the error condition and then promptly drop everything and simply hope that time, a change in system conditions, human or divine intervention, or – at worst – a patch will heal matters. That’s what the second principle ‘the system is perfect’ is about; the system obviously isn’t really perfect, but if we construct it in a way that we can either wait for it to return from a wedged state into a functional state or where we enable someone to go in and apply a fix for a blocking bug while preserving the system state, we can consider the system ‘perfect’ in the sense that pretty much any conceivable job that’s already in the system can be driven to completion.

In the second picture, we have Agent 2 blowing up as it is processing the step it got handed in (7). If the agent just can’t get its work done since some external dependency isn’t available – maybe a database can’t be reached or a server it’s talking to spews out ‘server too busy’ errors – it may be able to back off for a moment and retry. However, it must not retry past the LockedUntil ultimatum that’s in the step record. When things fail and the agent is still breathing, it may, as a matter of courtesy, notify the scheduler of the fact and report that the step was completed with no result, i.e. the desired state and the achieved state don’t match. That notification may also include diagnostic information. Once the LockedUntil ultimatum has passed, the Agent no longer owns the job and must drop it. It must even not report failure state back to the Scheduler past that point.

If the agent keels over and dies as it is processing the step (or right before or right after), it is obviously no longer in a position to let the scheduler know about its fate. Thus, there won’t be any message flowing back to the scheduler and the job is stalled. But we expect that. In fact, we’re ok with any failure anywhere in the system. We could lose or fumble a queue message, we could get a duplicate message, we could have the scheduler die a fiery death (or just being recycled for patching at some unfortunate moment) – all of those conditions are fine since we’ve brought the doctor on board with us: the Supervisor. 

image

The Supervisor

The Supervisor is a schedule driven process (or thread) of which one or a few instances may run occasionally. The frequency depends on much on the average duration of operations and the expected overall latency for completion of jobs.

The Supervisor’s job is to recover steps or jobs that have failed – and we’re assuming that failures are due to some transient condition. So if the system would expect a transient resource failure condition that prevented a job from completing just a second ago to be healed two seconds later, it’d depend on the kind of system and resource whether that’d be a good strategy.  What’s described here is a pattern, not a solution, so it depends on the concrete scenario to get the  timing right for when to try operations again once they fail.

This desired back-off time manifests in the LockedUntil value.  When a step gets scheduled, the Scheduler needs to state how long it is willing to wait for that step to complete; this includes some back-off time padding. Once that ultimatum has passed and the step is still in an inconsistent state (desired state doesn’t equal the current state)  the Supervisor can pick it up at any time and schedule it.

(1) – Supervisor queries the job store for any inconsistent steps whose LockedUntil value has expired.
(2) – The Supervisor schedules the step again by setting the LockedUntil value to a new timeout and submitting the step into the target actor’s queue
(3) – Once the step succeeds, the step is reported as complete on the regular path back to the Scheduler  where it completes normally as in steps (8), (9) from the happy-path scenario above. If it fails, we simply drop it again. For failures that allow reporting an error back to the Scheduler it may make sense to introduce an error counter that round-trips with the step so that the system could detect poisonous steps that fail ‘forever’ and have the Supervisor ignore those after some threshold.

The Supervisor can pursue a range of strategies for recovery. It can just take a look at individual steps and recover them by rescheduling them – assuming the steps are implemented as idempotent operations. If it were a bit cleverer, it may consider error information that a cooperative (and breathing) agent has submitted back to the Scheduler and even go as far as to fire an alert to an operator if the error condition were to require intervention and then take the step out of the loop by marking it and setting the LockedUntil value to some longer timeout so it’s taken out of the loop and someone can take a look.

At the job-scope, the Supervisor may want to perform recovery such that it first schedules all previously executed steps to revert back to the initial state by performing compensation work (all resources that got set to active are getting disabled again here in our example) and then scheduling another attempt at getting to the desired state.

In step (2)b up above, we’ve been logging current and desired state at the job-scope and with that we can also always find inconsistent jobs where all steps are consistent and wouldn’t show up in the step-level recovery query. That situation can occur if the Scheduler were to crash between logging one step as complete and scheduling the next step. If we find inconsistent jobs with all-consistent steps, we just need to reschedule the next step in the dependency sequence whose desired state isn’t matching the desired state of the overall job.

To be thorough, we could now take a look at all the places where things can go wrong in the system. I expect that survey to yield that at as long we can successfully get past step (2)b from the first diagram, the Supervisor is always in a position to either detect that a job isn’t making progress and help with recovery or can at least call for help. The system always knows what its current intent is, i.e. which state transitions it wants to drive, and never forgets about that intent since that intent is logged in the job store at all times and all progress against that intent is logged as well.  The submission request (1) depends on the outcome of (2)a/b to guard against failures while putting a job and its steps into the system so that a client can take corrective action. In fact, once the job record is marked as inconsistent in step (2)b, the scheduler could already report success back to the submitting party even before the first step is scheduled, because the Supervisor would pick up that inconsistency eventually.

 

Categories: Architecture | SOA | Azure | Technology

This post explains an essential class for asynchronous programming that lurks in the depths of the WCF samples: InputQueue<T>. If you need to write efficient server-side apps, you should consider reading through this and add InputQueue<T> to your arsenal. 

Let me start with: This blog post is 4 years late. Sorry! – and with that out of the way:

The WCF samples ship with several copies of a class that’s marked as internal in the System.ServiceModel.dll assembly: InputQueue<T>. Why are these samples – mostly those implementing channel-model extensions – bringing local copies of this class with them? It’s an essential tool for implementing the asynchronous call paths of many aspects of channels correctly and efficiently.

If you look closely enough, the WCF channel infrastructure resembles the Berkeley Socket model quite a bit – especially on the server side. There’s a channel listener that’s constructed on the server side and when that is opened (usually under the covers of the WCF ServiceHost) that operation is largely equivalent to calling ‘listen’ on a socket – the network endpoint is ready for business.  On sockets you’ll then call ‘accept’ to accept the next available socket connection from a client, in WCF you call ‘AcceptChannel’ to accept the next available (session-) channel. On sockets you then call ‘receive’ to obtain bytes, on a channel you call ’Receive’ to  obtain a message.

Before and between calls to '’AcceptChannel’ made by the server-side logic,  client-initiated connections – and thus channels – may be coming in and queue up for a bit before they handed out to the next caller of ‘AcceptChannel’, or the asynchronous equivalent ‘Begin/EndAcceptChannel’ method pair. The number of channels that may be pending is configured in WCF with the ‘ListenBacklog’ property that’s available on most bindings.

I wrote ‘queue up’ there since that’s precisely what happens – those newly created channels on top of freshly accepted sockets or HTTP request channels are enqueued into an InputQueue<T> instance and (Begin-)Accept is implemented as a dequeue operation on that queue. There are two particular challenges here that make the regular Queue<T> class from the System.Collections.Generic namespace unsuitable for use in the implementation of that mechanism: Firstly, the Dequeue method there is only available as a synchronous variant and also doesn’t allow for specifying a timeout. Secondly, the queue implementation doesn’t really help much with implementing the ListenBacklog quota where not only the length of the queue is limited to some configured number of entries, but accepting further connections/channels from the underlying network is also suspended for as long as the queue is at capacity and needs to resume as soon as the pressure is relieved, i.e. a caller takes a channel out of the queue.

To show that InputQueue<T> is a very useful general purpose class even outside of the context of the WCF channel infrastructure, I’ve lifted a version of it from one of the most recent WCF channel samples, made a small number of modifications that I’ll write about later, and created a little sample around it that I’ve attached to this post.

The sample I’ll discuss here is simulating parsing/reading IP addresses from a log-file and then performing a reverse DNS name resolution on those addresses – something that you’d do in a web-server log-analyzer or as the background task in a blog engine wile preparing statistics.

Reverse DNS name resolution is quite interesting since it’s embarrassingly easy to parallelize and each resolution commonly takes a really long time (4-5 seconds) –whereby all the work is done elsewhere. The process issuing the queries is mostly sitting around idle waiting for the response.  Therefore, it’s a good idea to run a number of DNS requests in parallel, but it’s a terrible idea to have any of these requests execute as a blocking call and burning a thread. Since we’re assuming that we’re reading from a log file that requires some parsing, it would also be a spectacularly bad idea to have multiple concurrent threads compete for access to that file and get into each other’s way. And since it is a file and we need to lift things up from disk, we probably shouldn’t do that ‘just in time’ as a DNS resolution step is done, but there should rather be some data readily waiting for processing.  InputQueue<T> is enormously helpful in such a scenario.

The key file of the sample code – the implementation of the queue itself aside – is obviously Program.cs. Here’s Main() :

static void Main(string[] args)
{
    int maxItemsInQueue = 10;
    InputQueue<IPAddress> logDataQueue = new InputQueue<IPAddress>();
    int numResolverLoops = 20;
    ManualResetEvent shutdownCompleteEvent = new ManualResetEvent(false);
    List<IPAddressResolverLoop> resolverLoops = new List<IPAddressResolverLoop>();
 
    Console.WriteLine("You can stop the program by pressing ENTER.");

We’re setting up a new InputQueue<IPAddress> here into which we’ll throw the parsed addresses from our acquisition loop that simulates reading from the log. The queue’s capacity will be limited to just 10 entries (maxItemsInQueue is the input value) and we will run 20 'resolver loops’, which are logical threads that process IP-to-hostname resolution steps.

    Console.WriteLine("You can stop the program by pressing ENTER.");
 
    // set up the loop termination callback
    WaitCallback loopTerminationCallback = o =>
    {
        if (Interlocked.Decrement(ref numResolverLoops) == 0)
        {
            shutdownCompleteEvent.Set();
        }
    };
 
    // set up the resolver loops
    for (int loop = 0; loop < numResolverLoops; loop++)
    {
        // add the resolver loop 'i' and set the done flag when the
        // last of them terminates
        resolverLoops.Add(
            new IPAddressResolverLoop(
                logDataQueue, loop, 
                loopTerminationCallback, null));
    }

Next we’re kicking off the resolver loops – we’ll look at these in detail a bit later. We’ve got a ManualResetEvent lock object that guards the program’s exit until all these loops have completed and we’re going to set that to signaled once the last loop completes – that’s what the loopTerminationCallback anonymous method is for.  We’re registering the method with each of the loops and as they complete the method gets called and the last call sets the event. Each loop gets a reference to the logDataQueue from where it gets its work.

   // set up the acquisition loop; the loop auto-starts
    using (LogDataAcquisitionLoop acquisitionLoop =
        new LogDataAcquisitionLoop(logDataQueue, maxItemsInQueue))
    {
        // hang main thread waiting for ENTER
        Console.ReadLine();
        Console.WriteLine("*** Shutdown initiated.");
    }

Finally we’re starting the acquisition loop that gets the data from the log file. The loop gets a reference to the logDataQueue where it places the acquired items and it’s passed the maxItemsInQueue quota that governs how many items may be read ahead into the queue. Once the user presses the ENTER key, the acquisition loop object is disposed by ways of exiting the using scope, which stops the loop.

    // shut down the queue; the resolvers will auto-close
    // as the queue drains. We don't need to close them here.
    logDataQueue.Shutdown();
 
    // wait for all work to complete
    shutdownCompleteEvent.WaitOne();
}

Lastly, the queue is shut down (by fittingly calling Shutdown). Shutdown closes the queue (all further enqueue operations are absorbed) and causes all pending readers for which no more entries are available on the queue to unblock immediately  and return null. The resolver loops will complete their respective jobs and will terminate whenever they dequeue null from the queue. As they terminate, they call the registered termination callback (loopTerminationCallback from above) and that will eventually cause shutdownCompletedEvent to become signaled as discussed above.

The log-reader simulator isn’t particularly interesting for this sample, even though one of the goodies is that the simulation executes on an I/O completion port instead of a managed thread-pool thread – that’s another blog post. The two methods of interest are Begin/EndGetLogData – all that’s of interest here is that EndGetLogData returns an IPAddress that’s assumed to be parsed out of a log.

class IPAddressLogReaderSimulator
{
    public IAsyncResult BeginGetLogData(AsyncCallback callback, object data);
    public IPAddress EndGetLogData(IAsyncResult result);
}

The simulator is used internally  by the LogDataAcquisitionLoop class – which we’ll drill into because it implements the throttling mechanism on the queue.

class LogDataAcquisitionLoop : IDisposable
{
    readonly IPAddressLogReaderSimulator ipAddressLogReaderSimulator;
    readonly InputQueue<IPAddress> logDataQueue;
    int maxItemsInQueue;
    int readingSuspended;
    bool shuttingDown;
 
    public LogDataAcquisitionLoop(InputQueue<IPAddress> logDataQueue, int maxItemsInQueue)
    {
        this.logDataQueue = logDataQueue;
        this.maxItemsInQueue = maxItemsInQueue;
        this.shuttingDown = false;
        this.ipAddressLogReaderSimulator = new IPAddressLogReaderSimulator();
        this.ipAddressLogReaderSimulator.BeginGetLogData(this.LogDataAcquired, null);
    }

The constructor sets up the shared state of the loop and kicks off the first read operation on the simulator. Once BeginGetLogData has acquired the first IPAddress (which will happy very quickly), the LogDataAcquired callback method will be invoked. 

    void LogDataAcquired(IAsyncResult result)
    {
        IPAddress address = this.ipAddressLogReaderSimulator.EndGetLogData(result);
 
        Console.WriteLine("-- added {0}", address);
        this.logDataQueue.EnqueueAndDispatch(address, this.LogDataItemDequeued);
        if (!this.shuttingDown && this.logDataQueue.PendingCount < this.maxItemsInQueue)
        {
            this.ipAddressLogReaderSimulator.BeginGetLogData(this.LogDataAcquired, null);
        }
        else
        {
            // the queue will be at the defined capacity, thus abandon 
            // the read loop - it'll be picked up by LogDataItemDequeued
            // as the queue pressure eases
            Interlocked.Exchange(ref this.readingSuspended, 1);
            Console.WriteLine("-- suspended reads");
        }
    }

The callback method gets the IPAddress and puts it into the queue – using the InputQueue<T>.EnqueueAndDispatch(T, Action) method. There are two aspects that are quite special about that method when compared to the regular Queue<T>.Enqueue(T) method. First, it does take a callback as the second argument alongside the item to be enqueued; second, the method name isn’t just Enqueue, it also says Dispatch.

When EnqueueAndDispatch() is called, the item and the callback get put into an internal item queue – that’s the ‘enqueue’ part. As we will see in context a bit later in this post, the ‘dequeue’ operation on the queue is the BeginDequeue/EndDequeue asynchronous method call pair. There can be any number of concurrent BeginDequeue requests pending on the queue. ‘Pending’ means that the calls – rather their async callbacks and async state – are registered in another queue internal to InputQueue<T> that preserves the call order. Thus, BeginDequeue always only puts the async callback and async state into that queue and returns afterwards. There is no thread spun or hung. That’s all it does. 

As things go, the best opportunity to service a pending dequeue operation on a queue is when an item is being enqueued. Consequently, EnqueueAndDispatch() will first put the item into the internal queue and will then look whether there are registered waiters and/or readers – waiters are registered by ‘(Begin-)WaitForItem’, readers are registered by ‘(Begin-)Dequeue’. Since it’s known that there a new item in the queue now, the operation will iterate overall waiters and complete them – and does so by invoking their async callbacks, effectively lending the  enqueue operation’s thread to the waiters. If there’s at least one pending reader, it’ll then pop a message from the head of the internal item queue and call the reader’s async callback, lending the enqueue operation’s thread to processing of the dequeue operation. If that just made your head spin – yes, the item may have been dequeued and processed as EnqueueAndDispatch returns.

There is an overload for EnqueueAndDispatch() that takes an extra boolean parameter that lets you cause the dispatch operation to happen on a different thread, and there is also a EnqueueWithoutDispatch() method that just won’t dispatch through and a standalone Dispatch() method. 

The callback supplied to EnqueueAndDispatch(), here the LogDataItemDequeued method, is am Action delegate. The queue will call this callback as the item is being dequeued and, more precisely, when the item has been removed from the internal item queue, but just before it is returned to the caller. That turns out to be quite handy. If you take another look at the LogDataAcquired method you’ll notice that we’ve got two alternate code paths after EnqueueAndDispatch(). The first branch is called when the queue has not reached capacity and it’s not shutting down. When that’s so, we’re scheduling getting the next log item – otherwise we don’t. Instead, we set the readingSuspended flag and quit – effectively terminating and abandoning the loop. So how does that get restarted when the queue is no longer at capacity? The LogDataItemDequeued callback!

    void LogDataItemDequeued()
    {
        // called whenever an item is dequeued. First we check 
        // whether the queue is no longer full after this 
        // operation and the we check whether we need to resume
        // the read loop.
        if (!this.shuttingDown &&
            this.logDataQueue.PendingCount < this.maxItemsInQueue &&
            Interlocked.CompareExchange(ref this.readingSuspended, 0, 1) == 1)
        {
            Console.WriteLine("-- resuming reads");
            this.ipAddressLogReaderSimulator.BeginGetLogData(this.LogDataAcquired, null);
        }
    }

The callback gets called for each item that gets dequeued. Which means that we’ll get an opportunity to restart the loop when it’s been stalled because the queue reached capacity. So we’re checking here whether the queue isn’t shuttong down and whether it’s below capacity and if that’s so and the readingSuspended flag is set, we’re  restarting the read loop. And that’s how the throttle works.

So now we’ve got the data from the log in the queue and we’re throttling nicely so that we don’t pull too much data into memory. How about taking a look at the DNS resolver loops that process the data?

class IPAddressResolverLoop : IDisposable
{
    readonly InputQueue<IPAddress> logDataQueue;
    readonly int loop;
    readonly WaitCallback loopCompleted;
    readonly object state;
    bool shutdown;
 
    public IPAddressResolverLoop(InputQueue<IPAddress> logDataQueue, int loop, WaitCallback loopCompleted, object state)
    {
        this.logDataQueue = logDataQueue;
        this.loop = loop;
        this.loopCompleted = loopCompleted;
        this.state = state;
        this.logDataQueue.BeginDequeue(TimeSpan.MaxValue, this.IPAddressDequeued, null);
    }

This loop is also implemented as a class and the fields hold shared that that’s initialized in the constructor. This loop also auto-starts and does so by calling BeginDequeue on the input queue. As stated above, BeginDequeue  commonly just parks the callback and returns.

    void IPAddressDequeued(IAsyncResult ar)
    {
        IPAddress address = this.logDataQueue.EndDequeue(ar);
        if (!this.shutdown && address != null)
        {
            Console.WriteLine("-- took {0}", address);
            Dns.BeginGetHostEntry(address, this.IPAddressResolved, new object[] { Stopwatch.StartNew(), address });
        }
        else
        {
            this.loopCompleted(this.state);
        }
    }

As an IPAddress is becomes available on the queue, the callback is being invoked and that’s quite likely on a thread lent by EnqueueAndDispatch() and therefore sitting  on the thread the log file generator is using to call back for completion of the BeginGetLogData method if you trace things back. If we get an address and the value isn’t null, we’ll then proceed to schedule the DNS lookup via Dns.BeginGetHostEntry. Otherwise we’ll terminate the loop and call the loopCompleted callback. In Main() that’s the anonymous method that counts down the loop counter and signals the event when it falls to zero.

    void IPAddressResolved(IAsyncResult ar)
    {
        var args = ((object[])ar.AsyncState);
        var stopwatch = (Stopwatch)args[0];
        var address = (IPAddress)args[1];
 
        stopwatch.Stop();
        double msecs = stopwatch.ElapsedMilliseconds;
 
        try
        {
            IPHostEntry entry = Dns.EndGetHostEntry(ar);
            Console.WriteLine("{0}: {1} {2}ms", this.loop, entry.HostName, msecs);
        }
        catch (SocketException)
        {
            // couldn't resolve. print the literal address
            Console.WriteLine("{0}: {1} {2}ms", this.loop, address, msecs);
        }
        // done with this entry, get the next
        this.logDataQueue.BeginDequeue(TimeSpan.MaxValue, this.IPAddressDequeued, null);
    }

The IPAddressResolved method just deals with the mechanics of printing out the result of the lookup and then schedules another BeginDequeue call to start the next iteration.

Summary: The enabler for and the core piece of the implementation of this scenario is InputQueue<T> – the dequeue-callback enables implementing throttling effectively and the dispatch logic provides an efficient way to leverage threads in applications that leverage asynchronous programming patterns, especially in I/O driven situations as illustrated here.

And last but not least – here’s teh codez; project file is for VS2010, throw the files into a new console app for VS2008 and mark the project to allow unsafe code (for the I/O completion thread pool code).

UsingInputQueue.zip (13.85 KB) 

or if you'd rather have a version of InputQueue that is using the regular thread pool, download the WCF samples and look for InputQueue.cs.

[The sample code posted here is subject to the Windows SDK sample code license]

Categories: Architecture | CLR | WCF

Book cover of Programming WCF Services

Juval Löwy’s very successful WCF book is now available in its third edition – and Juval asked me to update the foreword this time around. It’s been over three years since I wrote the foreword to the first edition and thus it was time for an update since WCF has moved on quite a bit and the use of it in the customer landscape and inside of MS has deepened where we’re building a lot of very interesting products on top of the WCF technology across all businesses – not least of which is the Azure AppFabric Service Bus that I work on and that’s entirely based on WCF services.

You can take a peek into the latest edition at the O’Reilly website and read my foreword if you care. To be clear: It’s the least important part of the whole book :-)

Categories: AppFabric | Azure | WCF | Web Services

September 9, 2010
@ 01:20 AM

For the (blog-) record - if you'd ever be looking for me on Twitter (where I'm sadly writing tons more than here), I'm @clemensv over there :-)

Categories: Blog

In case you need a refresher or update about the things me and our team work on at Microsoft, go here for a very recent and very good presentation by my PM colleague Maggie Myslinska from TechEd Australia 2010 about Windows Azure AppFabric with Service Bus demos and a demo of the new Access Control V2 CTP

Categories: AppFabric | SOA | Azure | Technology | ISB | WCF | Web Services