Skip to main content

AMPS Sink

The AMPSKafkaSink reads records from Kafka through Kafka Connect and publishes them to AMPS. The sink is responsible for consuming Kafka records, selecting the AMPS topic, and publishing each record value as an AMPS message payload.

The following is an example of a simple sink connector configuration. In this example, the sink consumes from the Kafka topic SourceTest and publishes all records to the AMPS topic SinkTest.

{
"name": "amps-kafka-sink",
"config": {
"connector.class": "com.crankuptheamps.kafka.AMPSKafkaSink",
"topics": "SourceTest",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"header.converter": "org.apache.kafka.connect.storage.StringConverter",
"clientFactoryClass": "com.crankuptheamps.kafka.AMPSBasicClientFunction",
"clientName": "KafkaSink",
"uri": "tcp://localhost:9007/amps/json",
"ampsTopic": "SinkTest",
"maxBatch": "1000",
"useTopicHeader": "false"
}
}
info

A valid AMPSKafkaSink requires a connector class, Kafka topics or a Kafka topic regular expression, a URI that can successfully connect to AMPS, an AMPS client name, and an AMPS topic. The example above also sets useTopicHeader to false so every record is published to the configured ampsTopic.

Topic Selection

By default, useTopicHeader is true. When this option is true, the sink publishes each Kafka record to an AMPS topic with the same name as the Kafka topic that supplied the record.

Set useTopicHeader to false to publish all records to the configured ampsTopic.

Message Values

The sink expects each Kafka record value to be a byte array and publishes those bytes to AMPS. Use org.apache.kafka.connect.converters.ByteArrayConverter for value.converter unless another converter is intentionally paired with the record value type.