Getting Started With RabbitMQ in .net

Posted on May 8, 2011

14


I’ve been using RabbitMQ to deliver live prices to my VisualSpreads trading app. The getting started section of the RabbitMQ website is actually very good, it explains the concepts in a very easy to understand format. The only downside for me(being a .net coder) was the lack of .net code in the examples, I thought it would be probably added at a later date but I just checked back and it still doesn’t(as of now) exist.  I’m planing to do a series of articles that mirrors the existing examples but in c# .net and this is the first. I’d recommended reading each article on the RabbitMQ website before reading my .net version as I’m only adding the missing .net code. Download the source.

1. Hello World

The original article (in Java) is here: http://www.rabbitmq.com/tutorials/tutorial-one-java.html I’ve taken a slightly different approach with these examples, in the originals a console app was used to run the code. I’ve decided to use a Windows Form application for no other reason than I prefer using a GUI to the command line(I’m sure I’m not the only windows dev like this :) ). This also allows me to show you a few techniques required when running a RabbitMQ consumer in a client application(such as delegates). So first up, lets create a new form application. And then drop on a TextBox for input, RichTextBox for output and a button to send messages  (labels are nice but optional) Next we need to write our producer. Add a new class with the name Producer. We want to initialise our queue and connection to the RabbitMQ broker in the constructor. We’ll pass in which host we’ll be using and the name of the queue. We will also define one method from where we’ll send a message, this is what we’ll end up with:

public class Producer : IDisposable
    {

        protected IModel Model;
        protected IConnection Connection;
        protected string QueueName;

        public Producer(string hostName, string queueName)
        {
            QueueName = queueName;
            var connectionFactory = new ConnectionFactory();
            connectionFactory.HostName = hostName;
            Connection = connectionFactory.CreateConnection();
            Model = Connection.CreateModel();
            Model.QueueDeclare(QueueName, false, false, false, null);
        }

        public void SendMessage(byte[] message)
        {
            IBasicProperties basicProperties = Model.CreateBasicProperties();
            Model.BasicPublish("", QueueName, basicProperties , message);
        }
        public void Dispose()
        {
            if (Connection != null)
                Connection.Close();
            if (Model != null)
                Model.Abort();
        }
    }

In comparison to the Java example we can see one main difference, in the .net client library we have a IModel interface instead of a concrete Channel in Java. We store the Model, Connection and QueueName in fields so we can use them in the SendMessage() and Dispose() methods. Next up is the consumer. Add a new class with the name(surprise surprise) Consumer.

public class Consumer
    {
        protected IModel Model;
        protected IConnection Connection;
        protected string QueueName;

        protected bool isConsuming;

        // used to pass messages back to UI for processing
        public delegate void onReceiveMessage(byte[] message);
        public event onReceiveMessage onMessageReceived;

        public Consumer(string hostName, string queueName)
        {
            QueueName = queueName;
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.HostName = hostName;
            Connection = connectionFactory.CreateConnection();
            Model = Connection.CreateModel();
            Model.QueueDeclare(QueueName, false, false, false, null);
        }

        //internal delegate to run the queue consumer on a seperate thread
        private delegate void ConsumeDelegate();

        public void StartConsuming()
        {
            isConsuming = true;
            ConsumeDelegate c = new ConsumeDelegate(Consume);
            c.BeginInvoke(null, null);
        }

        public void Consume()
        {
            QueueingBasicConsumer consumer = new QueueingBasicConsumer(Model);
            String consumerTag = Model.BasicConsume(QueueName, false, consumer);
            while (isConsuming)
            {
                try
                {
                    RabbitMQ.Client.Events.BasicDeliverEventArgs e = (RabbitMQ.Client.Events.BasicDeliverEventArgs)consumer.Queue.Dequeue();
                    IBasicProperties props = e.BasicProperties;
                    byte[] body = e.Body;
                    // ... process the message
                    onMessageReceived(body);
                    Model.BasicAck(e.DeliveryTag, false);
                }
                catch (OperationInterruptedException ex)
                {
                    // The consumer was removed, either through
                    // channel or connection closure, or through the
                    // action of IModel.BasicCancel().
                    break;
                }
            }
        }

        public void Dispose()
        {
            isConsuming = false;
            if (Connection != null)
                Connection.Close();
            if (Model != null)
                Model.Abort();
        }
    }

The main similarity with the Producer is the constructor, in fact it’s exactly the same(I’ll address this later). After that things are a lot more interesting, first we have:

        // used to pass messages back to UI for processing
        public delegate void onReceiveMessage(byte[] message);
        public event onReceiveMessage onMessageReceived;

