Move Flink Savepoint to a different S3 location


Users of Apache Flink are familiar with creating a savepoint and restarting a job from savepoint.

The issue with savepoint is, how to move a savepoint to a different location and be able to start a Flink job from the new location. Problem lies in the _metadata file of savepoint files, which contains absolute URIs (see documentation on moving savepoint).

In this article, we go step-by-step on how to move Flink savepoint from one S3 bucket to another and how to safely (without corrupting) alter the _metadata file in the destination, so that the Flink job starts smoothly from a new savepoint location. Setup is tested with S3 and filesystem state backend.

Goal

Suppose we have Flink cluster up and running, the flink-config.yaml contains these options to configure S3 for savepoints and checkpoints:

s3.access-key: outacccesskey
s3.secret-key: thisisverysecret
s3.endpoint: https://ourcloudprovider.org
s3.path-style-access: true

state.savepoints.dir: s3://old-bucket/old-savepoints
state.checkpoints.dir: s3://old-bucket/old-checkpoints
state.backend: filesystem

We want to change the bucket name and also the location in bucket to

state.savepoints.dir: s3://new-bucket/new-savepoints
state.checkpoints.dir: s3://new-bucket/new-checkpoints

We will do this in these steps:

  1. Create a savepoint in the current location
  2. Move the savepoint to new location
  3. Download the _metadata file to our local machine
  4. Alter the _metadata file so it contains absolute URIs pointing to new location
  5. Upload _metadata to new location
  6. Start the job from new location savepoint

Requirements

We will need:

  • S3 CLI, various options are available: AWS S3 CLI or minio client which has the advantage that you can configure multiple providers of S3 API in its config file, which is handy if you are changing provider, too.
  • Java 8 JRE, various options: Zulu, or OpenJDK or Oracle

Create a savepoint and move it to new location

At your flink cluster location, issue a standard savepoint command:

./bin/flink savepoint <job_id>
Savepoint completed. Path: s3://old-bucket/old-savepoints/savepoint-b9888f-a23df1784fa3

Move the savepoint to new location:

aws s3 --endpoint-url https://ourcloudprovider.org mv s3://old-bucket/old-savepoints/savepoint-b9888f-a23df1784fa3 s3://new-bucket/new-savepoints/savepoint-b9888f-a23df1784fa3 --recursive

Download the _metadata to your workstation:

aws s3 --endpoint-url https://ourcloudprovider.org cp s3://new-bucket/new-savepoints/savepoint-b9888f-a23df1784fa3/_metadata ./old_metadata

We call the file old_metadata on purpose, to make the new file later.

Alter the metadata file to accommodate for new location

One option is to open the _metadata file in text of you choice and try to find and replace all occurrences of old URI to new URI.
We will go with much safer option of programmatically altering the contents of this file.

We have prepared small Java app to do this job. You can download state-metadata-1.0.0.jar. Source code of this project is available at flink-state-metadata github repository.

Given the old savepoint location s3://old-bucket/old-savepoints and new location s3://new-bucket/new-savepoints we execute the program as follows:

java -jar state-metadata-1.0.0.jar --input.file old_metadata --output.file _metadata \
  --input.uri s3://old-bucket/old-savepoints/savepoint-b9888f-a23df1784fa3 \
  --output.uri s3://new-bucket/new-savepoints/savepoint-b9888f-a23df1784fa3

The file _metadata now has the URIs updated, which you can check by looking into it:

head _metadata

<some binary garbage>
   s3://new-bucket/new-savepoints/savepoint-b9888f-a23df1784fa3/44c90045-8c15-41d0-8bfd-26389b951243 ...

<some binary garbage>

Upload the _metadata and start from relocated savepoint

Upload the altered _metadata file:

aws s3 --endpoint-url https://ourcloudprovider.org cp ./_metadata s3://new-bucket/new-savepoints/savepoint-b9888f-a23df1784fa3/_metadata 

Remember to update the flink-config.yaml:

state.savepoints.dir: s3://new-bucket/new-savepoints
state.checkpoints.dir: s3://new-bucket/new-checkpoints

Now we can safely start from relocated checkpoint at our flink cluster:

./bin/flink ... --from-savepoint s3://new-bucket/new-savepoints/savepoint-b9888f-a23df1784fa3

It should load the savepoint and start checkpointing to new bucket.

Conclusion

All the source code is available in github repository.

Let me know about any questions.