Getting Started With RabbitMQ in .net – Part 3

Posted on May 19, 2011

19


In the previous two examples I built a simple .net application to demonstrate first two sections the RabbitMQ getting started guide in .net. In this post I’ll be looking at the third. Download the Source

3.) Publish/Subscribe

The original article (in Java) is here:http://www.rabbitmq.com/tutorials/tutorial-three-java.html

I’m going to take a slightly different approach to my previous two examples and split the Producer and Consumer into two different Windows Forms. This will allow us to run as many Consumers as we like  and so demonstrate Pub/Sub effectively.

First up is the Producer.

Create a new Form and add the Input TextBox and Button as in the first two examples. Also and a new Button “Start New Consumer” .

Next create the consumer. We only need to output messages so we only need one RichTextBox

In the previous two examples we had pretty much duplicate constructors for both Consumers and Producers. We will now fix this by creating a base class that these can both inherit from. Create a new class called IConnectToRabbitMQ.

public abstract class IConnectToRabbitMQ : IDisposable
    {
        protected IModel Model { get; set; }
        protected IConnection Connection { get; set; }
        public string Server { get; set; }
        public string ExchangeName{ get; set; }
        public string ExchangeTypeName { get; set; }

        public IConnectToRabbitMQ(string server, string exchange, string exchangeType)
        {
            Server = server;
            Exchange = exchange;
            ExchangeTypeName = exchangeType;
        }
        //Create the connection, Model and Exchange(if one is required)
        public virtual bool ConnectToRabbitMQ()
        {
            try
            {
                var connectionFactory = new ConnectionFactory();
                connectionFactory.HostName = Server;
                Connection = connectionFactory.CreateConnection();
                Model = Connection.CreateModel();
                bool durable = true;
                if (!String.IsNullOrEmpty(Exchange))
                    Model.ExchangeDeclare(Exchange, ExchangeTypeName, durable);
                return true;
            }
            catch (BrokerUnreachableException e)
            {
                return false;
            }
        }

        public void Dispose()
        {
            if (Connection != null)
                Connection.Close();
            if (Model != null)
                Model.Abort();
        }
    }

The class name may look a little odd to most as it begins with an “I”, this is usually the naming convention for an Interface but I’m using what I like to call Simon Says naming convention. I’ll be writing a post about this in the near future. The main gist of it, is I like to have classes tell me what they do. For example, a class which calls a remote service might inherit from a class(or interface) called ICallRemoteServices. So the full class name definition would be FooService : ICallRemoteServices. There would also be a abstract method defined that implements the action e.g CallRemoteService.  Other example are  IAmAnOrder(for a value object), ICalculateShipping, IDeliverEmail etc. This may seem a little weird but I like it :).

So enough of that for now, lets go through the class. First we declare fields to hold the familiar IModel and Connection instances. Next up are fields to store the details of the Server, Exchange and ExchangeTypeName.  Exchange is the name of the exchange we want to publish/consume messages from and ExchangeTypeName  holds the type of exchange we want to use(in this example it will be “fanout”).  ExchangeType is set from a constant declared in the RabbitMQ.Client.ExchangeType  class, so for us it will be ExchangeType .Fanout(More on this later.) Next we have the  ConnectToRabbitMQ() method, this is almost exactly the same as the Constructor methods of the Producer/Consumer methods  in my  previous two examples. We have this additional block which declares the Exchange.

                bool durable = true;
                if (!String.IsNullOrEmpty(ExchangeName))
                    Model.ExchangeDeclare(ExchangeName, ExchangeTypeName, durable);

We are declaring a durable exchange of the type ExchangeTypeName with the name ExchangeName.  If this exchange had already been declared by another Producer or Consumer  a new one is not created, the existing one will be used.

Now we’ll write our Producer.

public class Producer : IConnectToRabbitMQ
    {
        public Producer(string server, string exchange, string exchangeType) : base(server, exchange, exchangeType)
        {
        }

        public void SendMessage(byte[] message)
        {
            IBasicProperties basicProperties = Model.CreateBasicProperties();
            basicProperties.SetPersistent(true);
            Model.BasicPublish(ExchangeName, "", basicProperties, message);
        }
    }
}

Here we a have a nice lightweight publisher, the only difference from our previous examples is we are publishing to a named exchange called ExchangeName. We do not know about or use a Queue.

Then it’s our consumer, this is slightly more complicated.

public class Consumer : IConnectToRabbitMQ
    {
        protected bool isConsuming;
        protected string QueueName;

        // used to pass messages back to UI for processing
        public delegate void onReceiveMessage(byte[] message);
        public event onReceiveMessage onMessageReceived;

        public Consumer(string server, string exchange, string exchangeType) : base(server, exchange, exchangeType)
        {
        }

        //internal delegate to run the consuming queue on a seperate thread
        private delegate void ConsumeDelegate();

        public void StartConsuming()
        {
                Model.BasicQos(0, 1, false);
                QueueName = Model.QueueDeclare();
                Model.QueueBind(QueueName, ExchangeName, "");
                isConsuming = true;
                ConsumeDelegate c = new ConsumeDelegate(Consume);
                c.BeginInvoke(null, null);
        }

        protected Subscription mSubscription { get; set; }

        private void Consume()
        {
            bool autoAck = false;

            //create a subscription
            mSubscription = new Subscription(Model, QueueName, autoAck);

            while (isConsuming)
            {
                BasicDeliverEventArgs e = mSubscription.Next();
                byte[] body = e.Body;
                onMessageReceived(body);
                mSubscription.Ack(e);

            }
        }

        public void Dispose()
        {
            isConsuming = false;
            base.Dispose();
        }
    }

