A Weekend With Indigo. Part 3: Hard-Core Messaging. Duplex Conversations.

11 minutes read

[Read Part 1 and Part 2 first]

Like with parts 1 and 2, I’ll stick with the “this isn’t RPC” theme for this 3rd part of this little series and will show how to flow free form XML from and to services. However, I will drop the “client”/”server” nomenclature from here on and will talk about endpoints. If you look at the contract below (along with the following explanation, of course), you’ll quickly figure out why – both parties in the “buyer”/”seller” conversation I am declaring in the contract below, act as client and as server at the same time.

In contrast to the previous two examples, I am not using the raw Message class, but I move one notch up on the messaging stack and use the XmlSerializer formatting mode for Indigo, which allows me to flow the contents of an XmlNode between services just like it can be done today with ASP.NET Web Services. In addition, I show how custom message headers can be declared and flowed with (really: inside) messages. But first things first:

The snippet below declares one contract (!) with two endpoint service contracts. One endpoint defines the “seller” side and the other defines the “buyer” side of a duplex conversation that two service implementations will have about a (simplified) purchasing process. It also defines an application-specific (SOAP-) header that is used to flow the purchasing process identifier between the parties. That identifier can be used to locate the process state from disk or from some in-memory location at either side as the conversation progresses.

The seller-side service contract is defined through the ISeller interface that is appropriately labeled with a [ServiceContract] attribute and the buyer-side likewise defined through the IBuyer interface. The fusion of these two interfaces into what is effectively a single contract is established by mutually linking both interfaces by setting the respective CallbackContract property of the [ServiceContract] attribute to the respective other interface type. I highlighted the two places where that’s being done.

When I say “one contract”, that is not really true on the WSDL level. In WSDL, both interfaces would indeed be represented as independent contracts. (Which goes to show that WSDL isn’t really “the contract”, but represents just a subset of the complete metadata model).

Each operation in these contracts is labeled with an [OperationContract] attribute that defines the message flow as IsOneWay=true. That’s so because in a duplex conversation, messages flow always unidirectionally and the receiver answers not by “returning a result”, but rather by sending a message (or multiple messages) to the other party’s endpoint. All operation contracts also define the operation style to be DocumentBare, which means that the infrastructure will not auto-generate body wrapper elements.

Instead, each operation defines its own body wrapper by flagging the XmlNode typed argument for the message content with a [MessageBody] attribute and assigning an appropriate name to it.  Above the XmlNode content argument, you can see how the custom header PurchaseProcessHeader is specified for each operation. Custom headers are flagged with the [MessageHeader] attribute and therefore flow in the soap:Header section of the message.

using System;
using System.Collections.Generic;
using System.ServiceModel;
using System.Runtime.Serialization;
using System.Xml;
using System.Xml.Serialization;

namespace DuplexMessagingConversation
{
    [XmlRoot(Namespace = PurchaseProcessHeader.NamespaceURI)]
    [XmlType(Namespace = PurchaseProcessHeader.NamespaceURI)]
    public class PurchaseProcessHeader
    {
        public const string NamespaceURI="urn:newtelligence-com:indigosamples:purchasing";
        public const string ElementName="PurchaseOrder";

        private string orderIdentifier;
       
        public string OrderIdentifier
        {
            get { return orderIdentifier; }
            set { orderIdentifier = value; }
        }
    }

    [ServiceContract(Namespace = "urn:newtelligence-com:indigosamples:seller",
                     Session = false,
                     CallbackContract = typeof(IBuyer),
                     FormatMode = ContractFormatMode.XmlSerializer)]
    interface ISeller
    {
        [OperationContract(IsOneWay=true,IsInitiating=true,
                           Style=ServiceOperationStyle.DocumentBare)]
        void HandlePurchaseOrder(
            [MessageHeader(Name=PurchaseProcessHeader.ElementName,
                           Namespace=PurchaseProcessHeader.NamespaceURI)]
            PurchaseProcessHeader process,
            [MessageBody(Name="PurchaseOrderMessage")]
            XmlNode purchaseOrder);

