RabbitMQ for Windows: Headers Exchanges

On May 29, 2012, in Uncategorized, by derekgreer

This is the eighth and final installment to the series: RabbitMQ for Windows.  In the last installment, we walked through creating a topic exchange example.  As the last installment, we’ll walk through a headers exchange example.

Headers exchanges examine the message headers to determine which queues a message should be routed to.  As discussed earlier in this series, headers exchanges are similar to topic exchanges in that they allow you to specify multiple criteria, but offer a bit more flexibility in that the headers can be constructed using a wider range of data types (1).

To subscribe to receive messages from a headers exchange, a dictionary of headers is specified as part of the binding arguments.  In addition to the headers, a key of “x-match” is also included in the dictionary with a value of “all”, specifying that messages must be published with all the specified headers in order to match, or “any”, specifying that the message needs to only have one of the specified headers specified.

As our final example, we’ll create a Producer application which publishes the message “Hello, World!” using a headers exchange.  Here’s our Producer code:

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Text;
using System.Threading;
using RabbitMQ.Client;
using RabbitMQ.Client.Framing.v0_9_1;

namespace Producer
{
  class Program
  {
    const string ExchangeName = "header-exchange-example";

    static void Main(string[] args)
    {
      var connectionFactory = new ConnectionFactory();
      connectionFactory.HostName = "localhost";

      IConnection connection = connectionFactory.CreateConnection();
      IModel channel = connection.CreateModel();
      channel.ExchangeDeclare(ExchangeName, ExchangeType.Headers, false, true, null);
      byte[] message = Encoding.UTF8.GetBytes("Hello, World!");

      var properties = new BasicProperties();
      properties.Headers = new Dictionary<string, object>();
      properties.Headers.Add("key1", "12345");
      
      TimeSpan time = TimeSpan.FromSeconds(10);
      var stopwatch = new Stopwatch();
      Console.WriteLine("Running for {0} seconds", time.ToString("ss"));
      stopwatch.Start();
      var messageCount = 0;

      while (stopwatch.Elapsed < time)
      {
        channel.BasicPublish(ExchangeName, "", properties, message);
        messageCount++;
        Console.Write("Time to complete: {0} seconds - Messages published: {1}\r", (time - stopwatch.Elapsed).ToString("ss"), messageCount);
        Thread.Sleep(1000);
      }

      Console.Write(new string(' ', 70) + "\r");
      Console.WriteLine("Press any key to exit");
      Console.ReadKey();
      message = Encoding.UTF8.GetBytes("quit");
      channel.BasicPublish(ExchangeName, "", properties, message);
      connection.Close();
    }
  }
}

In the Producer, we’ve used a generic dictionary of type Dictionary<string, object> and added a single key “key1” with a value of “12345”.  As with our previous example, we’re using a stopwatch as a way to publish messages continually for 10 seconds.

For our Consumer application, we can use an “x-match” argument of “all” with the single key/value pair specified by the Producer, or we can use an “x-match” argument of “any” which includes the key/value pair specified by the Producer along with other potential matches.  We’ll use the latter for our example.   Here’s our Consumer code:

using System;
using System.Collections;
using System.Collections.Generic;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace Consumer
{
  class Program
  {
    const string QueueName = "header-exchange-example";
    const string ExchangeName = "header-exchange-example";

    static void Main(string[] args)
    {
      var connectionFactory = new ConnectionFactory();
      connectionFactory.HostName = "localhost";

      IConnection connection = connectionFactory.CreateConnection();
      IModel channel = connection.CreateModel();
      channel.ExchangeDeclare(ExchangeName, ExchangeType.Headers, false, true, null);
      channel.QueueDeclare(QueueName, false, false, true, null);

      IDictionary specs = new Dictionary();
      specs.Add("x-match", "any");
      specs.Add("key1", "12345");
      specs.Add("key2", "123455");
      channel.QueueBind(QueueName, ExchangeName, string.Empty, specs);

      channel.StartConsume(QueueName, MessageHandler);
      connection.Close();
    }

    public static void MessageHandler(IModel channel, DefaultBasicConsumer consumer, BasicDeliverEventArgs eventArgs)
    {
      string message = Encoding.UTF8.GetString(eventArgs.Body);
      Console.WriteLine("Message received: " + message);
      foreach (object headerKey in eventArgs.BasicProperties.Headers.Keys)
      {
        Console.WriteLine(headerKey + ": " + eventArgs.BasicProperties.Headers[headerKey]);
      }

      if (message == "quit")
        channel.BasicCancel(consumer.ConsumerTag);
    }
  }
}

