Danger: Keep your application away from poison! It’s January, and many people (including me) are thinking about the food they’ve eaten over the last few months. If you’ve ever eaten too much Halloween candy, indulged yourself in holiday food that doesn’t quite agree with you, had too many cookies or gone back for that one last sliver of pumpkin pie, you may know how much trouble eating the wrong thing can cause. That’s also a problem in messaging applications – if your application is unable to process a message because it’s malformed, oversized, or contains nonsense data, problems can arise.

In messaging terms, a message that can’t be processed by an application is called a poison message. These messages can cause big problems: imagine if you have a set of applications showing active orders when a poison message comes in. Each of the clients fails to parse that message and restarts. The clients then use AMPS bookmarks to pick up processing where they left off. They receive the message again, and restart again. What happens next? The clients pick up where they left off, and the whole process starts over. The clients keep crashing and restarting, and no work is getting done. That’s a serious problem.

Here’s a technique to help applications protect against poison messages. The technique is spelled out in this post, and there’s also a sample available for download.

There are two parts to the technique. First, we define a SOW topic to hold a list of poison messages:

<TopicMetaData>
    <TopicDefinition>
       <FileName>./sow/%n.sow</FileName>
       <Topic>ADMIN_PoisonMessages</Topic>
       <MessageType>json</MessageType>
       <Key>/bookmark</Key>
    </TopicDefinition>
  </TopicMetaData>

We’ll use this topic to hold the bookmark for each bad message. In addition, in cases where we have an exception, we’ll record the contents of the exception. Applications use a sow_and_subscribe to get the current list of bad messages. If a message comes in that’s on that list, the application skips the message rather than processing it. If the application fails to process a message, it publishes that message to the SOW topic.

Here’s a simple wrapper, written using the AMPS Python client, which demonstrates how to wrap an existing message handler to provide poison message protection.

First, we define the wrapper class and create an initializer:

class MisterYuck:

   # Initialize the class with the template client,
   # and the handler to wrap.

   def __init__(self, client, handler):
       self.skipMessages = []
       self.handler = handler
       self.doneLoading = False
       self.currentBookmark = None

       #create a new client to use for the admin subscription
       self.client = AMPS.Client("poison_monitor-%s" % uuid.uuid1())
       # Copy the connect string from the provided client
       self.client.connect(client.get_uri())
       self.client.logon()
       self.client.sow_and_subscribe(self.updatePoisonMessages,
                                     "ADMIN_PoisonMessages")
       # wait for the admin SOW query to complete
       while(self.doneLoading == False):
             time.sleep(.25)

The initializer takes a client and the handler that will do the actual work of processing the messages. The initializer saves the handler and uses the connect string from the client to create a new client to use to monitor the ADMIN_PoisonMessages topic. To keep the sample simple for this post, we don’t do all of the things that might be necessary for a production client (such as setting an Authenticator, or creating an HA client that can handle failover). Notice that we create a new client to avoid the deadlock that would occur when calling publish for a client from within a message handler for that client. The initialize method issues a sow_and_subscribe command to populate the list of poison messages, then waits for the SOW query that populates the list of poison messages to complete. This delay avoids the situation where a poison message could arrive before the list of poison messages is fully populated.

The snippet below shows the message handler for the sow_and_subscribe that manages the subscription to the ADMIN_PoisonMessages topic.

def updatePoisonMessages(self, m):
        if (m.get_command() == "group_begin"):
            return
        if (m.get_command() == "group_end"):
            self.doneLoading = True 
            return
        print "Trace: Updating poison message dictionary!"
        self.skipMessages.append(json.loads(m.get_data())["bookmark"])

Again, the handler is straightforward. The handler ignores the group_begin message. The group_end message sets the flag that indicates the SOW query is complete, so the __init__ method can return. For any other method, we extract the “bookmark” field of the message and add the value of that field to the skipMessages list.

