In the previous example I built a Windows Form application to demonstrate how to implement the first part of the RabbitMQ getting started example in .net. This time I’ll be implementing the second part. Download Source Project
2.) Work Queues
The original article (in Java) is here: http://www.rabbitmq.com/tutorials/tutorial-two-java.html
First up lets add a new Form to our solution, lets call it 2-Work-Queues.cs
Next we need to amend our Producer. Open the Producer.cs file from example 1 .
We need to adjust our constructor to make sure our Queue is set to durable
bool durable = true;
Model.QueueDeclare(QueueName, durable, false, false, null);
Then in our SendMessage method we need to ensure we publish a persistent message.
IBasicProperties basicProperties = Model.CreateBasicProperties();
basicProperties.SetPersistent(true);
Model.BasicPublish("", QueueName, basicProperties, message);
This is our complete Producer class
public class Producer : IDisposable
{
protected IModel Model;
protected IConnection Connection;
protected string QueueName;
public Producer(string hostName, string queueName)
{
QueueName = queueName;
var connectionFactory = new ConnectionFactory();
connectionFactory.HostName = hostName;
Connection = connectionFactory.CreateConnection();
Model = Connection.CreateModel();
bool durable = true;
Model.QueueDeclare(QueueName, durable, false, false, null);
}
public void SendMessage(byte[] message)
{
IBasicProperties basicProperties = Model.CreateBasicProperties();
basicProperties.SetPersistent(true);
Model.BasicPublish("", QueueName, basicProperties, message);
}
public void Dispose()
{
if (Connection != null)
Connection.Close();
if (Model != null)
Model.Abort();
}
}
OK, now onto our Consumer. Again, open the Consumer.cs class from the original example and adjust the constructor so the queue is durable.
bool durable = true;
Model.QueueDeclare(QueueName, durable, false, false, null);
And then turn off auto-ack to we have to manually ack each message (I’d already done this in the previous example, but shhh!)
....... bool autoAck = false; String consumerTag = Model.BasicConsume(QueueName, autoAck, consumer); ....... Model.BasicAck(e.DeliveryTag, false);
And then in our constructor setup fair delivery by setting the prefetchCount =1 . This ensures we only deliver one message at a time to a consumer. (Note the slight difference to the Java implementation, explanation here). What I’m doing here is BasicQos(0= “Dont send me a new message untill I’ve finshed”, 1= “Send me one message at a time”, false =”Apply to this Model only “)
Model.BasicQos(0,1,false);
This is the complete Consumer
public class Consumer
{
protected IModel Model;
protected IConnection Connection;
protected string QueueName;
protected bool isConsuming;
// used to pass messages back to UI for processing
public delegate void onReceiveMessage(byte[] message);
public event onReceiveMessage onMessageReceived;
public Consumer(string hostName, string queueName)
{
QueueName = queueName;
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.HostName = hostName;
Connection = connectionFactory.CreateConnection();
Model = Connection.CreateModel();
Model.BasicQos(0,1,false);
bool durable = true;
Model.QueueDeclare(QueueName, durable, false, false, null);
}
//internal delegate to run the consuming queue on a seperate thread
private delegate void ConsumeDelegate();
public void StartConsuming()
{
isConsuming = true;
ConsumeDelegate c = new ConsumeDelegate(Consume);
c.BeginInvoke(null, null);
}
public void Consume()
{
QueueingBasicConsumer consumer = new QueueingBasicConsumer(Model);
bool autoAck = false;
String consumerTag = Model.BasicConsume(QueueName, autoAck, consumer);
while (isConsuming)
{
try
{
RabbitMQ.Client.Events.BasicDeliverEventArgs e = (RabbitMQ.Client.Events.BasicDeliverEventArgs)consumer.Queue.Dequeue();
IBasicProperties props = e.BasicProperties;
byte[] body = e.Body;
// ... process the message
onMessageReceived(body);
Model.BasicAck(e.DeliveryTag, false);
}
catch (OperationInterruptedException ex)
{
// The consumer was removed, either through
// channel or connection closure, or through the
// action of IModel.BasicCancel().
break;
}
}
}
public void Dispose()
{
isConsuming = false;
if (Connection != null)
Connection.Close();
if (Model != null)
Model.Abort();
}
}
Now we need to build our new Form. We’re going to be running a second consumer so we need to have somewhere to show the output from this. Copy the controls from the original Form and Add a second RichTextBox. 
Right click and “View Code”. We need to use a new Queue so change the QUEUE_NAME constant. We also need to declare 2 Consumers.
public string HOST_NAME = "localhost";
public string QUEUE_NAME = "workQueues";
private Consumer consumer;
private Consumer consumer2;
private Producer producer;
And initialise our second Consumer which will pass a message back to a new event hander handleMessage2 that update the second RichTextBox.
//create the second consumer
consumer2 = new Consumer(HOST_NAME, QUEUE_NAME);
//this from will handle messages
consumer2.onMessageReceived += handleMessage2;
//start consuming
consumer2.StartConsuming();
.........
public void handleMessage2(byte[] message)
{
showMessageDelegate s = new showMessageDelegate(richTextBox2.AppendText);
this.Invoke(s, System.Text.Encoding.UTF8.GetString(message) + Environment.NewLine);
}
We also want to track the order the messages are delivered so we will create a new count variable and pre-pend this when we send a message.
private int count = 0;
private void button1_Click_1(object sender, EventArgs e)
{
string message = String.Format("{0} - {1}", count++, textBox1.Text);
producer.SendMessage(System.Text.Encoding.UTF8.GetBytes(message));
}
Here is our full Form code
public partial class _2_Work_Queues : Form
{
public string HOST_NAME = "localhost";
public string QUEUE_NAME = "workQueues";
private Consumer consumer;
private Consumer consumer2;
private Producer producer;
public _2_Work_Queues()
{
InitializeComponent();
//create the producer
producer = new Producer(HOST_NAME, QUEUE_NAME);
//**CONSUMER 1 **
//create the consumer
consumer = new Consumer(HOST_NAME, QUEUE_NAME);
//this from will handle messages
consumer.onMessageReceived += handleMessage;
//start consuming
consumer.StartConsuming();
//**CONSUMER 2 **
//create the second consumer
consumer2 = new Consumer(HOST_NAME, QUEUE_NAME);
//this from will handle messages
consumer2.onMessageReceived += handleMessage2;
//start consuming
consumer2.StartConsuming();
}
private int count = 0;
private void button1_Click_1(object sender, EventArgs e)
{
string message = String.Format("{0} - {1}", count++, textBox1.Text);
producer.SendMessage(System.Text.Encoding.UTF8.GetBytes(message));
}
//delegate to post to UI thread
private delegate void showMessageDelegate(string message);
//Callback for message receive
public void handleMessage(byte[] message)
{
showMessageDelegate s = new showMessageDelegate(richTextBox1.AppendText);
this.Invoke(s, System.Text.Encoding.UTF8.GetString(message) + Environment.NewLine);
}
//Callback for message receive
public void handleMessage2(byte[] message)
{
showMessageDelegate s = new showMessageDelegate(richTextBox2.AppendText);
this.Invoke(s, System.Text.Encoding.UTF8.GetString(message) + Environment.NewLine);
}
}
Now set this Form as the startup form open Program.cs and adjust this line to open the new Form on startup
Application.Run(new Form1()); //amend this to become Application.Run(new _2_Work_Queues()); //(or whatever your new Form is called)
Now hit debug (after making sure RabbitMQ broker is running). Add a message in the TextBox and hit send a few times, this is what you should see. Notice the work is evenly distributed.

