AMPS is built from the ground up to go fast. AMPS tries to deliver messages at the fastest rate that an individual consumer can handle. AMPS has sophisticated machinery to try to find the fastest possible delivery rate for an individual consumer, and AMPS works hard1 to keep slower consumers from causing problems for faster consumers.
These techniques apply to historical replay (bookmark subscriptions2) as well as subscriptions to current publishes. AMPS is designed to provide messages as fast as possible, regardless of the source.
Why would anyone Ever Want to go Slow?
As fast as possible is always a good thing in our world here at 60East, but, what if you need to slow the pace of messages? And why would you ever want to do that?
We recently worked with a developer who wanted to simulate real-time conditions for capacity planning and testing purposes. In the simulation, this developer wanted the ability to replay messages at any rate from the actual publish speed all the way up to the maximum throughput the client could handle. By controlling the replay speed, the simulation could exactly replicate activity peaks throughout the day, or run at 2x speed, or 4x speed, or anything up to the full speed of the system.
As it turns out, this is simple to do with AMPS 4.0, a topic backed by a transaction log, and a bookmark subscription.
The key ingredient is that AMPS 4.0 records the timestamp at processing time for each message and then makes that available when you replay messages. For regular subscriptions, the timestamp is close to the current time, but, for bookmark subscriptions, you get the original timestamp of when the message was processed. True-to-form, AMPS will continue to send messages as fast as your client can consume them. With a little bit of code, though, you can use the timestamps to replay messages to your client at any speed you like. Real time? Twice as fast? Four times as fast? Half speed? You’ve got full control.
Here is a sample bookmark subscription using the Java client and the Command Interface (new to AMPS 4.0 clients!):
public void subscribe(float pace, String bookmark)
{
try {
Client client = new Client("Paced-Replay-Client");
client.connect(uri);
client.logon();
// Use your own message handler
SOWAndSubscribeMessageHandler ssh = new SOWAndSubscribeMessageHandler();
PacedBookMarkSubscribeHandler pacedh = new
PacedBookMarkSubscribeHandler(pace, ssh);
Command command = new Command("subscribe")
.setTopic(topic)
.setBookmark(bookmark)
.setOptions("timestamp,oof");
client.executeAsync(command, pacedh);
}
catch(AMPSException e)
{ System.out.println("exception in Main: " + e.toString()); }
}
This is a pretty typical subscription, which includes the Option of timestamp
. This option is key as it will return a timestamp on published messages. We’ll use that timestamp to pace the messages.
To set the pace, we used a PacedBookMarkSubscribeHandler and pass our Message Handler of choice along with the desired replay pace.
public class PacedBookMarkSubscribeHandler implements MessageHandler
{
final TimeZone gmt = TimeZone.getTimeZone("GMT");
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd'T'HHmmss");
long previousTx = 0;
float pace;
MessageHandler wrappedHandler;
public PacedBookMarkSubscribeHandler(float pace, MessageHandler h)
{
System.out.printf("Replaying at %fx of real time.\n", 1/pace);
this.pace = pace;
this.sdf.setTimeZone(gmt);
this.wrappedHandler = h;
}
private long getDelta(String timestamp)
{
long delta = 0, currentTx = 0;
try
{
currentTx = sdf.parse(timestamp).getTime();
}
catch(ParseException e)
{
System.out.println("exception in getDelta: " + e.toString());
}
if(previousTx > 0)
delta =(long) (( currentTx - previousTx ) * pace);
previousTx = currentTx;
return delta;
}
public void invoke(Message m)
{
switch (m.getCommand())
{
case Message.Command.SOW:
wrappedHandler.invoke(m);
break;
case Message.Command.OOF:
case Message.Command.Publish:
try
{
Thread.sleep(getDelta(m.getTimestamp()));
}catch(InterruptedException e)
{
System.out.println("Paced Handler Exception: " + e.toString());
}
wrappedHandler.invoke(m);
break;
}
}
}
As you can see, when each message comes in, we use the timestamp on the message to determine the elapsed time between this message and the previous message, scaled by the speed of the replay. We sleep for that amount of time, and then process the message.
Set the Pace
You can use this simple technique to set the speed of a replay, and use the PacedBookmarkSubscribeHandler
(or your own variation of it!) in place of an existing message handler. How fast can your application process messages?
It’s up to you.
[1]See the AMPS User Guide, where Slow Client Management is discussed in section 23.4. We’re also working on a blog post to talk more about the techniques AMPS uses to make efficient use of resources – stay tuned!
[2]Bookmark subscribe allows you to begin a subscription at any point in the AMPS transaction log. It’s one of the most commonly-used features of AMPS. For more information, see the AMPS User Guide chapter 18, or the documentation for your AMPS client library of choice.