Writing primarily back-end services at work, the generic BlockingCollection<T> is probably one of my favorite features of .Net 4.0.
It's called a "BlockingCollection" because the method used to retrieve an item from the collection, Take();, will block the current thread if the collection is empty, and will return the instant there is an item in the collection. Adding and removing items to the collection is completely thread safe, and is quite useful when working in "store and forward" kind of situations.
The BlockingCollection<T> is a wrapper for an instance of the IProducerConsumerCollection<T>, which is what it uses internally for data storage. Another cool thing about the BlockingCollection<T> is that you can make behave as a FIFO or LIFO collection by specifying either a ConcurrentQueue<T> or ConcurrentStack<T> in the constructor overloads. The default behavior is First in, First Out (FIFO).
You can use any collection that implements the IProducerConsumerCollection<T>, so if you feel the need to re-engineer the wheel, you can do that too :P. If you have a situation where you need to prioritize the order items are removed from the collection this could be useful, but I'm getting way ahead of myself.
Store and Forward Architecture
Lets talk a little about store and forward architecture. In my experience if you need to do some processing as a reaction to an event of some kind, store and forward architecture is how you make your solution as fast and scalable as possible. Lets say you need to email a client to confirm a successful purchase, and your payment provider notifies you of a successful purchase by an event:
private static void SuccessfulPaymentProcessed(SuccessfulPaymentProcessedEventArgs eventArgs)
{
}
You could write the code to send the email right there in the SuccessfulPaymentProcessed() event handler, but there are two main problems with that approach, both of which are caused by an excessive amount of events being fired in a short period of time.
Lets say that EventArgs contains a property for UserName and PurchaseAmount. In order to send a notification email to your user you need to do a few things:
- Look up the users email address by username
- Fetch an email template
- Merge UserName & PurchaseAmount into the email template
- Send the email
Now lets say the above takes a total of 100ms to complete, no sweat right? Wrong! What happens if you get 10000 purchases in a short amount of time? Your notification emails could take up to 16 minutes to get sent, which is a crappy user experience.
The second reason writing the logic to send the notification emails straight in the SuccessfulPaymentProcessed() event handler is a bad idea is that if you get millions upon millions of events your server could run out of resources (RAM) to store it all. All the event args are and events are stored in memory and your service is processing them one by one and much slower that the events are coming in, so the backlog could get too much to handle and it could fall over and die. At this point you may even loose the opportunity to send the backlog of notification emails all together because the information was never persisted in anything other than RAM, the server required a reboot to become responsive again which means the RAM got cleared.
So how do we make it better? Store and Forward!
The idea is simple enough, you don't want to tie up the execution of the SuccessfulPaymentProcessed() event handler any longer than you need to, so store the event information in some mechanism, while having a separate thread or threads process them. This also gives you the opportunity to process more than one event at a time making the delivery time of your notification emails a lot faster, which gives your users a great experience.
Using the BlockingCollection<T>
In the example solution I've supplied below, you will find a project for a console application.
In the Main() method we new up the _ProcessingQueue with a new instance of BlockingCollection<SuccessfulPaymentProcessedEventArgs> since SuccessfulPaymentProcessedEventArgs is what we want to process. We use the default constructor because FIFO is perfect for what we are trying to achieve, and we don't really care about limiting the amount of items the collection can have.
After that we spawn a thread that will contain an instance of our PaymentProcessor. This is a mock object that will fire off an SuccessfulPaymentProcessed event at random intervals.
Finally we have a for loop that will spin up some processing threads. I like to make the amount of processing threads configurable, hence the for loop. Now the amount of processing threads you need will vary based on how much of the server's resources you want to or can use. If you don't want the server's CPU to run at 100%, then you need to have less processing threads than the amount of CPU cores the machine has. If you want to use close to 100%, you can set the amount of processing threads to 1.5 times the amount of cores the machine has. The reason its 1.5 times the amount of CPU cores and not a processing thread per CPU code is because the OS will never give all the CPU cycles available to your processing thread. One thing to keep in mind is that too many processing threads will degrade performance. So play around with the number, keep an eye on the machines performance counters, and see what works for you.
static void Main(string[] args)
{
// init our BlockingCollection
_ProcessingQueue = new BlockingCollection<SuccessfulPaymentProcessedEventArgs>();
// Start up our mock payment processor notifier thingy
Thread paymentProcessorThread = new Thread(StartPaymentProcessor)
{
IsBackground = true,
};
paymentProcessorThread.Start();
// Spin up some processing Threads
for (int i = 0; i < ServiceConfig.ProcessingThreads; i++)
{
Thread processingThread = new Thread(ProcessEvents)
{
IsBackground = true,
};
processingThread.Start();
}
// block the main thread so we don't kill the program
Console.ReadLine();
}
The SuccessfulPaymentProcessed event handler is ridiculously simple. The only real line of code there is line 9. All we do is add the eventArgs passed to us by our event handler and put it in our "Processing Queue". As I said before, one of the processing threads that are blocked on a _ProcessingQueue.Take(); call will immediately return with the next available instance of SuccessfulPaymentProcessedEventArgs, if all of the processing threads are not busy that is.
private static void SuccessfulPaymentProcessed(SuccessfulPaymentProcessedEventArgs eventArgs)
{
Console.WriteLine("Successful Payment Processed: [EventId: {0}] [UserName: {1}] [PurchaseAmount: {2}]",
eventArgs.EventId,
eventArgs.UserName,
eventArgs.PurchaseAmount);
// Add the event's information to our BlockingCollection
_ProcessingQueue.Add(eventArgs);
}
In the ProcessEvents() method, which is what our processing threads are executing, you will notice that I've got a seemingly infinite loop there. You may be thinking "what the fuck is this guy smoking!?" It may not be very apparent, but this isn't an infinite loop, because this method was spawned inside of a background thread. Have a look at the thread declaration again:
Thread processingThread = new Thread(ProcessEvents)
{
IsBackground = true,
};
What this means is that when the OS sends the terminate signal to the application, this thread will be aborted. The OS will NOT wait for it to finish what its doing, which is good because it will never finish :P
After that little WTF, its quite simple again, we have a _ProcessingQueue.Take(); call which will wait until there is something in the collection before returning, and a Thread.Sleep(5000); call to pretend to do some processing. In our example of sending successful payment processed notification emails, this is where you should look up the user's email address, fetch the email template, merge values into the email template and finally send the email.
One very important point to make is that the _ProcessingQueue.Take(); call removes or takes the item from the collection.
private static void ProcessEvents()
{
while (true)
{
// This call blocks this thread until there is something in the collection to take
var eventArgs = _ProcessingQueue.Take();
// Pretend to do some kind of processing
Thread.Sleep(5000);
Console.WriteLine("Processing Thread #{0}: Processed event id {1} successfully",
workerThreadId,
eventArgs.EventId);
}
}
And that's that! I've tried to keep the example solution as bare bones as possible. If anything is not quite clear, drop a comment below and I'll do my best to clear it up.
Notes
The purpose of this article is to explain the usage of the BlockingCollection<T>, there is still the possibility of a tech guy tripping over a server's power cable and loosing the unprocessed event's in RAM stored in the BlockingCollection<T> at the time.
In a real world service I would persist the event information in either a SQL server database, or even an MSMQ (Microsoft Message Queue), in the SuccessfulPaymentProcessed() event handler. I would then have a single thread to interrogate that data source for "unprocessed" events and fill the BlockingCollection<T> with data there. The processing part of this article will remain the same.
This lets your service pick up where it left off if interrupted at any point.
MSDN Documentation:
Example Solution