It’s January, and many people (including me) are thinking about the food they’ve eaten over the last few months. If you’ve ever eaten too much Halloween candy, indulged yourself in holiday food that doesn’t quite agree with you, had too many cookies or gone back for that one last sliver of pumpkin pie, you may know how much trouble eating the wrong thing can cause. That’s also a problem in messaging applications – if your application is unable to process a message because it’s malformed, oversized, or contains nonsense data, problems can arise.
In messaging terms, a message that can’t be processed by an application is called a poison message. These messages can cause big problems: imagine if you have a set of applications showing active orders when a poison message comes in. Each of the clients fails to parse that message and restarts. The clients then use AMPS bookmarks to pick up processing where they left off. They receive the message again, and restart again. What happens next? The clients pick up where they left off, and the whole process starts over. The clients keep crashing and restarting, and no work is getting done. That’s a serious problem.
Here’s a technique to help applications protect against poison messages. The technique is spelled out in this post, and there’s also a sample available for download.
There are two parts to the technique. First, we define a SOW topic to hold a list of poison messages:
<TopicMetaData>
<TopicDefinition>
<FileName>./sow/%n.sow</FileName>
<Topic>ADMIN_PoisonMessages</Topic>
<MessageType>json</MessageType>
<Key>/bookmark</Key>
</TopicDefinition>
</TopicMetaData>
We’ll use this topic to hold the bookmark for each bad message. In addition, in cases where we have an exception, we’ll record the contents of the exception. Applications use a sow_and_subscribe
to get the current list of bad messages. If a message comes in that’s on that list, the application skips the message rather than processing it. If the application fails to process a message, it publishes that message to the SOW topic.
Here’s a simple wrapper, written using the AMPS Python client, which demonstrates how to wrap an existing message handler to provide poison message protection.
First, we define the wrapper class and create an initializer:
class MisterYuck:
# Initialize the class with the template client,
# and the handler to wrap.
def __init__(self, client, handler):
self.skipMessages = []
self.handler = handler
self.doneLoading = False
self.currentBookmark = None
#create a new client to use for the admin subscription
self.client = AMPS.Client("poison_monitor-%s" % uuid.uuid1())
# Copy the connect string from the provided client
self.client.connect(client.get_uri())
self.client.logon()
self.client.sow_and_subscribe(self.updatePoisonMessages,
"ADMIN_PoisonMessages")
# wait for the admin SOW query to complete
while(self.doneLoading == False):
time.sleep(.25)
The initializer takes a client and the handler that will do the actual work of processing the messages. The initializer saves the handler and uses the connect string from the client to create a new client to use to monitor the ADMIN_PoisonMessages
topic. To keep the sample simple for this post, we don’t do all of the things that might be necessary for a production client (such as setting an Authenticator, or creating an HA client that can handle failover). Notice that we create a new client to avoid the deadlock that would occur when calling publish for a client from within a message handler for that client. The initialize method issues a sow_and_subscribe command to populate the list of poison messages, then waits for the SOW query that populates the list of poison messages to complete. This delay avoids the situation where a poison message could arrive before the list of poison messages is fully populated.
The snippet below shows the message handler for the sow_and_subscribe
that manages the subscription to the ADMIN_PoisonMessages
topic.
def updatePoisonMessages(self, m):
if (m.get_command() == "group_begin"):
return
if (m.get_command() == "group_end"):
self.doneLoading = True
return
print "Trace: Updating poison message dictionary!"
self.skipMessages.append(json.loads(m.get_data())["bookmark"])
Again, the handler is straightforward. The handler ignores the group_begin
message. The group_end message
sets the flag that indicates the SOW query is complete, so the __init__
method can return. For any other method, we extract the “bookmark”
field of the message and add the value of that field to the skipMessages list.
Notice that the wrapper uses sow_and_subscribe
. This means that any time there’s an update to the poison message list, the wrapper receives that update, so the poison message list is always current.
Once the skipMessage
list is populated, the wrapper is ready to use. The wrapper implements the __call__
method so that you can easily pass the wrapper to methods in the AMPS client. For each incoming message, the wrapper checks to be sure that the bookmark isn’t on the list of bad messages. If so, the wrapper skips the message. Otherwise, the wrapper passes the message to the handler provided when the class was created:
def __call__(self, m):
self.currentBookmark = m.get_bookmark()
# Process the message if it's not on the skip list
if self.currentBookmark in self.skipMessages:
print "Trace: skipping bookmark: %s -- I know it's bad" % bookmark
return
try:
self.handler(m)
self.currentBookmark = None
except Exception as e:
self.client.publish("ADMIN_PoisonMessages",
json.dumps( { "bookmark" : self.currentBookmark,
"why" : "%s" % e } ))
self.currentBookmark = None
raise
If an exception occurs during processing, we publish the bookmark and reason to the ADMIN_PoisonMessages
SOW topic and rethrow the exception. Notice that this method doesn’t update the skipMessageList itself. The sow_and_subscribe will receive the update and automatically add the bookmark to the skip messages list, so there’s no need to do that work here.
Last, but not least, we provide a pair of cleanup methods for the class. The close method can be used to deliberately shut down the class. The __del__
function cleans up the client. Notice that if the __del__
function is called while a bookmark is active, the wrapper assumes that message processing caused a problem and attempts to publish that bookmark to the list of poison messages. This helps to protect against a class of problems (for example, a handler detecting a fatal error and calling sys.exit()) that indicate a failure without throwing an exception:
def __del__(self):
# If the wrapper is being deleted while currentBookmark
# is set, that indicates a serious error.
if (self.curentBookmark != None and self.client != None):
self.client.publish("ADMIN_PoisonMessages",
json.dumps( { "bookmark" : self.currentBookmark,
"why" : "Fatal error." } ))
if (self.client != None):
self.client.close()
self.client = None
self.handler = None
def close(self):
self.currentBookmark = None
self.__del__()
That’s the full wrapper. Here’s a simple sample program that demonstrates using the wrapper. To simulate errors, the sample program throws an exception if it receives a message where the “id” field is evenly divisible by 13.
from MisterYuck import MisterYuck
def no13Handler(m):
dict = json.loads(m.get_data())
if dict["id"] % 13 == 0:
raise ValueError("I'm superstitious...")
print "Whew, a value of %d is fine." % dict["id"]
def main():
client = AMPS.Client("handler_demo")
client.connect("tcp://localhost:9007/amps")
client.logon()
poisonprotector = MisterYuck(client, no13Handler)
client.bookmark_subscribe(poisonprotector, "ContentTopic", bookmark=AMPS.Client.Bookmarks.EPOCH)
while True:
time.sleep(100)
main()
To keep things simple, we’ve used the AMPS.Client
and started each subscription from the beginning of the transaction log, EPOCH
. A production application would generally use the HAClient to resume the subscription at MOST_RECENT
, and the wrapper would call discard()
for poison messages so the client doesn’t read them again on restart. With those small changes, the same techniques work.
In this post, we’ve covered an easy way to use SOW topics to create a poison message list. The concept is simple: create a SOW topic in AMPS to hold the list of bad messages. Each instance of the application loads the list and uses sow_and_subscribe
to keep the list current as other clients update the list. AMPS SOW topics provide an easy way to build reliable, flexible poison message protection without keeping any persistent state on the application side.
Even better, because the state is stored in AMPS, if a new instance of the application starts, that new instance never has to process messages that are known to be bad. Each new instance gets the benefit of the work the other instances have done.
How does your messaging system handle poison messages? Let us know in the comments!