in

 

PhatBoyG

Coding from the Hood since '87

June 2008 - Posts

  • MassTransit 0.2 Now Available

    We’ve dropped a new release of MassTransit today, version 0.2 is now available on the main page. There are several new features included in this release. It was great to get some feedback from people who have tried MassTransit, along with the evolution from discussions with Greg Young, Udi Dahan, Ayende, and others at ALT.NET Seattle.

    A quick summary of the changes in this release:

    Extensible Message Dispatcher

    To make it easy to add new messaging patterns to the service bus, MassTransit has an entirely rewritten message dispatcher. The new code is structured in a producer/consumer style that lends itself to easy extensibility, allowing new features such as batch messaging and correlated messages without heavy lifting.

    Component-Based Message Handling

    In the previous version, handling messages required an object to subscribe to messages types passing methods to handle the messages. Now, a class can implement interfaces to support message consumption and the class itself can be added to the service bus. The service bus will then create objects to handle each message, removing the need to use the same instance for each message received. A class can handle multiple messages types, and can also indicate whether to receive all, selected or correlated messages by implementing the Consumes.All, Consumes.Selected, or Consumes.For interface.

    Batch Messaging

    The new message dispatcher includes support for message batches. Instead of having to correlate a batch of messages at the application layer, an object or component can consume a batch instead of each individual message. Details of how to use batch messaging are on the wiki.

    Container Integration

    We finally bit the bullet and started using a container. A new assembly called MassTransit.WindsorIntegration adds a default container derived from WindsorContainer that adds facilities to create ServiceBus instances. A new custom syntax was created to make it easy to configure multiple ServiceBus instances. All of the samples have been updated to use the new container to help understand the integration points.

    Plain Old C# Object Message Objects (POCOMOs Anyone?)

    The need to have all messages implement IMessage is gone, reducing the footprint of MassTransit in your application code. There are some new interfaces to handle things like correlated messages and batch messages, but those are only needed to use the built-in support for those message patterns.

    Better Thread Management

    To allow for more control over resources, a new thread manager has been added. While we haven’t exposed the thread configuration yet, a dedicated thread pool for asynchronous message dispatching should allow for more efficient message handling. We also took all the threading code from the endpoint and put it in the service bus, reducing the complexity required for new endpoints. In fact, the endpoint structure has also been redesigned to be more send/receive focused.

    Publish/Subscribe Focus

    The service bus now has a pure publish/subscribe architecture compared to the previous additional methods, such as Send() and Request(). For applications that need to send messages directly to endpoints, the endpoint now has a Send() method. The publishing of messages takes advantage of the same new subscription code, allowing all messages type information to be cached for better performance (avoiding the reflection penalty on each call to Publish). With the extensibility of the message dispatcher, it’s likely that remote endpoints may some day find there way into the dispatcher, resulting in a single dispatch engine for asynchronous publishing of messages as well.

    Request/Reply

    Requests are handled in an entirely new way, using a new fluent builder. The fluent builder allows the calling code to subscribe to any responses (directed via the Consumes.* interfaces, indicate whether an asynchronous callback should be allowed (for [WebMethod] style Begin/End usage, PageAsyncTask usage, or MonoRail asynchronous actions), and get a future object that can be used to complete the request upon receipt of a response. The responses are handled by the calling class itself, the future object is used to signal the operation complete which will release any waits or callbacks for the action.

    Distributed Subscription Cache

    A new distributed subscription cache (backed by memcached) is now available. This is mostly designed for high volume request/reply applications that need to add and remove a lot of correlated subscriptions and maintain a high level of performance. A load test will be added to the HeavyLoad sample soon, but it seems to hold up pretty well so far under regular testing.

    Dashboard

    Dru has been hard at working making an operations dashboard part of the core product. One of the big things about messaging systems is being able to assess the health of the endpoints at any time. The goal of the dashboard is to provide that single pane of glass to find out which endpoints are alive, what messages they are handling, and indicate any problems to the viewer. There are many plans for this, including the ability to remotely control the endpoints for dynamic adjustments to load handling and perhaps even remote service restarts.

    Deployment

    Dru has also been working on the deployment story, making it easier to deploy MassTransit into a production system. There is much love needed there, but I’m hoping to dig into it soon to see how things have gotten easier to manage.

    I'm sure there are many more features that have been added under the covers. The main thing is that with this release (0.2) we're pretty happy with the API experience. The consumer code is easy to understand and implement, particularly compared to the earlier version. It is unlikely that we'll do another major overhaul to the interface like we did with this version.

    So give it a shot, and blog about your experiences!

  • Enhancements to MassTransit (or Weekend of Coding)

    This past couple of weeks I've been putting some serious time into MassTransit. My primary goal is to improve the internal architecture and remove some of the MSMQ leaks into the infrastructure. Our original goal was to stick purely to MSMQ, however, as we got more into messaging systems we found that there are a lot of other transports with different advantages. For example, using ActiveMQ would make it easy to add integration with Java applications down the line. The problem at this point was all the code designed around MSMQ was making it difficult to support other transport types.

    About two weeks ago, I started working on a completely new method of dispatching messages within the service bus. My goal was to support a pure producer/consumer model using a publish/subscribe pattern. With this in mind, I built a new message dispatcher that allowed for a new way of specifying message subscriptions. Previously to this change, a service interested in messages would do the following:

    _bus.Subscribe<MyMessage>(MyHandlerMethod);
    public void MyHandlerMethod(IMessageContext<MyMessage> context) {}
    

    Because of this structure, there had to be an instance of the class in memory and it somehow needed to be started so that subscriptions could be added. The class also needed to be stopped so that it could remove any subscriptions from the service bus. Another goal was to be able to use an object builder to create objects as needed to handle messages. For example, we wanted to use Castle Windsor to dynamically build objects to handle messages and get all the injection benefits of the container.

    To support this new style, I added some new interfaces and made it possible to register either an object or a class with the service bus. As an actual example, compare the original subscription client code to the new version:

    Before the changes:

    public class SubscriptionClient : IHostedService
    {
    	public void Start()
    	{
    		_serviceBus.Subscribe<AddSubscription>(HandleAddSubscription);
    		_serviceBus.Subscribe<RemoveSubscription>(HandleRemoveSubscription);
    	}
    	public void Stop()
    	{
    		_serviceBus.Unsubscribe<AddSubscription>(HandleAddSubscription);
    		_serviceBus.Unsubscribe<RemoveSubscription>(HandleRemoveSubscription);
    	}
    	public void HandleAddSubscription(IMessageContext<AddSubscription> ctx)
    	{
    		_cache.Add(ctx.Message.Subscription);
    	}
    	public void HandleRemoveSubscription(IMessageContext<RemoveSubscription> ctx)
    	{
    		_cache.Add(ctx.Message.Subscription);
    	}
    }
    

    And now after the changes:

    public class SubscriptionClient : IHostedService, 
    	Consumes<AddSubscription>.All, 
    	Consumes<RemoveSubscription>.All
    {
    	public void Consume(AddSubscription message)
    	{
    		_cache.Add(message.Subscription);
    	}
    	public void Consume(RemoveSubscription message)
    	{
    		_cache.Remove(message.Subscription);
    	}
    	public void Start()
    	{
    		_serviceBus.Subscribe(this);
    	}
    	public void Stop()
    	{
    		_serviceBus.Unsubscribe(this);
    	}
    }
    

    The code just makes more sense and is easier to understand after the changes. In addition, you can also just call _bus.AddComponent<T>(); to register a type with the service bus and it will use the object builder to create an instance of the class to handle the message. If you're using a container like Windsor, you can specify the lifestyle of the object(s) there, either singleton, transient, or pooled -- depending upon your application requirements.

    Also notice that there are various types of consumers supported, indicated by the interface used in the consuming class. Consumes<T>.All will deliver any message of type T to the consumer. Consumes<T>.Selected adds an Accept(T message) method to the class to screen any messages before removing them from the endpoint (at least with MSMQ, likely not the case with ActiveMQ).

    The third option presently available is Consumes<T>.For<V> and adds support for a correlated consumer. In previous versions of MassTransit, Request/Reply was handled by using the transport message identifiers and setting a correlation identifier on the transport message. This leaked a lot of details into the service bus layer that were not pretty. Instead of using the transport correlation identifier for messages, we decided to add a new interface that messages can implement called CorrelatedBy<V>. This interface has a single method that returns the correlation identifier for the message -- and it is expected that the message body itself will contain the correlation identifier.

    So now a request/reply pattern would look something like this:

    class Controller : Consumes<Response>.For<Guid>
    {
    	public void Consume(Response message)
    	{}
    	public void Action()
    	{
    		_actionId = Guid.NewGuid();
    		_bus.Subscribe(this);
    		_bus.Publish(new Request(_actionId, someValue, someValue2);
    	}
    }
    

    When the object subscribes to the bus, the correlation identifier is used to filter incoming messages so that only correlated messages are delivered to the object. This is cleaner from a interface contract perspective since you can look at a service and see what messages are produced and ensure that your controller implements all of the expected responses.

    While working on these API changes, I also made a number of other changes including:

    • Messages no longer need to implement IMessage, plain old objects can be used
    • Removed all threading from the endpoint (asynchronous message dispatching is now handled by a thread manager in the service bus
    • Added a DispatchMode so that messages could be dispatched synchronously for unit tests

    I also wrote a new sample called HeavyLoad to benchmark the performance of the bus when using various transports. A variety of message per second tests are performed to see how well the system can be expected to perform based on the type of messaging being done. Early tests on my system (Windows 2003 server in VMware Fusion) show MSMQ performance to be between 950-1500 messages per second (for a 300 byte message, persistent) and around 500 messages per second doing a correlated request/response with a single thread (but using the asynchronous dispatcher). If I were to rewrite the test to use multiple message send threads I would expect performance to increase somewhat since my load test is a bit naive at the present time.

    At the same time, I managed to extend the subscription support to include correlated subscriptions. The only subscription cache that currently supports the extensions is the DistributedSubscriptionCache (which uses memcached to share subscription information across a distributed group of systems). The key goal here was to enable MassTransit to support a distributed request/reply architecture using publish/subscribe with correlated subscriptions to specifically route messages to their intended consumers. I plan to make heavy use of this in an upcoming project so I wanted to see it work.

    In addition to all the changes, I also updated a few of the samples and made various tweaks to the infrastructure to make it cleaner. There are several more tweaks on the whiteboard that I'm hoping to investigate in the next week. Once those are done, full ActiveMQ support is up next including running the tests under Mono on OSX.

    So a lot of changes since the 0.1 tag was put down a couple of weeks ago. I expect there will be some continued testing and tweaking this week as Dru seeks to understand all the changes that were made. While I've been doing this stuff, Dru has gotten a kick ass start on a new dashboard to monitor an application built using MassTransit. The goal there is to provide a single pane of glass view into the health of a system, including subscriptions, endpoint throughput, message counts, etc. We've got some cool ideas how to make the information available and hope to make this alone one of the cool features to help support distributed messaging applications.

    If you haven't checked out MassTransit, you can get the latest source from the GoogleCode repository. There is a message board for questions, or feel free to contact Dru or myself with any questions.

Copyright Los Techies 2007. All rights reserved.
Powered by Community Server (Commercial Edition), by Telligent Systems