Being Agile

On March 5, 2014, in Uncategorized, by derekgreer

When the term “agile” is used in reference to one’s development processes, it more often than not seems to be used in a monolithic way.  It isn’t that many aren’t cognizant of the fact that people tend to use a subset, combination, or modified form of the main agile processes marketed today, but even in this recognition there seems to be a tendency to  think about each variation in monolithic terms.

Which Process Do You Use?

If you’ve ever attended a local agile user group to hear various processes compared, you may have encountered the speaker asking for a show of hands to find out what various processes people are using.  If you’ve been to one of these meetings, it might have gone a little like this:

Let’s see a show of hands of those in the room who are using Scrum.  That’s quite a few of you.  How about Scrum-ban?  Nice.  Now let see a show of hands for people using Extreme Programming or XP.  Not as many of you guys here.  Who’s using the Rational Unified Process or RUP?  Anyone?  Who isn’t currently using an agile process, but came here today to learn more about agile?  Glad to have you guys here!  I hope this session will be informative.  So, is anyone using any other process we haven’t mentioned?  Yes, you sir.  What is your group using?

Um, we’ll, we use a process we call ‘Scrum-but’.  We use Scrum, but we leave out some things. [a chuckle is heard from a few individuals].

While such a meeting generally turns to discussing the various attributes of different processes, usually hitting major highlights of Scrum such as iterations, stand-ups, planning meetings, user stories, retrospectives, etc., it still does so in terms of the attributes of different monolithic processes.  The deficiency in this line of thinking is that it trains people to think in terms of what percentage of a name-brand agile process their team is adhering to, or should seek to adopt, rather than thinking about what problems these processes individually have evolved to solve.  “Which Process Do You Use” is the wrong question.

Are you Agile?

Many teams like to say “We’re an agile development shop”.  Now to be fair, when we want to convey to someone which common group of practices we follow, it can be useful to use labels such as Scrum, XP, Scrum-ban, etc.  That said, there seems to be an awful lot of shops that say they are agile when what they mean is that they do “stand-ups” (i.e. a daily status meetings for keeping their managers informed about what their up to) and “iterate” (i.e. chunky waterfall)  their way to a deadline that’s been handed down by upper management or a sales department.

What is Agile?

If you ask what agile is in a typical development shop today, you’ll more than likely find yourself in a conversation about Scrum or some other process than to talk about the actual meaning of the word.  Let’s actually go back and look at the definition:

ag·ile -  adjective \ˈa-jəl, -ˌjī(-ə)l\

1:  marked by ready ability to move with quick easy grace <an agile dancer>
2:  having a quick resourceful and adaptable character <an agile mind>

Based on this definition from Merriam-Webster’s dictionary, being agile is “an ability to change, or adapt to change, quickly”.  While communicating that you adhere to a given agile process may have it’s usefulness at times, thinking of your process in this monolithic way doesn’t promote the kind of thinking that leads to continuous improvement.  Rather than thinking in terms of which process we use, we should think in terms of what aspects of change our processes help us adapt to.

Toward An Agile View of Process

It seems that many team’s first foray into agile processes is the selection of Scrum by their management.  They’ve heard about this Scrum and how it can save them money, so they’ve sent the managers and Business Analysts off to Scrum Master training to outfit them with their Scrum-capes and Scrum-tights.

png;base645f2e29de6785c7d7

Introducing a process like Scrum (or whatever portions of Scrum a company’s existing  process will tolerate) will sometimes improve upon matters, but only insofar as one’s cargo-cult emulation of the prescribed practices happen to match up with the problems for which they were conceived.  Unfortunately this approach to adopting agile processes often seems to lead to a bunch of people going through the motions without really understanding what the purpose is.  Worse, when the local Scrum training consultants sell them on the fact that they don’t really have to give up things like deadlines, using business analysts to gather all the requirements, or otherwise restructuring their organization, they generally end up with some empty shell of a process which is really nothing more than their old waterfall process with more micro-management.

A better approach is to first learn about the types of issues different agile practices seek to address and then consider how your team’s existing process can improve if each practice were applied individually.  Rather than thinking of your team as “agile” or “not agile”, consider asking the following types of questions:

Is my team agile WITH RESPECT TO …

                   changes to the product’s desired features? 
               changes to the product’s code base? 
               changes to the team’s understanding of the domain? 
               changes to the team’s understanding of the technologies used? 
               changes to team member’s hours of availability? 
               changes to individuals on the team? 
               changes to skillsets within the team? 
               changes to the cost of materials and resources? 
               changes to the compatibility or availability of 3rd-party software? 
               etc.

Different agile practices address different kinds of problems, but to really become an agile team you need to learn how to identify problems and solutions on an ongoing basis, not just implement processes.  Let’s stop thinking of ourselves as agile or not agile and start asking the question “What are we agile at?

Tagged with:  

Expected Objects Custom Comparisons

On November 17, 2013, in Uncategorized, by derekgreer

ExpectedObjects is a testing library I developed a few years ago to facilitate using the Expected Objects pattern within my specifications to avoid obscure tests.  You can find the original introduction to the library here.

As of version 1.1.0, the ExpectedObjects library has been updated to include a feature called Custom Comparisons.  The standard behavior of the library is to traverse a strategy chain (which is itself configurable) to determine which comparison strategy is to be used for each type of object encountered within the object graph.  The Custom Comparisons feature allows you to override this behavior for specific properties.

For example, let’s say we’re writing a end-to-end test which validates a Receipt class as follows:

public class Receipt

{
    public string Name { get; set; }
    public DateTime TransactionDate { get; set; }
    public string VerificationCode { get; set; }
}

 

Given the following class, the VerificationCode property would probably not be a value you could anticipate.  In such a case, while you can’t verify that the property has a specific value, you may care that it at least has some value.  This is where the Custom Comparisons feature can help.  We can verify that the actual Receipt received matches the expected receipt structure using the following expected object configuration:

