.NET Services March 2009 CTP - Service Bus Routers and Queues - Part 4: The REST Queue Protocol in Code Snippets
After I’ve gone through the dry facts of the REST Queue Protocol in Part 3 of this series, here’s some code to look at. The code snippets are ripped from the HTTP Queue sample we’ve got in the .NET Services SDK and you can get the copy/paste ready code from there.
The most straightforward way to acquire the required hashed security token (see my comments to Stefan Tilkov) using HttpWebRequest looks like this:
1: static string HttpGetAuthenticationToken(string username, string password)2: {3: string tokenUri = string.Format("https://{0}/issuetoken.aspx?u={1}&p={2}",4: ServiceBusEnvironment.DefaultIdentityHostName,5: username, Uri.EscapeDataString(password));6:7: HttpWebRequest tokenRequest = (HttpWebRequest)WebRequest.Create(tokenUri);8: using (var tokenResponse = tokenRequest.GetResponse())
9: {10: using (var tokenResponseStream = tokenResponse.GetResponseStream())
11: {12: byte[] tokenBody = new byte[500];13: int tokenBodyLength = tokenResponseStream.Read(tokenBody, 0, 500);
14: return Encoding.UTF8.GetString(tokenBody, 0, tokenBodyLength);
15: }16: }17: }18:
Once you’ve got the token, you need to pick a place in the Service Bus namespace. We’ve got a built-in helper function in the SDK that sits on ServiceBusEnvironment and which knows all the right incantations:
1: Uri queueUri = ServiceBusEnvironment.CreateServiceUri(2: Uri.UriSchemeHttps,3: solutionName,4: "/MyHttpQueue/");
Invoked in this way, the method yields the URI https://yourproject.servicebus.windows.net/MyHttpQueue/ so there’s spectacularly little magic to it. The reason why we recommend that you use the method in .NET applications is that we’ve broken everyone’s apps going from the previous CTP to the current CTP due to the namespace restructuring and we’d like to avoid doing that again as we improve the namespace story. I don’t foresee any further change of the magnitude we had this time, however.
To create a new Queue, you first create a Queue policy and then apply it to the Service Bus namespace name. We’ve got a Queue Policy object in the SDK that I’ll use here to keep things short. I’m accepting all the defaults that are spelled out in Part 2 of the series, but explicitly I’m setting the expiration to 1 hour. The Queue Policy XSD schema in the the HTTP Queue sample’s readme document, by the way.
1: QueuePolicy queuePolicy = new QueuePolicy();
2: queuePolicy.ExpirationInstant = DateTime.UtcNow + TimeSpan.FromHours(1);3: queueManageUri = HttpCreateQueue(token, queueUri, queuePolicy);
The code for creating the Queue shouldn’t be very surprising. It’s a POST of an Atom 1.0 entry for which I’m using HttpWebRequest and the WCF 3.5 Syndication API. The SDK sample is using a rather terse version of the ‘correct’ way of doing things. I’m quoting the expanded variant from the Text140 sample that I showed at MIX’09:
1: public static QueuePolicy HttpCreateQueue(string token, Uri queueUri, QueuePolicy policy,2: out Uri queueSelfUri, out Uri queueHeadUri)3: {4: HttpWebRequest webRequest;5: HttpWebResponse webResponse;6:7: queueSelfUri = null;
8: queueHeadUri = null;
9:10: // Create a new syndication item and add the queue policy as an extension
11: SyndicationItem syndicationItem = new SyndicationItem();
12: syndicationItem.ElementExtensions.Add(13: new SyndicationElementExtension("QueuePolicy",14: "http://schemas.microsoft.com/ws/2007/08/connect",
15: policy));16:17: // create a new POST request and set the Content-Type to represent an Atom 1.0 entry
18: webRequest = HttpWebRequest.Create(queueUri) as HttpWebRequest;
19: webRequest.ConnectionGroupName = "queueclient";
20: webRequest.KeepAlive = false;
21: webRequest.Method = "POST";
22: webRequest.Headers.Add("X-MS-Identity-Token", token);
23: webRequest.ContentType = "application/atom+xml;type=entry;charset=utf-8";
24:25: // write the item to the request stream
26: using (var requestStream = webRequest.GetRequestStream())
27: {28: using (var textWriter = new XmlTextWriter(requestStream, Encoding.UTF8))29: {30: syndicationItem.GetAtom10Formatter().WriteTo(textWriter);31: textWriter.Flush();32: }33: }34:35: // get the response
36: using (webResponse = webRequest.GetResponse() as HttpWebResponse)37: {38: return GetQueueInfoFromResponse(webResponse, queueUri, out queueSelfUri, out queueHeadUri);39: }40: }41:42: static QueuePolicy GetQueueInfoFromResponse(HttpWebResponse webResponse, Uri queueUri,
43: out Uri queueSelfUri, out Uri queueHeadUri)44: {45: queueHeadUri = null;
46: queueSelfUri = null;
47:48: if (webResponse.ContentType.StartsWith("application/atom+xml;type=entry", StringComparison.OrdinalIgnoreCase))49: {50: Atom10ItemFormatter atomItemFormatter = new Atom10ItemFormatter();
51: using (var responseReader = new XmlTextReader(webResponse.GetResponseStream()))52: {53: atomItemFormatter.ReadFrom(responseReader);54: // we found it. let's get the policy and the queue head URI
55: foreach (SyndicationLink queueSelfLink in56: from link in atomItemFormatter.Item.Links
57: where link.RelationshipType.Equals("self", StringComparison.OrdinalIgnoreCase)
58: select link)59: {60: queueSelfUri = queueSelfLink.Uri;61: break;
62: }63:64: foreach (SyndicationLink queueHeadLink in65: from link in atomItemFormatter.Item.Links
66: where link.RelationshipType.Equals("queuehead", StringComparison.OrdinalIgnoreCase)
67: select link)68: {69: queueHeadUri = queueHeadLink.Uri;70: break;
71: }72:73: return atomItemFormatter.Item.ElementExtensions.ReadElementExtensions<QueuePolicy>(
74: "QueuePolicy",
75: "http://schemas.microsoft.com/ws/2007/08/connect").FirstOrDefault();
76: }77: }78: else
79: {80: Atom10FeedFormatter atomFeedFormatter = new Atom10FeedFormatter();
81: using (var responseReader = new XmlTextReader(webResponse.GetResponseStream()))82: {83: atomFeedFormatter.ReadFrom(responseReader);84:85: // look for an item whose alternate-link equals the desired queue URI. The alternate
86: // URI is the 'tail' endpoint by which the queue is mapped into the namespace. We are
87: // comparing Authority and Path since the scheme might be http or https and we want to match
88: // either.
89: foreach (SyndicationItem requestedItem in90: from item in atomFeedFormatter.Feed.Items
91: from link in item.Links
92: where93: (link.RelationshipType.Equals("alternate", StringComparison.OrdinalIgnoreCase) &&
94: link.Uri.Authority.Equals(queueUri.Authority, StringComparison.OrdinalIgnoreCase) &&95: link.Uri.AbsolutePath.Equals(queueUri.AbsolutePath, StringComparison.OrdinalIgnoreCase)) ||96: (link.RelationshipType.Equals("self", StringComparison.OrdinalIgnoreCase) &&
97: link.Uri.Equals(queueUri))98: select item)99: {100: // we found it. let's get the policy and the queue head URI
101: foreach (SyndicationLink queueSelfLink in102: from link in requestedItem.Links
103: where link.RelationshipType.Equals("self", StringComparison.OrdinalIgnoreCase)
104: select link)105: {106: queueSelfUri = queueSelfLink.Uri;107: break;
108: }109:110: foreach (SyndicationLink queueHeadLink in111: from link in requestedItem.Links
112: where link.RelationshipType.Equals("queuehead", StringComparison.OrdinalIgnoreCase)
113: select link)114: {115: queueHeadUri = queueHeadLink.Uri;116: break;
117: }118:119: return requestedItem.ElementExtensions.ReadElementExtensions<QueuePolicy>(
120: "QueuePolicy",
121: "http://schemas.microsoft.com/ws/2007/08/connect").FirstOrDefault();
122: }123: }124: }125: return null;126: }
The “magic” that isn’t straight up Atom Pub is in lines 10-15 above. I’m attaching the policy object to the SyndicationItem abstraction that will be written out as an Atom 1.0 entry. [I know that copy/paste is difficult using this format – I need to clean up that expanded utility class and will post it in downloadable form within the next few days]
The method returns the effective policy that the Service Bus is using, and yields the management URI for the queue where the Atom entry resides (self-link) as well as the head of the queue as output arguments. The queueUri to which you applied the entry has morphed into the Queue’s tail when the method returns successfully. Having the Queue by its tail, submitting messages into it is very simple. Just do a plain POST. Here we’re just writing a simple string from the local variable input as the entity body and send it.
1: // send
2: HttpWebRequest sendRequest = HttpWebRequest.Create(queueUri) as HttpWebRequest;
3: sendRequest.Method = "POST";
4: sendRequest.Headers.Add("X-MS-Identity-Token", token);
5: sendRequest.ContentType = "text/plain;charset=utf-8";
6: using (var sendStream = sendRequest.GetRequestStream())
7: {8: using (var writer = new StreamWriter(sendStream, Encoding.UTF8))9: {10: writer.Write(input);11: writer.Flush();12: }13: }14: sendRequest.GetResponse().Close();
For a simple destructive read, you use the ‘queue head’ URI and do a DELETE:
1: HttpWebRequest dequeueRequest =2: HttpWebRequest.Create(queueHeadUri.AbsoluteUri+3: "?encoding=asreply&maxmessages=1&timeout=60") as HttpWebRequest;4: dequeueRequest.ConnectionGroupName = "dequeue";
5: dequeueRequest.Method = "DELETE";
6: dequeueRequest.ContentLength = 0;7: dequeueRequest.Headers.Add("X-MS-Identity-Token", token);
8: using (HttpWebResponse response = dequeueRequest.GetResponse() as HttpWebResponse)9: {10: if (response.StatusCode == HttpStatusCode.OK)
11: {12: using (var responseBody = response.GetResponseStream())
13: {14: using (var responseReader = new StreamReader(responseBody, Encoding.UTF8))15: {16: string data = responseReader.ReadToEnd();
17: Console.WriteLine(data);18: }19: }20: }21: }
For a peek/lock read where you first lock the message and then delete the lock when you want to keep the message, you do a POST on the head and remember the X-MS-Message-Lock header value:
1: HttpWebRequest dequeueRequest =2: HttpWebRequest.Create(queueHeadUri.AbsoluteUri+3: "?encoding=asreply&maxmessages=1&timeout=60") as HttpWebRequest;4: dequeueRequest.ConnectionGroupName = "dequeue";
5: dequeueRequest.Method = "POST";
6: dequeueRequest.ContentLength = 0;7: dequeueRequest.Headers.Add("X-MS-Identity-Token", token);
8: using (HttpWebResponse response = dequeueRequest.GetResponse() as HttpWebResponse)9: {10: if (response.StatusCode == HttpStatusCode.OK)
11: {12: lockUri = new Uri(response.Headers["X-MS-Message-Lock"]);13: using (var responseBody = response.GetResponseStream())
14: {15: using (var responseReader = new StreamReader(responseBody, Encoding.UTF8))16: {17: string data = responseReader.ReadToEnd();
18: Console.WriteLine(data);19: }20: }21: }22: }
If you decide to keep the message (i.e. your app didn’t puke processing it), you DELETE the lock, otherwise you do a PUT. The code is practically the same except for the method so I’ll just quote DELETE:
1: HttpWebRequest deleteLockedMessageRequest = HttpWebRequest.Create(lockUri) as HttpWebRequest;
2: deleteLockedMessageRequest.ConnectionGroupName = "lock";
3: deleteLockedMessageRequest.Method = "DELETE";
4: deleteLockedMessageRequest.KeepAlive=false;
5: deleteLockedMessageRequest.ContentLength = 0;6: deleteLockedMessageRequest.Headers.Add("X-MS-Identity-Token", token);
7: deleteLockedMessageRequest.ContentLength = 0;8: deleteLockedMessageRequest.GetResponse().Close();
That’s it. Create, Enqueue, Destructive Read, Peek/Lock Read, and .. oh .. yes.. forgot Delete the Queue. Take the snippet above and swap out the lockUri for the queueSelfUri. Done.
Ah, yes, and please … don’t blame me for HTTP or HttpWebRequest or Clemens’ demo coding style requiring too many lines of code. I could have applied a bit of code-compression here, but I’m intentionally trying not to abstract away too much of the protocol here. I’m generally with you, though. The guys putting together the WCF REST Starter Kit are working on making code like that shrink.