Skip to main content

Configuration

The sink connector is configured with Kafka Connect properties. These properties can be supplied in a JSON request to a distributed worker or in a properties file for a standalone worker.

Required

The following properties are required to construct a valid AMPS sink connector.

Kafka Connect

PropertyDescription
connector.classMust be com.crankuptheamps.kafka.AMPSKafkaSink.
topicsKafka topic or comma-separated list of topics to consume from. Use either topics or topics.regex.
topics.regexJava regular expression for Kafka topics to consume from. Use either topics.regex or topics.
tasks.maxMaximum number of sink tasks. A value of 1 is recommended unless the pipeline is designed for parallel consumption.

Connection to AMPS

PropertyDescription
uriAMPS server URI or comma-separated list of URIs, such as tcp://localhost:9007/amps/json.
clientNameBase name for the AMPS client. Task suffixes are added automatically when multiple tasks are used.

Topic for AMPS

PropertyDescription
ampsTopicAMPS topic to publish to when useTopicHeader is false, or the fallback topic when useTopicHeader is true.

Optional

AMPS Client

The following properties are optional for functionality related to the AMPS client.

PropertyDescription
clientFactoryClassClass used to construct the AMPS client. Defaults to com.crankuptheamps.kafka.AMPSBasicClientFunction.
maxBatchMaximum number of Kafka records the sink processes in a batch. Defaults to 1000.
useTopicHeaderWhen true, publish each record to an AMPS topic matching the Kafka record topic. When false, publish to ampsTopic. Defaults to true.
publishFlushTimeoutTimeout passed to AMPS publishFlush at the end of a batch. A value of -1 disables the flush. A value of 0 waits indefinitely for the batch to persist. Defaults to -1.
logMsgOnWriteErrorWhen true, write the failed message to the Kafka Connect log when an AMPS failed write is detected. Defaults to true.

Publish Stores

The following properties configure AMPS publish stores for the sink client.

PropertyDescription
pubStoreTypePublish store type. Valid values are memory and file.
pubStoreInitialCapPublish store initial capacity in 2KB blocks. Defaults to 1000.
pubStorePathFile path for a file-backed publish store. Required when pubStoreType is file.
pruneTimeThresholdMinimum time in milliseconds between file-backed store prune operations. Defaults to 300000.