Skip to main content

Examples

Several examples that use different aspects of the AMPS Kafka connector are located in the connector repository under src/test/resources/examples/.

tip

The examples assume Kafka Connect can find the AMPS Kafka connector plugin and that the AMPS Java client JAR is available in the plugin directory.

Included JSON Examples

The repository includes these connector JSON files:

ExampleConnectorDemonstrates
AMPSKafkaGenericSource.jsonSourceA basic AMPS source connector that subscribes to SourceTest and writes records to Kafka.
AMPSKafkaGenericSink.jsonSinkA basic AMPS sink connector that consumes SourceTest from Kafka and publishes to SinkTest in AMPS.
aggSub/AMPSKafkaSource.jsonSourceAn AMPS sow_and_subscribe with a content filter, projection, grouping, conflation, command type filtering, and Kafka headers.
aggSub/AMPSKafkaSink.jsonSinkA sink connector that consumes Orders from Kafka and publishes to AMPSKafkaSinkTest in AMPS.
bookmarkSub/AMPSKafkaMemoryBookmarkSource.jsonSourceA bookmark source using the EPOCH bookmark and a memory bookmark store.
bookmarkSub/AMPSKafkaLoggedBookmarkSource.jsonSourceA bookmark source using the MOST_RECENT bookmark and a logged bookmark store.
queueSub/AMPSKafkaQueueSource.jsonSourceAn AMPS queue subscription that acknowledges messages after Kafka commits them.
dynamicSub/AMPSKafkaDynamicSource.jsonSourceA source connector that subscribes to AMPS topics with a regular expression.
dynamicSub/AMPSKafkaDynamicSink.jsonSinkA sink connector that consumes Kafka topics selected by topics.regex and publishes to matching AMPS topics with useTopicHeader.
failoverTest/AMPSKafkaFailTestSource.jsonSourceA source connector configured with multiple AMPS URIs and a memory bookmark store for failover testing.
failoverTest/AMPSKafkaFailTestSink.jsonSinkA sink connector configured with multiple AMPS URIs and a memory publish store for failover testing.

The example directory also includes aggSub/messages.json, a JSON-lines data file with 15 Orders messages used by the aggregate subscription walkthrough and tests.

JUnit Integration Tests

The connector repository includes JUnit tests in src/test/java/com/crankuptheamps/kafka/AMPSKafkaExampleIntegrationTest.java. These tests use AMPS clients, Kafka, and the Kafka Connect REST API to create the example connectors from src/test/resources/examples/, publish messages, and verify that the expected messages arrive back in AMPS.

TestDemonstrates
basicFlowTestUses AMPSKafkaGenericSource.json and AMPSKafkaGenericSink.json to verify the basic AMPS to Kafka to AMPS path with fixed-size JSON messages.
dynamicSubTestUses dynamicSub/AMPSKafkaDynamicSource.json and dynamicSub/AMPSKafkaDynamicSink.json to verify regular expression topic subscription and sink publication using Kafka topic headers.
deliveryQueueTestUses queueSub/AMPSKafkaQueueSource.json with the generic sink to verify AMPS queue consumption and delivery back to an AMPS sink topic.
bookmarkTestUses bookmarkSub/AMPSKafkaMemoryBookmarkSource.json and bookmarkSub/AMPSKafkaLoggedBookmarkSource.json to verify memory and logged bookmark source configurations.
aggregateTestUses aggSub/AMPSKafkaSource.json, aggSub/AMPSKafkaSink.json, and aggSub/messages.json to verify aggregate subscription filtering, grouping, projection, conflation, and sink delivery.
failoverTestUses failoverTest/AMPSKafkaFailTestSource.json and failoverTest/AMPSKafkaFailTestSink.json to verify that HA source and sink connectors continue delivering messages after the primary AMPS TCP transport is disabled.
info

The JUnit suite requires Kafka, Kafka Connect, and two AMPS servers to be running before mvn test starts. See docs/TEST.md in the connector repository for the test setup, default endpoints, Maven command, and single-test command.