Welcome to EMC Consulting Blogs Sign in | Join | Help

Owain Wraggs' Blog

I have now left EMC Consulting, you can subscribe to my new blog here: http://blog.endjin.com/author/owain-wragg/

Workflow 4.0 Correlation with Message Headers

I have continued to look at Workflow Foundation 4.0 which is included in the Beta 1 release of Visual Studio 2010 and .NET Framework 4.0, and I’ve been experimenting with the correlation functionality. The approach I have seen documented/blogged about most often is correlation at a workflow instance level, for example the same workflow instance will handle the processing of a related set of messages, whilst there is nothing wrong with this approach it does require you to manage the creation, persistence, loading and unloading of workflow instances instead of simply letting IIS manage the workflow instance lifecycle.

Another approach is to do the correlation at the message level using the functionality provided by WCF which allows you to add custom data into message headers, and therefore you can use this data to perform correlation. This approach allows you to perform correlation across different workflows and workflow instances but however does require you to persist the message data (think BizTalk MessageBox database) so that different workflow instances can check the state of the processing to determine if further action is required. This approach also enablescorrelation to be performed using data that is not specified in the service contract and the data added to the message header could be anything. Below I will walk through an example which demonstrates this technique.

The example scenario is as follows; an order is submitted but before the order can be taken to completion; the credit of the customer who placed the order needs to be checked that the requested products are in stock, in this imaginary example both the credit check and product stock check are submitted to one way services which will provide a response at some point in the future.

The diagram below is the Order Service workflow, which is responsible for submitting the order to both customer (performs credit check) and product (performs stock check) services

As you can see this workflow receives an order to process, it then calls the PersistOrderActivity which persists the order details, so that later in the process these details can be retrieved and used to confirm that the processing of the order is complete. Next the SubmitCreditCheckActivity is called which is responsible for submitting the credit check, then a ParallelForEach activity is used to call the SubmitStockCheckActivity (which submits the stock check) for each product contained in the order, as shown in the diagram below

Finally the Order Service workflow returns to the caller, indicating that the request has been submitted.

Shown below is the SubmitCreditCheckActivity

    public class SubmitCreditCheckActivity : CodeActivity

    {

        public InArgument<Order> Order { get; set; }

 

        protected override void Execute(CodeActivityContext context)

        {

            Order order = this.Order.Get(context);

 

            int orderId = order.Id;

            int customerId = order.Customer.Id;

            int orderPrice = order.Products.Sum(product => product.Price);

 

            // Set the address of the service the response from the customer service

            // should be sent back to

            string responseService = "http://localhost:1031/OrderValidationResponseService.xamlx";

 

            // Create a custom object to store in the message header

            CreditCheckResponseContext responseContext = new CreditCheckResponseContext { OrderId = orderId, ResponseAddress = responseService };

 

            MessageHeader<CreditCheckResponseContext> responseHeader = new MessageHeader<CreditCheckResponseContext>(responseContext);

 

            ChannelFactory<CustomerService.ICustomerService> factory = new ChannelFactory<CustomerService.ICustomerService>("BasicHttpBinding_ICustomerService");

            CustomerService.ICustomerService proxy = factory.CreateChannel();

 

            using (OperationContextScope contextScope = new OperationContextScope(proxy as IContextChannel))

            {

                // Add custom data to the message header before calling the service

                OperationContext.Current.OutgoingMessageHeaders.Add(responseHeader.GetUntypedHeader("ResponseHeader", "ServiceModelEx"));

 

                proxy.VerifyAvailableCredit(customerId, orderPrice);

            }

        }

    }

This activity is the first point in which any correlation based activity takes place, also it is worth noting that the Customer Service VerifyAvailableCredit operation is a one way call so therefore once called it will perform its work and not return a result further to this the operation may take a long time to complete, to cope with this the VerifiyAvailableCredit operation calls another service with the result of the operation when its work has completed. So that the service called on completion is not hard coded into the Customer Service the service to send the response to is supplied in custom message header data. The message header has been further customized to include the order id, this is so that the Customer Service can access the message header and pass the order id to the service called on completion, therefore enabling this service to correlate what order this credit check is for.

Instead of adding custom data to the message header another choice would have been to change the service contract to include both the response service address and order id but this would have been wrong as these pieces of data are not required by the service to perform its work and by doing this it would have made it harder to figure out the intent of the service.

The SubmitStockCheckActivity is very similar as it also sets the response service address and adds custom data to the message header.