        [OperationContract(IsOneWay = true, IsInitiating = false,
                           Style = ServiceOperationStyle.DocumentBare)]
        void HandlePaymentNotification(
            [MessageHeader(Name = PurchaseProcessHeader.ElementName,
                           Namespace = PurchaseProcessHeader.NamespaceURI)]
               PurchaseProcessHeader process,
            [MessageBody(Name = "PaymentNotificationMessage")]
               XmlNode paymentNotification);

        [OperationContract(IsOneWay = true, IsInitiating = false, IsTerminating = true,
                           Style = ServiceOperationStyle.DocumentBare)]
        void HandleShippingConfirmation(
            [MessageHeader(Name = PurchaseProcessHeader.ElementName,
                           Namespace = PurchaseProcessHeader.NamespaceURI)]
            PurchaseProcessHeader process,
            [MessageBody(Name = "ShippingConfirmationMessage")]
            XmlNode shippingConfirmation);
    }

    [ServiceContract(Namespace="urn:newtelligence-com:indigosamples:buyer",
                     Session = false,
                     CallbackContract = typeof(ISeller),
                     FormatMode=ContractFormatMode.XmlSerializer)]
    interface IBuyer
    {
        [OperationContract(IsOneWay = true, IsInitiating = true,
                           Style = ServiceOperationStyle.DocumentBare)]
        void HandlePurchaseOrderConfirmation(
            [MessageHeader(Name = PurchaseProcessHeader.ElementName,
                           Namespace = PurchaseProcessHeader.NamespaceURI)]
            PurchaseProcessHeader process,
            [MessageBody(Name = "PurchaseOrderConfirmationMessage")]
            XmlNode purchaseOrderConfirmation);

        [OperationContract(IsOneWay = true, IsInitiating = false,
                           Style = ServiceOperationStyle.DocumentBare)]
        void HandleInvoice(
            [MessageHeader(Name = PurchaseProcessHeader.ElementName,
                           Namespace = PurchaseProcessHeader.NamespaceURI)]
            PurchaseProcessHeader process,
            [MessageBody(Name = "InvoiceMessage")]
            XmlNode invoice);

        [OperationContract(IsOneWay = true, IsInitiating = false, IsTerminating = true,
                           Style = ServiceOperationStyle.DocumentBare)]
        void HandleShippingNotification(
            [MessageHeader(Name = PurchaseProcessHeader.ElementName,
                           Namespace = PurchaseProcessHeader.NamespaceURI)]
            PurchaseProcessHeader process,
            [MessageBody(Name = "ShippingNotificationMessage")]
            XmlNode shippingNotification);
    }
}

To illustrate the effect of these declarations on the wire (I will spare you the XSD/WSDL goop), I’ll show an sample message (grabbed from the debugger) as it can be seen at the ISeller endpoint’s HandlePurchaseOrder operation when it arrives.    

<s:Envelope xmlns:s="http://www.w3.org/2003/05/soap-envelope"
           
xmlns:a="http://schemas.xmlsoap.org/ws/2004/08/addressing"
           
xmlns:r="http://schemas.xmlsoap.org/ws/2005/01/rm">
    <
s:Header>
        <
a:Action s:mustUnderstand="1">
            urn:newtelligence-com:indigosamples:seller/ISeller/HandlePurchaseOrder
        </a:Action>
        <
h:PurchaseOrder xmlns="urn:newtelligence-com:indigosamples:purchasing"
                        
xmlns:h="urn:newtelligence-com:indigosamples:purchasing"
                        
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                        
xmlns:xsd="http://www.w3.org/2001/XMLSchema">
            <
OrderIdentifier>1234567890</OrderIdentifier>
        </
h:PurchaseOrder>

        <
r:Sequence s:mustUnderstand="1">
            <
r:Identifier>uuid:b99041bf-fab8-45dd-9235-0909d9c61d04;id=2</r:Identifier>
            <
r:MessageNumber>1</r:MessageNumber>
        </
r:Sequence>
        <
a:From>
            <
