Windows Mobile Support

  • Subscribe to our RSS feed.
  • Twitter
  • StumbleUpon
  • Reddit
  • Facebook
  • Digg

Wednesday, 17 October 2012

Different methods to implement Message Aggregator pattern using Service Bus Topic – CorrelationId

Posted on 01:08 by Unknown
In one of my last post I presented the Aggregator Pattern. The main purpose of this pattern is the ability of the consumer to combine (aggregate) messages. This pattern can be implemented in Windows Azure using Windows Azure Service Bus Queues or Topics.
There are two different implementation for this pattern. The implementations are extremely different and also can affect our performance.
The first implementation requires using the Session support from BrokeredMessage. What this means from the code perspective? We need to set the Session each time when we want to send a message. The consumer will start to consume the messages with the specific session id. This solution is simple and it works great when we don’t have a lot of messages. For example if we have only 9-10 messages.
The first advantage of this implementation is on the consumer side. We don’t need to create the consume before messages are added to the Service Bus. The messages will be persisted in the Service Bus infrastructure. One of the downside of this implementation is the numbers of the consumers that we can have for the same session. We can have only one consumer. Because of this, if we want to broadcast a message to more than one consumer … we will have a problem. Also, the current implementation of session doesn’t use any kind of hashing for the id field. Because of this, if we want to process thousands of messages per second, maybe we will have a problem.
Producer
Stream messageStream = message.GetBody<Stream>();
for (int offset = 0;
offset < length;
offset += 255)
{
long messageLength = (length - offset) > 255
? 255
: length - offset;
byte[] currentMessageContent = new byte[messageLength];
int result = messageStream.Read(currentMessageContent, 0, (int)messageLength);
BrokeredMessage currentMessage = new BrokeredMessage(
new MemoryStream(currentMessageContent),
true);
subMessage.SessionId = currentContentId;
qc.Send(currentMessage);
}

Consumer

MemoryStream finalStream = new MemoryStream();
MessageSession messageSession = qc.AcceptMessageSession(mySessionId);
while(true)
{
BrokeredMessage message = messageSession.Receive(TimeSpan.FromSeconds(20));
if(message != null)
{
message.GetBody<Stream>().CopyTo(finalStream);
message.Complete();
continue;
}
break;
}
In the above example we are spitting a stream in small parts and sending it on the Service Bus.
But, what should we do if we want more than this. For example if we want to send messages to more than one consumer. In this case, if we need a way to group messages, like session we will need to think twice. We have some options for this situation. We can use the CorrelationId for this purpose of to add a custom property to the BrokeredMessage. CorrelationId use hashing, because of this is faster than the first option.
Both solutions will work, but we will encounter the same problem in both of them. How we can create a subscription for the given CorrelationId of property before starting receiving messages. This is the biggest problem that we need to resolve.
Before talking about this problem, I would like to talk a little about CorrelationId. This fields that can be set for each BrokeredMessage that is send on the wire. The advantage using is how we can define the filter. We have a pre-define filter for the CorrelationId that can be use when we want to create a subscription. Also each id is hashed; because of this the check is not made using string comparison.
But what is our problem using CorrelationId. We can broadcast a message to more than one subscriber, but… Yes, there is a but. We need to know the correlation id in the moment when we create the subscription. Why? Because the correlation id need to specify in the moment when we are creating the subscriber. Correlation id need to be specified for a subscriber as a CorrelationFilterExpression.
This is not the end of the road. We can find solutions for this problem. The trick is to notify the consumers before adding messages to the Service Bus Topic about the new group of messages that will be added to the system. For this purpose we can use the same Service Bus Topic, or another topic for this. We can have some special messages that have some custom property that describe what will be content of the next flow of messages with the given correlation id. Based on this information, each consumer will be able to decide if we want to receive the given messages.
The trick here is to register the subscribers before the moment when you start broadcast the messages with a given correlation id. The messages will be sending only to the subscribers that are already listening. Because of this, if you are not register from the beginning, there are chances to lose some of the messages.
You need some kind of callback or a timer. Because in a Service Bus pattern, the producer doesn’t know the numbers of subscribers, we cannot create a mechanism where each subscriber notifies the producer using another topic. We could have a waiting time (5s) on the producer side, after he sends the message that notify about the new correlation id.

