Skip to main content

AMPS Source

The AMPSKafkaSource reads data from AMPS and writes Kafka source records through Kafka Connect. The source is responsible for subscribing or querying AMPS, receiving messages, optionally acknowledging queue messages or discarding bookmark store entries, and passing message payloads to Kafka.

The source connector is pollable. An AMPS client receives messages asynchronously. The AMPS source buffers those messages in batches, and returns those batches when Kafka Connect polls this source task. This avoids committing each message individually as it arrives.

The following is an example of a simple source connector configuration. In this example, the source subscribes to the AMPS topic SourceTest and writes records to the Kafka topic with the same topic name received from AMPS.

{
"name": "amps-kafka-source",
"config": {
"connector.class": "com.crankuptheamps.kafka.AMPSKafkaSource",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"header.converter": "org.apache.kafka.connect.storage.StringConverter",
"clientFactoryClass": "com.crankuptheamps.kafka.AMPSBasicClientFunction",
"clientName": "KafkaSource",
"uri": "tcp://localhost:9007/amps/json",
"ampsTopic": "SourceTest",
"maxBuffers": "10",
"maxBatch": "1000"
}
}
info

A valid AMPSKafkaSource requires a connector class, a connection to AMPS, an AMPS client name, and a topic or topic expression to subscribe to. The example above also sets Kafka Connect converters so record values are handled as bytes.

Topic Names

The source uses the topic name on each AMPS message as the destination Kafka topic. When ampsTopic is a regular expression topic, records can be written to multiple Kafka topics that match the AMPS message topics.

Message Values

The source copies the AMPS message body into the Kafka record value as a byte array. Use org.apache.kafka.connect.converters.ByteArrayConverter for value.converter unless another converter is intentionally paired with the data format being produced.