Your First Flink Job with the Connector
This chapter provides a basic walkthrough using the connector in a Maven project. For this example, we will create a simple Maven project manually.
Initialize the Maven Project
First, create a new directory that will be used as the Maven project. The project will be used to create the JAR that we will submit to the Flink cluster.
mkdir first-program
Now, enter the newly created directory.
cd first-program
Create a pom.xml and copy the following into the file:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>example</groupId>
<artifactId>first-program</artifactId>
<version>1.0.0</version>
<name>first-program</name>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.release>17</maven.compiler.release>
<flink.version>2.2.0</flink.version>
<amps.client.version>5.3.5.0</amps.client.version>
<amps.flink.connector.version>1.2.0-2.2</amps.flink.connector.version>
</properties>
<dependencies>
<dependency>
<groupId>com.crankuptheamps</groupId>
<artifactId>amps-client</artifactId>
<version>${amps.client.version}</version>
</dependency>
<dependency>
<groupId>com.crankuptheamps.flink</groupId>
<artifactId>flink-connector-amps</artifactId>
<version>${amps.flink.connector.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.6.2</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/MANIFEST.MF</exclude>
<exclude>META-INF/DEPENDENCIES</exclude>
<exclude>META-INF/LICENSE</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>example.FirstProgram</mainClass>
</transformer>
</transformers>
<createDependencyReducedPom>false</createDependencyReducedPom>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.11.0</version>
</plugin>
</plugins>
</build>
</project>
Create a directory for the source code.
mkdir src && mkdir src/main && mkdir src/main/java && mkdir src/main/java/example
Create a file that will submit a job to Flink:
touch src/main/java/example/FirstProgram.java
Copy the following into the file:
package example;
import com.crankuptheamps.client.Client;
import com.crankuptheamps.client.Message;
import com.crankuptheamps.client.MessageHandler;
import com.crankuptheamps.flink.source.AMPSSource;
import com.crankuptheamps.flink.sink.AMPSSink;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
public class FirstProgram {
public static void main(String[] args) throws Exception {
String uri = "tcp://localhost:9006/amps/json";
String initTopic = "initial-topic";
String modifiedTopic = "modified-topic";
int pubCount = 100;
try (Client client = new Client("client");) {
client.connect(uri);
client.logon();
for (int i = 0; i < pubCount; i++) {
client.publish(initTopic, String.format("{\"id\":%d}", i));
}
client.publishFlush();
client.subscribe(new MessageHandler() {
@Override
public void invoke(Message message) {
System.out.println(message.getData());
}
}, modifiedTopic, 0);
AMPSSource<String> source = AMPSSource.<String>builder()
.setUri(uri)
.setTopic(initTopic)
.setBookmark("0")
.setTopN(pubCount)
.setDeserializationSchema(new SimpleStringSchema())
.build();
AMPSSink<String> sink = AMPSSink.<String>builder()
.setUri(uri)
.setTopic(modifiedTopic)
.setSerializationSchema(new SimpleStringSchema())
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.fromSource(source, WatermarkStrategy.noWatermarks(), "AMPS Source")
.process(new ProcessFunction<String, String>() {
@Override
public void processElement(String value,
ProcessFunction<String, String>.Context ctx,
Collector<String> out) throws Exception {
Thread.sleep(200);
String outStr = value.substring(0, value.length() - 1);
out.collect(outStr + ",\"num\":" + Math.random() + "}");
}
})
.sinkTo(sink);
env.execute("First Program with AMPS Flink Connectors");
Thread.sleep(1000);
}
}
}
Build and package the job. This will create the JAR target/first-program-1.0.0.jar that should be submitted to Flink.
mvn clean package
Set Up AMPS and Flink
Create an AMPS configuration file in the first-program directory.
touch amps-config.xml
Copy the following AMPS configuration into amps-config.xml:
<?xml version="1.0" encoding="UTF-8"?>
<AMPSConfig>
<Name>AMPS-First-Connector-Program</Name>
<Transports>
<Transport>
<Name>any-tcp</Name>
<Type>tcp</Type>
<InetAddr>9006</InetAddr>
<Protocol>amps</Protocol>
</Transport>
<Transport>
<Name>any-ws</Name>
<Type>tcp</Type>
<InetAddr>9007</InetAddr>
<Protocol>websocket</Protocol>
</Transport>
</Transports>
<Admin>
<InetAddr>8085</InetAddr>
<SQLTransport>any-ws</SQLTransport>
<FileName>./amps-files/amps-flink/stats.db</FileName>
<WWWAuthenticate>Basic realm="AMPS Admin"</WWWAuthenticate>
</Admin>
<Logging>
<Target>
<Protocol>stdout</Protocol>
<Level>error</Level>
<IncludeErrors>00-0015</IncludeErrors>
</Target>
</Logging>
<TransactionLog>
<JournalDirectory>./amps-files/amps-flink/journals</JournalDirectory>
<JournalSize>10MB</JournalSize>
<Topic>
<Name>initial-topic</Name>
<MessageType>json</MessageType>
</Topic>
</TransactionLog>
</AMPSConfig>
Open a new terminal, navigate to the first-program directory, and start an AMPS instance using the configuration file.
/path/to/amps/AMPS-{amps_version}-Release-Linux/bin/ampServer amps-config.xml
Open a new terminal, navigate to your directory that contains Flink.
cd /path/to/flink-2.2.0
Start the Flink cluster.
bin/start-cluster.sh
Submit the Job to Flink
In the terminal in the Flink directory, submit the job to Flink.
bin/flink run /path/to/first-program/target/first-program-1.0.0.jar
Examining the Code for the Job
Most of the steps above were about setting up a basic Maven project that will create the JAR for Flink. Now, let's take a closer look at the job we are submitting to Flink.
package example;
/*
* These imports are for publishing the initial messages to AMPS and
* printing the messages that were modified by Flink and published
* back to AMPS by the connectors. They are not necessary for a
* real job as they are included for demonstration purposes.
*/
import com.crankuptheamps.client.Client;
import com.crankuptheamps.client.Message;
import com.crankuptheamps.client.MessageHandler;
/*
* These imports are for the connectors.
*/
import com.crankuptheamps.flink.source.AMPSSource;
import com.crankuptheamps.flink.sink.AMPSSink;
/*
* These imports are for creating and submitting the Flink job.
*/
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
public class FirstProgram {
public static void main(String[] args) throws Exception {
/*
* The connection URI the client and connectors use to connect to AMPS.
*/
String uri = "tcp://localhost:9006/amps/json";
/*
* The initial topic with a transaction log the client publishes to
* and the AMPS Source subscribes to.
*/
String initTopic = "initial-topic";
/*
* The modified ad hoc topic the AMPS Sink publishes to and the
* client subscribes to.
*/
String modifiedTopic = "modified-topic";
/*
* The amount of messages the client will publish and the AMPS Source
* will read from AMPS.
*/
int pubCount = 100;
/*
* Try with resources block to create and close the client.
*/
try (Client client = new Client("client");) {
/*
* Connect the client to AMPS.
*/
client.connect(uri);
client.logon();
/*
* Publish the initial messages to AMPS.
*/
for (int i = 0; i < pubCount; i++) {
client.publish(initTopic, String.format("{\"id\":%d}", i));
}
client.publishFlush();
/*
* Subscribe to the modified ad hoc topic. To print
* the messages modified by Flink.
*/
client.subscribe(new MessageHandler() {
@Override
public void invoke(Message message) {
System.out.println(message.getData());
}
}, modifiedTopic, 0);
/*
* Create an AMPS Source that will subscribe to the initial topic.
* The bookmark is the EPOCH bookmark, which starts the replay
* from the first message in the specified topic. The topN
* tells the source to only receive pubCount messages from AMPS
* before considering the job finished. The URI, topic,
* deserialization schema, and build() are all required to
* create a valid AMPS Source. The bookmark and topN are optional.
*/
AMPSSource<String> source = AMPSSource.<String>builder()
.setUri(uri)
.setTopic(initTopic)
.setBookmark("0")
.setTopN(pubCount)
.setDeserializationSchema(new SimpleStringSchema())
.build();
/*
* Creates an AMPS Sink that will publish to the modified topic.
* The URI, topic, serialization schema, and build() are all
* required to create a valid AMPS Sink.
*/
AMPSSink<String> sink = AMPSSink.<String>builder()
.setUri(uri)
.setTopic(modifiedTopic)
.setSerializationSchema(new SimpleStringSchema())
.build();
/*
* The Stream Execution Environment is used to create the job
* and submit it to Flink.
*/
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
/*
* Parallelism is set to 1 since this example does not use parallelism.
*/
env.setParallelism(1);
/*
* Create the job graph. First, we use the AMPS Source to receive data
* from AMPS and create a Data Stream of Strings. Then, we use a Process
* Function to add a new field to the Strings. Finally, we sink the
* Strings back to AMPS using the AMPS Sink. In this example, all three
* steps are chained together.
*/
env.fromSource(source, WatermarkStrategy.noWatermarks(), "AMPS Source")
.process(new ProcessFunction<String, String>() {
@Override
public void processElement(String value,
ProcessFunction<String, String>.Context ctx,
Collector<String> out) throws Exception {
/*
* Sleep for a short duration to avoid cluttering the console with messages.
*/
Thread.sleep(200);
/*
* Remove the trailing '}' to allow a new field to be added.
*/
String outStr = value.substring(0, value.length() - 1);
/*
* Collect the String plus a new field 'num'.
*/
out.collect(outStr + ",\"num\":" + Math.random() + "}");
}
})
.sinkTo(sink);
/*
* Submit the job to Flink. This method is synchronous and will wait until
* the job completes to resume the rest of the Java program. Since we
* are using topN with a bookmark subscription, the AMPS Source is
* BOUNDED and the job will be finished once pubCount messages are
* received by the AMPS Source.
*/
env.execute("First Program with AMPS Flink Connectors");
/*
* Sleep for a short duration to make sure all messages are printed to the console.
*/
Thread.sleep(1000);
}
}
}
In this example, we are using a simple AMPS Source and AMPS Sink, and the data type we are working with is just a String. Much more customization is available for the connector such as using user-defined data types and serialization schemas, which can make processing the stream data much more convenient when compared to working with Strings. See the examples for several jobs utilizing different aspects of the connector.