Checkpointing
Flink's checkpointing provides both fault tolerance and delivery guarantees. For the AMPS Source, checkpointing is used for fault tolerance, delivery guarantees, and "at-least-once" queues.
Topics with a Transaction Log
In order to work with Flink's checkpointing, the AMPS Source needs to be able to records its state and perform message replay. The source combines bookmarks and Flink's checkpointing to provide at least once delivery guarantees. If the source subscribes to a topic with a transaction log and checkpointing is enabled, then at least once delivery guarantee is achieved.
Basically, sources will store the most recent bookmark, and if a failure occurs, the subscription will resume from that stored bookmark, which provides fault tolerance for the AMPS Source.
An AMPS Source that works with Flink's checkpointing can be constructed similar to the following:
AMPSSource<String> source = AMPSSource.<String>builder()
.setUri(uri)
.setTopic(topic)
.setDeserializationSchema(new SimpleStringSchema())
.setBookmark("0") // EPOCH bookmark
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) // Makes the source use a BookmarkStore to work with Flink's checkpointing
.build();
The above example uses a Memory Bookmark Store that is set up internally to coordinate bookmarks with Flink's checkpoints. Any implementation of a Bookmark Store can be used. Refer to the builder methods to see an overview of all options when constructing an AMPS Source.
An AMPS Source discards bookmarks on checkpoint completion by default. This means that checkpointing should be enabled if a Bookmark Store is used. Otherwise, bookmarks will not be discarded, and the JVM will eventually run out of memory. Alternatively, an AMPS Source can be configured to discard bookmarks when the source emits a record to Flink.
Message Queues
The AMPS Source supports both "at-least-once" and "at-most-once" queues, and due do the differing queue semantics, they both acknowledge messages at different times.
For an "at-most-once" queue, messages are acknowledged when an AMPS Source emits a message to Flink. This means that checkpointing is not required for an "at-most-once" queue to properly function.
However, unlike "at-most-once" queues which do not require checkpointing, "at-least-once" queues need Flink's checkpointing to guarantee each message is processed at least once. Instead of discarding bookmarks, the source acknowledges messages on checkpoint completion. If a checkpoint fails to complete or a failure occurs, messages from the source are canceled and returned to the queue in AMPS.
An AMPS Source that subscribes to a queue can be constructed similar to the following:
AMPSSource<String> source = AMPSSource.<String>builder()
.setUri(uri)
.setTopic(topic)
.setDeserializationSchema(new SimpleStringSchema())
.setQueueSemantics("at-least-once") // Required
.setMaxBacklog(20) // Optional
.setAckBatchSize(5) // Optional
.setAckTimeout(5000) // Optional
.build();
Checkpointing must be enabled if the source is subscribing to an "at-least-once" queue. Otherwise, the source will only receive the amount of messages specified in max backlog as it will never acknowledge the messages that it received.