a:Address>net.tcp://localhost/buyer/reply/e01289a8-424f-4e1a-bba5-b3fb7c92a023</a:Address>
        </
a:From>
        <
a:To s:mustUnderstand="1">net.tcp://localhost/seller</a:To>
    </
s:Header>
    <
s:Body>
        <
PurchaseOrderMessage xmlns="urn:newtelligence-com:indigosamples:seller">
            <
Order xmlns="">...</Order>
        </
PurchaseOrderMessage>

    </
s:Body>
</
s:Envelope>

So … having the contract declaration in place, we can build the service. With your knowledge from the previous parts of this series, the seller side is (almost) straightforward to implement. I create a SellerService supporting the defined ISeller interface and write all operations (methods) in a similar fashion. First I dump out the content of the incoming message and an artificial instance identifier I use to play with instancing.  The only “magic” is in how I obtain the callback channel that I need to be able to send my answers to the other side. To be precise, the magic isn’t mine, it’s sitting inside Indigo. The call  IBuyer buyer = OperationContext.Current.GetCallbackChannel<IBuyer>() yields a ready-to-use channel that is properly configured and bound to the “other side”. Having that in hands, I cook up an answer (or two, or none, as you can see below) and send that to “the buyer”. The hosting class and the service host are standard fare.

using System;
using System.Xml;
using System.ServiceModel;
using System.Runtime.Serialization;

namespace DuplexMessagingConversation
{
    [ServiceBehavior(InstanceMode = InstanceMode.PrivateSession)]
    class SellerService : ISeller
    {
        Guid instanceId = Guid.NewGuid();

        public void HandlePurchaseOrder(PurchaseProcessHeader process, XmlNode data)
        {
            Console.WriteLine("Seller: Purchase Order Received\n\t{0}\n\tInstance {1}",
                              data.OuterXml, instanceId);
            IBuyer buyer = OperationContext.Current.GetCallbackChannel<IBuyer>();

            XmlDocument orderConfirmation = new XmlDocument();
            orderConfirmation.LoadXml("<OrderConfirmation>...</OrderConfirmation>");
            buyer.HandlePurchaseOrderConfirmation(process, orderConfirmation);

            XmlDocument invoice = new XmlDocument();
            invoice.LoadXml("<Invoice>...</Invoice>");
            buyer.HandleInvoice(process, invoice);
        }

        public void HandlePaymentNotification(PurchaseProcessHeader process, XmlNode data)
        {
            Console.WriteLine("Seller: Payment Notification Received\n\t{0}\n\tInstance {1}",
                              data.OuterXml, instanceId);
            IBuyer buyer = OperationContext.Current.GetCallbackChannel<IBuyer>();

            XmlDocument shippingNotification = new XmlDocument();
            shippingNotification.LoadXml("<Shipped>...</Shipped>");
            buyer.HandleShippingNotification(process, shippingNotification);
        }

        public void HandleShippingConfirmation(PurchaseProcessHeader process, XmlNode data)
        {
            Console.WriteLine("Seller: Shipping Confirmation Received\n\t{0}\n\tInstance {1}",
                              data.OuterXml, instanceId);
        }
    }

    class Seller
    {
        ServiceHost<SellerService> serviceHost;

        public void Open()
        {
            serviceHost = new ServiceHost<SellerService>();
            serviceHost.Open();
        }

        public void Close()
        {
            serviceHost.Close();
        }
    }
}

The buyer-side’s service implementation looks almost identical. The one significant difference here is that the buyer is (in the self-hosted scenario I have here: must be) a singleton within the scope of the conversation. That means that the initiator of the conversation (what we usually call “client”) will have to create a service instance and hand that down into the infrastructure. Because I want to know when the conversation is over and can shut down my test program, I hand a ManualResetEvent to the service instance and have it Set it to signaled whenever the buyer’s last expected message in the purchasing process arrives (shipping notification). Otherwise the service implementation doesn’t have any more surprises.