var expected = new
{
	Name = "John Doe",
	DateTime = DateTime.Today,
	VerificationCode = Expect.NotNull()
}.ToExpectedObject();


var actual = new Receipt
{
	Name = "John Doe",
	DateTime = DateTime.Today,
	VerificationCode = "ABC123"
};



expected.ShouldMatch(actual);

In the event that the VerificationCode property is null, the library will raise an exception with the following message:

For Receipt.VerificationCode, expected a non-null value but found [null].

The ExpectedObjects library currently provides a static Expect class which  includes convenience methods to check for null, not null, and an Any<T> comparison for checking that an object is of a specific type (e.g. Expect.Any<Receipt>()).  To supply your own comparisons, simply implement the IComparsion interface which defines the custom comparison and the text to include within any exception messages raised (e.g. “For SomeType.SomeProperty, expected [text you supply here] but found “42”).

 
 

This is the first article in a new sporadic series I’ll contribute to from time to time wherein I’ll discuss some noteworthy issues I’ve wrestled with. In this installment, I’ll be discussing an NHibernate issue which took me some time to work through. So, let’s dive into the story …

The Context

In an application I was recently working on, a need arose to modify a section of code involving two entities which should have been modeled using a parent/child relationship but which only had a loose association in the database. The primary table in the database schema for what needed to be the parent object in the domain only contained a unenforced foreign key column which matched up with a candidate key on the table used for what needed to be the child object. In the section of code I needed to modify, a View Model was being created by first retrieving data for the parent object and subsequently for the child object. I’m not exactly sure what lead to this path, but I think it had something to do with the original developer’s attempt at using a surrogate key strategy for all the tables and later attempts by others to pull the data into a domain model with NHibernate.

At any rate, while I wasn’t in a position to revamp the whole design, I knew there was a way to express many-to-one mappings in NHibernate using non-primary keys, so after a little searching and some trial-and-error I got the parent entity referencing the child entity with a Fluent NHibernate Auto-Mapping configuration similar to the following:

return AutoMap.AssemblyOf(new AutomappingConfiguration())
  .Override(map => map.References(p => p.Child, 
  "ParentColumnChildKeyName").PropertyRef("ChildCandidateKeyColumnName")
    .Fetch.Join());

 

Part of the changes required to make this work was some refactoring of an import job used to populate the database which relied upon the domain model and mappings to populate the parent and child data. After changing the parent entity to reference the child entity instead of just a candidate key to the child entity, I needed to modify the import job to persist the relationship between the parent and the child. To do this, I injected a pre-existing ChildRepository to query for existing instances of the child entities (which had its own separate import process) so I could associate it with the parent entity upon saving. All of the changes worked as expected for the client portion of the application, but the changes broke some acceptance tests for the import job. The error I started receiving in the tests was as follows:

