Configuration
The source connector is configured with Kafka Connect properties. These properties can be supplied in a JSON request to a distributed worker or in a properties file for a standalone worker.
Required
The following properties are required to construct a valid AMPS source connector.
Kafka Connect
| Property | Description |
|---|---|
connector.class | Must be com.crankuptheamps.kafka.AMPSKafkaSource. |
tasks.max | Maximum number of source tasks. A value of 1 is recommended unless the source is explicitly partitioned with taskFilter. |
Connection to AMPS
| Property | Description |
|---|---|
uri | AMPS server URI or comma-separated list of URIs, such as tcp://localhost:9007/amps/json. |
clientName | Base name for the AMPS client. Task suffixes are added automatically when multiple tasks are used. |
Topic from AMPS
| Property | Description |
|---|---|
ampsTopic | AMPS topic or regular expression topic used by the subscription or query. |
Optional
AMPS Client
The following properties are optional for functionality related to the AMPS client and AMPS command.
| Property | Description |
|---|---|
clientFactoryClass | Class used to construct the AMPS client. Defaults to com.crankuptheamps.kafka.AMPSBasicClientFunction. |
command | AMPS command type. Defaults to subscribe. Common values include subscribe and sow_and_subscribe. |
filter | AMPS content filter expression. |
options | AMPS command options, such as projection, grouping, conflation, or other subscription options. |
cmdTypeFilter | Bit mask of AMPS command type integers to include. Messages with command types outside the mask are ignored. |
eventHeaders | Comma-separated list of AMPS message headers to preserve as Kafka headers. |
Valid eventHeaders values include:
| Header | Description |
|---|---|
commandHeader | Command type of the AMPS message. |
topicHeader | Topic name of the AMPS message. |
sowKey | SOW key of the AMPS message. Only set for SOW subscriptions. |
ampsTimestamp | ISO-8601 timestamp of when AMPS processed the message. |
bookmarkHeader | Bookmark string of the AMPS message. Only set for bookmark subscriptions. |
correlationId | Correlation ID of the AMPS message. |
subId | Subscription ID of the AMPS message. |
length | Length of the AMPS message body in bytes. |
timestamp | Timestamp from when the source received the message. |
Bookmark Subscriptions
The following properties are optional and can be configured for bookmark subscriptions.
| Property | Description |
|---|---|
bookmark | Starting bookmark for the subscription. Accepts values such as NOW, EPOCH, MOST_RECENT, or any valid AMPS bookmark. |
subscriptionId | AMPS subscription ID. Required when bookmark is specified. |
bookmarkStoreType | Bookmark store type. Valid values are memory and logged. |
bookmarkLog | Bookmark log file path. Required when using a logged bookmark store. |
pruneTimeThreshold | Minimum time in milliseconds between logged bookmark store prune operations. Defaults to 300000. |
Queues
The following properties are optional and can be configured when subscribing to an AMPS queue.
| Property | Description |
|---|---|
isQueue | Set to y when ampsTopic is an AMPS queue. |
maxBacklog | Maximum number of unacknowledged messages AMPS will provide to the queue subscription. |
ackBatchSize | Number of queue messages to include in an acknowledgment batch. |
ackTimeout | Timeout value for sending queue acknowledgments. |
Batching and Parallelism
The following properties control batching and task partitioning.
| Property | Description |
|---|---|
maxBuffers | Maximum number of message buffers used by the source task. Defaults to 10. |
maxBatch | Maximum number of AMPS messages returned to Kafka Connect in each poll batch. Defaults to 1000. |
taskFilter | Filter format used to partition messages across source tasks. Required when tasks.max is greater than 1. |
The source calls String.format(taskFilter, tasks.max, taskIndex) to build a task-specific AMPS filter. For example, a taskFilter of /id MOD %d = %d with tasks.max set to 2 produces /id MOD 2 = 0 for one task and /id MOD 2 = 1 for the other.