Skip to main content

Quickstart

This page is intended to provide steamlined method for getting the AMPS Python Client installed and examples running.

Installation

$ pip install amps-python-client

Additional client install options can be found in Installation Options.

Examples

Now that you have the client installed, here are some code examples showing how the python client is used.

info

These examples are all runnable, but they require an AMPS instance to connect to. The instructions for starting an instance of AMPS are available in the Introduction to AMPS guide.

tip

For more comprehensive detail on the python client see the other sections of this Developer Guide and the Python Client API Reference

Example 1: Connect and Subscribe

In this example, we connect to an AMPS server running locally and initiate a subscription to the "messages" topic. As new messages are received, they are printed to the console.

import AMPS

uri = "tcp://localhost:9007/amps/json"

amps = AMPS.Client("subscribe-example")
amps.connect(uri)
amps.logon()

for message in amps.subscribe("messages"):
print(message.get_data())

Example 2: Publish a Message

With AMPS, publishing is simple, as shown in this example. We connect to an AMPS server running locally, and publish a single message to the messages topic. To simply publish a message, there is no need to predeclare the topic or configure complex routing. Any subscription that has asked for JSON messages on the messages topic will receive the message.

import AMPS

uri = "tcp://localhost:9007/amps/json"

client = AMPS.Client("publish-example")
client.connect(uri)
client.logon()

client.publish("messages", '{"hi" : "Hello, world!"}')

client.publish_flush()

Example 3: Query the Contents of a "SOW" Topic

State-of-the-World ("SOW") topics in AMPS combine the power of a database table with the performance of a publish-subscribe system. Use the AMPS Python client to query the contents of a SOW topic.

This example queries for all orders for the symbol ROL, and simply prints the messages to the console.

import AMPS

uri = "tcp://localhost:9007/amps/json"

client = AMPS.Client("sow-example")
client.connect(uri)
client.logon()

for message in client.sow("orders",
"/symbol='ROL'"):
if message.get_command() == AMPS.Message.Command.SOW:
print(message.get_data())

Example 4: Automatic Reconnection and Resubscription

Rock-solid applications must be able to recover from network outages. The AMPS Python client includes an HAClient class that includes automatic reconnection and resubscription. Best of all, easy-to-implement interfaces control reconnection and resubscription behavior, allowing you to easily customize failover.

The HAClient class can, optionally, also provide store-and-forward for reliable publish. With the AMPS transaction log, the class can provide resumable subscriptions that are guaranteed not to miss messages or receive duplicate messages, even in the case of failover between replicated servers.

import AMPS

client = AMPS.HAClient("reconnecting-subscriber")

# The ServerChooser interface tells the HAClient which server
# to connect to, both for the initial connection and failover.
# The DefaultServerChooser is included with the client: many
# applications implement a ServerChooser to control failover
# behavior.

chooser = AMPS.DefaultServerChooser()
chooser.add("tcp://amps-server:9007/amps/json")
chooser.add("tcp://amps-failover-server:9007/amps/json")

client.set_server_chooser(chooser)
client.connect_and_logon()

def handle_messages(message):
print(message.get_data())

# Subscribe. If the connection to the server
# is lost, the HAClient will restore the connection to
# the server or the failover partner, and restore
# subscriptions.

client.subscribe(handle_messages, "messages")

Example 5: Automatic Failover

The AMPS Python client includes both a basic client, and a high availability client with additional features, including the ability to automatically failover if the client is disconnected. This example shows how to set up a high availability client for failover.

This example creates an HA client and a server chooser for the client. The code then populates the server chooser with the list of failover servers, adds the chooser to the client, and then connects.

Once the client is connected, you can use the HAClient object just like a regular AMPS client. You can take advantage of the extended features, such as durable publish, duplicate message protection, and so on -- see the Developer's Guide for more information!

import AMPS

client = AMPS.HAClient("haclient-with-failover")

# create a server chooser
chooser = AMPS.DefaultServerChooser()

# add the addresses to use for failover
chooser.add("tcp://primary.amps.xyz.com:12345/amps/json")
chooser.add("tcp://secondary.amps.xyz.com:12345/amps/json")

# set the server chooser for the client
client.set_server_chooser(chooser)

# connect and logon
client.connect_and_logon()

# now, use the client: if the client detects
# a disconnection, it will reconnect.

...

# at the end of the program, disconnect

client.disconnect()

Example 6: Command interface and async vs sync execution

The named methods such as subscribe(), sow(), and publish() are convenience methods for common commands. When you want more direct control over the command sent to AMPS, use the AMPS.Command interface described in Creating Commands. This interface is the most flexible way to set fields such as topic, filter, options, ack_type, bookmark, and sub_id.

Use execute() when you want a MessageStream that you can iterate over on the calling thread. Use execute_async() when you want AMPS to process results on the client receive thread and dispatch messages to a handler function.

import AMPS

uri = "tcp://localhost:9007/amps/json"

client = AMPS.HAClient("command-example")
chooser = AMPS.DefaultServerChooser()
chooser.add(uri)
client.set_server_chooser(chooser)
client.connect_and_logon()