null id in “MyEntityType” entry (don't flush the Session after an exception occurs)

In this case the “MyEntityType” was another entity which had a many-to-one mapping with the aforementioned parent entity. After looking over the code and scratching my head for a bit, I decided to do a search on this particular error and read a few articles which at first didn’t seem to speak to my particular scenario. The advice I read basically boiled down to “Don’t try to do stuff with the session after you receive an error”. That certainly made sense, but upon stepping through the code I couldn’t see anywhere I was catching an error and proceeding to do something further with the session. I then decided to add a try/catch around the offending code and suddenly I saw the issue: trying to save an entity associated with one open session with an entity from another open session.

The Solution

Ultimately, the reason I couldn’t see the error was due to an issue with a manifestation of some common infrastructure code my team uses when working working with NHibernate. We use Autofac for dependency injection, and to facilitate transactions we use Autofac’s OnActivating() and OnRelease() methods to begin an NHibernate transaction and to handle the rollback or commit of the transaction when complete. Here was the offending code:

builder.Register(c => c.ResolveNamed(RegistrationKey).OpenSession())
	.As()
	.OnActivating(x => x.Instance.BeginTransaction())
	.OnRelease(session =>
		{
			try
			{
				if (!session.Transaction.WasRolledBack && session.Transaction.IsActive)
				{				
					session.Transaction.Commit();
				}
			}
			finally
			{
				session.Close();
				session.Dispose();
			}
		});
When used within the context of our Web applications, this code would contain a call to register the ISession with an HTTP Request lifetime scope, but this import job didn’t require a shared ISession prior to my changes. To fix the problem, I added a call to register the ISession as InstancePerLifetimeScope() which causes the same lifetime scope used to resolve the job to be used for resolving any instances of ISession. Additionally, I added a try/catch/throw around the session to at least provide some logging of similar issues should this ever come up again.
Tagged with:  

Introducing RabbitBus

On June 1, 2012, in Uncategorized, by derekgreer

What Is It?

RabbitBus is a .Net client API for use with RabbitMQ.  RabbitBus was designed to make working with RabbitMQ easy by providing a fluent-interface which places a focus on discoverability and by providing commonly needed constructs not provided through the official RabbitMQ .Net client API.

 

How Do I Use It?

The RabbitBus library was designed to allow for the centralization of all RabbitMQ configuration at application startup, separating the concerns of routing, serialization, and error handling from the central concerns of publishing and consuming messages.

RabbitBus works with object-based messages.  For example, if you have an application from which you would like to publish status update messages, you might model your message using the following class:

[Serializable]
public class StatusUpdate
{
  public StatusUpdate(string status)
  {
    Status = status;
  }

  public string Status { get; set; }
}

After configuring how messages are to be handled, you’ll then use an instance of a Bus type to publish or subscribe to each message.

Configuration of the Bus is handled through a BusBuilder.  The BusBuilder type provides an API for specifying how serialization, publication, consumption, and other concerns will be handled by the Bus.

If you’re already familiar with RabbitMQ concepts then you should find working with RabbitBus to be fairly easy.  The following demonstrates some of the basic usage scenarios:

Message Publication

To configure a producer application to publish messages of type StatusUpdate to a direct exchange named “status-update-exchange” on localhost, you would then use the following configuration:

 
Bus bus = new BusBuilder()
  .Configure(ctx => ctx.Publish<StatusUpdate>()
                         .WithExchange("status-update-exchange"))
  .Build();
bus.Connect();

To publish a StatusUpdate message, you would then make the following invocation:

bus.Publish(new StatusUpdate("OK"));

Message Subscription

To configure a consumer application to subscribe to StatusUpdate messages on localhost, you would use the following configuration:

Bus bus = new BusBuilder()
  .Configure(ctx => ctx.Consume<StatusUpdate>()
                         .WithExchange("status-update-exchange")
                         .WithQueue("status-update-queue"))
  .Build();

To subscribe to StatusUpdate messages, you would then make the following invocation:

bus.Subscribe<StatusUpdate>(messageContext => { /* handle message */ });

 

What Other Features Are Provided?

RabbitBus provides the following features:

  • support of all AMQP 0.9.1 exchange types (i.e. direct, fanout, topic, and headers)
  • remote procedure calls (RPC)
  • deadletter queue support
  • convention based auto-subscription
  • RabbitMQ push and pull API support
  • extensible serialization (Binary serialization by default, Json serialization provided by RabbitBus.Serialization.Json)
  • customizable error handling
  • RabbitMQ server restart recovery
  • configurable offline queuing support
  • logging

 

Where Can I Learn More?

You can find more information about how to use RabbitBus on the RabbitBus Wiki.  Additionally, RabbitBus was developed using Test-Driven Development and care was taken in the implementation of its executable specification suite to maximize demonstration of the API’s intended use.

 

Where Do I Get It?

RabbitBus is available as a NuGet package and the source is available on Github.

Tagged with:  

RabbitMQ for Windows: Headers Exchanges

On May 29, 2012, in Uncategorized, by derekgreer

This is the eighth and final installment to the series: RabbitMQ for Windows.  In the last installment, we walked through creating a topic exchange example.  As the last installment, we’ll walk through a headers exchange example.

Headers exchanges examine the message headers to determine which queues a message should be routed to.  As discussed earlier in this series, headers exchanges are similar to topic exchanges in that they allow you to specify multiple criteria, but offer a bit more flexibility in that the headers can be constructed using a wider range of data types (1).

To subscribe to receive messages from a headers exchange, a dictionary of headers is specified as part of the binding arguments.  In addition to the headers, a key of “x-match” is also included in the dictionary with a value of “all”, specifying that messages must be published with all the specified headers in order to match, or “any”, specifying that the message needs to only have one of the specified headers specified.

As our final example, we’ll create a Producer application which publishes the message “Hello, World!” using a headers exchange.  Here’s our Producer code:

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Text;
using System.Threading;
using RabbitMQ.Client;
using RabbitMQ.Client.Framing.v0_9_1;

namespace Producer
{
  class Program
  {
    const string ExchangeName = "header-exchange-example";

    static void Main(string[] args)
    {
      var connectionFactory = new ConnectionFactory();
      connectionFactory.HostName = "localhost";

      IConnection connection = connectionFactory.CreateConnection();
      IModel channel = connection.CreateModel();
      channel.ExchangeDeclare(ExchangeName, ExchangeType.Headers, false, true, null);
      byte[] message = Encoding.UTF8.GetBytes("Hello, World!");

      var properties = new BasicProperties();
      properties.Headers = new Dictionary<string, object>();
      properties.Headers.Add("key1", "12345");
      
      TimeSpan time = TimeSpan.FromSeconds(10);
      var stopwatch = new Stopwatch();
      Console.WriteLine("Running for {0} seconds", time.ToString("ss"));
      stopwatch.Start();
      var messageCount = 0;

      while (stopwatch.Elapsed < time)
      {
        channel.BasicPublish(ExchangeName, "", properties, message);
        messageCount++;
        Console.Write("Time to complete: {0} seconds - Messages published: {1}\r", (time - stopwatch.Elapsed).ToString("ss"), messageCount);
        Thread.Sleep(1000);
      }

      Console.Write(new string(' ', 70) + "\r");
      Console.WriteLine("Press any key to exit");
      Console.ReadKey();
      message = Encoding.UTF8.GetBytes("quit");
      channel.BasicPublish(ExchangeName, "", properties, message);
      connection.Close();
    }
  }
}

In the Producer, we’ve used a generic dictionary of type Dictionary<string, object> and added a single key “key1” with a value of “12345”.  As with our previous example, we’re using a stopwatch as a way to publish messages continually for 10 seconds.

For our Consumer application, we can use an “x-match” argument of “all” with the single key/value pair specified by the Producer, or we can use an “x-match” argument of “any” which includes the key/value pair specified by the Producer along with other potential matches.  We’ll use the latter for our example.   Here’s our Consumer code:

using System;
using System.Collections;
using System.Collections.Generic;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace Consumer
{
  class Program
  {
    const string QueueName = "header-exchange-example";
    const string ExchangeName = "header-exchange-example";

    static void Main(string[] args)
    {
      var connectionFactory = new ConnectionFactory();
      connectionFactory.HostName = "localhost";

      IConnection connection = connectionFactory.CreateConnection();
      IModel channel = connection.CreateModel();
      channel.ExchangeDeclare(ExchangeName, ExchangeType.Headers, false, true, null);
      channel.QueueDeclare(QueueName, false, false, true, null);

      IDictionary specs = new Dictionary();
      specs.Add("x-match", "any");
      specs.Add("key1", "12345");
      specs.Add("key2", "123455");
      channel.QueueBind(QueueName, ExchangeName, string.Empty, specs);

      channel.StartConsume(QueueName, MessageHandler);
      connection.Close();
    }

    public static void MessageHandler(IModel channel, DefaultBasicConsumer consumer, BasicDeliverEventArgs eventArgs)
    {
      string message = Encoding.UTF8.GetString(eventArgs.Body);
      Console.WriteLine("Message received: " + message);
      foreach (object headerKey in eventArgs.BasicProperties.Headers.Keys)
      {
        Console.WriteLine(headerKey + ": " + eventArgs.BasicProperties.Headers[headerKey]);
      }

      if (message == "quit")
        channel.BasicCancel(consumer.ConsumerTag);
    }
  }
}

Rather than handling our messages inline as we’ve done in previous examples, this example uses an extension method named StartConsume() which accepts a callback to be invoked each time a message is received.  Here’s the extension method used by our example:

using System;
using System.IO;
using System.Threading;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace Consumer
{
  public static class ChannelExtensions
  {
    public static void StartConsume(this IModel channel, string queueName,  Action<IModel, DefaultBasicConsumer, BasicDeliverEventArgs> callback)
    {
      var consumer = new QueueingBasicConsumer(channel);
      channel.BasicConsume(queueName, true, consumer);

      while (true)
      {
        try
        {
          var eventArgs = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
          new Thread(() => callback(channel, consumer, eventArgs)).Start();
        }
        catch (EndOfStreamException)
        {
          // The consumer was cancelled, the model closed, or the connection went away.
          break;
        }
      }
    }
  }
}

Setting our solution to run both the Producer and Consumer applications upon startup, running our example produces output similar to the following:

Producer

Running for 10 seconds
Time to complete: 08 seconds - Messages published: 2

Consumer

Message received: Hello, World!
key1: 12345
Message received: Hello, World!
key1: 12345

That concludes our headers exchange example as well as the RabbitMQ for Windows series.  For more information on working with RabbitMQ, see the documentation at http://www.rabbitmq.com or the purchase the book RabbitMQ in Action by Alvaro Videla and Jason Williams.  I hope you enjoyed the series.

 

Footnotes:

1 – See http://www.rabbitmq.com/amqp-0-9-1-errata.html#section_3 and http://hg.rabbitmq.com/rabbitmq-dotnet-client/diff/4def852523e2/projects/client/RabbitMQ.Client/src/client/impl/WireFormatting.cs for supported field types.

Tagged with:  

RabbitMQ for Windows: Topic Exchanges

On May 18, 2012, in Uncategorized, by derekgreer

This is the seventh installment to the series: RabbitMQ for Windows.  In the last installment, we walked through creating a fanout exchange example.  In this installment, we’ll be walking through a topic exchange example.

Topic exchanges are similar to direct exchanges in that they use a routing key to determine which queue a message should be delivered to, but they differ in that they provide the ability to match on portions of a routing key.  When publishing to a topic exchange, a routing key consisting of multiple words separated by periods (e.g. “word1.word2.word3”) will be matched against a pattern supplied by the binding queue.  Patterns may contain an asterisk (“*”) to match a word in a specific segment or a hash (“#”) to match zero or more words.  As discussed earlier in the series, the topic exchange type can be useful for directing messages based on multiple categories or for routing messages originating from multiple sources.

To demonstrate topic exchanges, we’ll return to our logging example, but this time we’ll subscribe to a subset of the messages being published to demonstrate the flexibility of how routing keys are used by topic exchanges.  For this example, we’ll be modeling a scenario where a company may have multiple client installations, each of which may be used to service different sectors of a company’s business model (e.g. Business or Personal sectors).  We’ll use a routing key that specifies the sector and subscribe to messages published for the Personal sector only.

As with our previous examples, we’ll keep things simple by creating console applications for a Producer and a Consumer.  Let’s start by creating the Producer app and establishing a connection using the default settings:

using RabbitMQ.Client;

namespace Producer
{
  class Program
  {
    const long ClientId = 10843;

    static void Main(string[] args)
    {
      var connectionFactory = new ConnectionFactory();
      IConnection connection = connectionFactory.CreateConnection();
    }
  }
}

 

Rather than just publishing messages directly from the Main() method as with our first logging example, let’s create a separate logger object this time.  Here the logger interface and implementation we’ll be using:

  interface ILogger
  {
    void Write(Sector sector, string entry, TraceEventType traceEventType);
  }

  class RabbitLogger : ILogger, IDisposable
  {
    readonly long _clientId;
    readonly IModel _channel;
    bool _disposed;

    public RabbitLogger(IConnection connection, long clientId)
    {
      _clientId = clientId;
      _channel = connection.CreateModel();
      _channel.ExchangeDeclare("direct-exchange-example", ExchangeType.Topic, false, true, null);
    }

    public void Dispose()
    {
      if (!_disposed)
      {
        if (_channel != null && _channel.IsOpen)
        {
          _channel.Close();
        }
      }
      GC.SuppressFinalize(this);
    }

    public void Write(Sector sector, string entry, TraceEventType traceEventType)
    {
      byte[] message = Encoding.UTF8.GetBytes(entry);
      string routingKey = string.Format("{0}.{1}.{2}", _clientId, sector.ToString(), traceEventType.ToString());
      _channel.BasicPublish("topic-exchange-example", routingKey, null, message);
    }

    ~RabbitLogger()
    {
      Dispose();
    }
  }

In addition to an open IConnection, our RabbitLogger class is instantiated with a client Id.  We use this as part of the routing key.  Since each log can vary by sector, we pass a Sector enum as part of the Write() method.  Here’s our Sector enum:

  public enum Sector
  {
    Personal,
    Business
  }

Returning to our Main() method, we now need to instantiate our RabbitLogger and log messages with differing sectors.  As as way to ensure our client has an opportunity to subscribe to our messages and to help emulate a continual stream of log messages being published, let’s use the logger to publish a series of log messages every second for 10 seconds:

      TimeSpan time = TimeSpan.FromSeconds(10);
      var stopwatch = new Stopwatch();
      Console.WriteLine("Running for {0} seconds", time.ToString("ss"));
      stopwatch.Start();

      while (stopwatch.Elapsed < time)
      {
        using (var logger = new RabbitLogger(connection, ClientId))
        {
          Console.Write("Time to complete: {0} seconds\r", (time - stopwatch.Elapsed).ToString("ss"));
          logger.Write(Sector.Personal, "This is an information message", TraceEventType.Information);
          logger.Write(Sector.Business, "This is an warning message", TraceEventType.Warning);
          logger.Write(Sector.Business, "This is an error message", TraceEventType.Error);
          Thread.Sleep(1000);
        }
      }

This code prints out the time remaining just to give us a little feedback on the publishing progress.  Finally, we’ll close our our connection and prompt the user to exit the console application:

      connection.Close();
      Console.Write("                             \r");
      Console.WriteLine("Press any key to exit");
      Console.ReadKey();

 

Here’s the full Producer listing:

using System;
using System.Diagnostics;
using System.Text;
using System.Threading;
using RabbitMQ.Client;

namespace Producer
{
  public enum Sector
  {
    Personal,
    Business
  }

  interface ILogger
  {
    void Write(Sector sector, string entry, TraceEventType traceEventType);
  }

  class RabbitLogger : ILogger, IDisposable
  {
    readonly long _clientId;
    readonly IModel _channel;
    bool _disposed;

    public RabbitLogger(IConnection connection, long clientId)
    {
      _clientId = clientId;
      _channel = connection.CreateModel();
      _channel.ExchangeDeclare("direct-exchange-example", ExchangeType.Topic, false, true, null);
    }

    public void Dispose()
    {
      if (!_disposed)
      {
        if (_channel != null && _channel.IsOpen)
        {
          _channel.Close();
        }
      }
      GC.SuppressFinalize(this);
    }

    public void Write(Sector sector, string entry, TraceEventType traceEventType)
    {
      byte[] message = Encoding.UTF8.GetBytes(entry);
      string routingKey = string.Format("{0}.{1}.{2}", _clientId, sector.ToString(), traceEventType.ToString());
      _channel.BasicPublish("topic-exchange-example", routingKey, null, message);
    }

    ~RabbitLogger()
    {
      Dispose();
    }
  }

  class Program
  {
    const long ClientId = 10843;

    static void Main(string[] args)
    {
      var connectionFactory = new ConnectionFactory();
      IConnection connection = connectionFactory.CreateConnection();

      TimeSpan time = TimeSpan.FromSeconds(10);
      var stopwatch = new Stopwatch();
      Console.WriteLine("Running for {0} seconds", time.ToString("ss"));
      stopwatch.Start();

      while (stopwatch.Elapsed < time)
      {
        using (var logger = new RabbitLogger(connection, ClientId))
        {
          Console.Write("Time to complete: {0} seconds\r", (time - stopwatch.Elapsed).ToString("ss"));
          logger.Write(Sector.Personal, "This is an information message", TraceEventType.Information);
          logger.Write(Sector.Business, "This is an warning message", TraceEventType.Warning);
          logger.Write(Sector.Business, "This is an error message", TraceEventType.Error);
          Thread.Sleep(1000);
        }
      }

      connection.Close();
      Console.Write("                             \r");
      Console.WriteLine("Press any key to exit");
      Console.ReadKey();
    }
  }
}

 

For our Consumer app, we’ll pretty much be using the same code as with our fanout exchange example, but we’ll need to change the exchange type along with the exchange and queue names.  Additionally, we also need to provide a routing key that registers for logs in the Personal sector only.  The messages published by the Producer will be in the form: [client Id].[sector].[log severity], so we can use a routing key of “*.Personal.*” (or alternately “*.Personal.#”).  Here’s the full Consumer listing:

using System;
using System.IO;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace Consumer
{
  class Program
  {
    static void Main(string[] args)
    {
      var connectionFactory = new ConnectionFactory();
      IConnection connection = connectionFactory.CreateConnection();
      IModel channel = connection.CreateModel();

      channel.ExchangeDeclare("topic-exchange-example", ExchangeType.Topic, false, true, null);
      channel.QueueDeclare("log", false, false, true, null);
      channel.QueueBind("log", "topic-exchange-example", "*.Personal.*");

      var consumer = new QueueingBasicConsumer(channel);
      channel.BasicConsume("log", true, consumer);

      while (true)
      {
        try
        {
          var eventArgs = (BasicDeliverEventArgs) consumer.Queue.Dequeue();
          string message = Encoding.UTF8.GetString(eventArgs.Body);
          Console.WriteLine(string.Format("{0} - {1}", eventArgs.RoutingKey, message));
        }
        catch (EndOfStreamException)
        {
          // The consumer was cancelled, the model closed, or the connection went away.
          break;
        }
      }

      channel.Close();
      connection.Close();
    }
  }
}

 

Setting the solution to run both the Producer and Consumer on startup, we should see similar output to the following listings:

 

Producer

Running for 10 seconds
Time to complete: 06 seconds

 

Consumer

10843.Personal.Information - This is an information message
10843.Personal.Information - This is an information message
10843.Personal.Information - This is an information message
10843.Personal.Information - This is an information message
10843.Personal.Information - This is an information message
10843.Personal.Information - This is an information message
10843.Personal.Information - This is an information message

 

This concludes our topic exchange example.  Next time, we’ll walk through an example using the final exchange type: Header Exchanges.

Tagged with:  

RabbitMQ for Windows: Fanout Exchanges

On May 16, 2012, in Uncategorized, by derekgreer

This is the sixth installment to the series: RabbitMQ for Windows. In the last installment, we walked through creating a direct exchange example and introduced the push API. In this installment, we’ll walk through a fanout exchange example.

As discussed earlier in the series, the fanout exchange type is useful for facilitating the publish-subscribe pattern. When we publish a message to a fanout exchange, the message is delivered indiscriminately to all bound queues. With the Direct, Topic, and Headers exchange types, a criteria is used by a routing algorithm taking the form of a routing key or a collection of message headers depending on the exchange type in question. A routing key or a collection of message headers may also be specified with the fanout exchange which will be delivered as part of the message’s metadata, but they will not be used as a filter in determining which queue receives a published message.

To demonstrate the fanout exchange, we’ll use a stock ticker example. In the previous example, logs were routed to queues based upon a matching routing key (an empty string in the logging example’s case). In this example, we’d like our messages to be delivered to all bound queues regardless of qualification.

Similar to the previous example, we’ll create a Producer console application which periodically publishes stock quote messages and a Consumer console application which displays the message to the console.

We’ll start our Producer app as before by establishing a connection using the default settings, creating the connection, and creating a channel:

namespace Producer
{
  class Program
  {
    static volatile bool _cancelling;

    static void Main(string[] args)
    {
      var connectionFactory = new ConnectionFactory();
      IConnection connection = connectionFactory.CreateConnection();
      IModel channel = connection.CreateModel();
    }
  }
}

Next, we need to declare an exchange of type “fanout”. We’ll name our new exchange “fanout-exchange-example”:

channel.ExchangeDeclare("direct-exchange-example", ExchangeType.Fanout, false, true, null);

To publish the stock messages periodically, we’ll call a PublishQuotes() method with the provided channel and run it on a background thread:

var thread = new Thread(() => PublishQuotes(channel));
thread.Start();

Next, we’ll provide a way to exit the application by prompting the user to enter ‘x’ and use a simple Boolean to signal the background thread when to exit:

Console.WriteLine("Press 'x' to exit");
var input = (char) Console.Read();
_cancelling = true;

Lastly, we need to close the channel and connection:

channel.Close();
connection.Close();

For our PublishQuotes() method, well iterate over a set of stock symbols, retrieve the stock information for each symbol, and publish a simple string-based message in the form [symbol]:[price]:

static void PublishQuotes(IModel channel)
{
  while (true)
  {
    if (_cancelling) return;
    IEnumerable quotes = FetchStockQuotes(new[] {"GOOG", "HD", "MCD"});
    foreach (string quote in quotes)
    {
      byte[] message = Encoding.UTF8.GetBytes(quote);
      channel.BasicPublish("direct-exchange-example", "", null, message);
    }
    Thread.Sleep(5000);
  }
}

To implement the FetchStockQuotes() method, we’ll use the Yahoo Finance API which entails retrieving an XML-based list of stock quotes and parsing out the bit of information we’re interested in for our example:

static IEnumerable<string> FetchStockQuotes(string[] symbols)
{
  var quotes = new List<string>();

  string url = string.Format("http://query.yahooapis.com/v1/public/yql?q=select%20*%20from%20yahoo.finance.quotes%20where%20symbol%20in%20({0})&env=store://datatables.org/alltableswithkeys",
      String.Join("%2C", symbols.Select(s => "%22" + s + "%22")));
  var wc = new WebClient {Proxy = WebRequest.DefaultWebProxy};
  var ms = new MemoryStream(wc.DownloadData(url));
  var reader = new XmlTextReader(ms);
  XDocument doc = XDocument.Load(reader);
  XElement results = doc.Root.Element("results");

  foreach (string symbol in symbols)
  {
    XElement q = results.Elements("quote").First(w => w.Attribute("symbol").Value == symbol);  
    quotes.Add(symbol + ":" + q.Element("AskRealtime").Value);
  }

  return quotes;
}

Here is the complete Producer listing:

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading;
using System.Xml;
using System.Xml.Linq;
using RabbitMQ.Client;

namespace Producer
{
  class Program
  {
    static volatile bool _cancelling;

    static void Main(string[] args)
    {
      var connectionFactory = new ConnectionFactory();
      IConnection connection = connectionFactory.CreateConnection();
      IModel channel = connection.CreateModel();
      channel.ExchangeDeclare("direct-exchange-example", ExchangeType.Fanout, false, true, null);

      var thread = new Thread(() => PublishQuotes(channel));
      thread.Start();

      Console.WriteLine("Press 'x' to exit");
      var input = (char) Console.Read();
      _cancelling = true;

      channel.Close();
      connection.Close();
    }

    static void PublishQuotes(IModel channel)
    {
      while (true)
      {
        if (_cancelling) return;
        IEnumerable quotes = FetchStockQuotes(new[] {"GOOG", "HD", "MCD"});
        foreach (string quote in quotes)
        {
          byte[] message = Encoding.UTF8.GetBytes(quote);
          channel.BasicPublish("direct-exchange-example", "", null, message);
        }
        Thread.Sleep(5000);
      }
    }


    static IEnumerable<string> FetchStockQuotes(string[] symbols)
    {
      var quotes = new List<string>();

      string url = string.Format("http://query.yahooapis.com/v1/public/yql?q=select%20*%20from%20yahoo.finance.quotes%20where%20symbol%20in%20({0})&env=store://datatables.org/alltableswithkeys",
          String.Join("%2C", symbols.Select(s => "%22" + s + "%22")));
      var wc = new WebClient {Proxy = WebRequest.DefaultWebProxy};
      var ms = new MemoryStream(wc.DownloadData(url));
      var reader = new XmlTextReader(ms);
      XDocument doc = XDocument.Load(reader);
      XElement results = doc.Root.Element("results");

      foreach (string symbol in symbols)
      {
        XElement q = results.Elements("quote").First(w => w.Attribute("symbol").Value == symbol);  
        quotes.Add(symbol + ":" + q.Element("AskRealtime").Value);
      }

      return quotes;
    }
  }
}

Our Consumer application will be similar to the one used in our logging example, but we’ll change the exchange name, queue name, and exchange type and put the processing of the messages within a while loop to continually display our any updates to our stock prices. Here’s the full listing for our Consumer app:

using System;
using System.IO;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace Consumer
{
  class Program
  {
    static void Main(string[] args)
    {
      var connectionFactory = new ConnectionFactory();
      IConnection connection = connectionFactory.CreateConnection();
      IModel channel = connection.CreateModel();

      channel.ExchangeDeclare("direct-exchange-example", ExchangeType.Fanout, false, true, null);
      channel.QueueDeclare("quotes", false, false, true, null);
      channel.QueueBind("quotes", "direct-exchange-example", "");

      var consumer = new QueueingBasicConsumer(channel);
      channel.BasicConsume("quotes", true, consumer);

      while (true)
      {
        try
        {
          var eventArgs = (BasicDeliverEventArgs) consumer.Queue.Dequeue();
          string message = Encoding.UTF8.GetString(eventArgs.Body);
          Console.WriteLine(message);
        }
        catch (EndOfStreamException)
        {
          // The consumer was cancelled, the model closed, or the connection went away.
          break;
        }
      }

      channel.Close();
      connection.Close();
    }
  }
}

Setting our solution startup projects to run both the Producer and Consumer apps together, we should see messages similar to the following for the Consumer output:

GOOG:611.62
HD:48.66
MCD:91.06
GOOG:611.58
HD:48.66
MCD:91.06

To show our queue would receive messages published to the fanout exchange regardless of the routing key value, we can change the value of the routing key to “anything”:

channel.QueueBind("quotes", "direct-exchange-example", "anything");

Running the application again shows the same values:

GOOG:611.62
HD:48.66
MCD:91.06
GOOG:611.58
HD:48.66
MCD:91.06 

That concludes our fanout exchange example. Next time, we’ll take a look at the topic exchange type.

Tagged with:  

RabbitMQ for Windows: Direct Exchanges

On April 2, 2012, in Uncategorized, by derekgreer

This is the fifth installment to the series: RabbitMQ for Windows.  In the last installment, we took a look at the four exchange types provided by RabbitMQ: Direct, Fanout, Topic, and Headers.  In this installment we’ll walk through an example which uses a direct exchange type directly and we’ll take a look at the push API.

In the Hello World example from the second installment of the series, we used a direct exchange type implicitly by taking advantage of the automatic  binding of queues to the default exchange using the queue name as the routing key.  The example we’ll work through this time will be similar, but we’ll declare and bind to the exchange explicitly.

This time our example will be a distributed logging application.  We’ll create a Producer console application which publishes a logging message for some noteworthy action and a Consumer console application which displays the message to the console.

Beginning with our Producer app, we’ll start by establishing a connection using the default settings, create the connection, and create a channel:

using RabbitMQ.Client;

namespace Producer
{
  class Program
  {
    static void Main(string[] args)
    {
      var connectionFactory = new ConnectionFactory();
      IConnection connection = connectionFactory.CreateConnection();
      IModel channel = connection.CreateModel();
    }
  }
}

Next, we need to declare the exchange we’ll be publishing our message to.  We need to give our exchange a name in order to reference it later, so let’s use “direct-exchange-example”:

channel.ExchangeDeclare("direct-exchange-example", ExchangeType.Direct);

The second parameter indicates the exchange type.  For the official RabbitMQ .Net client, this is just a simple string containing one of the values: direct, fanout, topic, or headers.  The type RabbitMQ.Client.ExchangeType defines each of the exchange types as a constant for convenience.

Next, let’s call some method which might produce a value worthy of interest.  We’ll call the method DoSomethingInteresting() and have it return a string value:

string value = DoSomethingInteresting();

For the return value, the implementation of DoSomethingInteresting() can just return the string value of a new Guid:

static string DoSomethingInteresting() 
{ 
  return Guid.NewGuid().ToString(); 
} 

Next, let’s use the returned value to create a log message containing a severity level of Information:

string logMessage = string.Format("{0}: {1}", TraceEventType.Information, value); 

Next, we need to convert our log message to a byte array and publish the message to our new exchange:

byte[] message = Encoding.UTF8.GetBytes(logMessage); channel.BasicPublish("direct-exchange-example", "", null, message);

Here, we use an empty string as our routing key and null for our message properties.

We end our Producer by closing the channel and connection:

channel.Close(); 
connection.Close(); 

Here’s the full listing:

using System;
using System.Diagnostics;
using System.Text;
using System.Threading;
using RabbitMQ.Client;

namespace Producer
{
  class Program
  {
    static void Main(string[] args)
    {
      Thread.Sleep(1000);
      var connectionFactory = new ConnectionFactory();
      IConnection connection = connectionFactory.CreateConnection();
      IModel channel = connection.CreateModel();

      channel.ExchangeDeclare("direct-exchange-example", ExchangeType.Direct);
      string value = DoSomethingInteresting();
      string logMessage = string.Format("{0}: {1}", TraceEventType.Information, value);

      byte[] message = Encoding.UTF8.GetBytes(logMessage);
      channel.BasicPublish("direct-exchange-example", "", null, message);

      channel.Close();
      connection.Close();
    }

    static string DoSomethingInteresting()
    {
      return Guid.NewGuid().ToString();
    }
  }
}

Note that our logging example’s Producer differs from our Hello World’s Producer in that we didn’t declare a queue this time.  In our Hello World example, we needed to run our Producer before the Consumer since the Consumer simply retrieved a single message and exited.  Had we published to the default exchange without declaring the queue first, our message would simply have been discarded by the server before the Consumer had an opportunity to declare and bind the queue.

Next, we’ll create our Consumer which starts the same way as our Producer code:

using RabbitMQ.Client;

namespace Consumer
{
  class Program
  {
    static void Main(string[] args)
    {
      var connectionFactory = new ConnectionFactory();
      IConnection connection = connectionFactory.CreateConnection();
      IModel channel = connection.CreateModel();

      channel.ExchangeDeclare("direct-exchange-example", ExchangeType.Direct);
    }
  }
}

Next, we need to declare a queue to bind to our exchange.  Let’s name our queue “logs”:

 
channel.QueueDeclare("logs", false, false, true, null); 

To associate our logs queue with our exchange, we use the QueueBind() method providing the name of the queue, the name of the exchange, and the binding key to filter messages on:

channel.QueueBind("logs", "direct-exchange-example", "");

At this point we could consume messages using the pull API method BasicGet() as we did in the Hello World example, but this time we’ll use the push API.  To have messages pushed to us rather than us pulling messages, we first need to declare a consumer:

var consumer = new QueueingBasicConsumer(channel);

To start pushing messages to our consumer, we call the channel’s BasicConsume() method and tell it which consumer to start pushing messages to:

channel.BasicConsume(“logs”, true, consumer); 

Here, we specify the queue to consume messages from, a boolean flag instructing messages to be auto-acknowledged (see discussion in the Getting the Message section of Hello World Review), and the consumer to push the messages to.  

Now, any messages placed on the queue will automatically be retrieved and placed in a local in-memory queue.  To dequeue a message from the local queue, we call the Dequeue() method on the consumer’s Queue property:

var eventArgs = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); 

This method call blocks until a message is available to be dequeued, or until an EndOfStreamException is thrown indicating that the consumer was cancelled, the channel was closed, or the connection otherwise was terminated.

Once the Dequeue() method returns, the BasicDeliverEventArgs contains the bytes published from the Producer in the Body property, so we can convert this value back into a string and print it to the console:

var message = Encoding.UTF8.GetString(eventArgs.Body);
Console.WriteLine(message); 

We end our Consumer by closing the channel and connection:

channel.Close();
connection.Close(); 

Here’s the full listing:

using System;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace Consumer
{
  class Program
  {
    static void Main(string[] args)
    {
      var connectionFactory = new ConnectionFactory();
      IConnection connection = connectionFactory.CreateConnection();
      IModel channel = connection.CreateModel();

      channel.ExchangeDeclare("direct-exchange-example", ExchangeType.Direct);
      channel.QueueDeclare("logs", false, false, true, null);
      channel.QueueBind("logs", "direct-exchange-example", "");

      var consumer = new QueueingBasicConsumer(channel);
      channel.BasicConsume("logs", true, consumer);

      var eventArgs = (BasicDeliverEventArgs) consumer.Queue.Dequeue();

      string message = Encoding.UTF8.GetString(eventArgs.Body);
      Console.WriteLine(message);

      channel.Close();
      connection.Close();
      Console.ReadLine();
    }
  }
}

If we run the resulting Consumer.exe at this point, it will block until a message is routed to the queue.  Running the Producer.exe from another shell produces a message on the consumer console similar to the following:

Information: 610fe447-bf31-41d2-ae29-414b2d00087b 
Note: For a convenient way to execute both the Consumer and Producer from within Visual Studio, go to the solution properties and choose “Set StartUp Projects …”.  Select the “Multiple startup projects:” option and set both the Consumer and Producer to the Action: Start.  Use the arrows to the right of the projects to ensure the Consumer is started before the Producer.  In some cases, this can still result in the Producer publishing the message before the Consumer has time to declare and bind the queue, so putting a Thread.Sleep(1000) at the start of your Producer should ensure things happen in the required order.  After this, you can run your examples by using Ctrl+F5 (which automatically prompts to exit).

That concludes our direct exchange example.  Next time, we’ll take a look at the Fanout exchange type.

Tagged with:  

TDD Best Practices: Don’t Mock Others

On April 1, 2012, in Uncategorized, by derekgreer

Test Doubles play an important role in the practice of Test-Driven Development for establishing a controlled context, facilitating outside-in design, verifying component interaction, and providing overall test stability through isolation.  While isolating components from their dependencies is a Good Thing, some of the advantages of using test doubles and Test-Driven Development itself can be thwarted by substituting the wrong components.

One design principle which is particularly relevant to the practice of using test doubles (mocks, stubs, spies, fakes, etc.) is the Dependency Inversion Principle.  Stated simply, the Dependency Inversion Principle pertains to the decoupling of the stuff you most care about (like your domain layer) from the stuff you care least about (like your low-level infrastructure code and third-party libraries).  In relation to the practice of using test doubles, it’s the coupling to other people’s stuff that is most relevant.

When you provide test doubles for types you don’t own, this indicates that you have a Dependency Inversion Principle violation.  If you’re injecting concrete, abstract, or interface types for a third party dependency then that means your stuff can’t be used without their stuff.  While not ideal, you might ignore this design principle due your personal stance of: “They’ll pry framework XYZ from my cold, dead hands”.  Fair enough.  Nevertheless, there are a few other reasons why you might want to consider following this principle.

When using test doubles for third-party libraries, you make assumptions about how the third-party library works.  Perhaps the library you’re using has no bugs and you thoroughly understand all of its behavior, so you know that the behavior you are substituting behaves exactly like the real library will work in production once you put it all together.  If you aren’t confident of this, however, you might find that your design doesn’t interact with the third-party component in quite the way you imagined.  Let’s say it does though.  What about the next version?  When we design our systems using test doubles for third-party libraries, we create a false sense of security around the soundness and stability of our design.

Another problem caused by the use of test doubles for third-party components is the limitation it places on emergent design.  Coupling to third-party components will often lead to making design concessions to accommodate your dependencies rather than allowing your own design to emerge through the feedback received through the TDD process.

If we shouldn’t couple our design to third-party libraries (thereby removing the need to supply test doubles for third-party libraries), how then do we make use of such libraries and ensure our code works correctly?  The answer is to write component/unit level tests which guide the design of your own code which expresses its needs through its own interfaces (whose behavior is defined through the use of test doubles), and write integration tests which validate the behavior of adaptors which implement your design’s interfaces using the desired third-party components.

For example, you might design your components to rely upon an ILogger, IMapper, or IBus using test doubles to define their expected behavior and have integration tests which validate the implementation of these interfaces with log4net, AutoMapper, or ØMQ respectively.

In summary, don’t use test doubles for types you don’t own.  Instead, let any dependencies you take on be an outgrowth of your emergent design and provide integration tests to validate the expected behavior of any third-party libraries you use to facilitate the required behavior.

Tagged with: