Getting Started With RabbitMQ in .net – Part 2

Posted on May 10, 2011

5


In the previous example I built a Windows Form application to demonstrate how to implement the first part of the RabbitMQ getting started example in .net.  This time I’ll be implementing the second part. Download Source Project

2.) Work Queues

The original article (in Java) is here: http://www.rabbitmq.com/tutorials/tutorial-two-java.html

First up lets add a new Form to our solution, lets call it 2-Work-Queues.cs

Next we need to amend our Producer. Open the Producer.cs file from example 1 .

We need to adjust our constructor to make sure our Queue is set to durable

      bool durable = true;
      Model.QueueDeclare(QueueName, durable, false, false, null);

Then in our SendMessage method we need to ensure we publish a persistent message.

 IBasicProperties basicProperties = Model.CreateBasicProperties();
 basicProperties.SetPersistent(true);
 Model.BasicPublish("", QueueName, basicProperties, message);

This is our complete Producer class

    public class Producer : IDisposable
    {

        protected IModel Model;
        protected IConnection Connection;
        protected string QueueName;

        public Producer(string hostName, string queueName)
        {
            QueueName = queueName;
            var connectionFactory = new ConnectionFactory();
            connectionFactory.HostName = hostName;
            Connection = connectionFactory.CreateConnection();
            Model = Connection.CreateModel();
            bool durable = true;
            Model.QueueDeclare(QueueName, durable, false, false, null);
        }

        public void SendMessage(byte[] message)
        {
            IBasicProperties basicProperties = Model.CreateBasicProperties();
            basicProperties.SetPersistent(true);
            Model.BasicPublish("", QueueName, basicProperties, message);
        }

        public void Dispose()
        {
            if (Connection != null)
                Connection.Close();
            if (Model != null)
                Model.Abort();
        }
    }

OK, now onto our Consumer.  Again, open the Consumer.cs class from the original example and adjust the constructor so the queue is durable.


      bool durable = true;
      Model.QueueDeclare(QueueName, durable, false, false, null);

And then turn off auto-ack to we have to manually ack each message (I’d already done this in the previous example, but shhh!)

.......
bool autoAck = false;
String consumerTag = Model.BasicConsume(QueueName, autoAck, consumer);
.......
Model.BasicAck(e.DeliveryTag, false);

And then in our constructor setup fair delivery by setting the prefetchCount =1 .  This ensures we only deliver one message at a time to a consumer. (Note the slight difference to the Java implementation, explanation here). What I’m doing here is BasicQos(0= “Dont send me a new message untill I’ve finshed”,  1= “Send me one message at a time”, false =”Apply to this Model only “)

   Model.BasicQos(0,1,false);

This is the complete Consumer

public class Consumer
    {
        protected IModel Model;
        protected IConnection Connection;
        protected string QueueName;

        protected bool isConsuming;

        // used to pass messages back to UI for processing
        public delegate void onReceiveMessage(byte[] message);
        public event onReceiveMessage onMessageReceived;

        public Consumer(string hostName, string queueName)
        {
            QueueName = queueName;
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.HostName = hostName;
            Connection = connectionFactory.CreateConnection();
            Model = Connection.CreateModel();
            Model.BasicQos(0,1,false);
            bool durable = true;
            Model.QueueDeclare(QueueName, durable, false, false, null);
        }

        //internal delegate to run the consuming queue on a seperate thread
        private delegate void ConsumeDelegate();

        public void StartConsuming()
        {
            isConsuming = true;
            ConsumeDelegate c = new ConsumeDelegate(Consume);
            c.BeginInvoke(null, null);
        }

        public void Consume()
        {
            QueueingBasicConsumer consumer = new QueueingBasicConsumer(Model);
            bool autoAck = false;
            String consumerTag = Model.BasicConsume(QueueName, autoAck, consumer);
            while (isConsuming)
            {
                try
                {
                    RabbitMQ.Client.Events.BasicDeliverEventArgs e = (RabbitMQ.Client.Events.BasicDeliverEventArgs)consumer.Queue.Dequeue();
                    IBasicProperties props = e.BasicProperties;
                    byte[] body = e.Body;
                    // ... process the message
                    onMessageReceived(body);
                    Model.BasicAck(e.DeliveryTag, false);
                }
                catch (OperationInterruptedException ex)
                {
                    // The consumer was removed, either through
                    // channel or connection closure, or through the
                    // action of IModel.BasicCancel().
                    break;
                }
            }

        }

        public void Dispose()
        {
            isConsuming = false;
            if (Connection != null)
                Connection.Close();
            if (Model != null)
                Model.Abort();
        }
    }

Now we need to build our new Form. We’re going to be running a second consumer so we need to have somewhere to show the output from this. Copy the controls from the original Form and Add a second RichTextBox.

Right click and “View Code”. We need to use a new Queue so change the QUEUE_NAME constant. We also need to declare 2 Consumers.

    public string HOST_NAME = "localhost";
        public string QUEUE_NAME = "workQueues";

        private Consumer consumer;
        private Consumer consumer2;
        private Producer producer;

And initialise our second Consumer which will pass a message back to a new event hander handleMessage2 that update the second RichTextBox.

            //create the second consumer
            consumer2 = new Consumer(HOST_NAME, QUEUE_NAME);

            //this from will handle messages
            consumer2.onMessageReceived += handleMessage2;

            //start consuming
            consumer2.StartConsuming();
            .........
        public void handleMessage2(byte[] message)
        {
            showMessageDelegate s = new showMessageDelegate(richTextBox2.AppendText);

            this.Invoke(s, System.Text.Encoding.UTF8.GetString(message) + Environment.NewLine);
        }

We also want to track the order the messages are delivered so we will create a new count variable and pre-pend this when we send a message.

  private int count = 0;
        private void button1_Click_1(object sender, EventArgs e)
        {
            string message = String.Format("{0} - {1}", count++, textBox1.Text);
            producer.SendMessage(System.Text.Encoding.UTF8.GetBytes(message));
        }

Here is our full Form code

public partial class _2_Work_Queues : Form
    {

        public string HOST_NAME = "localhost";
        public string QUEUE_NAME = "workQueues";

        private Consumer consumer;
        private Consumer consumer2;
        private Producer producer;

        public _2_Work_Queues()
        {
            InitializeComponent();

            //create the producer
            producer = new Producer(HOST_NAME, QUEUE_NAME);

            //**CONSUMER 1 **

            //create the consumer
            consumer = new Consumer(HOST_NAME, QUEUE_NAME);

            //this from will handle messages
            consumer.onMessageReceived += handleMessage;

            //start consuming
            consumer.StartConsuming();

            //**CONSUMER 2 **

            //create the second consumer
            consumer2 = new Consumer(HOST_NAME, QUEUE_NAME);

            //this from will handle messages
            consumer2.onMessageReceived += handleMessage2;

            //start consuming
            consumer2.StartConsuming();
        }

        private int count = 0;
        private void button1_Click_1(object sender, EventArgs e)
        {
            string message = String.Format("{0} - {1}", count++, textBox1.Text);
            producer.SendMessage(System.Text.Encoding.UTF8.GetBytes(message));
        }

        //delegate to post to UI thread
        private delegate void showMessageDelegate(string message);

        //Callback for message receive
        public void handleMessage(byte[] message)
        {
            showMessageDelegate s = new showMessageDelegate(richTextBox1.AppendText);

            this.Invoke(s,  System.Text.Encoding.UTF8.GetString(message) + Environment.NewLine);
        }

        //Callback for message receive
        public void handleMessage2(byte[] message)
        {
            showMessageDelegate s = new showMessageDelegate(richTextBox2.AppendText);

            this.Invoke(s, System.Text.Encoding.UTF8.GetString(message) + Environment.NewLine);
        }

    }

Now set this Form as the startup form open Program.cs and adjust this line to open the new Form on startup

 Application.Run(new Form1()); //amend this to become

 Application.Run(new _2_Work_Queues()); //(or whatever your new Form is called)
 

Now hit debug (after making sure RabbitMQ broker is running). Add a message in the TextBox and hit send a few times, this is what you should see. Notice the work is evenly distributed.

To make it a little more realistic, we’ll add a pause to handleMessage.

        public void handleMessage(byte[] message)
        {
            Thread.Sleep(1000);
            ......
        }

Hit send a few more time, now you should see something like this. Consumer 2 has handled more messages as Consumer 1 is busy.

Next time we’ll look at Publish/Subscribe. Download Source Project

About these ads
Posted in: .net, RabbitMQ