# Build a command explicitly and process the returned MessageStream
# on the calling thread.
sow_cmd = AMPS.Command("sow") \
.set_topic("orders") \
.set_filter("/symbol = 'ROL'") \
.add_ack_type("processed")

for message in client.execute(sow_cmd):
print(message.get_data())


# Using the same command, but dispatch messages to a handler
# on the client receive thread.
def on_message(message):
print(message.get_data())

client.execute_async(sow_cmd, on_message)

Example 7: sow_and_subscribe with OOF Using AMPS.Command

In this example, a Command is used to query a SOW topic, then receive updates. The oof option tells AMPS to also send out-of-focus messages when records no longer match the filter.

import AMPS

uri = "tcp://localhost:9007/amps/json"

client = AMPS.HAClient("sow-and-subscribe-example")
chooser = AMPS.DefaultServerChooser()
chooser.add(uri)
client.set_server_chooser(chooser)
client.connect_and_logon()

def on_message(message):
if message.get_command() in (AMPS.Message.Command.Publish, AMPS.Message.Command.SOW):
print(f"{message.get_command()}:", message.get_data())
elif message.get_command() == AMPS.Message.Command.OOF:
print("OOF:", message.get_data())

sow_sub_cmd = AMPS.Command("sow_and_subscribe") \
.set_topic("orders") \
.set_filter("/status = 'OPEN'") \
.set_options("oof")

client.execute_async(sow_sub_cmd, on_message)

Example 8: Subscribe to a Queue Topic with max_backlog

For queue consumers, the Command interface makes it easy to request queue-specific behavior such as a larger max_backlog. This allows AMPS to pipeline queue messages to the client more efficiently and is the key to getting maximum performance with queues as explained in Queues section of the User Guide.

In this example, add_ack_type("processed") requests a processed acknowledgment which sets the threading.Event allowing the application wait until AMPS has confirmed that the subscription is active before continuing.

import AMPS
import threading

uri = "tcp://localhost:9007/amps/json"

client = AMPS.HAClient("queue-consumer-example")
chooser = AMPS.DefaultServerChooser()
chooser.add(uri)
client.set_server_chooser(chooser)
client.connect_and_logon()

subscription_ready = threading.Event()

def on_message(message):
if message.get_command() == AMPS.Message.Command.Ack:
if message.get_ack_type() == "processed":
subscription_ready.set()
else:
print(message.get_data())
message.ack()


queue_sub_cmd = AMPS.Command("subscribe") \
.set_topic("sample-queue") \
.set_options("max_backlog=10") \
.add_ack_type("processed")

client.execute_async(queue_sub_cmd, on_message)
subscription_ready.wait(timeout=5)

Example 9: Bookmark Subscribe with a Timestamp Bookmark

If a topic is recorded in the AMPS transaction log, you can use a bookmark subscription to replay messages from a specific point in time and then cut over to the live stream. A timestamp bookmark uses the format YYYYmmddTHHMMSSZ.

This command begins the subscription just after the provided timestamp bookmark. The completed acknowledgment is returned when the replay portion of the bookmark subscription is complete, and the handler class shows how to keep state such as an event or message counters in the handler itself.

import AMPS
import threading

uri = "tcp://localhost:9007/amps/json"

client = AMPS.HAClient("bookmark-subscribe-example")
chooser = AMPS.DefaultServerChooser()
chooser.add(uri)
client.set_server_chooser(chooser)
client.connect_and_logon()

class MessageHandler(object):
def __init__(self):
self.replay_complete = threading.Event()
self.message_count = 0

def __call__(self, message):
if message.get_command() == AMPS.Message.Command.Ack:
if message.get_ack_type() == "completed":
self.replay_complete.set()
else:
self.message_count += 1
print(message.get_bookmark(), message.get_data())


bookmark_sub_cmd = AMPS.Command("subscribe") \
.set_topic("messages-history") \
.set_bookmark("20250401T153000Z") \
.add_ack_type("completed")

handler = MessageHandler()

client.execute_async(bookmark_sub_cmd, handler)
handler.replay_complete.wait(timeout=5)

Example 10: Other Useful AMPS.Command Options

Once you are using AMPS.Command, you can use the same interface to request other AMPS features. For example, you can request conflation, pagination with top_n and skip_n, or aggregation with projection and grouping.

This example requests a page of 25 matching records after skipping the first 50. Other useful command options include conflation for subscriptions and aggregation with projection and grouping.

import AMPS

uri = "tcp://localhost:9007/amps/json"

client = AMPS.HAClient("paged-sow-example")
chooser = AMPS.DefaultServerChooser()
chooser.add(uri)
client.set_server_chooser(chooser)
client.connect_and_logon()

sow_cmd = AMPS.Command("sow") \
.set_topic("orders") \
.set_filter("/status = 'OPEN'") \
.set_options("top_n=25,skip_n=50")

for message in client.execute(sow_cmd):
print(message.get_data())

Additional Examples

More examples can be found in the other section of this Developer Guide as well as in the API Reference.