Data can be stored in many different systems, such as filesystems, object stores, relational database systems, key-value stores, search indexes, event logs, message queues, and so on. Each class of systems has been designed for specific access patterns and excels at serving a certain purpose. Consequently, today’s data infrastructures often consist of many different storage systems. Before adding a new component into the mix, a logical question to ask should be, “How well does it work with the other components in my stack?”
Adding a data processing system, such as Apache Flink, requires careful considerations because it does not include its own storage layer but relies on external storage systems to ingest and persist data. Hence, it is important for data processors like Flink to provide a well-equipped library of connectors to read data from and write data to external systems as well as an API to implement custom connectors. However, just being able to read or write data to external datastores is not sufficient for a stream processor that wants to provide meaningful consistency guarantees in the case of failure.
In this chapter, we discuss how source and sink connectors affect the consistency guarantees of Flink streaming applications and present Flink’s most popular connectors to read and write data. You will learn how to implement custom source and sink connectors and how to implement functions that send asynchronous read or write requests to external datastores.
In “Checkpoints, Savepoints, and State Recovery”, you learned that Flink’s checkpointing and recovery mechanism periodically takes consistent checkpoints of an application’s state. In case of a failure, the state of the application is restored from the latest completed checkpoint and processing continues. However, being able to reset the state of an application to a consistent point is not sufficient to achieve satisfying processing guarantees for an application. Instead, the source and sink connectors of an application need to be integrated with Flink’s checkpointing and recovery mechanism and provide certain properties to be able to give meaningful guarantees.
In order to provide exactly-once state consistency for an application,1 each source connector of the application needs to be able to set its read positions to a previously checkpointed position. When taking a checkpoint, a source operator persists its reading positions and restores these positions during recovery. Examples for source connectors that support the checkpointing of reading positions are file-based sources that store the reading offsets in the byte stream of the file or a Kafka source that stores the reading offsets in the topic partitions it consumes. If an application ingests data from a source connector that is not able to store and reset a reading position, it might suffer from data loss in the case of a failure and only provide at-most-once guarantees.
The combination of Flink’s checkpointing and recovery mechanism and resettable source connectors guarantees that an application will not lose any data. However, the application might emit results twice because all results that have been emitted after the last successful checkpoint (the one to which the application falls back in the case of a recovery) will be emitted again. Therefore, resettable sources and Flink’s recovery mechanism are not sufficient to provide end-to-end exactly-once guarantees even though the application state is exactly-once consistent.
An application that aims to provide end-to-end exactly-once guarantees requires special sink connectors. There are two techniques that sink connectors can apply in different situations to achieve exactly-once guarantees: idempotent writes and transactional writes.
An idempotent operation can be performed several times but will only result in a single change. For example, repeatedly inserting the same key-value pair into a hashmap is an idempotent operation because the first insert operation adds the value for the key into the map and all following insertions will not change the map since it already contains the key-value pair. On the other hand, an append operation is not an idempotent operation, because appending an element multiple times results in multiple appends. Idempotent write operations are interesting for streaming applications because they can be performed multiple times without changing the results. Hence, they can to some extent mitigate the effect of replayed results as caused by Flink’s checkpointing mechanism.
It should be noted an application that relies on idempotent sinks to achieve exactly-once results must guarantee that it overrides previously written results while it replays. For example, an application with a sink that upserts into a key-value store must ensure that it deterministically computes the keys that are used to upsert. Moreover, applications that read from the sink system might observe unexpected results during the time when an application recovers. When the replay starts, previously emitted results might be overridden by earlier results. Hence, an application that consumes the output of the recovering application might witness a jump back in time, e.g., read a smaller count than before. Also, the overall result of the streaming application will be in an inconsistent state while the replay is in progress because some results will be overridden while others are not. Once the replay completes and the application is past the point at which it previously failed, the result will be consistent again.
The second approach to achieve end-to-end exactly-once consistency is based on transactional writes. The idea here is to only write those results to an external sink system that have been computed before the last successful checkpoint. This behavior guarantees end-to-end exactly-once because in case of a failure, the application is reset to the last checkpoint and no results have been emitted to the sink system after that checkpoint. By only writing data once a checkpoint is completed, the transactional approach does not suffer from the replay inconsistency of the idempotent writes. However, it adds latency because results only become visible when a checkpoint completes.
Flink provides two building blocks to implement transactional sink connectors—a generic write-ahead-log (WAL) sink and a two-phase-commit (2PC) sink. The WAL sink writes all result records into application state and emits them to the sink system once it receives the notification that a checkpoint was completed. Since the sink buffers records in the state backend, the WAL sink can be used with any kind of sink system. However, it cannot provide bulletproof exactly-once guarantees,2 adds to the state size of an application, and the sink system has to deal with a spiky writing pattern.
In contrast, the 2PC sink requires a sink system that offers transactional support or exposes building blocks to emulate transactions. For each checkpoint, the sink starts a transaction and appends all received records to the transaction, writing them to the sink system without committing them. When it receives the notification that a checkpoint completed, it commits the transaction and materializes the written results. The mechanism relies on the ability of a sink to commit a transaction after recovering from a failure that was opened before a completed checkpoint.
The 2PC protocol piggybacks on Flink’s existing checkpointing mechanism. The checkpoint barriers are notifications to start a new transaction, the notifications of all operators about the success of their individual checkpoint are their commit votes, and the messages of the JobManager that notify about the success of a checkpoint are the instructions to commit the transactions. In contrast to WAL sinks, 2PC sinks can achieve exactly-once output depending on the sink system and the sink’s implementation. Moreover, a 2PC sink continuously writes records to the sink system compared to the spiky writing pattern of a WAL sink.
Table 8-1 shows the end-to-end consistency guarantees for different types of source and sink connectors that can be achieved in the best case; depending on the implementation of the sink, the actual consistency might be worse.
Nonresettable source | Resettable source | |
---|---|---|
Any sink | At-most-once | At-least-once |
Idempotent sink | At-most-once | Exactly-once* (temporary inconsistencies during recovery) |
WAL sink | At-most-once | At-least-once |
2PC sink | At-most-once | Exactly-once |
Apache Flink provides connectors to read data from and write data to a variety of storage systems. Message queues and event logs, such as Apache Kafka, Kinesis, or RabbitMQ, are common sources to ingest data streams. In batch processing-dominated environments, data streams are also often ingested by monitoring a filesystem directory and reading files as they appear.
On the sink side, data streams are often produced into message queues to make the events available to subsequent streaming applications, written to filesystems for archiving or making the data available for offline analytics or batch applications, or inserted into key-value stores or relational database systems, like Cassandra, ElasticSearch, or MySQL, to make the data searchable and queryable, or to serve dashboard applications.
Unfortunately, there are no standard interfaces for most of these storage systems, except JDBC for relational DBMS. Instead, every system features its own connector library with a proprietary protocol. As a consequence, processing systems like Flink need to maintain several dedicated connectors to be able to read events from and write events to the most commonly used message queues, event logs, filesystems, key-value stores, and database systems.
Flink provides connectors for Apache Kafka, Kinesis, RabbitMQ, Apache Nifi, various filesystems, Cassandra, ElasticSearch, and JDBC. In addition, the Apache Bahir project provides additional Flink connectors for ActiveMQ, Akka, Flume, Netty, and Redis.
In order to use a provided connector in your application, you need to add its dependency to the build file of your project. We explained how to add connector dependencies in “Including External and Flink Dependencies”.
In the following section, we discuss the connectors for Apache Kafka, file-based sources and sinks, and Apache Cassandra. These are the most widely used connectors and they also represent important types of source and sink systems. You can find more information about the other connectors in the documentation for Apache Flink or Apache Bahir.
Apache Kafka is a distributed streaming platform. Its core is a distributed publish-subscribe messaging system that is widely adopted to ingest and distribute event streams. We briefly explain the main concepts of Kafka before we dive into the details of Flink’s Kafka connector.
Kafka organizes event streams as so-called topics. A topic is an event log that guarantees that events are read in the same order in which they were written. In order to scale writing to and reading from a topic, it can be split into partitions that are distributed across a cluster. The ordering guarantee is limited to a partition—Kafka does not provide ordering guarantees when reading from different partitions. The reading position in a Kafka partition is called an offset.
Flink provides source connectors for all common Kafka versions. Through Kafka 0.11, the API of the client library evolved and new features were added. For instance, Kafka 0.10 added support for record timestamps. Since release 1.0, the API has remained stable. Flink provides a universal Kafka connector that works for all Kafka versions since 0.11. Flink also features version-specific connectors for the Kafka versions 0.8, 0.9, 0.10, and 0.11. For the remainder of this section, we focus on the universal connector and for the version-specific connectors, we refer you to Flink’s documentation.
The dependency for the universal Flink Kafka connector is added to a Maven project as shown in the following:
<dependency>
<groupId>
org.apache.flink</groupId>
<artifactId>
flink-connector-kafka_2.12</artifactId>
<version>
1.7.1</version>
</dependency>
The Flink Kafka connector ingests event streams in parallel. Each parallel source task can read from one or more partitions. A task tracks for each partition its current reading offset and includes it into its checkpoint data. When recovering from a failure, the offsets are restored and the source instance continues reading from the checkpointed offset. The Flink Kafka connector does not rely on Kafka’s own offset-tracking mechanism, which is based on so-called consumer groups. Figure 8-1 shows the assignment of partitions to source instances.
A Kafka source connector is created as shown in Example 8-1.
val
properties
=
new
Properties
()
properties
.
setProperty
(
"bootstrap.servers"
,
"localhost:9092"
)
properties
.
setProperty
(
"group.id"
,
"test"
)
val
stream
:
DataStream
[
String
]
=
env
.
addSource
(
new
FlinkKafkaConsumer
[
String
](
"topic"
,
new
SimpleStringSchema
(),
properties
))
The constructor takes three arguments. The first argument defines the topics to read from. This can be a single topic, a list of topics, or a regular expression that matches all topics to read from. When reading from multiple topics, the Kafka connector treats all partitions of all topics the same and multiplexes their events into a single stream.
The second argument is a DeserializationSchema
or KeyedDeserializationSchema
. Kafka messages are stored as raw byte messages and need to be deserialized into Java or Scala objects. The SimpleStringSchema
, which is used in Example 8-1, is a built-in DeserializationSchema
that simply deserializes a byte array into a String
. In addition, Flink provides implementations for Apache Avro and text-based JSON encodings. DeserializationSchema
and KeyedDeserializationSchema
are public interfaces so you can always implement custom deserialization logic.
The third parameter is a Properties
object that configures the Kafka client that is internally used to connect to and read from Kafka. A minimum Properties
configuration consists of two entries, "bootstrap.servers"
and "group.id"
. Consult the Kafka documentation for additional configuration properties.
In order to extract event-time timestamps and generate watermarks, you can provide an AssignerWithPeriodicWatermark
or an AssignerWithPunctuatedWatermark
to a Kafka consumer by calling a FlinkKafkaConsumer.assignTimestampsAndWatermark().
3 An assigner is applied to each partition to leverage the per partition ordering guarantees, and the source instance merges the partition watermarks according to the watermark propagation protocol (see “Watermark Propagation and Event Time”).
Note that the watermarks of a source instance cannot make progress if a partition becomes inactive and does not provide messages. As a consequence, a single inactive partition causes the whole application to stall because the application’s watermarks do not make progress.
As of version 0.10.0, Kafka supports message timestamps. When reading from Kafka version 0.10 or later, the consumer will automatically extract the message timestamp as an event-time timestamp if the application runs in event-time mode. In this case, you still need to generate watermarks and should apply an AssignerWithPeriodicWatermark
or an AssignerWithPunctuatedWatermark
that forwards the previously assigned Kafka timestamp.
There are a few more notable configuration options. You can configure the starting position from which the partitions of a topic are initially read. Valid options are:
The last reading position known by Kafka for the consumer group that was configured via the group.id
parameter. This is the default behavior:
FlinkKafkaConsumer.setStartFromGroupOffsets()
The earliest offset of each individual partition:
FlinkKafkaConsumer.setStartFromEarliest()
The latest offset of each individual partition:
FlinkKafkaConsumer.setStartFromLatest()
All records with a timestamp greater than a given timestamp (requires Kafka 0.10.x or later):
FlinkKafkaConsumer.setStartFromTimestamp(long)
Specific reading positions for all partitions as provided by a Map
object:
FlinkKafkaConsumer.setStartFromSpecificOffsets(Map)
Note that this configuration only affects the first reading positions. In the case of a recovery or when starting from a savepoint, an application will start reading from the offsets stored in the checkpoint or savepoint.
A Flink Kafka consumer can be configured to automatically discover new topics that match the regular expression or new partitions that were added to a topic. These features are disabled by default and can be enabled by adding the parameter flink.partition-discovery.interval-millis
with a nonnegative value to the Properties
object.
Flink provides sink connectors for all Kafka versions since 0.8. Through Kafka 0.11, the API of the client library evolved and new features were added, such as record timestamp support with Kafka 0.10 and transactional writes with Kafka 0.11. Since release 1.0, the API has remained stable. Flink provides a universal Kafka connector that works for all Kafka versions since 0.11. Flink also features version-specific connectors for Kafka versions 0.8, 0.9, 0.10, and 0.11. For the remainder of this section, we focus on the universal connector and refer you to Flink’s documentation for the version-specific connectors. The dependency for Flink’s universal Kafka connector is added to a Maven project as shown in the following:
<dependency>
<groupId>
org.apache.flink</groupId>
<artifactId>
flink-connector-kafka_2.12</artifactId>
<version>
1.7.1</version>
</dependency>
A Kafka sink is added to a DataStream application as shown in Example 8-2.
val
stream
:
DataStream
[
String
]
=
...
val
myProducer
=
new
FlinkKafkaProducer
[
String
](
"localhost:9092"
,
// broker list
"topic"
,
// target topic
new
SimpleStringSchema
)
// serialization schema
stream
.
addSink
(
myProducer
)
The constructor used in Example 8-2 receives three parameters. The first parameter is a comma-separated string of Kafka broker addresses. The second is the name of the topic to which the data is written, and the last is a SerializationSchema
that converts the input types of the sink (String
in Example 8-2) into a byte array. SerializationSchema
is the counterpart of the DeserializationSchema
that we discussed in the Kafka source section.
FlinkKafkaProducer
provides more constructors with different combinations of arguments as follows:
Similar to the Kafka source connector, you can pass a Properties
object to provide custom options to the internal Kafka client. When using Properties
, the list of brokers has to be provided as a "bootstrap.servers"
property. Have a look at the Kafka documentation for a comprehensive list of parameters.
You can specify a FlinkKafkaPartitioner
to control how records are mapped to Kafka partitions. We will discuss this feature in more depth later in this section.
Instead of using a SerializationSchema
to convert records into byte arrays, you can also specify a KeyedSerializationSchema
, which serializes a record into two byte arrays—one for the key and one for the value of a Kafka message. Moreover, KeyedSerializationSchema
also exposes more Kafka-specific functionality, such as overriding the target topic to write to multiple topics.
The consistency guarantees that Flink’s Kafka sink provides depend on its configuration. The Kafka sink provides at-least-once guarantees under the following conditions:
Flink’s checkpointing is enabled and all sources of the application are resettable.
The sink connector throws an exception if a write does not succeed, causing the application to fail and recover. This is the default behavior. The internal Kafka client can be configured to retry writes before declaring them failed by setting the retries
property to a value larger than zero (the default). You can also configure the sink to log write only failures by calling setLogFailuresOnly(true)
on the sink object. Note that this will void any output guarantees of the application.
The sink connector waits for Kafka to acknowledge in-flight records before completing its checkpoint. This is the default behavior. By calling setFlushOnCheck point(false)
on the sink object, you can disable this waiting. However, this will also disable any output guarantees.
Kafka 0.11 introduced support for transactional writes. Due to this feature, Flink’s Kafka sink is also able to provide exactly-once output guarantees given that the sink and Kafka are properly configured. Again, a Flink application must enable checkpointing and consume from resettable sources. Moreover, FlinkKafkaProducer
provides a constructor with a Semantic
parameter that controls the consistency guarantees provided by the sink. Possible consistency values are:
Semantic.NONE
, which provides no guarantees—records might be lost or written multiple times.
Semantic.AT_LEAST_ONCE
, which guarantees that no write is lost but might be duplicated. This is the default setting.
Semantic.EXACTLY_ONCE
, which builds on Kafka’s transactions to write each record exactly once.
There are a few things to consider when running a Flink application with a Kafka sink that operates in exactly-once mode, and it helps to roughly understand how Kafka processes transactions. In a nutshell, Kafka’s transactions work by appending all messages to the log of a partition and marking messages of open transactions as uncommitted. Once a transaction is committed, the markers are changed to committed. A consumer that reads from a topic can be configured with an isolation level (via the isolation.level
property) to declare whether it can read uncommitted messages (read_uncommitted
, the default) or not (read_committed
). If the consumer is configured to read_committed
, it stops consuming from a partition once it encounters an uncommitted message and resumes when the message is committed. Hence, open transactions can block consumers from reading a partition and introduce significant delays. Kafka guards against this by rejecting and closing transactions after a timeout interval, which is configured with the transaction.timeout.ms
property.
In the context of Flink’s Kafka sink, this is important because transactions that time out—due to long recovery cycles, for example—lead to data loss. So, it is crucial to configure the transaction timeout property appropriately. By default, the Flink Kafka sink sets transaction.timeout.ms
to one hour, which means you probably need to adjust the transaction.max.timeout.ms
property of your Kafka setup, which is set to 15 minutes by default. Moreover, the visibility of committed messages depends on the checkpoint interval of a Flink application. Refer to the Flink documentation to learn about a few other corner cases when enabling exactly-once consistency.
The default configuration of a Kafka cluster can still lead to data loss, even after a write is acknowledged. You should carefully revise the configuration of your Kafka setup, paying special attention to the following parameters:
acks
log.flush.interval.messages
log.flush.interval.ms
log.flush.*
We refer you to the Kafka documentation for details about its configuration parameters and guidelines for a suitable configuration.
When writing messages to a Kafka topic, a Flink Kafka sink task can choose to which partition of the topic to write. FlinkKafkaPartitioner
can be defined in some constructors of the Flink Kafka sink. If not specified, the default partitioner maps each sink task to a single Kafka partition—all records emitted by the same sink task are written to the same partition and a single partition may contain the records of multiple sink tasks if there are more tasks than partitions. If the number of partitions is larger than the number of subtasks, the default configuration results in empty partitions, which can cause problems for Flink applications consuming the topic in event-time mode.
By providing a custom FlinkKafkaPartitioner
, you can control how records are routed to topic partitions. For example, you can create a partitioner based on a key attribute of the records or a round-robin partitioner for even distribution. There is also the option to delegate the partitioning to Kafka based on the message key. This requires a KeyedSerializationSchema
in order to extract the message keys and configure the FlinkKafkaPartitioner
parameter with null
to disable the default partitioner.
Finally, Flink’s Kafka sink can be configured to write message timestamps as supported since Kafka 0.10. Writing the event-time timestamp of a record to Kafka is enabled by calling setWriteTimestampToKafka(true)
on the sink object.
Filesystems are commonly used to store large amounts of data in a cost-efficient way. In big data architectures they often serve as data source and data sink for batch processing applications. In combination with advanced file formats, such as Apache Parquet or Apache ORC, filesystems can efficiently serve analytical query engines such as Apache Hive, Apache Impala, or Presto. Therefore, filesystems are commonly used to “connect” streaming and batch applications.
Apache Flink features a resettable source connector to ingest data in files as streams. The filesystem source is part of the flink-streaming-java
module. Hence, you do not need to add any other dependency to use this feature. Flink supports different types of filesystems, such as local filesystems (including locally mounted NFS or SAN shares, Hadoop HDFS, Amazon S3, and OpenStack Swift FS). Refer to “Filesystem Configuration” to learn how to configure filesystems in Flink. Example 8-3 shows how to ingest a stream by reading text files line-wise.
val
lineReader
=
new
TextInputFormat
(
null
)
val
lineStream
:
DataStream
[
String
]
=
env
.
readFile
[
String
](
lineReader
,
// The FileInputFormat
"hdfs:///path/to/my/data"
,
// The path to read
FileProcessingMode
.
PROCESS_CONTINUOUSLY
,
// The processing mode
30000L
)
// The monitoring interval in ms
The arguments of the StreamExecutionEnvironment.readFile()
method are:
A FileInputFormat
, which is responsible for reading the content of the files. We discuss the details of this interface later in this section. The null
parameter of TextInputFormat
in Example 8-3 defines the path that is separately set.
The path that should be read. If the path refers to a file, the single file is read. If it refers to a directory, FileInputFormat
scans the directory for files to read.
The mode in which the path should be read. The mode can either be PROCESS_ONCE
or PROCESS_CONTINUOUSLY
. In PROCESS_ONCE
mode, the read path is scanned once when the job is started and all matching files are read. In PROCESS_CONTINUOUSLY
, the path is periodically scanned (after an initial scan) and new and modified files are continuously read.
The interval in milliseconds in which the path is periodically scanned. The parameter is ignored in PROCESS_ONCE
mode.
FileInputFormat
is a specialized InputFormat
to read files from a filesystem.4 A FileInputFormat
reads files in two steps. First it scans a filesystem path and creates so-called input splits for all matching files. An input split defines a range on a file, typically via a start offset and a length. After dividing a large file into multiple splits, the splits can be distributed to multiple reader tasks to read the file in parallel. Depending on the encoding of a file, it can be necessary to only generate a single split to read the file as a whole. The second step of a FileInputFormat
is to receive an input split, read the file range that is defined by the split, and return all corresponding records.
A FileInputFormat
used in a DataStream application should also implement the CheckpointableInputFormat
interface, which defines methods to checkpoint and reset the the current reading position of an InputFormat
within a file split. The filesystem source connector provides only at-least-once guarantees when checkpointing is enabled if the FileInputFormat
does not implement the CheckpointableInputFormat
interface because the input format will start reading from the beginning of the split that was processed when the last complete checkpoint was taken.
In version 1.7, Flink provides a few classes that extend FileInputFormat
and implement CheckpointableInputFormat
. TextInputFormat
reads text files line-wise (split by newline characters), subclasses of CsvInputFormat
read files with comma-separated values, and AvroInputFormat
reads files with Avro-encoded records.
In PROCESS_CONTINUOUSLY
mode, the filesystem source connector identifies new files based on their modification timestamp. This means a file is completely reprocessed if it is modified because its modification timestamp changes. This includes modifications due to appending writes. Therefore, a common technique to continuously ingest files is to write them in a temporary directory and atomically move them to the monitored directory once they are finalized. When a file is completely ingested and a checkpoint completed, it can be removed from the directory. Monitoring ingested files by tracking the modification timestamp also has implications if you read from file stores with eventually consistent list operations, such as S3. Since files might not appear in order of their modification timestamps, they may be ignored by the filesystem source connector.
Note that in PROCESS_ONCE
mode, no checkpoints are taken after the filesystem path is scanned and all splits are created.
If you want to use a filesystem source connector in an event-time application, you should be aware that it can be challenging to generate watermarks since input splits are generated in a single process and round-robin distributed to all parallel readers that process them in order of the modification timestamp of the file. In order to generate satisfying watermarks you need to reason about the smallest timestamp of a record that is included in a split later processed by the task.
Writing a stream into files is a common requirement, for example, to prepare data with low latency for offline ad-hoc analysis. Since most applications can only read files once they are finalized and streaming applications run for long periods of time, streaming sink connectors typically chunk their output into multiple files. Moreover, it is common to organize records into so-called buckets, so that consuming applications have more control over which data to read.
Like the filesystem source connector, Flink’s StreamingFileSink
connector is contained in the flink-streaming-java
module. Hence, you do not need to add a dependency to your build file to use it.
StreamingFileSink
provides end-to-end exactly-once guarantees for an application given that the application is configured with exactly-once checkpoints and all its sources are reset in the case of a failure. We will discuss the recovery mechanism in more detail later in this section. Example 8-4 shows how to create a StreamingFileSink
with minimal configuration and append it to a stream.
val
input
:
DataStream
[
String
]
=
…
val
sink
:
StreamingFileSink
[
String
]
=
StreamingFileSink
.
forRowFormat
(
new
Path
(
"/base/path"
),
new
SimpleStringEncoder
[
String
](
"UTF-8"
))
.
build
()
input
.
addSink
(
sink
)
When a StreamingFileSink
receives a record, the record is assigned to a bucket. A bucket is a subdirectory of the base path that is configured with the builder of StreamingFileSink
—"/base/path"
in Example 8-4.
The bucket is chosen by a BucketAssigner
, which is a public interface and returns for every record a BucketId
that determines the directory to which the record will be written. The BucketAssigner
can be configured on the builder using the withBucketAssigner()
method. If no BucketAssigner
is explicitly specified, it uses a DateTimeBucketAssigner
that assigns records to hourly buckets based on the processing time when they are written.
Each bucket directory contains multiple part files that are concurrently written by multiple parallel instances of the StreamingFileSink
. Moreover, each parallel instance chunks its output into multiple part files. The path of a part file has the following format:
[base-path]/[bucket-path]/part-[task-idx]-[id]
For example, given a base path of "/johndoe/demo"
and a part prefix of "part"
, the path "/johndoe/demo/2018-07-22--17/part-4-8"
points to the eight file that was written by the fifth (0-indexed) sink task to bucket "2018-07-22--17"
—the 5 p.m. bucket of July 22, 2018.
Nonconsecutive file IDs, the last number in a committed file’s name, do not indicate data loss. StreamingFileSink
simply increments the file IDs. When discarding pending files it does not reuse their IDs.
A RollingPolicy
determines when a task creates a new part file. You can configure the RollingPolicy
with the withRollingPolicy()
method on the builder. By default, StreamingFileSink
uses a DefaultRollingPolicy
, which is configured to roll part files when they exceed 128 MB or are older than 60 seconds. You can also configure an inactivity interval after which a part file is rolled.
StreamingFileSink
supports two modes of writing records to part files: row encoding and bulk encoding. In row encoding mode, every record is individually encoded and appended to a part file. In bulk encoding, records are collected and written in batches. Apache Parquet, which organizes and compresses records in a columnar format, is a file format that requires bulk encoding.
Example 8-4 creates a StreamingFileSink
with row encoding by providing an Encoder
that writes single records to a part file. In Example 8-4, we use a SimpleStringEncoder
, which calls the toString()
method of the record and writes the String
representation of the record to the file. Encoder
is a simple interface with a single method that can be easily implemented.
A bulk-encoding StreamingFileSink
is created as shown in Example 8-5.
val
input
:
DataStream
[
String
]
=
…
val
sink
:
StreamingFileSink
[
String
]
=
StreamingFileSink
.
forBulkFormat
(
new
Path
(
"/base/path"
),
ParquetAvroWriters
.
forSpecificRecord
(
classOf
[
AvroPojo
]))
.
build
()
input
.
addSink
(
sink
)
A StreamingFileSink
in bulk-encoding mode requires a BulkWriter.Factory
. In Example 8-5 we use a Parquet writer for Avro files. Note that the Parquet writer is contained in the flink-parquet
module, which needs to be added as a dependency. As usual, BulkWriter.Factory
is an interface that can be implemented for custom file formats, such as Apache Orc.
A StreamingFileSink in bulk-encoding mode cannot choose a RollingPolicy
. Bulk-encoding formats can only be combined with the OnCheckpointRollingPolicy
, which rolls in-progress part files on every checkpoint.
StreamingFileSink
provides exactly-once output guarantees. The sink achieves this by a commit protocol that moves files through different stages, in progress, pending, and finished, and that is based on Flink’s checkpointing mechanism. While a sink writes to a file, the file is in the in-progress state. When the RollingPolicy
decides to roll a file, it is closed and moved into the pending state by renaming it. Pending files are moved into the finished state (again by renaming) when the next checkpoint completes.
In some situations, a pending file is never committed. The StreamingFileSink
ensures this does not result in data loss. However, these files are not automatically cleaned up.
Before manually deleting a pending file, you need to check whether it is lingering or about to be committed. Once you find a committed file with the same task index and a higher ID, you can safely remove a pending file.
In the case of a failure, a sink task needs to reset its current in-progress file to its writing offset at the last successful checkpoint. This is done by closing the current in-progress file and discarding the invalid part at the file’s end, for example, by using the filesystem’s truncate operation.
Apache Cassandra is a popular, scalable, and highly available column store database system. Cassandra models datasets as tables of rows that consist of multiple typed columns. One or more columns have to be defined as (composite) primary keys. Each row can be uniquely identified by its primary key. Among other APIs, Cassandra features the Cassandra Query Language (CQL), a SQL-like language to read and write records and create, modify, and delete database objects, such as keyspaces and tables.
Flink provides a sink connector to write data streams to Cassandra. Cassandra’s data model is based on primary keys, and all writes to Cassandra happen with upsert semantics. In combination with exactly-once checkpointing, resettable sources, and deterministic application logic, upsert writes yield eventually exactly-once output consistency. The output is only eventually consistent, because results are reset to a previous version during recovery, meaning consumers might read older results than read previously. Also, the versions of values for multiple keys might be out of sync.
In order to prevent temporal inconsistencies during recovery and provide exactly-once output guarantees for applications with nondeterministic application logic, Flink’s Cassandra connector can be configured to leverage a WAL. We will discuss the WAL mode in more detail later in this section. The following code shows the dependency you need to add to the build file of your application in order to use the Cassandra sink connector:
<dependency>
<groupId>
org.apache.flink</groupId>
<artifactId>
flink-connector-cassandra_2.12</artifactId>
<version>
1.7.1</version>
</dependency>
To illustrate the use of the Cassandra sink connector, we use the simple example of a Cassandra table that holds data about sensor readings and consists of two columns, sensorId
and temperature
. The CQL statements in Example 8-6 create a keyspace “example” and a table “sensors” in that keyspace.
CREATE KEYSPACE IF NOT EXISTS example WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}; CREATE TABLE IF NOT EXISTS example.sensors ( sensorId VARCHAR, temperature FLOAT, PRIMARY KEY(sensorId) );
Flink provides different sink implementations to write data streams of different data types to Cassandra. Flink’s Java tuples and Row
type and Scala’s built-in tuples and case classes are handled differently than user-defined POJO types. We discuss both cases separately. Example 8-7 shows how to create a sink that writes a DataStream
of tuples, case classes, or rows into a Cassandra table. In this example, a DataStream[(String, Float)]
is written into the “sensors” table.
val
readings
:
DataStream
[(
String
,Float
)]
=
???
val
sinkBuilder
:
CassandraSinkBuilder
[(
String
,Float
)]
=
CassandraSink
.
addSink
(
readings
)
sinkBuilder
.
setHost
(
"localhost"
)
.
setQuery
(
"INSERT INTO example.sensors(sensorId, temperature) VALUES (?, ?);"
)
.
build
()
Cassandra sinks are created and configured using a builder that is obtained by calling the CassandraSink.addSink()
method with the DataStream
object that should be emitted. The method returns the right builder for the data type of the DataStream
. In Example 8-7, it returns a builder for a Cassandra sink that handles Scala tuples.
The Cassandra sink builders for tuples, case classes, and rows require the specification of a CQL INSERT query.5 The query is configured using the CassandraSinkBuilder.setQuery()
method. During execution, the sink registers the query as a prepared statement and converts the fields of tuples, case classes, or rows into parameters for the prepared statement. The fields are mapped to the parameters based on their position; the first value is converted to the first parameter and so on.
Since POJO fields do not have a natural order, they need to be treated differently. Example 8-8 shows how to configure a Cassandra sink for a POJO of type SensorReading
.
val
readings
:
DataStream
[
SensorReading
]
=
???
CassandraSink
.
addSink
(
readings
)
.
setHost
(
"localhost"
)
.
build
()
As you can see in Example 8-8, we do not specify an INSERT query. Instead, POJOs are handed to Cassandra’s Object Mapper, which automatically maps POJO fields to fields of a Cassandra table. In order for this to work, the POJO class and its fields need to be annotated with Cassandra annotations and provide setters and getters for all fields as shown in Example 8-9. The default constructor is required by Flink as mentioned in “Supported Data Types” when discussing supported data types.
@Table
(
keyspace
=
"example"
,
name
=
"sensors"
)
class
SensorReadings
(
@Column
(
name
=
"sensorId"
)
var
id
:
String
,
@Column
(
name
=
"temperature"
)
var
temp
:
Float
)
{
def
this
()
=
{
this
(
""
,
0.0
)
}
def
setId
(
id
:
String
)
:
Unit
=
this
.
id
=
id
def
getId
:
String
=
id
def
setTemp
(
temp
:
Float
)
:
Unit
=
this
.
temp
=
temp
def
getTemp
:
Float
=
temp
}
In addition to the configuration options in Figures 8-7 and 8-8, a Cassandra sink builder provides a few more methods to configure the sink connector:
setClusterBuilder(ClusterBuilder)
: The ClusterBuilder
builds a Cassandra Cluster
that manages the connection to Cassandra. Among other options, it can configure the hostnames and ports of one or more contact points; define load balancing, retry, and reconnection policies; and provide access credentials.
setHost(String, [Int])
: This method is a shortcut for a simple ClusterBuilder
configured with the hostname and port of a single contact point. If no port is configured, Cassandra’s default port 9042 is used.
setQuery(String)
: This specifies the CQL INSERT query to write tuples, case classes, or rows to Cassandra. A query must not be configured to emit POJOs.
setMapperOptions(MapperOptions)
: This provides options for Cassandra’s Object Mapper, such as configurations for consistency, time-to-live (TTL), and null field handling. The options are ignored if the sink emits tuples, case classes, or rows.
enableWriteAheadLog([CheckpointCommitter])
: This enables the WAL to provide exactly-once output guarantees in the case of nondeterministic application logic. CheckpointCommitter
is used to store information about completed checkpoints in an external datastore. If no CheckpointCommitter
is configured, the information is written into a specific Cassandra table.
The Cassandra sink connector with WAL is implemented based on Flink’s GenericWriteAheadSink
operator. How this operator works, including the role of the CheckpointCommitter
, and which consistency guarantees it provides, is described in more detail in “Transactional Sink Connectors”.
The DataStream API provides two interfaces to implement source connectors along with corresponding RichFunction
abstract classes:
SourceFunction
and RichSourceFunction
can be used to define nonparallel source connectors—sources that run with a single task.ParallelSourceFunction
and RichParallelSourceFunction
can be used to define source connectors that run with multiple parallel task instances.With the exception of being nonparallel and parallel, both interfaces are identical. Just like the rich variants of processing functions,6 subclasses of RichSourceFunction
and RichParallelSourceFunction
can override the open()
and close()
methods and access a RuntimeContext
that provides the number of parallel task instances and the index of the current instance, among other things.
SourceFunction
and ParallelSourceFunction
define two methods:
void run(SourceContext<T> ctx)
void cancel()
The run()
method is doing the actual work of reading or receiving records and ingesting them into a Flink application. Depending on the system from which the data is received, the data might be pushed or pulled. The run()
method is called once by Flink and runs in a dedicated source thread, typically reading or receiving data and emitting records in an endless loop (infinite stream). The task can be explicitly canceled at some point in time or terminated in the case of a finite stream when the input is fully consumed.
The cancel()
method is invoked by Flink when the application is canceled and shut down. In order to perform a graceful shutdown, the run()
method, which runs in a separate thread, should terminate as soon as the cancel()
method is called. Example 8-10 shows a simple source function that counts from 0 to Long.MaxValue
.
class
CountSource
extends
SourceFunction
[
Long
]
{
var
isRunning
:
Boolean
=
true
override
def
run
(
ctx
:
SourceFunction.SourceContext
[
Long
])
=
{
var
cnt
:
Long
=
-
1
while
(
isRunning
&&
cnt
<
Long
.
MaxValue
)
{
cnt
+=
1
ctx
.
collect
(
cnt
)
}
}
override
def
cancel
()
=
isRunning
=
false
}
Earlier in this chapter we explained that Flink can only provide satisfying consistency guarantees for applications that use source connectors that can replay their output data. A source function can replay its output if the external system that provides the data exposes an API to retrieve and reset a reading offset. Examples of such systems are filesystems that provide the offset of a file stream and a seek method to move a file stream to a specific position or Apache Kafka, which provides offsets for each partition of a topic and can set the reading position of a partition. A counterexample is a source connector that reads data from a network socket, which immediately discards delivered the data.
A source function that supports output replay needs to be integrated with Flink’s checkpointing mechanism and must persist all current reading positions when a checkpoint is taken. When the application is started from a savepoint or recovers from a failure, the reading offsets are retrieved from the latest checkpoint or savepoint. If the application is started without existing state, the reading offsets must be set to a default value. A resettable source function needs to implement the CheckpointedFunction
interface and should store the reading offsets and all related meta information, such as file paths or partition ID, in operator list state or operator union list state depending on how the offsets should be distributed to parallel task instances in the case of a rescaled application. See “Scaling Stateful Operators” for details on the distribution behavior of operator list state and union list state.
In addition, it is important to ensure that the SourceFunction.run()
method, which runs in a separate thread, does not advance the reading offset and emit data while a checkpoint is taken; in other words, while the CheckpointedFunction.snapshotState()
method is called. This is done by guarding the code in run()
that advances the reading position and emits records in a block that synchronizes on a lock object, which is obtained from the SourceContext.getCheckpointLock()
method. Example 8-11 makes the CountSource
of Example 8-10 resettable.
class
ResettableCountSource
extends
SourceFunction
[
Long
]
with
CheckpointedFunction
{
var
isRunning
:
Boolean
=
true
var
cnt
:
Long
=
_
var
offsetState
:
ListState
[
Long
]
=
_
override
def
run
(
ctx
:
SourceFunction.SourceContext
[
Long
])
=
{
while
(
isRunning
&&
cnt
<
Long
.
MaxValue
)
{
// synchronize data emission and checkpoints
ctx
.
getCheckpointLock
.
synchronized
{
cnt
+=
1
ctx
.
collect
(
cnt
)
}
}
}
override
def
cancel
()
=
isRunning
=
false
override
def
snapshotState
(
snapshotCtx
:
FunctionSnapshotContext
)
:
Unit
=
{
// remove previous cnt
offsetState
.
clear
()
// add current cnt
offsetState
.
add
(
cnt
)
}
override
def
initializeState
(
initCtx
:
FunctionInitializationContext
)
:
Unit
=
{
val
desc
=
new
ListStateDescriptor
[
Long
](
"offset"
,
classOf
[
Long
])
offsetState
=
initCtx
.
getOperatorStateStore
.
getListState
(
desc
)
// initialize cnt variable
val
it
=
offsetState
.
get
()
cnt
=
if
(
null
==
it
||
!
it
.
iterator
().
hasNext
)
{
-
1L
}
else
{
it
.
iterator
().
next
()
}
}
}
Another important aspect of source functions are timestamps and watermarks. As pointed out in “Event-Time Processing” and “Assigning Timestamps and Generating Watermarks”, the DataStream API provides two options to assign timestamps and generate watermarks. Timestamps and watermarks can be assigned and generate by a dedicated TimestampAssigner
(see “Assigning Timestamps and Generating Watermarks” for details) or be assigned and generated by a source function.
A source function assigns timestamps and emits watermarks through its SourceContext
object. SourceContext
provides the following methods:
def collectWithTimestamp(T record, long timestamp): Unit
def emitWatermark(Watermark watermark): Unit
collectWithTimestamp()
emits a record with its associated timestamp and emitWatermark()
emits the provided watermark.
Besides removing the need for an additional operator, assigning timestamps and generating watermarks in a source function can be beneficial if one parallel instance of a source function consumes records from multiple stream partitions, such as partitions of a Kafka topic. Typically, external systems, such as Kafka, only guarantee message order within a stream partition. Given the case of a source function operator that runs with a parallelism of two and that reads data from a Kafka topic with six partitions, each parallel instance of the source function will read records from three Kafka topic partitions. Consequently, each instance of the source function multiplexes the records of three stream partitions to emit them. Multiplexing records most likely introduces additional out-of-orderness with respect to the event-time timestamps such that a downstream timestamp assigner might produce more late records than expected.
To avoid such behavior, a source function can generate watermarks for each stream partition independently and always emit the smallest watermark of its partitions as its watermark. This way, it can ensure that the order guarantees on each partition are leveraged and no unnecessary late records are emitted.
Another problem source functions have to deal with are instances that become idle and do not emit anymore data. This can be very problematic, because it may prevent the whole application from advancing its watermarks and hence lead to a stalling application. Since watermarks should be data driven, a watermark generator (either integrated in a source function or in a timestamp assigner) will not emit new watermarks if it does not receive input records. If you look at how Flink propagates and updates watermarks (see “Watermark Propagation and Event Time”), you can see that a single operator that does not advance watermarks can grind all watermarks of an application to a halt if the application involves a shuffle operation (keyBy()
, rebalance()
, etc.).
Flink provides a mechanism to avoid such situations by marking source functions as temporarily idle. While being idle, Flink’s watermark propagation mechanism will ignore the idle stream partition. The source is automatically set as active as soon as it starts to emit records again. A source function can decide when to mark itself as idle and does so by calling the method SourceContext.markAsTemporarilyIdle()
.
In Flink’s DataStream API, any operator or function can send data to an external system or application. A DataStream
does not have to eventually flow into a sink operator. For instance, you could implement a FlatMapFunction
that emits each incoming record via an HTTP POST call and not via its Collector
. Nonetheless, the DataStream API provides a dedicated SinkFunction
interface and a corresponding RichSinkFunction
abstract class.7 The SinkFunction
interface provides a single method:
void invoke(IN value, Context ctx)
The Context
object of SinkFunction
provides access to the current processing time, the current watermark (i.e., the current event time at the sink), and the timestamp of the record.
Example 8-12 shows a simple SinkFunction
that writes sensor readings to a socket. Note that you need to start a process that listens on the socket before starting the program. Otherwise, the program fails with a ConnectException
because a connection to the socket could not be opened. Run the command nc -l localhost 9191
on Linux to listen on localhost:9191.
val
readings
:
DataStream
[
SensorReading
]
=
???
// write the sensor readings to a socket
readings
.
addSink
(
new
SimpleSocketSink
(
"localhost"
,
9191
))
// set parallelism to 1 because only one thread can write to a socket
.
setParallelism
(
1
)
// -----
class
SimpleSocketSink
(
val
host
:
String
,
val
port
:
Int
)
extends
RichSinkFunction
[
SensorReading
]
{
var
socket
:
Socket
=
_
var
writer
:
PrintStream
=
_
override
def
open
(
config
:
Configuration
)
:
Unit
=
{
// open socket and writer
socket
=
new
Socket
(
InetAddress
.
getByName
(
host
),
port
)
writer
=
new
PrintStream
(
socket
.
getOutputStream
)
}
override
def
invoke
(
value
:
SensorReading
,
ctx
:
SinkFunction.Context
[
_
])
:
Unit
=
{
// write sensor reading to socket
writer
.
println
(
value
.
toString
)
writer
.
flush
()
}
override
def
close
()
:
Unit
=
{
// close writer and socket
writer
.
close
()
socket
.
close
()
}
}
As discussed, the end-to-end consistency guarantees of an application depend on the properties of its sink connectors. In order to achieve end-to-end exactly-once semantics, an application requires either idempotent or transactional sink connectors. The SinkFunction
in Example 8-12 neither performs idempotent writes nor features transactional writes. Due to the append-only characteristic of a socket, it is not possible to perform idempotent writes. Since a socket does not have built-in transactional support, transactional writes can only be done using Flink’s generic WAL sink. In the following sections, you will learn how to implement idempotent or transactional sink connectors.
For many applications, the SinkFunction
interface is sufficient to implement an idempotent sink connector. This is possible if the following two properties hold:
The result data has a deterministic (composite) key, on which idempotent updates can be performed. For an application that computes the average temperature per sensor and minute, a deterministic key could be the ID of the sensor and the timestamp for each minute. Deterministic keys are important to ensure all writes are correctly overwritten in case of a recovery.
The external system supports updates per key, such as a relational database system or a key-value store.
Example 8-13 illustrates how to implement and use an idempotent SinkFunction
that writes to a JDBC database, in this case an embedded Apache Derby database.
val
readings
:
DataStream
[
SensorReading
]
=
???
// write the sensor readings to a Derby table
readings
.
addSink
(
new
DerbyUpsertSink
)
// -----
class
DerbyUpsertSink
extends
RichSinkFunction
[
SensorReading
]
{
var
conn
:
Connection
=
_
var
insertStmt
:
PreparedStatement
=
_
var
updateStmt
:
PreparedStatement
=
_
override
def
open
(
parameters
:
Configuration
)
:
Unit
=
{
// connect to embedded in-memory Derby
conn
=
DriverManager
.
getConnection
(
"jdbc:derby:memory:flinkExample"
,
new
Properties
())
// prepare insert and update statements
insertStmt
=
conn
.
prepareStatement
(
"INSERT INTO Temperatures (sensor, temp) VALUES (?, ?)"
)
updateStmt
=
conn
.
prepareStatement
(
"UPDATE Temperatures SET temp = ? WHERE sensor = ?"
)
}
override
def
invoke
(
r
:
SensorReading
,
context
:
Context
[
_
])
:
Unit
=
{
// set parameters for update statement and execute it
updateStmt
.
setDouble
(
1
,
r
.
temperature
)
updateStmt
.
setString
(
2
,
r
.
id
)
updateStmt
.
execute
()
// execute insert statement if update statement did not update any row
if
(
updateStmt
.
getUpdateCount
==
0
)
{
// set parameters for insert statement
insertStmt
.
setString
(
1
,
r
.
id
)
insertStmt
.
setDouble
(
2
,
r
.
temperature
)
// execute insert statement
insertStmt
.
execute
()
}
}
override
def
close
()
:
Unit
=
{
insertStmt
.
close
()
updateStmt
.
close
()
conn
.
close
()
}
}
Since Apache Derby does not provide a built-in UPSERT
statement, the example sink performs UPSERT
writes by first trying to update a row and inserting a new row if no row with the given key exists. The Cassandra sink connector follows the same approach when the WAL is not enabled.
Whenever an idempotent sink connector is not suitable, either the characteristics of the application’s output, the properties of the required sink system, or due to stricter consistency requirements, transactional sink connectors can be an alternative. As described before, transactional sink connectors need to be integrated with Flink’s checkpointing mechanism because they may only commit data to the external system when a checkpoint completes successfully.
In order to ease the implementation of transactional sinks, Flink’s DataStream API provides two templates that can be extended to implement custom sink operators. Both templates implement the CheckpointListener
interface to receive notifications from the JobManager about completed checkpoints (see “Receiving Notifications About Completed Checkpoints” for details about the interface):
The GenericWriteAheadSink
template collects all outgoing records per checkpoint and stores them in the operator state of the sink task. The state is checkpointed and recovered in the case of a failure. When a task receives a checkpoint completion notification, it writes the records of the completed checkpoints to the external system. The Cassandra sink connector with WAL-enabled implements this interface.
The TwoPhaseCommitSinkFunction
template leverages transactional features of the external sink system. For every checkpoint, it starts a new transaction and writes all following records to the sink system in the context of the current transaction. The sink commits a transaction when it receives the completion notification of the corresponding checkpoint.
In the following, we describe both interfaces and their consistency guarantees.
GenericWriteAheadSink
eases the implementation of sink operators with improved consistency properties. The operator is integrated with Flink’s checkpointing mechanism and aims to write each record exactly once to an external system. However, you should be aware that failure scenarios exist in which a write-ahead log sink emits records more than once. Hence, a GenericWriteAheadSink
does not provide bulletproof exactly-once guarantees but only at-least-once guarantees. We will discuss these scenarios in more detail later in this section.
GenericWriteAheadSink
works by appending all received records to a write-ahead log that is segmented by checkpoints. Every time the sink operator receives a checkpoint barrier, it starts a new section and all the following records are appended to the new section. The WAL is stored and checkpointed as operator state. Since the log will be recovered, no records will be lost in the case of a failure.
When GenericWriteAheadSink
receives a notification about a completed checkpoint, it emits all records that are stored in the WAL in the segment corresponding to the successful checkpoint. Depending on the concrete implementation of the sink operator, the records can be written to any kind of storage or message system. When all records have been successfully emitted, the corresponding checkpoint must be internally committed.
A checkpoint is committed in two steps. First, the sink persistently stores the information that the checkpoint was committed and secondly it removes the records from the WAL. It is not possible to store the commit information in Flink’s application state because it is not persistent and would be reset in case of a failure. Instead, GenericWriteAheadSink
relies on a pluggable component called CheckpointCommitter
to store and look up information about committed checkpoints in an external persistent storage. For example, the Cassandra sink connector by default uses a CheckpointCommitter
that writes to Cassandra.
Thanks to the built-in logic of GenericWriteAheadSink
, it is not difficult to implement a sink that leverages a WAL. Operators that extend GenericWriteAheadSink
need to provide three constructor parameters:
CheckpointCommitter
as discussed beforeTypeSerializer
to serialize the input recordsCheckpointCommitter
to identify commit information across application restartsMoreover, the write-ahead operator needs to implement a single method:
boolean
sendValues
(
Iterable
<
IN
>
values
,
long
chkpntId
,
long
timestamp
)
GenericWriteAheadSink
calls the sendValues()
method to write the records of a completed checkpoint to the external storage system. The method receives an Iterable
over all records of a checkpoint, the ID of the checkpoint, and the timestamp of when the checkpoint was taken. The method must return true
if all writes succeeded and false
if a write failed.
Example 8-14 shows the implementation of a write-ahead sink that writes to the standard output. It uses FileCheckpointCommitter
, which we do not discuss here. You can look up its implementation in the repository that contains the examples of the book.
Note that GenericWriteAheadSink
does not implement the SinkFunction
interface. So, sinks that extend GenericWriteAheadSink
cannot be added using DataStream.addSink()
but are attached using the DataStream.transform()
method.
val
readings
:
DataStream
[
SensorReading
]
=
???
// write the sensor readings to the standard out via a write-ahead log
readings
.
transform
(
"WriteAheadSink"
,
new
SocketWriteAheadSink
)
// -----
class
StdOutWriteAheadSink
extends
GenericWriteAheadSink
[
SensorReading
](
// CheckpointCommitter that commits checkpoints to the local filesystem
new
FileCheckpointCommitter
(
System
.
getProperty
(
"java.io.tmpdir"
)),
// Serializer for records
createTypeInformation
[
SensorReading
]
.
createSerializer
(
new
ExecutionConfig
),
// Random JobID used by the CheckpointCommitter
UUID
.
randomUUID
.
toString
)
{
override
def
sendValues
(
readings
:
Iterable
[
SensorReading
],
checkpointId
:
Long
,
timestamp
:
Long
)
:
Boolean
=
{
for
(
r
<-
readings
.
asScala
)
{
// write record to standard out
println
(
r
)
}
true
}
}
The examples repository contains an application that fails and recovers in regular intervals to demonstrate the behavior of StdOutWriteAheadSink
and a regular DataStream.print()
sink in case of failures.
As mentioned earlier, GenericWriteAheadSink
cannot provide bulletproof exactly-once guarantees. There are two failure cases that can result in records being emitted more than once:
The program fails while a task is running the sendValues()
method. If the external sink system cannot atomically write multiple records—either all or none—some records might have been written and others not. Since the checkpoint was not committed yet, the sink will write all records again during recovery.
All records are correctly written and the sendValues()
method returns true; however, the program fails before CheckpointCommitter
is called or CheckpointCommitter
fails to commit the checkpoint. During recovery, all records of not-yet-committed checkpoints will be written again.
Note that these failure scenarios do not affect the exactly-once guarantees of the Cassandra sink connector because it performs UPSERT writes. The Cassandra sink connector benefits from the WAL because it guards from nondeterministic keys and prevents inconsistent writes to Cassandra.
Flink provides the TwoPhaseCommitSinkFunction
interface to ease the implementation of sink functions that provide end-to-end exactly-once guarantees. However, whether a 2PC sink function provides such guarantees or not depends on the implementation details. We start the discussion of this interface with a question: “Isn’t the 2PC protocol too expensive?”
In general, 2PCs are an expensive approach to ensuring consistency in a distributed system. However, in the context of Flink, the protocol is only run once for every checkpoint. Moreover, the protocol of TwoPhaseCommitSinkFunction
piggybacks on Flink’s regular checkpointing mechanism and thus adds little overhead. The TwoPhaseCommitSinkFunction
works quite similar to the WAL sink, but it does not collect records in Flink’s application state; rather, it writes them in an open transaction to an external sink system.
The TwoPhaseCommitSinkFunction
implements the following protocol. Before a sink task emits its first record, it starts a transaction on the external sink system. All subsequently received records are written in the context of the transaction. The voting phase of the 2PC protocol starts when the JobManager initiates a checkpoint and injects barriers in the sources of the application. When an operator receives the barrier, it checkpoints it state and sends an acknowledgment message to the JobManager once it is done. When a sink task receives the barrier, it persists its state, prepares the current transaction for committing, and acknowledges the checkpoint at the JobManager. The acknowledgment messages to the JobManager are analogous to the commit vote of the textbook 2PC protocol. The sink task must not yet commit the transaction, because it is not guaranteed that all tasks of the job will complete their checkpoints. The sink task also starts a new transaction for all records that arrive before the next checkpoint barrier.
When the JobManager receives successful checkpoint notifications from all task instances, it sends the checkpoint completion notification to all interested tasks. This notification corresponds to the 2PC protocol’s commit command. When a sink task receives the notification, it commits all open transactions of previous checkpoints.8 Once a sink task acknowledges its checkpoint, it must be able to commit the corresponding transaction, even in the case of a failure. If the transaction cannot be committed, the sink loses data. An iteration of the 2PC protocol succeeds when all sink tasks committed their transactions.
Let’s summarize the requirements for the external sink system:
The external sink system must provide transactional support or the sink must be able to emulate transactions on the external system. Hence, the sink should be able to write to the sink system, but the written data must not be made visible to the outside before it is committed.
A transaction must be open and accept writes for the duration of a checkpoint interval.
A transaction must wait to be committed until a checkpoint completion notification is received. In the case of a recovery cycle, this may take some time. If the sink system closes a transaction (e.g., with a timeout), the not committed data will be lost.
The sink must be able to recover a transaction after a process failed. Some sink systems provide a transaction ID that can be used to commit or abort an open transaction.
Committing a transaction must be an idempotent operation—the sink or external system should be able to notice that a transaction was already committed or a repeated commit must have no effect.
The protocol and the requirements of the sink system might be easier to understand by looking at a concrete example. Example 8-15 shows a TwoPhaseCommitSinkFunction
that writes with exactly-once guarantees to a filesystem. Essentially, this is a simplified version of the BucketingFileSink
discussed earlier.
class
TransactionalFileSink
(
val
targetPath
:
String
,
val
tempPath
:
String
)
extends
TwoPhaseCommitSinkFunction
[(
String
,Double
)
,String
,Void
](
createTypeInformation
[
String
].
createSerializer
(
new
ExecutionConfig
),
createTypeInformation
[
Void
].
createSerializer
(
new
ExecutionConfig
))
{
var
transactionWriter
:
BufferedWriter
=
_
/** Creates a temporary file for a transaction into which the records are
* written.
*/
override
def
beginTransaction
()
:
String
=
{
// path of transaction file is built from current time and task index
val
timeNow
=
LocalDateTime
.
now
(
ZoneId
.
of
(
"UTC"
))
.
format
(
DateTimeFormatter
.
ISO_LOCAL_DATE_TIME
)
val
taskIdx
=
this
.
getRuntimeContext
.
getIndexOfThisSubtask
val
transactionFile
=
s"
$timeNow
-
$taskIdx
"
// create transaction file and writer
val
tFilePath
=
Paths
.
get
(
s"
$tempPath
/
$transactionFile
"
)
Files
.
createFile
(
tFilePath
)
this
.
transactionWriter
=
Files
.
newBufferedWriter
(
tFilePath
)
println
(
s"Creating Transaction File:
$tFilePath
"
)
// name of transaction file is returned to later identify the transaction
transactionFile
}
/** Write record into the current transaction file. */
override
def
invoke
(
transaction
:
String
,
value
:
(
String
,
Double
),
context
:
Context
[
_
])
:
Unit
=
{
transactionWriter
.
write
(
value
.
toString
)
transactionWriter
.
write
(
' '
)
}
/** Flush and close the current transaction file. */
override
def
preCommit
(
transaction
:
String
)
:
Unit
=
{
transactionWriter
.
flush
()
transactionWriter
.
close
()
}
/** Commit a transaction by moving the precommitted transaction file
* to the target directory.
*/
override
def
commit
(
transaction
:
String
)
:
Unit
=
{
val
tFilePath
=
Paths
.
get
(
s"
$tempPath
/
$transaction
"
)
// check if the file exists to ensure that the commit is idempotent
if
(
Files
.
exists
(
tFilePath
))
{
val
cFilePath
=
Paths
.
get
(
s"
$targetPath
/
$transaction
"
)
Files
.
move
(
tFilePath
,
cFilePath
)
}
}
/** Aborts a transaction by deleting the transaction file. */
override
def
abort
(
transaction
:
String
)
:
Unit
=
{
val
tFilePath
=
Paths
.
get
(
s"
$tempPath
/
$transaction
"
)
if
(
Files
.
exists
(
tFilePath
))
{
Files
.
delete
(
tFilePath
)
}
}
}
TwoPhaseCommitSinkFunction[IN, TXN, CONTEXT]
has three type parameters:
IN
specifies the type of the input records. In Example 8-15, this is a Tuple2
with a String
and a Double
field.
TXN
defines a transaction identifier that can be used to identify and recover a transaction after a failure. In Example 8-15, this is a string holding the name of the transaction file.
CONTEXT
defines an optional custom context. TransactionalFileSink
in Example 8-15 does not need the context and hence sets the type to Void
.
The constructor of TwoPhaseCommitSinkFunction
requires two TypeSerializer
—one for the TXN
type and the other for the CONTEXT
type.
Finally, TwoPhaseCommitSinkFunction
defines five functions that need to be implemented:
beginTransaction()
: TXN
starts a new transaction and returns the transaction identifier. TransactionalFileSink
in Example 8-15 creates a new transaction file and returns its name as the identifier.
invoke(txn: TXN, value: IN, context: Context[_]): Unit
writes a value to the current transaction. The sink in Example 8-15 appends the value as a String
to the transaction file.
preCommit(txn: TXN): Unit
precommits a transaction. A precommitted transaction may not receive further writes. Our implementation in Example 8-15 flushes and closes the transaction file.
commit(txn: TXN): Unit
commits a transaction. This operation must be idempotent—records must not be written twice to the output system if this method is called twice. In Example 8-15, we check if the transaction file still exists and move it to the target directory if that is the case.
abort(txn: TXN): Unit
aborts a transaction. This method may also be called twice for a transaction. Our TransactionalFileSink
in Example 8-15 checks if the transaction file still exists and deletes it if that is the case.
As you can see, the implementation of the interface is not too involved. However, the complexity and consistency guarantees of an implementation depend on, among other things, the features and capabilities of the sink system. For instance, Flink’s Kafka producer implements the TwoPhaseCommitSinkFunction
interface. As mentioned before, the connector might lose data if a transaction is rolled back due to a timeout.9 Hence, it does not offer definitive exactly-once guarantees even though it implements the TwoPhaseCommitSinkFunction
interface.
Besides ingesting or emitting data streams, enriching a data stream by looking up information in a remote database is another common use case that requires interacting with an external storage system. An example is the well-known Yahoo! stream processing benchmark, which is based on a stream of advertisement clicks that need to be enriched with details about their corresponding campaign that are stored in a key-value store.
The straightforward approach for such use cases is to implement a MapFunction
that queries the datastore for every processed record, waits for the query to return a result, enriches the record, and emits the result. While this approach is easy to implement, it suffers from a major issue: each request to the external datastore adds significant latency (a request/response involves two network messages) and the MapFunction
spends most of its time waiting for query results.
Apache Flink provides the AsyncFunction
to mitigate the latency of remote I/O calls. AsyncFunction
concurrently sends multiple queries and processes their results asynchronously. It can be configured to preserve the order of records (requests might return in a different order than the order in which they were sent out) or return the results in the order of the query results to further reduce the latency. The function is also properly integrated with Flink’s checkpointing mechanism—input records that are currently waiting for a response are checkpointed and queries are repeated in the case of a recovery. Moreover, AsyncFunction
properly works with event-time processing because it ensures watermarks are not overtaken by records even if out-of-order results are enabled.
In order to take advantage of AsyncFunction
, the external system should provide a client that supports asynchronous calls, which is the case for many systems. If a system only provides a synchronous client, you can spawn threads to send requests and handle them. The interface of AsyncFunction
is shown in the following:
trait
AsyncFunction
[
IN
,OUT
]
extends
Function
{
def
asyncInvoke
(
input
:
IN
,
resultFuture
:
ResultFuture
[
OUT
])
:
Unit
}
The type parameters of the function define its input and output types. The asyncInvoke()
method is called for each input record with two parameters. The first parameter is the input record and the second parameter is a callback object to return the result of the function or an exception. In Example 8-16, we show how to apply AsyncFunction
on a DataStream
.
val
readings
:
DataStream
[
SensorReading
]
=
???
val
sensorLocations
:
DataStream
[(
String
,String
)]
=
AsyncDataStream
.
orderedWait
(
readings
,
new
DerbyAsyncFunction
,
5
,
TimeUnit
.
SECONDS
,
// timeout requests after 5 seconds
100
)
// at most 100 concurrent requests
The asynchronous operator that applies an AsyncFunction
is configured with the AsyncDataStream
object,10 which provides two static methods: orderedWait()
and unorderedWait()
. Both methods are overloaded for different combinations of parameters. orderedWait()
applies an asynchronous operator that emits results in the order of the input records, while the operator of unorderWait()
only ensures watermarks and checkpoint barriers remain aligned. Additional parameters specify when to time out the asynchronous call for a record and how many concurrent requests to start. Example 8-17 shows DerbyAsyncFunction
, which queries an embedded Derby database via its JDBC interface.
class
DerbyAsyncFunction
extends
AsyncFunction
[
SensorReading
,(
String
,String
)]
{
// caching execution context used to handle the query threads
private
lazy
val
cachingPoolExecCtx
=
ExecutionContext
.
fromExecutor
(
Executors
.
newCachedThreadPool
())
// direct execution context to forward result future to callback object
private
lazy
val
directExecCtx
=
ExecutionContext
.
fromExecutor
(
org
.
apache
.
flink
.
runtime
.
concurrent
.
Executors
.
directExecutor
())
/**
* Executes JDBC query in a thread and handles the resulting Future
* with an asynchronous callback.
*/
override
def
asyncInvoke
(
reading
:
SensorReading
,
resultFuture
:
ResultFuture
[(
String
,String
)])
:
Unit
=
{
val
sensor
=
reading
.
id
// get room from Derby table as Future
val
room
:
Future
[
String
]
=
Future
{
// Creating a new connection and statement for each record.
// Note: This is NOT best practice!
// Connections and prepared statements should be cached.
val
conn
=
DriverManager
.
getConnection
(
"jdbc:derby:memory:flinkExample"
,
new
Properties
())
val
query
=
conn
.
createStatement
()
// submit query and wait for result; this is a synchronous call
val
result
=
query
.
executeQuery
(
s"SELECT room FROM SensorLocations WHERE sensor = '
$sensor
'"
)
// get room if there is one
val
room
=
if
(
result
.
next
())
{
result
.
getString
(
1
)
}
else
{
"UNKNOWN ROOM"
}
// close resultset, statement, and connection
result
.
close
()
query
.
close
()
conn
.
close
()
// return room
room
}(
cachingPoolExecCtx
)
// apply result handling callback on the room future
room
.
onComplete
{
case
Success
(
r
)
=>
resultFuture
.
complete
(
Seq
((
sensor
,
r
)))
case
Failure
(
e
)
=>
resultFuture
.
completeExceptionally
(
e
)
}(
directExecCtx
)
}
}
The asyncInvoke()
method of DerbyAsyncFunction
in Example 8-17 wraps the blocking JDBC query in a Future
, which is executed via CachedThreadPool
. To keep the example concise, we create a new JDBC connection for each record, which is, of course, quite inefficient and should be avoided. Future[String]
holds the result of the JDBC query.
Finally, we apply an onComplete()
callback on Future
and pass the result (or a possible exception) to the ResultFuture
handler. In contrast to the JDBC query Future
, the onComplete()
callback is processed by DirectExecutor
because passing the result to ResultFuture
is a lightweight operation that does not require a dedicated thread. Note that all operations are done in a nonblocking fashion.
It is important to point out that an AsyncFunction
instance is sequentially called for each of its input records—a function instance is not called in a multithreaded fashion. Therefore, the asyncInvoke()
method should quickly return by starting an asynchronous request and handling the result with a callback that forwards the result to ResultFuture
. Common antipatterns that must be avoided include:
In this chapter you learned how Flink DataStream applications can read data from and write data to external systems and the requirements for an application to achieve different end-to-end consistency guarantees. We presented Flink’s most commonly used built-in source and sink connectors, which also serve as representatives for different types of storage systems, such as message queues, filesystems, and key-value stores.
Subsequently, we showed you how to implement custom source and sink connectors, including WAL and 2PC sink connectors, providing detailed examples. Finally, you learned about Flink’s AsyncFunction
, which can significantly improve the performance of interacting with external systems by performing and handling requests asynchronously.
1 Exactly-once state consistency is a requirement for end-to-end exactly-once consistency but is not the same.
2 We discuss the consistency guarantees of a WAL sink in more detail in “GenericWriteAheadSink”.
3 See Chapter 6 for details about the timestamp assigner interfaces.
4 InputFormat
is Flink’s interface to define data sources in the DataSet API.
5 In contrast to SQL INSERT statements, CQL INSERT statements behave like upsert queries—they override existing rows with the same primary key.
6 Rich functions were discussed in Chapter 5.
7 Usually the RichSinkFunction
interface is used because sink functions typically need to set up a connection to an external system in the RichFunction.open()
method. See Chapter 5 for details on the RichFunction
interface.
8 A task might need to commit multiple transactions if an acknowledgment message is lost.
9 See details in “Apache Kafka Sink Connector”.
10 The Java API provides an AsyncDataStream
class with the respective static methods.
18.118.12.50