Here is the code that we should have on the producer and on the consumer side.
-on the producer side, we need to create and send the message that contains the new correlation id. After this we will need to wait a period of time, until the consumers will be able to register to it.
BrokeredMessage message = new BrokeredMessage();
message.Properties["NewCorrelationId"] = 1;
message.Properties["Content"] = "new available cars for rent";
topicClient.Send(message);
Thread.Sleep(10000);
-creating the subscription that check if the message contains the property that specify the correlation id.
namespaceManager.CreateSubscription(
“myTopic”,
“listenToNewCorrelationIds”,
new SqlFilterExpression("EXISTS(NewCorrelationId”));
-create the consumer that processes the message, by creating a new subscription.
SubscriptionClient client =
SubscriptionClient.CreateFromConnectionString(
connectionString,
"myTopic",
“listenToNewCorrelationIds”);
BrokeredMessage correlationIdMessage = client.Receive();
namespaceManager.CreateSubscription(
“myTopic”,
“listenToMyCorrelationId”,
new CorrelationFilterExpression(correlationIdMessage.Properties["NewCorrelationId"]));
SubscriptionClient client =
SubscriptionClient.CreateFromConnectionString(
connectionString,
"myTopic",
“listenToMyCorrelationId);

... client.Receive() …
From performance perspective is would be better to use a different topic to send the messages that contains the new correlation id.
We can imagine a lot of implementation. What we need to remember when using correlation id that we need to create a subscription for the given correlation id before starting sending the messages to it. This waiting period that I described in the above paragraphs is the most challenging part. 
In conclusion, we should use session when we need to have only one consumer. If we need more than one consumer, than we should use correlation id.
Email ThisBlogThis!Share to XShare to FacebookShare to Pinterest
Posted in Azure, design patterns, service bus, Windows Azure | No comments
Newer Post Older Post Home

0 comments:

Post a Comment

Subscribe to: Post Comments (Atom)

Popular Posts

  • Service Bus Topic - Automatic forward messages from a subscription to a topic
    Windows Azure Service Bus Topic is a service that enables us to distribute the same messages to different consumers without having to know e...
  • Patterns in Windows Azure Service Bus - Message Splitter Pattern
    In one of my post about Service Bus Topics from Windows Azure I told you that I will write about a post that describe how we can design an a...
  • CDN is not the only solution to improve the page speed - Reverse Caching Proxy
    I heard more and more often think like this: “If your website is to slow, you should use a CDN.” Great, CDN is THE solution for any kind of ...
  • E-Learning Vendors Attempt to Morph Mobile
    The sign should read: " Don't touch! Wet Paint !" I had a good chuckle today after receiving my latest emailed copy of the eLe...
  • Content Types - Level 6: Rich Media
    Level 6: Rich Media NOTE: This is part 7 of 7 and the conclusion of this continuing series; please see earlier posts for more background inf...
  • Publishing our CellCast Widget for iPad
    The rush has been on this week as our development team worked to design a new version of our CellCast Widget specifically for Apple's up...
  • Content Types - Level 5: Courseware
    Level 5: Content and Courseware NOTE: This is part 6 of 7 in a continuing series; please see earlier posts for more background information. ...
  • SQL - UNION and UNION ALL
    I think that all of us used until now UNION in a SQLstatement. Using this operator we can combine the result of 2 queries. For example we wa...
  • Cum sa salvezi un stream direct intr-un fisier
    Cred ca este a 2-a oara când întâlnesc aceasta cerința in decurs de câteva săptămâni. Se da un stream și o locație unde trebuie salvat, se c...
  • Content Types - Level 4: Reference
    Level 4: Reference Materials & Static Content NOTE: This is part 5 of 7 in a continuing series; please see earlier posts for more backgr...

Categories

  • .NET
  • .NET nice to have
  • #if DEBUG
  • 15 iunie 2011
  • 15 octombrie 2011
  • 2011
  • abstracta
  • action
  • adaugare
  • ajax
  • Amsterdam
  • Android
  • aplicatii
  • App Fabric
  • Apple iSlate
  • array
  • as
  • ASP.NET
  • AsReadOnly
  • Assembly comun
  • async
  • Asynchronous programming
  • asyncron
  • Autofac
  • AutoMapper
  • az
  • Azure
  • Azure AppFabric Cache
  • Azure backup solution
  • Azure Storage Explorer
  • azure. cloud
  • backup
  • BCP utility
  • bing maps v7
  • BitArray
  • BlackBerry
  • blob
  • BlobContainerPublicAccessType
  • breakpoint
  • bucuresti
  • C#
  • cache
  • CallerMemberName
  • CellCast
  • Certificate
  • CES
  • change
  • ChannelFactory
  • clasa
  • classinitialize
  • clean code
  • click event
  • close
  • Cloud
  • Cluj
  • cluj-napoca
  • Code contracts
  • code retrat
  • codecamp
  • CollectionAssert
  • Compact Edition
  • compara
  • Comparer T .Default
  • CompareTo
  • comparison
  • comunitate
  • concurs
  • Conditional attribute
  • configurare
  • connection string
  • container
  • content type
  • control
  • Convert
  • convertAll
  • convertor
  • cross platform
  • CRUD
  • css
  • custom properties
  • custom request
  • DACPAC
  • Daniel Andres
  • data sync service
  • database
  • date time
  • datetime
  • debug
  • default
  • delegate
  • dependency injection
  • deploy
  • DeploymentItem
  • design patterns
  • Dev de Amsterdam
  • development stoage
  • dictionary
  • diferente
  • digging
  • director
  • Directory.Exist
  • disable
  • dispatcher
  • dispose
  • dropdown
  • dynamic
  • EF
  • email
  • encoding
  • entity framework
  • enum
  • enumerable
  • Environment.NewLine
  • error
  • error 404
  • error handling
  • eveniment
  • event
  • ews
  • excel
  • exception
  • exchange
  • exita
  • explicit
  • export
  • extension
  • field
  • File.Exist
  • finalize
  • fire and forget
  • Fluent interface pattern
  • format
  • func
  • GC.SuppressFinalize
  • generic
  • getdirectoryname
  • globalization
  • gmail
  • hackathon
  • Hadoop
  • handle
  • HTML
  • html 5
  • Html.ActionLink
  • http://www.blogger.com/img/blank.gif
  • HttpModule
  • IComparable
  • IE
  • ienumerable
  • IIS
  • image
  • implicit
  • import
  • int
  • internationalization
  • Internet Explorer
  • interop
  • Ioc
  • IP Filter
  • iPhone
  • iQuest
  • IStructuralEquatable
  • ITCamp
  • itspark
  • java script
  • javascript
  • July 2012
  • KeyedByTypeCollection
  • KeyNotFoundException
  • Kinect SDK
  • lambda expression
  • LightSwitch Microsoft Silverlight
  • linq
  • list
  • lista
  • lista servicii
  • liste
  • Live Connect
  • Live ID
  • load
  • localization
  • lock
  • m-learning
  • MAC
  • Mango
  • map
  • mapare
  • mapare propietati
  • messagequeue
  • meta properties
  • method
  • MethodImpl
  • Metro App
  • Microsoft
  • Microsoft Sync Framework
  • mlearning
  • mlearning devices
  • Mobile Apps
  • mobile in the cloud
  • mobile learning
  • mobile services
  • Mobile Web
  • mongoDb
  • monitorizare
  • msmq
  • multitasking
  • MVC
  • MVC 3
  • MVVM
  • namespace
  • nextpartitionkey
  • nextrowkey
  • Ninject
  • nivel acces
  • no result
  • normalize
  • nosql
  • null expcetion
  • null object pattern
  • NullReferenceException
  • OAuth API
  • office
  • offline
  • Open ID
  • openhackeu2011
  • operations
  • operator
  • optimization
  • option
  • outputcache
  • OutputCacheProvider
  • override
  • paginare
  • pagination
  • path
  • persistare
  • Portable Library tool
  • Post event – CodeCamp Cluj-Napoca
  • predicate
  • predictions
  • prezentare
  • process
  • proiect
  • property
  • propietati
  • query
  • ReadOnlyCollection
  • ReadOnlyDictionary
  • referinta
  • reflection
  • remote
  • reply command
  • request
  • request response
  • resouce
  • REST
  • REST Client
  • RESTSharp
  • ronua
  • rss
  • rulare
  • salvare in fisier
  • sc
  • schimbare timp
  • select
  • select nodes
  • send
  • serializare
  • serialization
  • Server.Transfer. Resposen.Redirect
  • service bus
  • ServiceBase
  • servicecontroller
  • sesiune
  • session
  • Session_End
  • Session_Start
  • setup
  • Sibiu
  • signalR
  • Silverlight
  • sincronizare
  • Single Responsibility Principle
  • SkyDrive
  • skype
  • smartphones
  • smtp
  • Snapguide
  • sniffer
  • socket
  • solid
  • spec#
  • sql
  • Sql Azure
  • SQL CE
  • sql server 2008 RC
  • SRP
  • startuptype
  • stateful
  • stateless
  • static
  • stergere
  • store
  • store procedure
  • stream
  • string
  • string.join
  • struct
  • StructuralEqualityComparer
  • submit
  • switch
  • Symbian
  • Synchronized
  • system
  • tabele
  • table
  • techEd 2012
  • tempdata
  • test
  • testcleanup
  • testinitialize
  • testmethod
  • thread
  • timer
  • ToLower
  • tool
  • tostring
  • Total Cost Calculator
  • trace ASP.NET
  • transcoding
  • tuplu
  • tutorial
  • TWmLearning
  • type
  • unit test
  • unittest
  • UrlParameter.Optional
  • Validate
  • validation
  • verificare
  • video
  • view
  • ViewBag
  • virtual
  • visual studio
  • VM role
  • Vunvulea Radu
  • wallpaper
  • WCF
  • WebBrower
  • WebRequest
  • where clause
  • Windows
  • windows 8
  • Windows Azure
  • Windows Azure Service Management CmdLets
  • windows live messenger
  • Windows Mobile
  • Windows Phone
  • windows service
  • windows store application
  • Windows Task
  • WinRT
  • word
  • workaround
  • XBox
  • xml
  • xmlns
  • XNA
  • xpath
  • YMesseger
  • Yonder
  • Zip

Blog Archive

  • ►  2013 (139)
    • ►  November (17)
    • ►  October (12)
    • ►  September (10)
    • ►  August (7)
    • ►  July (8)
    • ►  June (15)
    • ►  May (12)
    • ►  April (17)
    • ►  March (16)
    • ►  February (9)
    • ►  January (16)
  • ▼  2012 (251)
    • ►  December (9)
    • ►  November (19)
    • ▼  October (26)
      • Windows Azure Tools - Error 0x80070643: Failed to ...
      • Visual Studio - How to see the source code of .NET...
      • Design a state machine mechanism using Windows Azu...
      • How many fingers point can be tracked by Windows 8
      • Design a state machine mechanism using Windows Azu...
      • Day 4 of Software Architecture 2012 & Agile and OO...
      • Day 3 of Software Architecture 2012 & Basic Princi...
      • How to NOT expose a read only collection in C#
      • Day 2 of Software Architecture 2012 & How an archi...
      • Different methods to implement Message Aggregator ...
      • Day 1 of Software Architecture 2012 & Unit Test Pa...
      • Windows Azure Service Bus Patterns - a comprehensi...
      • How to force browser to updated the Silverlight pa...
      • What should I do when I receive "4.1 Your app must...
      • WOWZAPP 2012: Worldwide Global Hackathon for Windo...
      • Patterns in Windows Azure Service Bus - Dynamic Ro...
      • Configuration settings from Windows Azure Service ...
      • New Unit Tests features of Visual Studio 2012
      • Windows Azure Service Bus - Adding properties to t...
      • Custom configuration files for each developer machine
      • Patterns in Windows Azure Service Bus - Scatter-Ga...
      • How to write unit tests in JavaScript for a Window...
      • Patterns in Windows Azure Service Bus - Content-Ba...
      • Mixing UI controllers in a Windows Store App (Metr...
      • Microsoft MVP
      • [Post Event] Windows 8 Dev Camp, Cluj-Napoca - Sep...
    • ►  September (13)
    • ►  August (35)
    • ►  July (28)
    • ►  June (27)
    • ►  May (24)
    • ►  April (18)
    • ►  March (17)
    • ►  February (20)
    • ►  January (15)
  • ►  2011 (127)
    • ►  December (11)
    • ►  November (20)
    • ►  October (8)
    • ►  September (8)
    • ►  August (8)
    • ►  July (10)
    • ►  June (5)
    • ►  May (8)
    • ►  April (9)
    • ►  March (14)
    • ►  February (20)
    • ►  January (6)
  • ►  2010 (26)
    • ►  December (1)
    • ►  November (1)
    • ►  October (1)
    • ►  June (2)
    • ►  May (1)
    • ►  April (4)
    • ►  March (1)
    • ►  February (1)
    • ►  January (14)
Powered by Blogger.

About Me

Unknown
View my complete profile