Skip to main content

Configuration

The source connector is configured with Kafka Connect properties. These properties can be supplied in a JSON request to a distributed worker or in a properties file for a standalone worker.

Required

The following properties are required to construct a valid AMPS source connector.

Kafka Connect

PropertyDescription
connector.classMust be com.crankuptheamps.kafka.AMPSKafkaSource.
tasks.maxMaximum number of source tasks. A value of 1 is recommended unless the source is explicitly partitioned with taskFilter.

Connection to AMPS

PropertyDescription
uriAMPS server URI or comma-separated list of URIs, such as tcp://localhost:9007/amps/json.
clientNameBase name for the AMPS client. Task suffixes are added automatically when multiple tasks are used.

Topic from AMPS

PropertyDescription
ampsTopicAMPS topic or regular expression topic used by the subscription or query.

Optional

AMPS Client

The following properties are optional for functionality related to the AMPS client and AMPS command.

PropertyDescription
clientFactoryClassClass used to construct the AMPS client. Defaults to com.crankuptheamps.kafka.AMPSBasicClientFunction.
commandAMPS command type. Defaults to subscribe. Common values include subscribe and sow_and_subscribe.
filterAMPS content filter expression.
optionsAMPS command options, such as projection, grouping, conflation, or other subscription options.
cmdTypeFilterBit mask of AMPS command type integers to include. Messages with command types outside the mask are ignored.
eventHeadersComma-separated list of AMPS message headers to preserve as Kafka headers.

Valid eventHeaders values include:

HeaderDescription
commandHeaderCommand type of the AMPS message.
topicHeaderTopic name of the AMPS message.
sowKeySOW key of the AMPS message. Only set for SOW subscriptions.
ampsTimestampISO-8601 timestamp of when AMPS processed the message.
bookmarkHeaderBookmark string of the AMPS message. Only set for bookmark subscriptions.
correlationIdCorrelation ID of the AMPS message.
subIdSubscription ID of the AMPS message.
lengthLength of the AMPS message body in bytes.
timestampTimestamp from when the source received the message.

Bookmark Subscriptions

The following properties are optional and can be configured for bookmark subscriptions.

PropertyDescription
bookmarkStarting bookmark for the subscription. Accepts values such as NOW, EPOCH, MOST_RECENT, or any valid AMPS bookmark.
subscriptionIdAMPS subscription ID. Required when bookmark is specified.
bookmarkStoreTypeBookmark store type. Valid values are memory and logged.
bookmarkLogBookmark log file path. Required when using a logged bookmark store.
pruneTimeThresholdMinimum time in milliseconds between logged bookmark store prune operations. Defaults to 300000.

Queues

The following properties are optional and can be configured when subscribing to an AMPS queue.

PropertyDescription
isQueueSet to y when ampsTopic is an AMPS queue.
maxBacklogMaximum number of unacknowledged messages AMPS will provide to the queue subscription.
ackBatchSizeNumber of queue messages to include in an acknowledgment batch.
ackTimeoutTimeout value for sending queue acknowledgments.

Batching and Parallelism

The following properties control batching and task partitioning.

PropertyDescription
maxBuffersMaximum number of message buffers used by the source task. Defaults to 10.
maxBatchMaximum number of AMPS messages returned to Kafka Connect in each poll batch. Defaults to 1000.
taskFilterFilter format used to partition messages across source tasks. Required when tasks.max is greater than 1.
info

The source calls String.format(taskFilter, tasks.max, taskIndex) to build a task-specific AMPS filter. For example, a taskFilter of /id MOD %d = %d with tasks.max set to 2 produces /id MOD 2 = 0 for one task and /id MOD 2 = 1 for the other.