We use this delegate/event to pass messages back to the UI. This is required as the message listening code will be running on a separate thread to the Form UI (calling) code. Next we have

        //internal delegate to run the consuming queue on a seperate thread
        private delegate void ConsumeDelegate();

        public void StartConsuming()
        {
            isConsuming = true;
            ConsumeDelegate c = new ConsumeDelegate(Consume);
            c.BeginInvoke(null, null);
        }

This is a very neat way to run a method on a new thread. In this example we invoke the Consume method using ConsumeDelegate. This is required as the Consumemethod contains a blocking while() loop. If we ran this on the same thread as the UI, the UI would become unresponsive and hang the application. We could also pass parameters to the method using a delegate/method with a different signature e.g.

        //delegate with a single string parameter
        private delegate void ConsumeDelegate(string someParameter);

        public void Start()
        {
            ConsumeDelegate c = new ConsumeDelegate(Consume);
            string someParameterValue = "foo"
            //invoke with parameter
            c.BeginInvoke(someParameterValue, null, null);
        }
        //method with a single string paramter
        public void Consume(string someParameter)
        {
         ......
        }

Then we have the consuming code

public void Consume()
        {
            QueueingBasicConsumer consumer = new QueueingBasicConsumer(Model);
            String consumerTag = Model.BasicConsume(QueueName, false, consumer);
            while (isConsuming)
            {
                try
                {
                    RabbitMQ.Client.Events.BasicDeliverEventArgs e = (RabbitMQ.Client.Events.BasicDeliverEventArgs)consumer.Queue.Dequeue();
                    IBasicProperties props = e.BasicProperties;
                    byte[] body = e.Body;
                    // ... process the message
                    onMessageReceived(body);
                    Model.BasicAck(e.DeliveryTag, false);
                }
                catch (OperationInterruptedException ex)
                {
                    // The consumer was removed, either through
                    // channel or connection closure, or through the
                    // action of IModel.BasicCancel().
                    break;
                }
            }
        }

First we declare a new QueueingBasicConsumer and let the Model know we would like to consume from the queue name QueueName. Within the the while(isConsuming) block we call consumer.Queue.Dequeue() this will block the loop until the next message arrives. When it does, we can grab the body of bytes and pass this to the onMessageReceived event, this in turn will pass the byte[] message to all methods listening for this event. A method can start listening using the following code:

      //for a method declared as so
       public void handleMessage(byte[] message)
        {
            ......
        }
        .....
       //subscribe like this
       //create the consumer
       consumer = new Consumer(HOST_NAME, QUEUE_NAME);

       //listen for messages
       consumer.onMessageReceived += handleMessage;
       ....

The final part of the puzzle glues all of this together, it’s the UI Form handling code and it goes like this:

 public partial class Form1 : Form
    {
        public string HOST_NAME = "localhost";
        public string QUEUE_NAME = "helloWorld";

        private Consumer consumer;
        private Producer producer;

        public Form1()
        {
            InitializeComponent();

            //create the producer
            producer = new Producer(HOST_NAME, QUEUE_NAME);

            //create the consumer
            consumer = new Consumer(HOST_NAME, QUEUE_NAME);

            //listen for message events
            consumer.onMessageReceived += handleMessage;

            //start consuming
            consumer.StartConsuming();
        }

        //Send the message on click
        private void button1_Click(object sender, EventArgs e)
        {
            producer.SendMessage(System.Text.Encoding.UTF8.GetBytes(textBox1.Text));
        }

        //delegate to post to UI thread
        private delegate void showMessageDelegate(string message);

        //Callback for message receive
        public void handleMessage(byte[] message)
        {
            showMessageDelegate s = new showMessageDelegate(richTextBox1.AppendText);

            this.Invoke(s,  System.Text.Encoding.UTF8.GetString(message) + Environment.NewLine);
        }
    }

Whats happening here should all be pretty clear. We create the Consumer and Producer instances and start listening for messages from the consumer. When the button is clicked we take the input from the TextBox and send it using the producer, the consumer receives the message and passes it to our handleMessage through the onMessageReceived event. We then use a delegate to ensure we update the output RichTextBox on the containing Form thread.

Running the application

I’m assuming you’ve installed the RabbitMQ server already and setup your environment variables. First we need to get the RabbitMQ broker running open a command prompt and enter the following:

rabbitmq-server -detached

Now you can run the Windows application so hit Debug in Visual Studio. Enter some text into the TextBox and hit the Button. The message will be displyed in the output RichTextBox. Nice :) Download Source Project

About these ads
Posted in: .net, RabbitMQ