Skip to main content

Delivery Guarantees

Kafka Connect commits source records after they have been written to Kafka. The AMPS Kafka source uses that commit callback to discard bookmark store entries or acknowledge queue messages in AMPS.

Topics with a Transaction Log

For AMPS topics with a transaction log, bookmark subscriptions can be used to provide resumable delivery. When the source is configured with a bookmark store, records are not discarded from the store until Kafka Connect commits the corresponding Kafka record.

The following source uses an EPOCH bookmark subscription and a memory bookmark store:

{
"name": "amps-kafka-source",
"config": {
"connector.class": "com.crankuptheamps.kafka.AMPSKafkaSource",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"header.converter": "org.apache.kafka.connect.storage.StringConverter",
"clientFactoryClass": "com.crankuptheamps.kafka.AMPSBasicClientFunction",
"clientName": "KafkaSource",
"uri": "tcp://localhost:9007/amps/json",
"ampsTopic": "SourceTest",
"bookmark": "EPOCH",
"bookmarkStoreType": "memory",
"subscriptionId": "Sub-200",
"maxBuffers": "10",
"maxBatch": "1000"
}
}
info

subscriptionId is required when bookmark is specified. Use a unique subscription ID for each independent subscription in the application.

Bookmark Stores

The connector supports memory and logged bookmark stores.

Store TypeDescription
memoryStores bookmarks in the worker process. This is the highest performance option, but it does not protect against process failure.
loggedStores bookmarks on disk. This provides recovery after client or worker failure, and performance depends on the storage device used for the bookmark log.

A logged bookmark store also requires bookmarkLog:

{
"bookmark": "MOST_RECENT",
"bookmarkStoreType": "logged",
"subscriptionId": "Sub-200",
"bookmarkLog": "./bookmark.log",
"pruneTimeThreshold": "300000"
}
warning

The source provides at-least-once delivery for bookmark subscriptions. The guarantee is managed by the source and the AMPS client bookmark store. If a failure happens after Kafka commits a record but before the bookmark store discard completes, the record can be redelivered after restart.

Message Queues

The source can subscribe directly to AMPS queues. Set isQueue to y and configure the queue acknowledgment settings for the subscription.

AMPS queue configuration determines whether queue delivery is at-most-once or at-least-once. The source uses the same guarantees as a regular AMPS queue consumer: the connector subscribes to the configured queue, and AMPS applies the delivery semantics configured for that queue.

{
"name": "amps-kafka-source",
"config": {
"connector.class": "com.crankuptheamps.kafka.AMPSKafkaSource",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"header.converter": "org.apache.kafka.connect.storage.StringConverter",
"clientFactoryClass": "com.crankuptheamps.kafka.AMPSBasicClientFunction",
"clientName": "KafkaSource",
"uri": "tcp://localhost:9007/amps/json",
"ampsTopic": "SourceTest",
"isQueue": "y",
"maxBacklog": "10000",
"ackBatchSize": "3000",
"ackTimeout": "60000",
"maxBuffers": "10",
"maxBatch": "1000"
}
}

For queues, messages are acknowledged after Kafka Connect commits the corresponding Kafka record. If the connector stops before a message is committed to Kafka, AMPS can redeliver the unacknowledged message according to the queue's configured delivery semantics.

Aggregated Subscriptions

The source can use AMPS command options, including projection, grouping, and conflation. The following configuration uses sow_and_subscribe with an aggregated subscription:

{
"name": "amps-kafka-source",
"config": {
"connector.class": "com.crankuptheamps.kafka.AMPSKafkaSource",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"header.converter": "org.apache.kafka.connect.storage.StringConverter",
"clientFactoryClass": "com.crankuptheamps.kafka.AMPSBasicClientFunction",
"clientName": "KafkaSource",
"uri": "tcp://localhost:9007/amps/json",
"command": "sow_and_subscribe",
"ampsTopic": "Orders",
"filter": "/symbol IN ('IBM', 'MSFT')",
"options": "projection=[/symbol,avg(/price) as /avg_price, avg(/price*/qty) as /avg_total],grouping=[/symbol],conflation=1s",
"subscriptionId": "Sub-100",
"cmdTypeFilter": "9",
"maxBuffers": "10",
"maxBatch": "1000",
"pruneTimeThreshold": "300000",
"eventHeaders": "topicHeader,timestamp"
}
}