To make it a little more realistic, we’ll add a pause to handleMessage.
public void handleMessage(byte[] message)
{
Thread.Sleep(1000);
......
}
Hit send a few more time, now you should see something like this. Consumer 2 has handled more messages as Consumer 1 is busy.

Next time we’ll look at Publish/Subscribe. Download Source Project

Max
May 29, 2011
Great tutorial!
Really deserves a spot on the official site, next to the Java and Python samples. Keep it up!
simonwdixon
May 29, 2011
Thanks for the kind words! more coming soon
orthodontist
September 13, 2012
whoah this weblog is wonderful i love studying your articles.
Keep up the good work! You already know, a lot of people
are hunting around for this information, you can aid them greatly.
Anand
February 1, 2013
I had a few questions:
1) Why are you calling Abort() instead of Close() on Model? in the Dispose() implementations?
2) In general is calling Close() on Connection (and on Model) preferred over calling Dispose()? If so why?
3) Why is the connection closed before aborting the model? Shouldn’t the order be reversed?
4) Its a bit disappointing that the Rabbit-MQ client for .NET does not natively support an asynchronous callback mechanism for consuming messages like the client for Python does for instance (as illustrated in the tutorials on the Rabbit MQ website). Do you know of any other .NET client library which supports this? Any idea if the Rabbit MQ team is planning to add this support any time soon?