Your First AMPS Kafka Connect Pipeline
This chapter provides a basic walkthrough using the connector with Kafka Connect. For this example, the AMPS Kafka source subscribes to AMPS, writes records into Kafka, and the AMPS Kafka sink reads those records from Kafka and publishes them back to AMPS.
The connector repository includes the same example under src/main/resources/examples/aggSub/.
Start Kafka
Start a local Kafka broker from the Kafka installation directory.
For KRaft mode, generate a cluster ID, format the log directories, and start the server:
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
bin/kafka-storage.sh format --standalone -t "$KAFKA_CLUSTER_ID" -c config/server.properties
bin/kafka-server-start.sh config/server.properties
For ZooKeeper-based Kafka distributions, start ZooKeeper and then start the server:
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
Start AMPS
Start AMPS with the configuration file provided by the connector example:
AMPS-{version}-Release-Linux/bin/ampServer src/main/resources/examples/aggSub/amps-config.xml
Start Kafka Connect
Start a distributed Kafka Connect worker:
bin/connect-distributed.sh config/connect-distributed.properties
Make sure the worker plugin.path includes the directory where the AMPS Kafka connector plugin was installed before starting Kafka Connect.
Load the Source Connector
From the connector repository directory, load the source connector:
curl -X POST -H "Content-Type:application/json" http://localhost:8083/connectors --data @src/main/resources/examples/aggSub/AMPSKafkaSource.json
The source connector configuration uses an AMPS aggregated subscription:
{
"name": "amps-kafka-source",
"config": {
"connector.class": "com.crankuptheamps.kafka.AMPSKafkaSource",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"header.converter": "org.apache.kafka.connect.storage.StringConverter",
"clientFactoryClass": "com.crankuptheamps.kafka.AMPSBasicClientFunction",
"clientName": "KafkaSource",
"uri": "tcp://localhost:9007/amps/json",
"command": "sow_and_subscribe",
"ampsTopic": "Orders",
"filter": "/symbol IN ('IBM', 'MSFT')",
"options": "projection=[/symbol,avg(/price) as /avg_price, avg(/price*/qty) as /avg_total],grouping=[/symbol],conflation=1s",
"subscriptionId": "Sub-100",
"cmdTypeFilter": "9",
"maxBuffers": "10",
"maxBatch": "1000",
"pruneTimeThreshold": "300000",
"eventHeaders": "topicHeader,timestamp"
}
}
Load the Sink Connector
Load the sink connector:
curl -X POST -H "Content-Type:application/json" http://localhost:8083/connectors --data @src/main/resources/examples/aggSub/AMPSKafkaSink.json
The sink reads from the Kafka topic Orders and publishes records to the AMPS topic AMPSKafkaSinkTest:
{
"name": "amps-kafka-sink",
"config": {
"connector.class": "com.crankuptheamps.kafka.AMPSKafkaSink",
"topics": "Orders",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"header.converter": "org.apache.kafka.connect.storage.StringConverter",
"clientFactoryClass": "com.crankuptheamps.kafka.AMPSBasicClientFunction",
"clientName": "KafkaSink",
"pubStoreType": "memory",
"pubStoreInitialCap": "100",
"uri": "tcp://localhost:9007/amps/json",
"ampsTopic": "AMPSKafkaSinkTest",
"maxBatch": "100",
"useTopicHeader": "false"
}
}
Subscribe to the Sink Output
Use the AMPS spark utility to subscribe to the topic where the sink publishes records:
AMPS-{version}-Release-Linux/bin/spark subscribe -server localhost:9007 -topic AMPSKafkaSinkTest
Publish Example Messages
Publish the example JSON messages to the Orders topic:
AMPS-{version}-Release-Linux/bin/spark publish -server localhost:9007 -topic Orders -rate 1 -file src/main/resources/examples/aggSub/messages.json
The source filters for /symbol values of IBM and MSFT, uses a one second conflation interval, and projects aggregate fields. The sink subscriber receives the processed updates after they pass from AMPS to Kafka and back into AMPS.