Coming from Kafka-Streams continuous delivery (CD) is quite an easy task, and almost no effort has to be done compared to Apache Flink. Because the state of a Kafka-Streams application is stored in Kafka, and it can build up the state after a redeployment from so-called changelog topics, therefore Kafka-Streams is also bounded to have source and sink to Apache Kafka.

Apache Flink on the other hand has the freedom to choose from a variety of source systems, e.g. Kafka, Pulsar, RabbitMQ and many other sources to consume and build the streaming application. Therefore it could not rely on changelog topics as Kafka-Streams does. Apache Flink offers out of the box following State Backends for stateful streaming processing:

  • MemoryStateBackend
  • FsStateBackend
  • RocksDBStateBackend

For more details and an in depth description of each state backend with the limitations and the use cases, please look at the official documentation. In this article I’ll be using FsStateBackend for the examples.

Configure checkpointing with FsStateBackend

Before starting to build up the state of a stateful Flink application you have to save the state somewhere or better to say you have to choose of one of the three available state backend implementations or you have to build your custom state backend. I’ve have configure the FsStateBackend as followed in Flink application:

1ParameterTool parameter = ParameterTool.fromArgs(args);
2StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
3executionEnvironment.enableCheckpointing(Duration.ofSeconds(60).toMillis());
4executionEnvironment.getCheckpointConfig().enableExternalizedCheckpoints(RETAIN_ON_CANCELLATION);
5executionEnvironment.setStateBackend(new FsStateBackend(parameter.get("s3-uri")));

In order to use the FsStateBackend you have to add the following dependency, if you are using AWS S3 for your state.

<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-s3-fs-hadoop</artifactId>
	<version>${flink.version}</version>
</dependency>

Configure the application for continuous delivery

After setting up the FsStateBackend for the stateful streaming application the main question is how to have a seamless, continuous delivery pipeline where your application starts with the latest checkpoint from the old deployed version (job id). To my surprise, I didn’t find a lot of resources out there in the internet so I looked around and also asked the community. I got the following answer and the following tools that could be helpful for a seamless deployment of Flink applications.

… but the deployment process involves several steps, any of which might potentially fail. So it’s a bit complex to fully automate.

The mentioned tools would be a bigger time invest and it would need to be investigated more before using these tools for a project in a bigger corporation. So we have to came up with something else in order to make a seamless deployment work in Kubernetes, where all our Apache Flink applications are running in.

After Bundling the application in an official Flink docker container, the application can be started with following arguments passed to a Kubernetes deployment:

containers:
    - name: jobmanager
      image: your_awesome_flink_app:<TAG>
      args:
         - "standalone-job"
         - "--job-classname"
         - "a.b.c.MainClassOfStreamingApp"
         - "--fromSavepoint"
         - "s3://bucket_name/dir_were_to_store_the_state_v1/last-state/_metadata"
         - "--s3-uri"
         - "s3://bucket_name/dir_were_to_store_the_state_v1"
         - "-Ds3.access-key=$(AWS_ACCESS_KEY_ID)"
         - "-Ds3.secret-key=$(AWS_SECRET_KEY)"

The important part is the --fromSavepoint argument and the last-state directory in S3 which is being created or better to say copied over by a Kubernetes cronjob that runs every minute. The following bash script for the cronjob is the heart, with whom we are able to have a continuous delivery without changing the path of the --fromSavepoint argument to the old job id which is being randomly created for each jobmanger.

 1#!/usr/bin/env sh
 2
 3APP_NAME="$1"
 4JOBMANAGER_URL="$2"
 5shift;
 6
 7if [ "${APP_NAME}" == "--help" -o "${APP_NAME}" == "" -o "${JOBMANAGER_URL}" == "" ]; then
 8    echo "Usage: <app-name> <jobmanager-url>
 9
10    app-name: subfolder of the bucket where the state needs to be saved. This dir needs to be created manually at first
11    jobmanager-url: k8s service url of the jobmanager without port
12    "
13    exit 1
14else
15    set -e
16
17    # (1) looking for the job id
18    RUNNING_JOBS=$(curl -s $JOBMANAGER_URL:8081/jobs)
19    JOBID=$(echo $RUNNING_JOBS | jq -r '.jobs | .[0] | select(.status=="RUNNING") | .id')
20    if [ -z "$JOBID" ]; then
21      echo "No Flink job id found"
22      exit 1
23    fi
24    echo "Looking for $JOBID job id state"
25
26    # (2) link current checkpoint to last_state
27    LAST_METADATA=$(aws s3 ls s3://$AWS_BUCKET_NAME/$APP_NAME/$JOBID --recursive | sort | grep _metadata | grep -v "last-state" | tail -n1 | awk '{print $4}')
28    echo $LAST_METADATA
29    # (3)
30    timeout -t 60 aws s3 cp s3://$AWS_BUCKET_NAME/$LAST_METADATA s3://$AWS_BUCKET_NAME/$APP_NAME/last-state/_metadata
31    exit $?
32fi
33
34exec "$@"

(1) Apache Flink’s JobManager API is being used to get all running jobs and select the first (limitation of this script) jobs id.

(2) This Job id is being used to grep the _metadata file by from the latest checkpoint of the currently running Flink job.

The last step (3) is copying the found metadata file and to the last-state directory from where the application is restoring from during restart or reployment.

The shown script is bundled in a container as a docker-entrypoint file where aws cli, curl and jq should be available. This container is then being used and started as Kubernetes cronjob with the following arguments:

containers:
   - name: aws-container
     image: image-with-aws-jq-and-curl-and-the-script:latest
        args:
          - "dir_were_to_store_the_state_v1"
          - "k8s-service-of-flink-jobmanager"

Conclusion

In this blog post the challenges of a stateful streaming application in Apache Flink for a continuous delivery process has been dicussed and a solution that worked for my client has been shown. I’m quite interested to get feedback by the Flink community, in which pitfalls this approach may lead into and in which scenarios it may not work.