Notice that the wrapper uses sow_and_subscribe. This means that any time there’s an update to the poison message list, the wrapper receives that update, so the poison message list is always current.

Once the skipMessage list is populated, the wrapper is ready to use. The wrapper implements the __call__ method so that you can easily pass the wrapper to methods in the AMPS client. For each incoming message, the wrapper checks to be sure that the bookmark isn’t on the list of bad messages. If so, the wrapper skips the message. Otherwise, the wrapper passes the message to the handler provided when the class was created:

def __call__(self, m):
        self.currentBookmark = m.get_bookmark() 
        # Process the message if it's not on the skip list

        if self.currentBookmark in self.skipMessages:
              print "Trace: skipping bookmark: %s -- I know it's bad" % bookmark
              return

        try:
          self.handler(m)
          self.currentBookmark = None

        except Exception as e:
          self.client.publish("ADMIN_PoisonMessages",
                       json.dumps( { "bookmark" : self.currentBookmark,
                                     "why" : "%s" % e } ))
          self.currentBookmark = None
          raise

If an exception occurs during processing, we publish the bookmark and reason to the ADMIN_PoisonMessages SOW topic and rethrow the exception. Notice that this method doesn’t update the skipMessageList itself. The sow_and_subscribe will receive the update and automatically add the bookmark to the skip messages list, so there’s no need to do that work here.

Last, but not least, we provide a pair of cleanup methods for the class. The close method can be used to deliberately shut down the class. The __del__ function cleans up the client. Notice that if the __del__ function is called while a bookmark is active, the wrapper assumes that message processing caused a problem and attempts to publish that bookmark to the list of poison messages. This helps to protect against a class of problems (for example, a handler detecting a fatal error and calling sys.exit()) that indicate a failure without throwing an exception:

def __del__(self):
         # If the wrapper is being deleted while currentBookmark
         # is set, that indicates a serious error.
         if (self.curentBookmark != None and self.client != None):
             self.client.publish("ADMIN_PoisonMessages",
                        json.dumps( { "bookmark" : self.currentBookmark,
                                      "why" : "Fatal error." } ))
        if (self.client != None):
          self.client.close()
          self.client = None
       self.handler = None

   def close(self):
       self.currentBookmark = None
       self.__del__()

That’s the full wrapper. Here’s a simple sample program that demonstrates using the wrapper. To simulate errors, the sample program throws an exception if it receives a message where the “id” field is evenly divisible by 13.

from MisterYuck import MisterYuck

def no13Handler(m):
    dict = json.loads(m.get_data())
    if dict["id"] % 13 == 0:
         raise ValueError("I'm superstitious...")
    print "Whew, a value of %d is fine." % dict["id"]

def main():

   client = AMPS.Client("handler_demo")
   client.connect("tcp://localhost:9007/amps")
   client.logon()

   poisonprotector =  MisterYuck(client, no13Handler)

   client.bookmark_subscribe(poisonprotector, "ContentTopic", bookmark=AMPS.Client.Bookmarks.EPOCH)

   while True:
      time.sleep(100)
main()

To keep things simple, we’ve used the AMPS.Client and started each subscription from the beginning of the transaction log, EPOCH. A production application would generally use the HAClient to resume the subscription at MOST_RECENT, and the wrapper would call discard() for poison messages so the client doesn’t read them again on restart. With those small changes, the same techniques work.

In this post, we’ve covered an easy way to use SOW topics to create a poison message list. The concept is simple: create a SOW topic in AMPS to hold the list of bad messages. Each instance of the application loads the list and uses sow_and_subscribe to keep the list current as other clients update the list. AMPS SOW topics provide an easy way to build reliable, flexible poison message protection without keeping any persistent state on the application side.

Even better, because the state is stored in AMPS, if a new instance of the application starts, that new instance never has to process messages that are known to be bad. Each new instance gets the benefit of the work the other instances have done.

How does your messaging system handle poison messages? Let us know in the comments!