After the Order Service has completed the workflow instance is terminated and the Customer and Product Services continue to perform the various checks.  

Shown below is the Customer Service contract and implementation.

    [ServiceContract]

    public interface ICustomerService

    {

        /// <summary>

        /// Checks if the customer has sufficient credit available

        /// </summary>

        /// <param name="customerId">The custom</param>

        /// <param name="orderPrice">The order price</param>

        [OperationContract(IsOneWay = true)]

        void VerifyAvailableCredit(int customerId, int orderPrice);

    }

    public class CustomerService : ICustomerService

    {

        /// <summary>

        /// Checks if the customer has sufficient credit available

        /// </summary>

        /// <param name="customerId">The custom</param>

        /// <param name="orderPrice">The order price</param>

        public void VerifyAvailableCredit(int customerId, int orderPrice)

        {

            // Get the amount of time to sleep the thread for (between 1 and 120 seconds)

            Random randomiser = new Random();

            int sleepInterval = randomiser.Next(1000, 120000);

 

            Thread.Sleep(sleepInterval);

 

            bool hasAvailableCredit = false;

 

            // If the customer id is less than 25 and the order price

            // is less than 500 - they have available credit

            if (customerId < 25 && orderPrice < 500)

            {

                hasAvailableCredit = true;

            }

 

            // If the customer id is greater than 100 and the order price

            // is less than 5000 - they have available credit

            if (customerId > 100 && orderPrice < 5000)

            {

                hasAvailableCredit = true;

            }

 

            // Call the response service passing in the message headers to

            // enable it to perform correlation

            CreditCheckResponseContext responseContext = OperationContext.Current.IncomingMessageHeaders.GetHeader<CreditCheckResponseContext>("ResponseHeader", "ServiceModelEx");

 

            EndpointAddress responseAddress = new EndpointAddress(responseContext.ResponseAddress);

 

            MessageHeader<CreditCheckResponseContext> responseHeader = new MessageHeader<CreditCheckResponseContext>(responseContext);

 

            ChannelFactory<OrderValidationResponseService.OrderValidationResponseServiceContract> factory = new ChannelFactory<OrderValidationResponseService.OrderValidationResponseServiceContract>("OrderValidationResponseService");

            OrderValidationResponseService.OrderValidationResponseServiceContract proxy = factory.CreateChannel(responseAddress);

 

            using (OperationContextScope contextScope = new OperationContextScope(proxy as IContextChannel))

            {

                // Add custom message header before calling the response service

                OperationContext.Current.OutgoingMessageHeaders.Add(responseHeader.GetUntypedHeader("ResponseHeader", "ServiceModelEx"));

 

                proxy.OnCreditCheckResponse(new OnCreditCheckResponse(hasAvailableCredit));

            }

        }

    }

As you can see the VerifyAvailableCredit method sleeps for a random number of seconds before evaluating if the customer has enough available credit, after that the method gets more interesting.

Firstly it retrieves the custom data from the incoming message header, retrieves the address of the service which to send the response to, adds the custom data to the outgoing message header (to enable the service which handles the response to correlate the result with the correct order) and finally it calls the OnCreditCheckResponse method on the service which is expecting the result. The Product Service’s implementation is very similar to the implementation described above.

Finally shown below is the OrderValidationResponse Service workflow which handles the results of both the credit check and stock check (sent from the Customer and Product services respectively), and works out if all responses for the order in question have been received and if they have continues processing the order.

Below is the expanded pick activity, which exposes two service operations, OnStockCheckResponse and OnCreditCheckResponse

When the OnStockCheckResponse service operation is invoked, the workflow calls the PersistStockCheckResponseActivity, which is responsible for retrieve the custom data from the message header and persisting the stock check result against the correct order and product (specified in the message header for correlation purposes).One the service operation has been handled the VerifyAllResponsesReceivedActivity checks that for the order specified in the custom message header that all responses have been received (customer credit check and all the product stock check responses), if all responses have been received the If activity shown below calls the ProcessOrderActivity which continues with order processing now that all the checks have been performed.

I’ve attached the solution to this post; the solution also contains a readme file which explains how to run the application.

Note: - That this post was written using Beta 1 of the .NET Framework 4.0 so therefore changes in future releases of the .NET Framework might break the example application

 

Comments

No Comments
Anonymous comments are disabled
Powered by Community Server (Personal Edition), by Telligent Systems