Skip to main content

Your First AMPS Kafka Connect Pipeline

This chapter provides a basic walkthrough using the connector with Kafka Connect. For this example, the AMPS Kafka source subscribes to AMPS, writes records into Kafka, and the AMPS Kafka sink reads those records from Kafka and publishes them back to AMPS.

The connector repository includes the same example under src/main/resources/examples/aggSub/.

Start Kafka

Start a local Kafka broker from the Kafka installation directory.

For KRaft mode, generate a cluster ID, format the log directories, and start the server:

KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
bin/kafka-storage.sh format --standalone -t "$KAFKA_CLUSTER_ID" -c config/server.properties
bin/kafka-server-start.sh config/server.properties

For ZooKeeper-based Kafka distributions, start ZooKeeper and then start the server:

bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties

Start AMPS

Start AMPS with the configuration file provided by the connector example:

AMPS-{version}-Release-Linux/bin/ampServer src/main/resources/examples/aggSub/amps-config.xml

Start Kafka Connect

Start a distributed Kafka Connect worker:

bin/connect-distributed.sh config/connect-distributed.properties
tip

Make sure the worker plugin.path includes the directory where the AMPS Kafka connector plugin was installed before starting Kafka Connect.

Load the Source Connector

From the connector repository directory, load the source connector:

curl -X POST -H "Content-Type:application/json" http://localhost:8083/connectors --data @src/main/resources/examples/aggSub/AMPSKafkaSource.json

The source connector configuration uses an AMPS aggregated subscription:

{
"name": "amps-kafka-source",
"config": {
"connector.class": "com.crankuptheamps.kafka.AMPSKafkaSource",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"header.converter": "org.apache.kafka.connect.storage.StringConverter",
"clientFactoryClass": "com.crankuptheamps.kafka.AMPSBasicClientFunction",
"clientName": "KafkaSource",
"uri": "tcp://localhost:9007/amps/json",
"command": "sow_and_subscribe",
"ampsTopic": "Orders",
"filter": "/symbol IN ('IBM', 'MSFT')",
"options": "projection=[/symbol,avg(/price) as /avg_price, avg(/price*/qty) as /avg_total],grouping=[/symbol],conflation=1s",
"subscriptionId": "Sub-100",
"cmdTypeFilter": "9",
"maxBuffers": "10",
"maxBatch": "1000",
"pruneTimeThreshold": "300000",
"eventHeaders": "topicHeader,timestamp"
}
}

Load the Sink Connector

Load the sink connector:

curl -X POST -H "Content-Type:application/json" http://localhost:8083/connectors --data @src/main/resources/examples/aggSub/AMPSKafkaSink.json

The sink reads from the Kafka topic Orders and publishes records to the AMPS topic AMPSKafkaSinkTest:

{
"name": "amps-kafka-sink",
"config": {
"connector.class": "com.crankuptheamps.kafka.AMPSKafkaSink",
"topics": "Orders",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"header.converter": "org.apache.kafka.connect.storage.StringConverter",
"clientFactoryClass": "com.crankuptheamps.kafka.AMPSBasicClientFunction",
"clientName": "KafkaSink",
"pubStoreType": "memory",
"pubStoreInitialCap": "100",
"uri": "tcp://localhost:9007/amps/json",
"ampsTopic": "AMPSKafkaSinkTest",
"maxBatch": "100",
"useTopicHeader": "false"
}
}

Subscribe to the Sink Output

Use the AMPS spark utility to subscribe to the topic where the sink publishes records:

AMPS-{version}-Release-Linux/bin/spark subscribe -server localhost:9007 -topic AMPSKafkaSinkTest

Publish Example Messages

Publish the example JSON messages to the Orders topic:

AMPS-{version}-Release-Linux/bin/spark publish -server localhost:9007 -topic Orders -rate 1 -file src/main/resources/examples/aggSub/messages.json

The source filters for /symbol values of IBM and MSFT, uses a one second conflation interval, and projects aggregate fields. The sink subscriber receives the processed updates after they pass from AMPS to Kafka and back into AMPS.