Rather than handling our messages inline as we’ve done in previous examples, this example uses an extension method named StartConsume() which accepts a callback to be invoked each time a message is received.  Here’s the extension method used by our example:

using System;
using System.IO;
using System.Threading;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace Consumer
{
  public static class ChannelExtensions
  {
    public static void StartConsume(this IModel channel, string queueName,  Action<IModel, DefaultBasicConsumer, BasicDeliverEventArgs> callback)
    {
      var consumer = new QueueingBasicConsumer(channel);
      channel.BasicConsume(queueName, true, consumer);

      while (true)
      {
        try
        {
          var eventArgs = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
          new Thread(() => callback(channel, consumer, eventArgs)).Start();
        }
        catch (EndOfStreamException)
        {
          // The consumer was cancelled, the model closed, or the connection went away.
          break;
        }
      }
    }
  }
}

Setting our solution to run both the Producer and Consumer applications upon startup, running our example produces output similar to the following:

Producer

Running for 10 seconds
Time to complete: 08 seconds - Messages published: 2

Consumer

Message received: Hello, World!
key1: 12345
Message received: Hello, World!
key1: 12345

That concludes our headers exchange example as well as the RabbitMQ for Windows series.  For more information on working with RabbitMQ, see the documentation at http://www.rabbitmq.com or the purchase the book RabbitMQ in Action by Alvaro Videla and Jason Williams.  I hope you enjoyed the series.

 

Footnotes:

1 – See http://www.rabbitmq.com/amqp-0-9-1-errata.html#section_3 and http://hg.rabbitmq.com/rabbitmq-dotnet-client/diff/4def852523e2/projects/client/RabbitMQ.Client/src/client/impl/WireFormatting.cs for supported field types.

Tagged with:  

RabbitMQ for Windows: Topic Exchanges

On May 18, 2012, in Uncategorized, by derekgreer

This is the seventh installment to the series: RabbitMQ for Windows.  In the last installment, we walked through creating a fanout exchange example.  In this installment, we’ll be walking through a topic exchange example.