More interesting is the InitiatePurchase method. It predictably creates a service host instance for the buyer service and a channel factory that we need to send the first message (purchase order) to the seller. From there onwards, things are a little different than in the previous examples.

As the next step, I create a “service site”, which acts as the manager for the duplex conversation we’re setting up. The ServiceSite is initialized with the service host and a newly created service instance. As I indicated in the previous paragraph, that instance is a singleton for the conversation; it’s not a singleton per-se.

Using the service site as an argument, I can now create a duplex channel with a call to CreateDuplexChannel on the channel factory. The resulting channel is set up to do everything necessary to listen for answers in the scope of the conversation and to relay the required “send answers here” info to the other side. If you look at the SOAP message above, you’ll see how that back reference is flowing using a WS-Addressing wsa:From header, which is a reasonable thing to do as per WS-Addressing (see: 3. / [reply endpoint] paragraph).

Once I have the channel in hands, I create the custom header instance and a purchase order document (well…) and send it off to the seller side. Once that’s done, I hang out and wait until the conversation is over and subsequently shut down.

Using System;
using System.Xml;
using System.ServiceModel;
using System.Threading;

namespace DuplexMessagingConversation
{
    class BuyerService : IBuyer
    {
        Guid instanceId = Guid.NewGuid();
        ManualResetEvent waitHandle;

        public BuyerService(ManualResetEvent waitHandle)
        {
            this.waitHandle = waitHandle;
        }

        public void HandlePurchaseOrderConfirmation(PurchaseProcessHeader process, XmlNode data)
        {
            Console.WriteLine("Buyer: Purchase Order Confirmation Received\n\t{0}\n\tInstance {1}",
                               data.OuterXml, instanceId);
            return;
        }

        public void HandleInvoice(PurchaseProcessHeader process, XmlNode data)
        {
            Console.WriteLine("Buyer: Invoice Received\n\t{0}\n\tInstance {1}",
                               data.OuterXml, instanceId);
            ISeller seller = OperationContext.Current.GetCallbackChannel<ISeller>();

            XmlDocument paymentNotification = new XmlDocument();
            paymentNotification.LoadXml("<Payment>...</Payment>");
            seller.HandlePaymentNotification(process, paymentNotification);
        }

        public void HandleShippingNotification(PurchaseProcessHeader process, XmlNode data)
        {
            Console.WriteLine("Buyer: Shipping Notification Received\n\t{0}\n\tInstance {1}",
                               data.OuterXml, instanceId);
            ISeller seller = OperationContext.Current.GetCallbackChannel<ISeller>();

            XmlDocument shippingConfirmation = new XmlDocument();
            shippingConfirmation.LoadXml("<ShipmentReceived>...</ShipmentReceived>");
            seller.HandleShippingConfirmation(process, shippingConfirmation);
            waitHandle.Set();
        }
    }

    class Buyer
    {
        public void InitiatePurchase()
        {
            ServiceHost<BuyerService> buyerHost = new ServiceHost<BuyerService>();
            using (ChannelFactory<ISeller> channelFactory = new ChannelFactory<ISeller>("clientChannel"))
            {
                ManualResetEvent conversationDone = new ManualResetEvent(false);
                using (ServiceSite replyTarget = new ServiceSite(buyerHost, new BuyerService(conversationDone)))
                {
                    ISeller channel = channelFactory.CreateDuplexChannel(replyTarget);

                    PurchaseProcessHeader header = new PurchaseProcessHeader();
                    header.OrderIdentifier = "1234567890";

                    XmlDocument purchaseOrderDocument = new XmlDocument();
                    purchaseOrderDocument.LoadXml("<Order>...</Order>");
                    channel.HandlePurchaseOrder(header, purchaseOrderDocument);

                    conversationDone.WaitOne();
                    replyTarget.Close();
                }
                channelFactory.Close();
            }
            buyerHost.Close();
        }
   }
}

The Program is simple and predictable; I am just posting it for completeness and because I renamed the classes.

using System;

namespace DuplexMessagingConversation
{
    class Program
    {
        static void Main(string[] args)
        {
            Seller server = new Seller();
            server.Open();

            Buyer client = new Buyer();
            client.InitiatePurchase();

            Console.WriteLine("Press ENTER to quit");
            Console.ReadLine();
            server.Close();
        }
    }
}

The configuration file that goes with this example is of course a bit different from the previous ones. The <client> section and the buyerClientBinding binding configuration apply to the buyer side, and the <services> section and the sellerBinding are for the seller side. These sections would be respectively split across two configuration files, if we would host the sample in two processes.

Of course, the buyer’s <client>/<endpoint> definition for the channel refers to the buyerClientBinding. That binding defines three required binding elements: <reliableSession> configures the channel to use a reliable messaging session with default values, <compositeDuplex/> enables duplex support and <tcpTransport/> selects the TCP transport. The order of these elements is significant and defines how these “behaviors” are stacked in the channel. Quite special is the clientBaseAddress attribute of the <compositeDuplex/> element; this value is used as the base URI to dynamically construct the endpoint on which replies shall be received by the buyer instance for this conversation. The result of that composition can be seen in the wsa:From element in the SOAP message above.

The seller-side configuration for the <service> and its <endpoint> is largely equivalent to what I’ve explained in the previous examples. The only real difference is that the sellerBinding binding now also defines the required binding elements and behaviors I just pointed out.

<?xml version="1.0" encoding="utf-8" ?>
<
configuration xmlns="http://schemas.microsoft.com/.NetConfiguration/v2.0">
    <
system.serviceModel>
        <
bindings>
            <
customBinding>
                <
binding configurationName="sellerBinding">
                    <
reliableSession/>
                    <
compositeDuplex/>
                    <
tcpTransport/>
                </
binding>
                <
binding configurationName="buyerClientBinding">
                    <
reliableSession/>
                    <
compositeDuplex clientBaseAddress="net.tcp://localhost/buyer/reply"/>
                    <
tcpTransport/>
                </
binding>
            </
customBinding>
        </
bindings>
        <
client>
            <
endpoint address="net.tcp://localhost/seller"
                      
bindingConfiguration="buyerClientBinding"
                      bindingType="customBinding"
                     
configurationName="clientChannel"
                     
contractType="DuplexMessagingConversation.ISeller, DuplexMessagingConversation"/>
        </
client>
        <
services>
            <
service serviceType="DuplexMessagingConversation.SellerService, DuplexMessagingConversation">
                <
endpoint contractType="DuplexMessagingConversation.ISeller, DuplexMessagingConversation"
                     address="net.tcp://localhost/seller"
                     bindingType="customBinding"
                     bindingConfiguration="sellerBinding" />
            </
service>
        </
services>
    </
system.serviceModel>
</
configuration>

And, lastly, here’s the output:

Seller: Purchase Order Received

        <Order xmlns="">...</Order>

        Instance eb628fce-ac56-43af-9326-5bfc62a101dc

Buyer: Purchase Order Confirmation Received

        <OrderConfirmation xmlns="">...</OrderConfirmation>

        Instance c1ce0c0f-fb98-4432-86fb-c81ac7243295

Buyer: Invoice Received

        <Invoice xmlns="">...</Invoice>

        Instance c1ce0c0f-fb98-4432-86fb-c81ac7243295

Seller: Payment Notification Received

        <Payment xmlns="">...</Payment>

        Instance eb628fce-ac56-43af-9326-5bfc62a101dc

Buyer: Shipping Notification Received

        <Shipped xmlns="">...</Shipped>

        Instance c1ce0c0f-fb98-4432-86fb-c81ac7243295

Seller: Shipping Confirmation Received

        <ShipmentReceived xmlns="">...</ShipmentReceived>

        Instance eb628fce-ac56-43af-9326-5bfc62a101dc

Press ENTER to quit

Again, the messages are free form XML, so I am using Indigo strictly as a raw messaging platform. It’s just a bit more powerful. ;-) If I’d show you a functionally equivalent application based on System.Messaging and MSMQ, you wouldn’t be done reading, yet.  

Updated:

Leave a Comment