The power of AMPS filtering allows your application to receive the messages it wants to process, and none of the messages it doesn’t. Client-side filtering is no longer required with AMPS. The ability to use Perl Compatible Regular Expressions (PCRE) to define filters on a topic, a message, or both allows your application to achieve an unrivaled level of precision in message delivery.
One example of this precision is an application which implements topic filtering to facilitate request-response messaging between a monitoring system and an agent for a large scale heartbeat system. Heartbeats are a pattern often used to monitor the health of one or more systems or services in a group of systems. In a heartbeat system a monitoring application can send out a heartbeat message (request) to one or more monitoring agents. The monitoring agents then send a message back to the monitoring application (response) indicating the health of the system. If a monitoring agent fails to respond within a specified window of time, the monitoring application will assume that the service is unavailable.
In a previous article, we introduced the selectivity granted by the content filtering and topic filtering in AMPS. It is recommended to read the article to be familiar with the concepts introduced there before diving into this article.
In this post, we give a brief overview of how AMPS implements topic filtering, and then build upon that knowledge to describe how to implement a simple request / response heartbeat monitoring system using AMPS topic filtering. We also provide an example in Java, and discuss how to enhance the system to be more (or less) selective by using different topic filters.
AMPS Topic Filtering
With AMPS, a client can use a regular expression to subscribe to topics that match a given pattern. This feature can be used in two different ways:
-
to subscribe to topics without knowing the topic names in advance. This is known as a “greedy subscription” and can be used to subscribe to many topics simultaneously. For example, the subscription to the topic
foo.*
would match topics “foo”, “fool”, “food” and “footie”. -
to subscribe to topics that only match a very selective pattern. The simplest example of this is matching on an exact pattern, for example - we only want to return messages from the topic “Client”.
For our request-response heartbeat monitoring system, we will be focusing on greedy subscriptions.
Subscription topics are interpreted as regular expressions if they include special regular expression characters. Otherwise, they must be an exact match. Some examples of regular expressions within topics are included in table below.
Topic | Behavior |
---|---|
trade |
matches only “trade”. |
^client.* |
matches “client”, “clients”, “client001”, etc. |
.*trade.* |
matches “NYSEtrades”, “ICEtrade”, etc. |
Setting up the Example
Heartbeat messages consist of two parts - the request message (requesting a heartbeat) and the response message (letting the requester know that it is still alive). In each of these messages, there is a “source” and a “destination”. In the heartbeat request message, the source is a heartbeat monitoring application and the destination is the agent that is being monitored by the monitoring application.
With this information, we want to conceive of a format for our message topic that can be used to control the flow of messages from the source to the destination, regardless if it is a request or a response message. This lends nicely to a topic format that follows:
<MACHINE_1>
<ROUTING_INSTRUCTION>
<MACHINE_2>
In this topic format, we can define MACHINE_1
to be a source or a destination depending on the ROUTING_INSTRUCTION
we use. We will use the following convention to define the ROUTING_INSTRUCTION
-
When
->
is theROUTING_INSTRUCTION
, this is a request message and the topic can be read as:<SOURCE>
->
<DESTINATION>
Where
<SOURCE>
is the monitoring application and<DESTINATION>
is the heartbeat agent. -
When
<-
is theROUTING_INSTRUCTION
, this is a response message and the topic can be read as:<DESTINATION>
<-
<SOURCE>
Where
<SOURCE>
is the heartbeat agent and the<DESTINATION>
is the monitoring application.
With these tokens our routing message topics would look like M->A
for the
request messages, and M<-A
for the response messages - where M
represents
the heartbeat monitoring system, and A
represents the heartbeat agent.
With our topic format defined, we’re ready to create the subscriptions for
our monitoring system and our agent. The monitoring system will subscribe to
topic ^M<-.*
- which can be interpreted as “subscribe to all messages where the topic
begins with the string M<-
”.
Similarly, the agent will subscribe to the topic ->A$
- which can be interpreted as
“subscribe to all messages where the topic ends with the string ->A
”.
An Example
By now you should see the beginnings of how such a system would work, but let’s walk through the lifespan of a single request-response to see the system in action!
The code samples included below come from the included demonstration. These are used to highlight some of the features of the heartbeat request-response system. Note that the Monitor and Agent classes are implemented in their entirety in the included demonstration. Only highlights from those classes are shown here.
To begin the lifespan of the heartbeat message, the system is put into motion
when the Monitor
class (M) publishes a heartbeat with a timestamp message to
topic M->A
.
// Monitoring System publish heartbeat message.
Date d = new Date();
SimpleDateFormat sdf = new SimpleDateFormat(
"MM/dd/yyyy h:mm:ss a");
msg = "Heartbeat! Timestamp: " + sdf.format(d);
System.out.println(
"**************************\n" +
"Monitor - sending message:" +
"\n\tmessage: " + msg + "\n" +
"**************************\n");
monitorClient.publish("M->A", msg);
The Agent
class (A)- which is subscribed to Topic ->A$
- receives the
message, recognizes this is a request topic because of the presence of the ->
token, parses the prefix of the Topic
field, and then constructs a response
message to be returned to M. A publishes the response message to topic M<-A
.
// Agent subscription.
AgentMessageHandler mh = new AgentMessageHandler();
agent.subscribe(mh, "->A$", 10000);
...
static class AgentMessageHandler implements MessageHandler{
public void invoke(Message m){
// Parse the incoming message.
String topic = m.getTopic();
TopicParse parse = TopicParser.parse(topic);
System.out.println(
"**************************\n" +
"Agent - received message:" +
"\n\t type: " +
(parse.type == "<-" ? "response" : "request") +
"\n\t message: " + m.getData() + "\n" +
"**************************\n");
try{
// Construct and send the response message.
Date d = new Date();
SimpleDateFormat sdf = new SimpleDateFormat(
"MM/dd/yyyy h:mm:ss a");
String msg = "Response: " + sdf.format(d);
System.out.println(
"**************************\n" +
"Agent - Sending response: " +
"\n\t message: " + msg + "\n" +
"**************************\n");
agent.publish(parse.prefix + "<-" +
parse.postfix, msg);
} catch(Exception ex){
System.err.println("Agent exception: " +
ex.toString());
ex.printStackTrace();
}
}
}
Monitor
M - who is subscribed to Topic ^M<-.*
- receives the response
message, recognizes this as a response topic because of the presence of the
<-
symbol, parses the postfix of the Topic field to see that it is from A
and completes processing the heartbeat.
// Monitoring System subscription.
MonitorMessageHandler mh = new MonitorMessageHandler();
monitorClient.subscribe(mh, "^M<-.*", 10000);
...
static class MonitorMessageHandler implements MessageHandler{
public void invoke(Message m){
String topic = m.getTopic();
TopicParse parse = TopicParser.parse(topic);
System.out.println(
"**************************\n" +
"Monitor - received message:" +
"\n\t type: " +
(parse.type == "<-" ? "response" : "request") +
"\n\t message: " + m.getData() + "\n" +
"**************************\n");
}
}
In our example, processing the message means that we write a message to the console noting that the heartbeat request and response message have been received. In a more sophisticated system, additional logging and processing could take place.
class TopicParser{
void TopicParser(){;}
public static TopicParse parse(String topic){
TopicParse parse = new TopicParse();
if (topic.indexOf("->") >= 0){
// We have a request message
parse.type = "->";
parse.prefix = topic.substring(0,
topic.indexOf("->"));
parse.postfix = topic.substring(
topic.indexOf("->")+2,
topic.length());
}
else if (topic.indexOf("<-") >= 0){
// We have a response message.
parse.type = "<-";
parse.prefix = topic.substring(0,
topic.indexOf("<-"));
parse.postfix = topic.substring(
topic.indexOf("<-")+2,
topic.length());
}
else{
// We don't recognize this message type.
parse.type = "unknown";
parse.prefix = null;
parse.postfix = null;
}
return parse;
}
}
For completeness and to show how simple it is to create a class to parse the
messages manually using our routing tokens, the TopicParser
class has been
included above. In our example, each of the three cases are enumerated in the
if / else structure, and populate a TopicParse object, which is then returned
as a result of the parse()
method. The actions taken as a result of the
parse function can easily be replaced with more advanced features, but that
will be an exercise which is left to the reader.
Conclusion
While the example here is a simple one, it would be trivial to add even more powerful features such as: listener notifications when heartbeats are received / missed, processing messages once and only once, detectng messages received out of order. Through the powerful combination of AMPS Topic filtering, PCRE, and some creativity in combining these two features - we are able to architect a solution that provides a simple heartbeat request-response system in AMPS.
Next Steps
Try it yourself to see how easy request/response messaging can be with AMPS! Download the sample project in Java, along with instructions in the README file for building and running the sample: 01_simple_request_response.zip