Skip to main content

Builder

The builder pattern is used to construct new AMPS Source instances.

Required

The following methods in the builder are required to construct a valid AMPS Source instance.

Connection to AMPS

One of the following must be used for the AMPS Source to connect to AMPS.

MethodDescription
setUriSets the URI that will be used to connect to AMPS.
setServerChooserSupplierSets the supplier used to get a server chooser to connect to AMPS. The supplier must be serializable.

Topic from AMPS

One of the following must be used to determine what topic the AMPS Source should read from.

MethodDescription
setTopicSets the topic that will be used by the subscribe command.
setAMPSSplitsSets an AMPSSplit collection that will be used to create subscriptions.

Deserializing Data from AMPS

The following method is overloaded to allow either a DeserializationSchema or an AMPSDeserializationSchema to be used by an AMPS Source.

MethodDescription
setDeserializationSchemaSets the deserialization schema that will be used to deserialize messages from AMPS.

Building the AMPS Source

The following method must be used to build the AMPS Source based on the set builder methods.

MethodDescription
buildReturns the constructed AMPS Source.

Optional - AMPS Client

The following methods in the builder are optional for functionality related to the AMPS client.

MethodDescription
setClientNameSets the base client name.
setContentFilterSets the content filter used by all clients from this source. This can be used to provide a filter that all clients should use in addition to any filter provided by a split.
setOptionsSets additional options.
setBookmarkStoreFunctionSets the SerializableFunction that gets each client a bookmark store. The parameter is the client name that an individual client of the source will use.

This should only be used if checkpointing is enabled or discardAfterEmit is true.
setBookmarkSets the starting bookmark.

Only valid for topics with a transaction log.
setSplitsSets the splits for the source using a collection of strings that are content filters that will shard the topic.

This can be used instead of setAMPSSplits to use content filters on the topic set by setTopic.
setQueueSemanticsSets the queue semantics if the source is subscribing to a queue.

Valid semantics are:
  • "at-least-once"
  • "at-most-once"
Only valid for queues.
setAckBatchSizeSets how many acknowledgement messages will be batched.

Only valid for queues.
setAckTimeoutSets how long acknowledgement messages will be held.

Only valid for queues.
setTopNSets the amount of messages that should be received from AMPS.

Only valid for SOW queries and topics with a transaction log.
setSkipNSets the amount of messages in a SOW query that should be skipped. If "skip_n=n" is set using options, then this value will be ignored.

Only valid for SOW queries with a set topN.
setSubscribeCommandSets the subscription command for the source.

Valid commands include:
  • "subscribe" (Default)
  • "sow"
  • "sow_and_subscribe"
  • "delta_subscribe" (May require a custom deserialization schema)
  • "sow_and_delta_subscribe" (May require a custom deserialization schema)
setExceptionListenerSupplierSets the supplier that gets each client an exception listener. The supplier must be serializable.
setBatchSizeSets the batch size for SOW queries.

Only valid for SOW queries.
setOrderBySets how SOW results should be ordered.

Only valid for SOW queries.
setReconnectDelayStrategySupplierSets the supplier that gets each client a ReconnectDelayStrategy. Default uses exponential delay. The supplier must be serializable.
setHeartbeatSets the heartbeat interval in seconds for the clients.
setMaxBacklogSets the max backlog when subscribing to a message queue. If "max_backlog=n" is set using options, then this value will be ignored.

Only valid for queues.
setPruneIntervalSets the milliseconds that must pass before a LoggedBookmarkStore is pruned. Default is 30_000L or 30 seconds. This field only has an effect if there is a bookmark store defined, and it is a LoggedBookmarkStore.
setHeaderKeysSets the headers that should be preserved from a message from AMPS. The headers will be preserved in an AMPSMessage and can only be accessed using the .getHeader(AMPSSourceHeaderKeys) method in an AMPSDeserializationSchema.
setConnectorInitializerSets the ConnectorInitializer that will run its init(HAClient) method before the reader connects to AMPS. This can be used to execute code such as setting up an SSLContext for the AMPS clients.

Optional - Other

The following methods in the builder are optional for additional functionality not directly related to the AMPS client.

MethodDescription
setDeliveryGuaranteeSets the source's delivery guarantee by internally setting up a Memory Bookmark Store with a Recovery Point Adapter. Default is NONE.

This should only be set to AT_LEAST_ONCE or EXACTLY_ONCE if checkpointing is enabled or discardAfterEmit is true.
setInternalBufferSizeSets the buffer size of the queue that each client uses to buffer messages from AMPS. Default is 1000. This should be adjusted based on the application the source is being used for. For example, applications with large message sizes may want to use a smaller buffer size to avoid excessive memory use on buffering messages.
setConfigurationSets the Flink Configuration used to supply SourceReaderOptions.
setSleepMillisAfterBlockSets the amount of milliseconds to sleep for after blocking for a message. Default is 0. This can be used to allow some messages to buffer in the queue before handing them to Flink.
setDiscardAfterEmitSets the flag for if bookmarks should be discarded after a record is emitted rather than on checkpoint completion. Default is false. If the checkpoint interval for an application is high, this may help reduce memory usage for the source's bookmark store.
setUseSuffixSets the flag for if a suffix should be appended to the client name. Default is false, but it will be set to true if parallelism is greater than 1 and there are multiple splits.