We need to store the name of our Queue that we will be binding to the exchange so we have a field QueueName for this purpose. The next code of interest is the StartConsuming() method. Most of this is familiar with this additional block:

 QueueName = Model.QueueDeclare();
 Model.QueueBind(QueueName, ExchangeName, "");
 

What we are doing here is asking the model to declare a temporary queue for us and give it a random unique name(stored in QueueName), we then bind this queue to the Exchange called ExchangeName. 

This a key concept to exchanges in RabbitMQ, a publisher/producer only knows about the Exchange, it will publish messages directly to the Exchange and has no concept of a queue. Each consumer knows about the Exchange but they will also have a queue that is bound to the Exchange.  The way I look at it is the one or more Producers own an Exchange(and publish to it) and each Consumer owns a Queue(which is bound to an Exchange.)

The Consume() method is very different to what we have seen before(and the Java Example). Instead of using a  QueueingBasicConsumer we are using a Subscription. Subscription is part of the RabbitMQ.Client.MessagePatterns package in the .net client Library. It give us a nice wrapper to the boilerplate message de-queuing code. More info is here.

        mSubscription = new Subscription(Model, QueueName, autoAck);
          .....
          BasicDeliverEventArgs e = mSubscription.Next();
          .....
          mSubscription.Ack(e);

Now we need to add the code for our Producer Form

       public string HOST_NAME = "localhost";
        public string EXCHANGE_NAME = "logs";

        private Producer producer;

        //delegate to show messages on the UI thread
        private delegate void showMessageDelegate(string message);

        public PubSub_Producer()
        {
            InitializeComponent();

            //Declare the producer
            producer = new Producer(HOST_NAME, EXCHANGE_NAME, ExchangeType.Fanout);

            //connect to RabbitMQ
            if(!producer.ConnectToRabbitMQ())
            {
                //Show a basic error if we fail
                MessageBox.Show("Could not connect to Broker");
            }

        }

        private int count = 0;
        private void button1_Click(object sender, EventArgs e)
        {
            string message = String.Format("{0} - {1}", count++, textBox1.Text);
            producer.SendMessage(System.Text.Encoding.UTF8.GetBytes(message));
        }

        private void button2_Click(object sender, EventArgs e)
        {
            //Open a new Consumer Form
            PubSub_Consumer consumer = new PubSub_Consumer();
            consumer.Show();
        }

This should be fairly self explanatory. The producer.ConnectToRabbitMQ() call is handled in the base IConnectToRabbitMQ class. We’ve added little error handling code just in case the broker is unavailable(if it is run rabbitmq-server -detached from the command line .) There’s also a method to handle clicks on the “Start New Consumer” Button which spawns a new Consumer Form.

Then we have our Consumer Form.

public partial class PubSub_Consumer : Form
    {
        public string HOST_NAME = "localhost";
        public string EXCHANGE_NAME = "logs";

        private Consumer consumer;
        public PubSub_Consumer()
        {
            InitializeComponent();
            //create the consumer
            consumer = new Consumer(HOST_NAME, EXCHANGE_NAME, ExchangeType.Fanout);

            //connect to RabbitMQ
            if (!consumer.ConnectToRabbitMQ())
            {
                //Show a basic error if we fail
                MessageBox.Show("Could not connect to Broker");
            }

            //Register for message event
            consumer.onMessageReceived += handleMessage;

            //Start consuming
            consumer.StartConsuming();
        }

        //delegate to post to UI thread
        private delegate void showMessageDelegate(string message);

        //Callback for message receive
        public void handleMessage(byte[] message)
        {
            showMessageDelegate s = new showMessageDelegate(richTextBox1.AppendText);
            this.Invoke(s, System.Text.Encoding.UTF8.GetString(message) + Environment.NewLine);
        }
    }

This is exactly the same as previous Consumer examples with the additional call to the base class.

Now we can run the project after making sure the correct Form is opened on starup

   [STAThread]
        static void Main()
        {
            Application.EnableVisualStyles();
            Application.SetCompatibleTextRenderingDefault(false);
            Application.Run(new PubSub_Producer());
        }

Click the “Start New Consumer” Button a couple of time to get a few consumers running, then put  your message in the “Producer Input”  TextBox and hit send. You should see the message appear in all the Consumer output windows. Good stuff 🙂

Summary

What we have done here is create a Fanout Exchange named “logs”,  we’ve created some Consumers(three in my example above) each with their own unique temporary queue bound to the exchange. We have then published a message to the exchange using our Producer, the exchange then routes the message to all bound queues which in turn delivers it to the Consumers. Download the Source

Posted in: .net, RabbitMQ