Topic exchanges are similar to direct exchanges in that they use a routing key to determine which queue a message should be delivered to, but they differ in that they provide the ability to match on portions of a routing key.  When publishing to a topic exchange, a routing key consisting of multiple words separated by periods (e.g. “word1.word2.word3”) will be matched against a pattern supplied by the binding queue.  Patterns may contain an asterisk (“*”) to match a word in a specific segment or a hash (“#”) to match zero or more words.  As discussed earlier in the series, the topic exchange type can be useful for directing messages based on multiple categories or for routing messages originating from multiple sources.

To demonstrate topic exchanges, we’ll return to our logging example, but this time we’ll subscribe to a subset of the messages being published to demonstrate the flexibility of how routing keys are used by topic exchanges.  For this example, we’ll be modeling a scenario where a company may have multiple client installations, each of which may be used to service different sectors of a company’s business model (e.g. Business or Personal sectors).  We’ll use a routing key that specifies the sector and subscribe to messages published for the Personal sector only.

As with our previous examples, we’ll keep things simple by creating console applications for a Producer and a Consumer.  Let’s start by creating the Producer app and establishing a connection using the default settings:

using RabbitMQ.Client;

namespace Producer
{
  class Program
  {
    const long ClientId = 10843;

    static void Main(string[] args)
    {
      var connectionFactory = new ConnectionFactory();
      IConnection connection = connectionFactory.CreateConnection();
    }
  }
}

 

Rather than just publishing messages directly from the Main() method as with our first logging example, let’s create a separate logger object this time.  Here the logger interface and implementation we’ll be using:

  interface ILogger
  {
    void Write(Sector sector, string entry, TraceEventType traceEventType);
  }

  class RabbitLogger : ILogger, IDisposable
  {
    readonly long _clientId;
    readonly IModel _channel;
    bool _disposed;

    public RabbitLogger(IConnection connection, long clientId)
    {
      _clientId = clientId;
      _channel = connection.CreateModel();
      _channel.ExchangeDeclare("direct-exchange-example", ExchangeType.Topic, false, true, null);
    }

    public void Dispose()
    {
      if (!_disposed)
      {
        if (_channel != null && _channel.IsOpen)
        {
          _channel.Close();
        }
      }
      GC.SuppressFinalize(this);
    }

    public void Write(Sector sector, string entry, TraceEventType traceEventType)
    {
      byte[] message = Encoding.UTF8.GetBytes(entry);
      string routingKey = string.Format("{0}.{1}.{2}", _clientId, sector.ToString(), traceEventType.ToString());
      _channel.BasicPublish("topic-exchange-example", routingKey, null, message);
    }

    ~RabbitLogger()
    {
      Dispose();
    }
  }

In addition to an open IConnection, our RabbitLogger class is instantiated with a client Id.  We use this as part of the routing key.  Since each log can vary by sector, we pass a Sector enum as part of the Write() method.  Here’s our Sector enum:

  public enum Sector
  {
    Personal,
    Business
  }

Returning to our Main() method, we now need to instantiate our RabbitLogger and log messages with differing sectors.  As as way to ensure our client has an opportunity to subscribe to our messages and to help emulate a continual stream of log messages being published, let’s use the logger to publish a series of log messages every second for 10 seconds:

      TimeSpan time = TimeSpan.FromSeconds(10);
      var stopwatch = new Stopwatch();
      Console.WriteLine("Running for {0} seconds", time.ToString("ss"));
      stopwatch.Start();

      while (stopwatch.Elapsed < time)
      {
        using (var logger = new RabbitLogger(connection, ClientId))
        {
          Console.Write("Time to complete: {0} seconds\r", (time - stopwatch.Elapsed).ToString("ss"));
          logger.Write(Sector.Personal, "This is an information message", TraceEventType.Information);
          logger.Write(Sector.Business, "This is an warning message", TraceEventType.Warning);
          logger.Write(Sector.Business, "This is an error message", TraceEventType.Error);
          Thread.Sleep(1000);
        }
      }

This code prints out the time remaining just to give us a little feedback on the publishing progress.  Finally, we’ll close our our connection and prompt the user to exit the console application:

      connection.Close();
      Console.Write("                             \r");
      Console.WriteLine("Press any key to exit");
      Console.ReadKey();

 

Here’s the full Producer listing:

using System;
using System.Diagnostics;
using System.Text;
using System.Threading;
using RabbitMQ.Client;

namespace Producer
{
  public enum Sector
  {
    Personal,
    Business
  }

  interface ILogger
  {
    void Write(Sector sector, string entry, TraceEventType traceEventType);
  }

  class RabbitLogger : ILogger, IDisposable
  {
    readonly long _clientId;
    readonly IModel _channel;
    bool _disposed;

    public RabbitLogger(IConnection connection, long clientId)
    {
      _clientId = clientId;
      _channel = connection.CreateModel();
      _channel.ExchangeDeclare("direct-exchange-example", ExchangeType.Topic, false, true, null);
    }

    public void Dispose()
    {
      if (!_disposed)
      {
        if (_channel != null && _channel.IsOpen)
        {
          _channel.Close();
        }
      }
      GC.SuppressFinalize(this);
    }

    public void Write(Sector sector, string entry, TraceEventType traceEventType)
    {
      byte[] message = Encoding.UTF8.GetBytes(entry);
      string routingKey = string.Format("{0}.{1}.{2}", _clientId, sector.ToString(), traceEventType.ToString());
      _channel.BasicPublish("topic-exchange-example", routingKey, null, message);
    }

    ~RabbitLogger()
    {
      Dispose();
    }
  }

  class Program
  {
    const long ClientId = 10843;

    static void Main(string[] args)
    {
      var connectionFactory = new ConnectionFactory();
      IConnection connection = connectionFactory.CreateConnection();

      TimeSpan time = TimeSpan.FromSeconds(10);
      var stopwatch = new Stopwatch();
      Console.WriteLine("Running for {0} seconds", time.ToString("ss"));
      stopwatch.Start();

      while (stopwatch.Elapsed < time)
      {
        using (var logger = new RabbitLogger(connection, ClientId))
        {
          Console.Write("Time to complete: {0} seconds\r", (time - stopwatch.Elapsed).ToString("ss"));
          logger.Write(Sector.Personal, "This is an information message", TraceEventType.Information);
          logger.Write(Sector.Business, "This is an warning message", TraceEventType.Warning);
          logger.Write(Sector.Business, "This is an error message", TraceEventType.Error);
          Thread.Sleep(1000);
        }
      }

      connection.Close();
      Console.Write("                             \r");
      Console.WriteLine("Press any key to exit");
      Console.ReadKey();
    }
  }
}

 

For our Consumer app, we’ll pretty much be using the same code as with our fanout exchange example, but we’ll need to change the exchange type along with the exchange and queue names.  Additionally, we also need to provide a routing key that registers for logs in the Personal sector only.  The messages published by the Producer will be in the form: [client Id].[sector].[log severity], so we can use a routing key of “*.Personal.*” (or alternately “*.Personal.#”).  Here’s the full Consumer listing:

using System;
using System.IO;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace Consumer
{
  class Program
  {
    static void Main(string[] args)
    {
      var connectionFactory = new ConnectionFactory();
      IConnection connection = connectionFactory.CreateConnection();
      IModel channel = connection.CreateModel();

      channel.ExchangeDeclare("topic-exchange-example", ExchangeType.Topic, false, true, null);
      channel.QueueDeclare("log", false, false, true, null);
      channel.QueueBind("log", "topic-exchange-example", "*.Personal.*");

      var consumer = new QueueingBasicConsumer(channel);
      channel.BasicConsume("log", true, consumer);

      while (true)
      {
        try
        {
          var eventArgs = (BasicDeliverEventArgs) consumer.Queue.Dequeue();
          string message = Encoding.UTF8.GetString(eventArgs.Body);
          Console.WriteLine(string.Format("{0} - {1}", eventArgs.RoutingKey, message));
        }
        catch (EndOfStreamException)
        {
          // The consumer was cancelled, the model closed, or the connection went away.
          break;
        }
      }

      channel.Close();
      connection.Close();
    }
  }
}

 

Setting the solution to run both the Producer and Consumer on startup, we should see similar output to the following listings:

 

Producer

Running for 10 seconds
Time to complete: 06 seconds

 

Consumer

10843.Personal.Information - This is an information message
10843.Personal.Information - This is an information message
10843.Personal.Information - This is an information message
10843.Personal.Information - This is an information message
10843.Personal.Information - This is an information message
10843.Personal.Information - This is an information message
10843.Personal.Information - This is an information message

 

This concludes our topic exchange example.  Next time, we’ll walk through an example using the final exchange type: Header Exchanges.

Tagged with:  

RabbitMQ for Windows: Fanout Exchanges

On May 16, 2012, in Uncategorized, by derekgreer

This is the sixth installment to the series: RabbitMQ for Windows. In the last installment, we walked through creating a direct exchange example and introduced the push API. In this installment, we’ll walk through a fanout exchange example.

As discussed earlier in the series, the fanout exchange type is useful for facilitating the publish-subscribe pattern. When we publish a message to a fanout exchange, the message is delivered indiscriminately to all bound queues. With the Direct, Topic, and Headers exchange types, a criteria is used by a routing algorithm taking the form of a routing key or a collection of message headers depending on the exchange type in question. A routing key or a collection of message headers may also be specified with the fanout exchange which will be delivered as part of the message’s metadata, but they will not be used as a filter in determining which queue receives a published message.

To demonstrate the fanout exchange, we’ll use a stock ticker example. In the previous example, logs were routed to queues based upon a matching routing key (an empty string in the logging example’s case). In this example, we’d like our messages to be delivered to all bound queues regardless of qualification.

Similar to the previous example, we’ll create a Producer console application which periodically publishes stock quote messages and a Consumer console application which displays the message to the console.

We’ll start our Producer app as before by establishing a connection using the default settings, creating the connection, and creating a channel:

namespace Producer
{
  class Program
  {
    static volatile bool _cancelling;

    static void Main(string[] args)
    {
      var connectionFactory = new ConnectionFactory();
      IConnection connection = connectionFactory.CreateConnection();
      IModel channel = connection.CreateModel();
    }
  }
}

Next, we need to declare an exchange of type “fanout”. We’ll name our new exchange “fanout-exchange-example”:

channel.ExchangeDeclare("direct-exchange-example", ExchangeType.Fanout, false, true, null);

To publish the stock messages periodically, we’ll call a PublishQuotes() method with the provided channel and run it on a background thread:

var thread = new Thread(() => PublishQuotes(channel));
thread.Start();

Next, we’ll provide a way to exit the application by prompting the user to enter ‘x’ and use a simple Boolean to signal the background thread when to exit:

Console.WriteLine("Press 'x' to exit");
var input = (char) Console.Read();
_cancelling = true;

Lastly, we need to close the channel and connection:

channel.Close();
connection.Close();

For our PublishQuotes() method, well iterate over a set of stock symbols, retrieve the stock information for each symbol, and publish a simple string-based message in the form [symbol]:[price]:

static void PublishQuotes(IModel channel)
{
  while (true)
  {
    if (_cancelling) return;
    IEnumerable quotes = FetchStockQuotes(new[] {"GOOG", "HD", "MCD"});
    foreach (string quote in quotes)
    {
      byte[] message = Encoding.UTF8.GetBytes(quote);
      channel.BasicPublish("direct-exchange-example", "", null, message);
    }
    Thread.Sleep(5000);
  }
}

To implement the FetchStockQuotes() method, we’ll use the Yahoo Finance API which entails retrieving an XML-based list of stock quotes and parsing out the bit of information we’re interested in for our example:

static IEnumerable<string> FetchStockQuotes(string[] symbols)
{
  var quotes = new List<string>();

  string url = string.Format("http://query.yahooapis.com/v1/public/yql?q=select%20*%20from%20yahoo.finance.quotes%20where%20symbol%20in%20({0})&env=store://datatables.org/alltableswithkeys",
      String.Join("%2C", symbols.Select(s => "%22" + s + "%22")));
  var wc = new WebClient {Proxy = WebRequest.DefaultWebProxy};
  var ms = new MemoryStream(wc.DownloadData(url));
  var reader = new XmlTextReader(ms);
  XDocument doc = XDocument.Load(reader);
  XElement results = doc.Root.Element("results");

  foreach (string symbol in symbols)
  {
    XElement q = results.Elements("quote").First(w => w.Attribute("symbol").Value == symbol);  
    quotes.Add(symbol + ":" + q.Element("AskRealtime").Value);
  }

  return quotes;
}

Here is the complete Producer listing:

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading;
using System.Xml;
using System.Xml.Linq;
using RabbitMQ.Client;

namespace Producer
{
  class Program
  {
    static volatile bool _cancelling;

    static void Main(string[] args)
    {
      var connectionFactory = new ConnectionFactory();
      IConnection connection = connectionFactory.CreateConnection();
      IModel channel = connection.CreateModel();
      channel.ExchangeDeclare("direct-exchange-example", ExchangeType.Fanout, false, true, null);

      var thread = new Thread(() => PublishQuotes(channel));
      thread.Start();

      Console.WriteLine("Press 'x' to exit");
      var input = (char) Console.Read();
      _cancelling = true;

      channel.Close();
      connection.Close();
    }

    static void PublishQuotes(IModel channel)
    {
      while (true)
      {
        if (_cancelling) return;
        IEnumerable quotes = FetchStockQuotes(new[] {"GOOG", "HD", "MCD"});
        foreach (string quote in quotes)
        {
          byte[] message = Encoding.UTF8.GetBytes(quote);
          channel.BasicPublish("direct-exchange-example", "", null, message);
        }
        Thread.Sleep(5000);
      }
    }


    static IEnumerable<string> FetchStockQuotes(string[] symbols)
    {
      var quotes = new List<string>();

      string url = string.Format("http://query.yahooapis.com/v1/public/yql?q=select%20*%20from%20yahoo.finance.quotes%20where%20symbol%20in%20({0})&env=store://datatables.org/alltableswithkeys",
          String.Join("%2C", symbols.Select(s => "%22" + s + "%22")));
      var wc = new WebClient {Proxy = WebRequest.DefaultWebProxy};
      var ms = new MemoryStream(wc.DownloadData(url));
      var reader = new XmlTextReader(ms);
      XDocument doc = XDocument.Load(reader);
      XElement results = doc.Root.Element("results");

      foreach (string symbol in symbols)
      {
        XElement q = results.Elements("quote").First(w => w.Attribute("symbol").Value == symbol);  
        quotes.Add(symbol + ":" + q.Element("AskRealtime").Value);
      }

      return quotes;
    }
  }
}

Our Consumer application will be similar to the one used in our logging example, but we’ll change the exchange name, queue name, and exchange type and put the processing of the messages within a while loop to continually display our any updates to our stock prices. Here’s the full listing for our Consumer app:

using System;
using System.IO;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace Consumer
{
  class Program
  {
    static void Main(string[] args)
    {
      var connectionFactory = new ConnectionFactory();
      IConnection connection = connectionFactory.CreateConnection();
      IModel channel = connection.CreateModel();

      channel.ExchangeDeclare("direct-exchange-example", ExchangeType.Fanout, false, true, null);
      channel.QueueDeclare("quotes", false, false, true, null);
      channel.QueueBind("quotes", "direct-exchange-example", "");

      var consumer = new QueueingBasicConsumer(channel);
      channel.BasicConsume("quotes", true, consumer);

      while (true)
      {
        try
        {
          var eventArgs = (BasicDeliverEventArgs) consumer.Queue.Dequeue();
          string message = Encoding.UTF8.GetString(eventArgs.Body);
          Console.WriteLine(message);
        }
        catch (EndOfStreamException)
        {
          // The consumer was cancelled, the model closed, or the connection went away.
          break;
        }
      }

      channel.Close();
      connection.Close();
    }
  }
}

Setting our solution startup projects to run both the Producer and Consumer apps together, we should see messages similar to the following for the Consumer output:

GOOG:611.62
HD:48.66
MCD:91.06
GOOG:611.58
HD:48.66
MCD:91.06

To show our queue would receive messages published to the fanout exchange regardless of the routing key value, we can change the value of the routing key to “anything”:

channel.QueueBind("quotes", "direct-exchange-example", "anything");

Running the application again shows the same values:

GOOG:611.62
HD:48.66
MCD:91.06
GOOG:611.58
HD:48.66
MCD:91.06 

That concludes our fanout exchange example. Next time, we’ll take a look at the topic exchange type.

Tagged with: