结构化流式编程指南

概述

结构化流是一个可伸缩的、 fault-tolerance 的流处理引擎,构建在Spark SQL引擎上。你可以用在静态数据上表示批处理计算的方式来表示流计算。Spark SQL引擎将负责增量地、连续地运行它,并在流数据不断到达时更新最终结果。你可以使用 ScalaJavaPythonR中的Dataset/DataFrame API来表示stream aggregationevent-time windowstream-to-batch join 等。计算是在同一个优化的Spark SQL引擎上执行的。最后,系统通过 checkpointwrite-ahead 日志来确保 end-to-end exactly-once fault-tolerance 保证。简而言之,结构化流提供了快速、可伸缩、fault-toleranceend-to-end exactly-once 流处理,而用户无需对流进行推断。

在内部,默认情况下,使用 micro-batch 引擎处理结构化流查询,micro-batch 引擎将数据流处理为一系列小批作业,从而实现低至100毫秒的 end-to-end 延迟和 exactly-once fault-tolerance 保证。但是,从Spark 2.3开始,我们引入了一种新的低延迟处理模式,称为 连续处理(Continuous Processing),它可以在 at-least-once 保证的情况下实现低至1毫秒的 end-to-end 延迟。不需要更改查询中的 Dataset /DataFrame 操作,就可以根据应用程序需求选择模式。

在本指南中,我们将介绍编程模型和 API。我们将主要使用默认的 micro-batch 模型来解释这些概念,然后讨论连续处理模型。首先,让我们从一个结构化流查询的简单示例开始—一个流字数统计。

简单例子

假设你希望维护从监听 TCP Socket 的数据服务器接收到的文本数据的运行字数计数。让我们看看如何使用结构化流来表达这一点。你可以在Scala/Java/Python/R中看到完整的代码。如果你下载了Spark,你可以直接运行这个例子。在任何情况下,让我们一步一步地遍历这个示例,并了解它是如何工作的。首先,我们必须导入必要的类并创建一个本地SparkSession,这是与Spark相关的所有功能的起点。

  • Scala
  1. import org.apache.spark.sql.functions._
  2. import org.apache.spark.sql.SparkSession
  3. val spark = SparkSession
  4. .builder
  5. .appName("StructuredNetworkWordCount")
  6. .getOrCreate()
  7. import spark.implicits._
  • Java
  1. import org.apache.spark.api.java.function.FlatMapFunction;
  2. import org.apache.spark.sql.*;
  3. import org.apache.spark.sql.streaming.StreamingQuery;
  4. import java.util.Arrays;
  5. import java.util.Iterator;
  6. SparkSession spark = SparkSession
  7. .builder()
  8. .appName("JavaStructuredNetworkWordCount")
  9. .getOrCreate();

接下来,让我们创建一个 stream DataFrame,它表示从监听 localhost:9999 的服务器接收到的文本数据,并将 DataFrame 转换为 word count 计算。

  • Scala
  1. // Create DataFrame representing the stream of input lines from connection to localhost:9999
  2. val lines = spark.readStream
  3. .format("socket")
  4. .option("host", "localhost")
  5. .option("port", 9999)
  6. .load()
  7. // Split the lines into words
  8. val words = lines.as[String].flatMap(_.split(" "))
  9. // Generate running word count
  10. val wordCounts = words.groupBy("value").count()
  • Java
  1. // Create DataFrame representing the stream of input lines from connection to localhost:9999
  2. Dataset<Row> lines = spark
  3. .readStream()
  4. .format("socket")
  5. .option("host", "localhost")
  6. .option("port", 9999)
  7. .load();
  8. // Split the lines into words
  9. Dataset<String> words = lines
  10. .as(Encoders.STRING())
  11. .flatMap((FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(), Encoders.STRING());
  12. // Generate running word count
  13. Dataset<Row> wordCounts = words.groupBy("value").count();

lines DataFrame 表示一个包含流文本数据的无界表。该表包含一列名为“value”的字符串,流文本数据中的每一行都成为该表中的一行。请注意,由于我们只是在设置转换,还没有启动转换,所以目前还没有接收任何数据。接下来,我们使用以下命令将 DataFrame 转换为一个字符串 Dataset .as[String],这样我们就可以应用 flatMap 操作将每一行分割成多个单词。生成的单词 Dataset 包含所有单词。最后,我们定义了wordCounts DataFrame,方法是根据 Dataset 中的惟一值进行分组并计数。请注意,这是一个流DataFrame,它表示流的运行字数。

我们现在已经设置了对流数据的查询。剩下的就是实际开始接收数据并计算计数。为此,我们将它设置为在每次更新计数结果集(由 outputMode("complete") 指定)时将它们打印到控制台。然后使用 start() 启动流计算。

  • Scala
  1. // Start running the query that prints the running counts to the console
  2. val query: StreamingQuery = wordCounts.writeStream
  3. .outputMode("complete")
  4. .format("console")
  5. .start()
  6. query.awaitTermination()
  • Java
  1. // Start running the query that printes the running counts to the console
  2. StreamingQuery query = wordCounts.writeStream()
  3. .outPutMode("complete")
  4. .format("console")
  5. .start();
  6. query.awaitTermination()

执行此代码后,流计算将在后台启动。query 对象是活动流查询的句柄,我们决定使用 awaitTermination() 来等待查询的终止,以防止在查询活动期间进程退出。

要实际执行此示例代码,你可以在自己的 Spark应用程序中编译代码,也可以在下载了Spark后运行示例。在这里我们用下载的Spark 运行示例进行展示。你首先需要使用Netcat(在大多数类unix系统中可以找到的一个小实用程序)作为数据服务器运行

  1. $ nc -lk 9999

然后,在另一个终端中,你可以使用以下命令启动示例

  • Scala
  1. $ ./bin/run-example org.apache.spark.examples.sql.streaming.StructuredNetworkWordCount localhost 9999
  • Java
  1. $ ./bin/run-example org.apache.spark.examples.sql.streaming.JavaStructuredNetworkWordCount localhost 9999

然后,在运行 netcat 服务器的终端中键入的任何行都将被计数并在屏幕上每秒打印一次。它看起来就像下面这样。

  1. # TERMINAL 1:
  2. # Running Netcat
  3. $ nc -lk 9999
  4. apache spark
  5. apache hadoop
  • 下面以 Scala 为例(执行结果都一样,仅仅启动脚本命令行稍微不同而已), 每秒打印一次,并且统计结果累计
  1. # TERMINAL 2: RUNNING StructuredNetworkWordCount
  2. $ ./bin/run-example org.apache.spark.examples.sql.streaming.StructuredNetworkWordCount localhost 9999
  3. -------------------------------------------
  4. Batch: 0
  5. -------------------------------------------
  6. +------+-----+
  7. | value|count|
  8. +------+-----+
  9. |apache| 1|
  10. | spark| 1|
  11. +------+-----+
  12. -------------------------------------------
  13. Batch: 1
  14. -------------------------------------------
  15. +------+-----+
  16. | value|count|
  17. +------+-----+
  18. |apache| 2|
  19. | spark| 1|
  20. |hadoop| 1|
  21. +------+-----+
  22. ...

编程模型

结构化流的关键思想是将实时数据流视为一个不断追加的表。这导致了一个新的流处理模型,它非常类似于批处理模型。你可以将流计算表示为标准的类似批处理的查询,就像在静态表上一样,而Spark将它作为 unbound 输入表上的增量查询来运行。让我们更详细地了解这个模型。

基本概念

将输入数据流视为“Input Table”。到达流上的每个数据项就像一个新行被追加到输入表中。

Stream as a Table

对输入的查询将生成“Result Table”。在每个触发间隔(例如,每1秒),都会向输入表追加新行,从而最终更新结果表。无论何时更新结果表,我们都希望将更改后的结果行写入外部 sink

Model

“OutPut”被定义为写入外部存储器的内容。输出可以在不同的模式下定义:

  • Complete Mode - 整个更新后的 Result Table 将被写入外部存储。由 storage connector 决定如何处理整个表的写入。
  • Append Mode - 仅将新行添加到 Result Table ,因为最后一个 trigger 触发器将被写入外部存储。这仅适用于不希望更改结果表中现有行的查询。
  • Update Mode - 仅将 Result Table 中自最后一个 trigger 触发器以来更新的行写入外部存储(Spark 2.1.1开始可用)。注意,这与 Complete Mode 不同,因为该模式仅输出自上次触发器以来更改的行。如果查询不包含聚合,则其效果等同于追加模式。

注意,每种模式都适用于特定类型的查询。稍后将对此进行详细讨论。

为了演示这个模型的使用,让我们在上面的快速示例上下文中理解这个模型。第一个 lines DataFrame是输入表,最后一行 wordCounts DataFrame是结果表。请注意,用于生成 wordCounts 的流行DataFrame上的查询与静态DataFrame上的查询完全相同。但是,当这个查询启动时,Spark将不断地检查来自 Socket 连接的新数据。如果有新数据,Spark将运行一个“incremental(增量)”查询,该查询将以前的运行计数与新数据组合起来,以计算更新的计数,如下所示。

Model

请注意,结构化流并不会物化整个表。它从流数据源读取最新的可用数据,以增量方式处理它以更新结果,然后丢弃源数据。它只保留更新结果所需的最小中间状态数据(例如,前面示例中的中间计数)。

这个模型与许多其他流处理引擎有很大的不同。许多流系统要求用户自己维护运行的聚合,因此必须考虑 fault-tolerance 和数据一致性( at-least-once ,或 at-most-once ,或 exactly-once )。在这个模型中,当有新数据时,Spark负责更新结果表,从而使用户不必进行推理。作为一个示例,让我们看看这个模型如何处理基于 event-time 的处理和延迟到达的数据。

处理 Event-timeLate Data

event-time 是嵌入到数据本身中的时间。对于许多应用程序,你可能希望对这个 event-time 进行操作。例如,如果你希望获得物联网设备每分钟生成的事件数,那么你可能希望使用数据生成时的时间(即数据中的 event-time ),而不是Spark接收它们的时间。这个 event-time 很自然地在这个模型中表示出来——来自设备的每个事件是表中的一行,而 event-time 是行中的列值。这使得基于窗口的聚合(例如每分钟事件数)成为 event-time 列上的一种特殊的分组和聚合类型——每个时间窗口都是一个组,每一行都可以属于多个窗口/组。因此,可以在静态 Dataset (例如,从收集的设备事件日志)和数据流上一致地定义这种基于 event-time 窗口的聚合查询,这使得用户使用地更加轻松。

此外,此模型自然会根据 event-time 处理比预期晚到达的数据。因为Spark正在更新结果表,所以它可以完全控制在出现延迟数据时更新旧的聚合,以及清理旧的聚合以限制中间状态数据的大小。从Spark 2.1开始,我们支持 Watermark ,允许用户指定延迟数据的阈值,允许引擎相应地清理旧状态。稍后将在窗口操作部分更详细地解释这些操作。

fault-tolerance 语义

提供 end-to-end Exactly-once 语义是结构化流设计背后的关键目标之一。为了实现这一点,我们设计了结构化的流源、Sink 和执行引擎来可靠地跟踪处理的确切进度,以便它能够通过重新启动和/或重新处理来处理任何类型的故障。假设每个流源都有偏移量(类似于Kafka偏移量或Kinesis序列号)来跟踪流中的读取位置。该引擎使用 checkpointwrite-ahead 日志来记录每个触发器中正在处理的数据的偏移范围。Stream sink 被设计成处理后处理的幂等性。通过使用可重放的源和幂等 Sink,结构化流可以确保在任何失败情况下 end-to-endExactly-once 语义。

使用 DatasetDataFrame 的API

自Spark 2.0以来,DataFrames和 Dataset 可以表示静态的有界数据,也可以表示流式的无界数据。与静态 Dataset /DataFrames类似,你可以使用公共入口点SparkSession (Scala/Java/Python/R docs)从流数据源创建流 Dataset / Dataset ,并对它们应用与静态 Dataset / Dataset 相同的操作。如果你不熟悉 Dataset / Dataset ,强烈建议你使用 DataFrame/Dataset编程指南熟悉它们。

创建流式 DataFrame 和流式 Dataset

流数据流可以通过 SparkSession.readStream() 返回的 DataStreamReader 接口(Scala/Java)/Python文档)创建。在R中,使用 read.stream() 方法。与创建静态DataFrame的read接口类似,你可以指定源数据格式、模式、选项等的详细信息。

输入源

有一些 built-in 的资源。

  • File source - 以数据流的形式读取目录中写入的文件。支持的文件格式有 textcsv, json, orc, parquet。请参阅 DataStreamReader 接口的文档,以获得 up-to-date 的列表和每种文件格式支持的选项。请注意,文件必须自动地放在给定的目录中,在大多数文件系统中,可以通过文件移动操作来实现.
  • Kafka source - 从Kafka读取数据。它与Kafka broker 版本0.10.0或更高版本兼容。有关更多细节,请参阅Kafka集成指南
  • Socket source(用于测试)- 从 Socket 连接读取 UTF8文本数据。监听位于 Driver 的 Socket 服务。注意,这应该只用于测试,因为它不提供 end-to-endfault-tolerance 保证。
  • Rate source(用于测试) - 以每秒指定的行数生成数据,每个输出行包含一个 timestampvalue。其中timestamp 是包含消息分派时间的 Timestamp 类型,而 value 是包含消息计数的 Long 类型,从第一行的0开始。此源代码用于测试和 benchmark(基准测试)。

有些源不是 fault-tolerance 的,因为它们不保证在发生故障后可以使用 checkpoint 偏移量重新播放数据。请参阅前面有关 fault-tolerance 语义的部分。以下是Spark中所有源的详细信息。

Source Options Fault-tolerant Notes
File source path: 到输入目录的路径,对所有文件格式都通用。
maxFilesPerTrigger: 每个 trigger 中要考虑的新文件的最大数量 (默认: no max)
latestFirst: 是否先处理最新的新文件,在有大量积压文件时非常有用 (默认: false)
fileNameOnly: 是否仅根据文件名而不是完整路径检查新文件 (默认: false). 将这个值设置为 true,以下文件将被视为相同的文件,因为它们的文件名为“dataset.txt”,都是一样的:
“file:///dataset.txt”
“s3://a/dataset.txt”
“s3n://a/b/dataset.txt”
“s3a://a/b/c/dataset.txt”


有关文件格式特定的选项,请参阅 DataStreamReader (Scala/Java/Python/R). E.g. “parquet” 格式选项 请参见 DataStreamReader.parquet().

I此外,还有一些会话配置会影响某些文件格式。请参见 SQL Programming Guide 更多详细信息. E.g., “parquet”, 请参见 Parquet 配置 部分.
Yes 支持glob路径,但不支持多个逗号分隔的 paths/globs。
Socket Source host: 连接的主机名,必须指定
port: 连接端口,必须指定
No  
Rate Source rowsPerSecond (e.g. 100, 默认: 1): 每秒应该生成多少行。

rampUpTime (e.g. 5s, 默认: 0s): 在生成数据速度上升之前需要多长时间 rowsPerSecond. 使用比秒更细的粒度将被截断为整数秒。

numPartitions (e.g. 10, 默认: Spark 默认并行度): 生成数据行的分区数

Source 将尽所能的达到 rowsPerSecond, 但是查询可能受到资源限制,并且 numPartitions 可以调整,以帮助达到所需的速度。
Yes  
Kafka Source See the Kafka Integration Guide. Yes

这里有一些例子。

  • Scala
  1. val spark: SparkSession = ...
  2. // Read text from socket
  3. val socketDF:Dataset[Row] = spark
  4. .readStream
  5. .format("socket")
  6. .option("host", "localhost")
  7. .option("port", 9999)
  8. .load()
  9. socketDF.isStreaming // Returns True for DataFrames that have streaming sources
  10. socketDF.printSchema
  11. // Read all the csv files written atomically in a directory
  12. val userSchema: StructType = new StructType().add("name", "string").add("age", "integer")
  13. val csvDF: Dataset<Row> = spark
  14. .readStream
  15. .option("sep", ";")
  16. .schema(userSchema) // Specify schema of the csv files
  17. .csv("/path/to/directory") // Equivalent to format("csv").load("/path/to/directory")
  • Java
  1. SparkSession spark = ...
  2. // Read text from socket
  3. Dataset<Row> socketDF = spark
  4. .readStream()
  5. .format("socket")
  6. .option("host", "localhost")
  7. .option("port", 9999)
  8. .load();
  9. socketDF.isStreaming(); // Returns True for DataFrames that have streaming sources
  10. socketDF.printSchema();
  11. // Read all the csv files written atomically in a directory
  12. StructType userSchema = new StructType().add("name", "string").add("age", "integer");
  13. Dataset<Row> csvDF = spark
  14. .readStream()
  15. .option("sep", ";")
  16. .schema(userSchema) // Specify schema of the csv files
  17. .csv("/path/to/directory"); // Equivalent to format("csv").load("/path/to/directory")

这些示例生成无类型的流式数据流,这意味着在编译时不检查DataFrame的模式,只在提交查询时检查。有些操作,如map、flatMap等,需要在编译时知道类型。为此,可以使用与静态DataFrame相同的方法将这些无类型的流数据流转换为有类型的流 Dataset 。有关更多细节,请参阅SQL编程指南。另外,关于支持的stream source 的更多细节将在本文后面讨论。

流式 DataFrame/Dataset 的模式推断和分区

默认情况下,来自基于文件的源的结构化流要求你指定模式,而不是依赖Spark自动推断它。这个限制确保流查询使用一致的模式,即使在失败的情况下也是如此。对于特殊用例,可以通过将 spark.sql.streaming.schemaInference 设置为 true 来重新启用模式推断。

当存在名为 /key=value/ 的子目录时,分区发现确实会发生,并且列表将自动递归到这些目录中。如果这些列出现在用户提供的模式中,则 Spark 将根据正在读取的文件的路径填充它们。组成分区方案的目录必须在查询启动时出现,并且必须保持静态。例如, 当 /data/year=2015/ 是可以添加 /data/year=2016/,但是更改分区列是无效的(即通过创建目录 /data/date=2016-04-17/)。

对流式 DataFrame/Dataset 的操作

你可以在 DataFrameDataset 上应用所有类型的操作——从无类型的,类似sql的操作(例如 select, where, groupBy),到类型化的类似 RDD 的操作(例如 map, filter, flatMap)。有关更多细节,请参阅SQL编程指南。让我们来看几个可以使用的示例操作。

基本操作—— Selection,Projection,Aggregation

数据流支持DataFrame/Dataset上的大多数常见操作。本节后面将讨论少数不受支持的操作。

  • Scala
  1. case class DeviceData(device: String, deviceType: String, signal: Double, time: DateTime)
  2. val df: DataFrame = ... // streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: string }
  3. val ds: Dataset[DeviceData] = df.as[DeviceData] // streaming Dataset with IOT device data
  4. // Select the devices which have signal more than 10
  5. df.select("device").where("signal > 10") // using untyped APIs
  6. ds.filter(_.signal > 10).map(_.device) // using typed APIs
  7. // Running count of the number of updates for each device type
  8. df.groupBy("deviceType").count() // using untyped API
  9. // Running average signal for each device type
  10. import org.apache.spark.sql.expressions.scalalang.typed
  11. ds.groupByKey(_.deviceType).agg(typed.avg(_.signal)) // using typed API
  • Java
  1. import org.apache.spark.api.java.function.*;
  2. import org.apache.spark.sql.*;
  3. import org.apache.spark.sql.expressions.javalang.typed;
  4. import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
  5. public class DeviceData {
  6. private String device;
  7. private String deviceType;
  8. private Double signal;
  9. private java.sql.Date time;
  10. ...
  11. // Getter and setter methods for each field
  12. }
  13. Dataset<Row> df = ...; // streaming DataFrame with IOT device data with schema { device: string, type: string, signal: double, time: DateType }
  14. Dataset<DeviceData> ds = df.as(ExpressionEncoder.javaBean(DeviceData.class)); // streaming Dataset with IOT device data
  15. // Select the devices which have signal more than 10
  16. df.select("device").where("signal > 10"); // using untyped APIs
  17. ds.filter((FilterFunction<DeviceData>) value -> value.getSignal() > 10)
  18. .map((MapFunction<DeviceData, String>) value -> value.getDevice(), Encoders.STRING());
  19. // Running count of the number of updates for each device type
  20. df.groupBy("deviceType").count(); // using untyped API
  21. // Running average signal for each device type
  22. ds.groupByKey((MapFunction<DeviceData, String>) value -> value.getDeviceType(), Encoders.STRING())
  23. .agg(typed.avg((MapFunction<DeviceData, Double>) value -> value.getSignal()));

你还可以将一个流式DataFrame/Dataset注册为一个临时视图,然后在其上应用SQL命令。

  • Scala
  1. df.createOrReplaceTempView("updates")
  2. spark.sql("select count(*) from updates") // returns another streaming DF
  • Java
  1. df.createOrReplaceTempView("updates");
  2. spark.sql("select count(*) from updates"); // returns another streaming DF

注意,你可以使用 df.isStreaming 来识别一个DataFrame/Dataset是否有流式数据。

  • Scala
  1. df.isStreaming
  • Java
  1. df.isStreaming()

Event-time 上的 Window 操作

滑动 event-time 窗口上的聚合与结构化流非常简单,非常类似于分组聚合。在分组聚合中,为用户指定的分组列中的每个惟一值维护聚合值(例如计数)。对于基于窗口的聚合,将为每个窗口维护聚合值,其中包含一行的 event-time 。让我们用一个例子来理解它。

想象一下,我们的 快速示例 被修改了,流现在包含了行以及该行生成的时间。我们想要计算10分钟内窗口内的单词数,而不是运行单词计数,每5分钟更新一次。也就是说,在10分钟窗口内收到的单词数为12:00 - 12:10、12:05 - 12:15、12:10 - 12:20等。请注意,12:00 - 12:10表示在12:00之后但在12:10之前到达的数据。现在,考虑12:07收到的一个单词。此字应增加与两个窗口12:00 - 12:10和12:05 - 12:15对应的计数。因此计数将被分组键(即单词)和窗口(可以从 event-time 计算)索引。

结果表将类似如下所示。

Window Operations

由于这种窗口操作类似于分组,因此在代码中可以使用groupBy()和window()操作来表示窗口聚合。在下面的Scala/Java/Python示例中,你可以看到完整的代码。

  • Scala
  1. import spark.implicits._
  2. val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
  3. // Group the data by window and word and compute the count of each group
  4. val windowedCounts = words.groupBy(
  5. window($"timestamp", "10 minutes", "5 minutes"),
  6. $"word"
  7. ).count()
  • Java
  1. Dataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
  2. // Group the data by window and word and compute the count of each group
  3. Dataset<Row> windowedCounts = words.groupBy(
  4. functions.window(words.col("timestamp"), "10 minutes", "5 minutes"),
  5. words.col("word")
  6. ).count();

处理 Late DataWatermark

现在,考虑如果其中一个事件延迟, 应用程序会发生什么。例如,在12:04生成的单词(即 event-time )可以在12:11被应用程序接收。应用程序应该使用时间12:04而不是12:11来更新窗口12:00 - 12:10的旧计数。这在我们的基于窗口的分组中很自然地发生——结构化流可以在很长一段时间内保持部分聚合的中间状态,以便延期到达数据可以正确地更新旧窗口的聚合值,如下所示。

Handling Late Data

但是,为了连续几天运行这个查询,系统必须限制它在内存中积累的中间状态的数量。这意味着系统需要知道何时可以将旧的聚合从内存状态中删除,因为应用程序将不再接收该聚合的最新数据。为了实现这一点,在Spark 2.1中,我们引入了 Watermark ,它允许引擎自动跟踪数据中的当前 event-time ,并尝试相应地清除旧状态。你可以通过指定 event-time 列和数据在 event-time 方面的预期延迟的阈值来定义查询的 Watermark 。 对于在T时结束的特定窗口,引擎将维护状态并允许延期数据更新状态,直到(max event time seen by the engine - late threshold > T)。换句话说,阈值内的延迟数据将被聚合,但是超过阈值的数据将开始被删除(有关确切的保证,请参阅后面的部分)。让我们用一个例子来理解这一点。我们可以使用如下所示的 withWatermark() 在前面的示例中轻松定义 Watermark

  • Scala
  1. import spark.implicits._
  2. val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
  3. // Group the data by window and word and compute the count of each group
  4. val windowedCounts = words
  5. .withWatermark("timestamp", "10 minutes")
  6. .groupBy(
  7. window($"timestamp", "10 minutes", "5 minutes"),
  8. $"word")
  9. .count()
  • Java
  1. Dataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
  2. // Group the data by window and word and compute the count of each group
  3. Dataset<Row> windowedCounts = words
  4. .withWatermark("timestamp", "10 minutes")
  5. .groupBy(
  6. window(col("timestamp"), "10 minutes", "5 minutes"),
  7. col("word"))
  8. .count();

在本例中,我们将查询的 Watermark 定义为“时间戳”列的值,并将“10分钟”定义为允许数据延迟的阈值。如果此查询以更新输出模式运行(稍后将在输出模式一节中讨论),则引擎将不断更新结果表中的窗口计数,直到该窗口比 Watermark 的时间更早,这比“时间戳”列中的当前 event-time 晚10分钟。这是一个例子。

Watermarking in Update Mode

如图所示,引擎跟踪的最大 event-time 为蓝色虚线,每个触发器开始处的 Watermark 设置为(max event time - '10 mins')红线。例如,当引擎观察数据(12:14,dog)时,它将下一个触发器的 Watermark 设置为12:04。这个 Watermark 让引擎保持中间状态额外10分钟,允许延迟到达数据被计数。例如,数据(12:09,cat)出现故障和延迟,并且落在窗口 12:00 - 12:1012:05 - 12:15。由于它仍然在触发器中的 Watermark 12:04 时间后面,因此引擎仍然将中间计数作为状态维护,并正确更新相关窗口的计数。但是,当 Watermark 被更新到12:11时,窗口(12:00 - 12:10)的中间状态被清除,所有后续数据(例如(12:04, donkey))被认为“too late”,因此被忽略。注意,在每个 trigger 之后,更新计数(即紫色行)被写入 sink 作为 trigger 输出,这是由更新模式决定的。

某些 Sink (例如文件)可能不支持更新模式所需的细粒度 (fine-grain) 更新。为了与它们一起工作,我们还支持 Append Mode,其中只将最终计数写入 sink。如下图所示。

注意,在非流 Dataset 上使用带 Watermark 是不允许的。由于 Watermark 不应该以任何方式影响任何批量查询,我们将直接忽略它。

Watermarking in Append Mode

与前面的更新模式类似,引擎维护每个窗口的中间计数。但是,部分计数不会更新到结果表,也不会写入sink。引擎等待“10分钟”以计算最后的日期,然后删除窗口< Watermark 的中间状态,并将最后的计数追加到 Result Table/sink。例如,窗口 12:00 - 12:10 的最终计数仅在 Watermark 更新到12:11之后才追加到结果表。

用于 Watermark 的条件为清除聚集状态

需要注意的是,为了清除聚合查询中的状态,必须满足以下条件(从Spark 2.1.1开始,以后可能会更改)。

  • Output 模式必须是 AppendUpdateComplete mode 需要保存所有聚合数据,因此不能使用 Watermark 来删除中间状态。有关每个输出模式的语义的详细说明,请参阅输出模式一节。
  • 聚合必须具有 event-time 列,或 event-time 列上的 window
  • 必须在与聚合中使用的时间戳列相同的列上调用 withWatermark 。例如 df.withWatermark("time", "1 min").groupBy("time2").count()Append 输出模式下无效,因为 Watermark 是在与聚合列不同的列上定义的。
  • 要使用 watermark 细节,必须在聚合之前调用 withWatermark。例如, df.groupBy("time").count().withWatermark("time", "1 min") 是无效的 Append 输出模式。

Watermark聚合语义保证

  • Watermark 延迟(与 Watermark 一起设置)为“2小时”,保证引擎不会丢弃任何延迟小于2小时的数据。换句话说,任何在 event-time 上少于2小时之前处理的最新数据都将被聚合。

  • 然而,这种保证只在一个方向上是严格的。延迟超过2小时的数据不保证被删除;它可能聚合,也可能不聚合。数据越是延迟,引擎处理它的可能性就越小。

Join 操作

结构化流支持将 Stream Dataset DataFrame 与静态 Dataset/DataFrame 以及另一个流 Dataset / DataFrame 连接起来。Stream Join 的结果是增量生成的,类似于前一节中流聚合的结果。在本节中,我们将探讨在上述情况下支持哪种类型的连接(即 innerouter 等)。请注意,在所有受支持的连接类型中,与流 Dataset /DataFrame 的连接的结果与和流中包含相同数据的静态 Dataset /DataFrame 的连接的结果完全相同。

Stream-static 连接

自从在Spark 2.0中引入以来,结构化流就支持流和静态Dataset /DataFrame 之间的连接(inner join 和某种类型的 outer join )。下面是一个简单的例子。

  • Scala
  1. val staticDf = spark.read. ...
  2. val streamingDf = spark.readStream. ...
  3. streamingDf.join(staticDf, "type") // inner equi-join with a static DF
  4. streamingDf.join(staticDf, "type", "right_join") // right outer join with a static DF
  • Java
  1. Dataset<Row> staticDf = spark.read(). ...;
  2. Dataset<Row> streamingDf = spark.readStream(). ...;
  3. streamingDf.join(staticDf, "type"); // inner equi-join with a static DF
  4. streamingDf.join(staticDf, "type", "right_join"); // right outer join with a static DF

注意,Stream-static 连接不是有状态的,因此不需要状态管理。但是,目前还不支持一些类型的stream-static outer join。这些都列在这个 连接部分的末尾

Stream-stream 连接

在Spark 2.3中,我们添加了对 stream-stream 连接的支持,也就是说,你可以连接两个流 Dataset/ DataFrame。在两个数据流之间生成连接结果的挑战在于,在任何时候,对于连接的两边, Dataset 的视图都是不完整的,这使得在输入之间查找匹配项变得更加困难。从一个输入流接收的任何行可以与从另一个输入流接收的任何将来的、尚未接收的行匹配。因此,对于这两个输入流,我们将过去的输入缓冲为流状态,这样我们就可以将每个未来的输入与过去的输入匹配起来,从而生成连接的结果。此外,与流聚合类似,我们自动处理延迟的、无序(out-of-order)的数据,并可以使用 Watermark 限制状态。让我们讨论一下受支持的不同类型的 stream-stream 连接以及如何使用它们。

Watermark Inner Join

支持任何类型列上的 inner join 以及任何类型的连接条件。但是,随着流的运行,流状态的大小将无限地增长,因为必须保存所有过去的输入,因为任何新的输入都可以与过去的任何输入匹配。为了避免无界状态,你必须定义附加的连接条件,使不确定的旧输入无法与未来的输入匹配,因此可以从状态中清除。换句话说,你必须在联接中执行以下附加步骤。

  • 在两个输入上定义 Watermark 延迟,以便引擎知道延迟的程度(类似于流聚合)
  • 在两个输入之间定义一个 event-time 约束,这样引擎就可以找出一个输入的旧行什么时候不需要(即不满足时间约束)与另一个输入匹配。这个约束可以用以下两种方式之一进行定义:
    • 时间范围连接条件(例如 JOIN ON leftTime BETWEEN rightTime AND rightTime + INTERVAL 1 HOUR)
    • event-time 窗口连接 (例如…JOIN ON leftTimeWindow = rightTimeWindow)。

让我们用一个例子来理解这一点。

假设我们想要加入一个广告印象流(当一个广告出现时)和另一个用户点击广告流,以关联什么时候印象导致了可货币化的点击。要允许在这个stream-stream 连接中进行状态清理,你必须如下所示指定 Watermark 延迟和时间约束。

  1. Watermark 延迟: 例如,在 event-time 内,广告显示和相应的点击可能分别延迟2小时和3小时。
  2. event-time 范围条件: 例如,在相应的印象之后0秒到1小时的时间范围内可以发生一次单击。

代码看起来是这样的。

  • Scala
  1. import org.apache.spark.sql.functions.expr
  2. val impressions = spark.readStream. ...
  3. val clicks = spark.readStream. ...
  4. // Apply watermarks on event-time columns
  5. val impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours")
  6. val clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")
  7. // Join with event-time constraints
  8. impressionsWithWatermark.join(
  9. clicksWithWatermark,
  10. expr("""
  11. clickAdId = impressionAdId AND
  12. clickTime >= impressionTime AND
  13. clickTime <= impressionTime + interval 1 hour
  14. """)
  15. )
  • Java
  1. import static org.apache.spark.sql.functions.expr
  2. Dataset<Row> impressions = spark.readStream(). ...
  3. Dataset<Row> clicks = spark.readStream(). ...
  4. // Apply watermarks on event-time columns
  5. Dataset<Row> impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours");
  6. Dataset<Row> clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours");
  7. // Join with event-time constraints
  8. impressionsWithWatermark.join(
  9. clicksWithWatermark,
  10. expr(
  11. "clickAdId = impressionAdId AND " +
  12. "clickTime >= impressionTime AND " +
  13. "clickTime <= impressionTime + interval 1 hour ")
  14. );

Watermarkstream-stream inner join 语义保证

这与Watermark聚合语义保证类似。 Watermark 延迟“2小时”保证了引擎不会丢失任何延迟小于2小时的数据。但是延迟超过2小时的数据可能会被处理,也可能不会被处理。

Watermark Outer Join

虽然 Watermark + event-time 约束对于 inner join 是可选的,但是对于左右 outer join 必须指定它们。这是因为,为了在外部连接中生成NULL结果,引擎必须知道输入行在将来什么时候与任何东西都不匹配。因此,必须指定 Watermark + event-time 约束才能生成正确的结果。因此,带有 outer join 的查询与前面的广告货币化示例非常相似,不同之处是有一个额外的参数将其指定为外连接。

  • Scala
  1. impressionsWithWatermark.join(
  2. clicksWithWatermark,
  3. expr("""
  4. clickAdId = impressionAdId AND
  5. clickTime >= impressionTime AND
  6. clickTime <= impressionTime + interval 1 hour
  7. """),
  8. joinType = "leftOuter" // can be "inner", "leftOuter", "rightOuter"
  9. )
  • Java
  1. impressionsWithWatermark.join(
  2. clicksWithWatermark,
  3. expr(
  4. "clickAdId = impressionAdId AND " +
  5. "clickTime >= impressionTime AND " +
  6. "clickTime <= impressionTime + interval 1 hour "),
  7. "leftOuter" // can be "inner", "leftOuter", "rightOuter"
  8. );

带 Watermark Outer Join Stream-Stream 语义保证

关于 Watermark 延迟和是否删除数据,Output joinInner join 具有相同的语义保证。

警告

关于如何生成外部结果,有几个重要的特征需要注意。

  • 外部 NULL 结果将产生一个延迟,该延迟取决于指定的 Watermark 延迟和时间范围条件。 这是因为引擎必须等待很长时间,以确保没有匹配上,将来也不会有更多的匹配。

  • 在目前的 micro-match 引擎实现中, Watermark 是在一个micro-match 结束时进行的,下一个微批使用更新后的 Watermark 来清理状态并输出外部结果。因为我们只在需要处理新数据时才触发 micro-batch ,所以如果流中没有接收到新数据,则外部结果的生成可能会延迟。简而言之,如果连接的两个输入流中的任何一个在一段时间内都没有接收到数据,则外部(左或右两种情况)输出可能会延迟。

支持指标 Join 流查询

Left Input Right Input Join Type  
Static Static All types 支持,因为它不是一个数据 stream,即使它可以出现在一个 stream 查询操作
Stream Static Inner 支持,无状态
Left Outer 支持,无状态
Right Outer 不支持
Full Outer 不支持
Static Stream Inner 支持,无状态
Left Outer 不支持
Right Outer 支持,无状态
Full Outer 不支持
Stream Stream Inner 支持, 可选指定两边的 watermark + 时间限制的状态清除
Left Outer 有条件支持,必须指定watermark 在右边+ time 限制正确的结果,可选指定watermark 在左边的所有状态清除
Right Outer 有条件支持, 必须指定watermark 在左边+ time 限制正确的结果,可选指定watermark 在右边的所有状态清除
Full Outer 不支持

关于支持的连接的更多细节:

  • 连接可以级联,也就是说,可以执行 df1.join(df2, ...).join(df3, ...).join(df4, ....).

  • 从Spark 2.4开始,你只能在查询处于 Append 输出模式时使用连接。还不支持其他输出模式。

  • 从Spark 2.4开始,你不能在联接之前使用其他 non-map-like 的操作。以下是一些不能使用的例子。

    • 不能在连接之前使用流聚合。

    • 在连接之前,不能在 Update 模式中使用 mapGroupsWithStateflatMapGroupsWithState

Stream 重复数据删除(Deduplication)

你可以使用事件中的唯一标识符来删除流中的重复数据记录。这与使用唯一标识符列的静态重复数据删除完全相同。查询将存储来自以前记录的必要数据量,以便可以过滤重复的记录。与聚合类似,可以使用也可以不使用 Watermark 来进行重复数据的删除 。

  • 使用 Watermark : 如果重复记录到达的时间有上限,那么你可以在 event-time 列上定义 Watermark ,并使用 guidevent-time 列来删除重复记录。该查询将使用 Watermark 从过去的记录中删除旧的状态数据,这些数据不再期望得到任何重复。这限制了查询必须维护的状态的数量。
  • Watermark : 由于对重复记录何时到达没有限制,查询回将所有过去记录的数据存储在状态信息中。
  • Scala
  1. val streamingDf = spark.readStream. ... // columns: guid, eventTime, ...
  2. // Without watermark using guid column
  3. streamingDf.dropDuplicates("guid")
  4. // With watermark using guid and eventTime columns
  5. streamingDf
  6. .withWatermark("eventTime", "10 seconds")
  7. .dropDuplicates("guid", "eventTime")
  • Java
  1. Dataset<Row> streamingDf = spark.readStream(). ...; // columns: guid, eventTime, ...
  2. // Without watermark using guid column
  3. streamingDf.dropDuplicates("guid");
  4. // With watermark using guid and eventTime columns
  5. streamingDf
  6. .withWatermark("eventTime", "10 seconds")
  7. .dropDuplicates("guid", "eventTime");

处理多个 Watermark 的策略

一个流查询可以有多个统一或连接在一起的输入流。每个输入流都有一个不同的延期数据阈值,需要对有状态操作进行容忍。在每个输入流上使用 withWatermarks("eventTime", delay) 的阈值来指定这些阈值。例如,考虑一个具有 inputStream1inputStream1 之间的 stream-stream 连接的查询。

  1. inputStream1.withWatermark("eventTime1", "1 hour")
  2. .join(
  3. inputStream2.withWatermark("eventTime2", "2 hours"),
  4. joinCondition)

在执行查询时,结构化流分别跟踪每个输入流中看到的最大 event-time ,根据相应的延迟计算 Watermark ,并选择单个全局 Watermark 用于有状态操作。默认情况下,选择最小值作为全局 Watermark ,因为它可以确保当一个流落后于其他流(例如,其中一个流由于上游失败而停止接收数据)时,不会因为太晚而意外丢失数据。换句话说,全局 Watermark 将以最慢流的速度安全地移动,查询输出也将相应地延迟。

然而,在某些情况下,你可能希望获得更快的结果,即使这意味着从最慢的流中删除数据。自从 Spark 2.4 以来,你可以通过设置SQL配置 spark.sql.streaming.multipleWatermarkPolicymax(默认值: min) 来设置多重 Watermark 策略,选择最大值作为全局 Watermark 。这让全局 Watermark 移动的速度最快的流。然而,作为一个副作用,来自较慢的流的数据将被积极地丢弃。因此,请明智地使用此配置。

任意的 Stateful Operation

许多场景需要比聚合更高级的有状态操作。例如,在许多场景中,必须从事件的数据流跟踪会话。要进行这种会话,必须将任意类型的数据保存为状态,并在每个触发器中使用数据流事件对状态执行任意操作。从Spark 2.2开始,这可以使用 mapGroupsWithState 操作和功能更强大的 flatMapGroupsWithState 操作来完成。这两个操作都允许你在分组 Dataset 上应用用户定义的代码来更新用户定义的状态。要了解更多的具体细节,请看API文档(Scala/Java)和示例(Scala/Java)。

不支持的 Operation

有一些DataFrame/Dataset操作是流数据不支持的。其中一些如下。

  • Dataset 中还不支持多个流聚合(即流 DF 上的聚合链)。
  • 在 Dataset 流中, 不支持 Limit 和 获取开始N行(类似 topN)操作
  • 在 Dataset 流中, 不支持 Distinct 操作
  • Dataset 流仅在聚合之后并在 Compleate Output Mode 下,才支持 Sort 操作。
  • Dataset 流, 不支持少数类型的 outer join。有关更多详细信息,请参阅支持指标 Join 流查询

此外,有些 Dataset 方法在Dataset 流上不支持的。它们是将立即运行查询并返回结果的操作,这在Dataset 流上是没有意义的。相反,这些功能可以通过显式地启动一个流查询来实现(参见下一节)。

  • count() - Dataset 流不能返回单个 count。相反,使用 ds.groupBy().count(),它返回一个包含正在运行的计数的流 Dataset
  • foreach() - 使用 ds.writeStream.foreach(...) (参见下一节) 代替。
  • show() - 使用控制台 sink 代替(参见下一节)。

如果你尝试任何这些操作,你将看到一个 AnalysisException,如“ Dataset / DataFrame 流中, 操作 XYZ 是不支持的”。虽然其中一些可能会在未来的Spark版本中得到支持,但其他一些则很难在流数据上有效地实现。例如,不支持对输入流进行排序,因为它需要跟踪流中收到的所有数据。因此,这基本上很难有效执行。

开始 Stream Query

一旦定义了 DataFrame/Dataset 的最终结果,剩下的就是开始流计算了。为此,必须使用通过 Dataset.writeStream() 返回的 DataStreamWriter (Scala/Java/Python文档)。你必须在这个接口中指定以下内容的一个或多个。

  • 输出 sink 的详细信息:数据格式、位置等。

  • 输出模式 : 指定写入输出 sink 的内容。

  • query 名称: 可选, 指定 query 的唯一名称以进行标识。

  • 触发间隔 : 可选, 指定触发间隔。如果没有指定,系统将在前面的处理完成后立即检查新数据的可用性。如果由于前一个处理未完成而错过了触发时间,则系统将立即触发处理。

  • checkpoint 位置: 对于一些可以保证 end-to-end fault-tolerance 的输出 sink,指定系统将写入所有 checkpoint 信息的位置。这应该是 HDFS-compatiblefault-tolerance 文件系统中的一个目录。下一节将更详细地讨论 checkpoint 的语义。

Output 模式

有几种类型的输出模式。

  • Append mode(默认) - 这是默认模式, (因最后一个触发器(trigger)会输出到 sink) 只有新行才会添加到 Result Table。这只适用于那些添加到结果表的行永远不会改变的查询。因此,此模式保证每个行只输出一次(假设 fault-tolerance sink)。例如,只有 selectwheremapflatMapfilterjoin等查询支持 Append 模式。
  • Complete mode - 每个触发器触发后, 整个 Result Table 会输出到 sinkComplete mode 支持聚合查询
  • Update 模式 - (Spark 2.1.1 版本后可用) 只有在最后一个触发器之后, 更新的结果表中的行才会输出到 Sink。更多信息将在未来的版本中添加。

不同类型的流 Query 支持不同的输出模式。这是兼容性选项。

Query Type   Supported Output Modes Notes
Queries with aggregation Aggregation on event-time with watermark Append, Update, Complete Append mode uses watermark to drop old aggregation state. But the output of a windowed aggregation is delayed the late threshold specified in withWatermark() as by the modes semantics, rows can be added to the Result Table only once after they are finalized (i.e. after watermark is crossed). See the Late Data section for more details.

Update mode uses watermark to drop old aggregation state.

Complete mode does not drop old aggregation state since by definition this mode preserves all data in the Result Table.
Other aggregations Complete, Update Since no watermark is defined (only defined in other category), old aggregation state is not dropped.

Append mode is not supported as aggregates can update thus violating the semantics of this mode.
Queries with mapGroupsWithState Update  
Queries with flatMapGroupsWithState Append operation mode Append Aggregations are allowed after flatMapGroupsWithState.
Update operation mode Update Aggregations not allowed after flatMapGroupsWithState.
Queries with joins Append Update and Complete mode not supported yet. See the support matrix in the Join Operations section for more details on what types of joins are supported.
Other queries Append, Update Complete mode not supported as it is infeasible to keep all unaggregated data in the Result Table.

Output Sink

有几种类型的 build-in 输出 sink

  • File sink: 将输出保存到一个目录中
  1. writeStream
  2. .format("parquet") // can be "orc", "json", "csv", etc.
  3. .option("path", "path/to/destination/dir")
  4. .start()
  • Kafka sink : kafka 中存储一个或多个 topic 的输出。
  1. writeStream
  2. .format("kafka")
  3. .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  4. .option("topic", "updates")
  5. .start()
  • Foreach sink - 对输出中的记录执行任意计算。有关更多细节,请参阅本节后面的内容。
  1. writeStream
  2. .foreach(...)
  3. .start()
  • Console sink (for debugging) -每次触发时将输出打印到控制台/ stdout。 都支持“Append ”和“Complete ”输出模式。 这应该用于低数据量的调试目的,因为在每次触发后,整个输出被收集并存储在驱动程序的内存中。
  1. writeStream
  2. .format("console")
  3. .start()
  • Memory sink (for debugging) - 输出作为内存表存储在内存中。 都支持“Append ”和“Complete ”输出模式。 由于整个输出被收集并存储在驱动程序的内存中,所以应用于低数据量的调试目的。 因此,请谨慎使用
  1. writeStream
  2. .format("memory")
  3. .queryName("tableName")
  4. .start()

某些 sinkfault-tolerance,因为它们不保证输出的持久性,仅用于调试目的。 请参阅上一节关于 fault-tolerance semantic` 的部分。 以下是Spark中所有 Sink 的详细信息。

Sink Supported Output Modes Options Fault-tolerant Notes
File Sink Append path: path to the output directory, must be specified.

For file-format-specific options, see the related methods in DataFrameWriter (Scala/Java/Python/R). E.g. for “parquet” format options see DataFrameWriter.parquet()
Yes (exactly-once) Supports writes to partitioned tables. Partitioning by time may be useful.
Kafka Sink Append, Update, Complete See the Kafka Integration Guide Yes (at-least-once) More details in the Kafka Integration Guide
Foreach Sink Append, Update, Complete None Yes (at-least-once) More details in the next section
ForeachBatch Sink Append, Update, Complete None Depends on the implementation More details in the next section
Console Sink Append, Update, Complete numRows: Number of rows to print every trigger (default: 20)
truncate: Whether to truncate the output if too long (default: true)
No  
Memory Sink Append, Complete None No. But in Complete Mode, restarted query will recreate the full table. Table name is the query name.

 

请注意,您必须调用 start() 来实际启动查询的执行。 这将返回一个 StreamingQuery 对象,它是连续运行执行的 handle。 您可以使用此对象来管理查询,我们将在下一小节中讨论。 现在,让我们通过几个例子了解所有这些。 - Scala scala // ========== DF with no aggregations ========== val noAggDF = deviceDataDf.select("device").where("signal > 10") // Print new data to console noAggDF .writeStream .format("console") .start() // Write new data to Parquet files noAggDF .writeStream .format("parquet") .option("checkpointLocation", "path/to/checkpoint/dir") .option("path", "path/to/destination/dir") .start() // ========== DF with aggregation ========== val aggDF = df.groupBy("device").count() // Print updated aggregations to console aggDF .writeStream .outputMode("complete") .format("console") .start() // Have all the aggregates in an in-memory table aggDF .writeStream .queryName("aggregates") // this query name will be the table name .outputMode("complete") .format("memory") .start() spark.sql("select * from aggregates").show() // interactively query in-memory table - Java java // ========== DF with no aggregations ========== Dataset<Row> noAggDF = deviceDataDf.select("device").where("signal > 10"); // Print new data to console noAggDF .writeStream() .format("console") .start(); // Write new data to Parquet files noAggDF .writeStream() .format("parquet") .option("checkpointLocation", "path/to/checkpoint/dir") .option("path", "path/to/destination/dir") .start(); // ========== DF with aggregation ========== Dataset<Row> aggDF = df.groupBy("device").count(); // Print updated aggregations to console aggDF .writeStream() .outputMode("complete") .format("console") .start(); // Have all the aggregates in an in-memory table aggDF .writeStream() .queryName("aggregates") // this query name will be the table name .outputMode("complete") .format("memory") .start(); spark.sql("select * from aggregates").show(); // interactively query in-memory table ##### 使用 ForeachForeachBatch foreachforeachBatch 操作允许你对流查询的输出应用任意操作和写入逻辑。它们的使用场景略有不同: - foreach - 允许在每一行上执行自定义写逻辑, - foreachBatch - 允许在每个微批的输出上执行任意操作和自定义逻辑。让我们更详细地了解它们的用法。 ForeachBatch foreachBatch(...) 允许你指定一个函数,该函数在流查询的每个 micro-batch 的输出数据上执行。从Spark 2.4开始,Scala、Java和Python都支持这个特性。它有一个 DataFrameDataset 的两个参数:-micro-batch输出数据 -micro-batch唯一 ID - Scala ```scala streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => // Transform and write batchDF }.start() ``` - Java ```java streamingDatasetOfString.writeStream().foreachBatch( new VoidFunction2<Dataset<String>, Long> { public void call(Dataset<String> dataset, Long batchId) { // Transform and write batchDF } } ).start(); streamingDatasetOfString.writeStream().foreachBatch ( new VoidFunction2<Dataset<String>, Long> { public void call (Dataset<String> dataset, Long batchId) { // Transform and write batchDF } } ).start(); ``` 使用foreachBatch,你可以执行以下操作。 - **重用现有的批处理数据源** - 对于许多存储系统,可能还没有可用的流Sink,但可能已经存在用于批处理查询的数据写入器。使用foreachBatch,你可以在每个micro-batch的输出上使用批数据写入器。 - **写入多个位置** - 如果你想将流查询的输出写入多个位置,那么你可以简单地多次写入输出DataFrame/Dataset。但是,每次写操作都会导致重新计算输出数据(包括可能的重新读取输入数据)。为了避免重复计算,你应该缓存输出的DataFrame/Dataset,将其写入多个位置,然后释放它。这是一个outline(轮廓,大纲)。 ```scala streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => batchDF.persist() batchDF.write.format(...).save(...) // location 1 batchDF.write.format(...).save(...) // location 2 batchDF.unpersist() } ``` - **应用额外的 DataFrame操作** - 在流DataFrame中不支持许多DataFrameDataset操作,因为在这些情况下Spark不支持生成增量计划。使用foreachBatch,你可以在每个micro-batch输出上应用这些操作。但是,你必须自己考虑执行该操作的end-to-end语义。 **注意:** - 默认情况下,foreachBatch只提供at-least-once写保证。但是,你可以使用提供给函数的batchId来重复输出,并获得exactly-once的保证。 -foreachBatch不能使用连续处理模式,因为它基本上依赖于流查询的micro-batch执行。如果以连续模式写入数据,则使用foreach。 **Foreach** 如果foreachBatch不能满足要求(例如,对应的批数据写入器不存在,或者是连续处理模式),那么你可以使用foreach来表达你的自定义写入器逻辑。具体来说,你可以通过将数据写入逻辑划分为三种方法来表达:open、process和close。从Spark 2.4开始,foreach就可以在Scala、Java和Python中使用。 - 在Scala中,你必须扩展类ForeachWriter(docs)```scala streamingDatasetOfString.writeStream.foreach( new ForeachWriter[String] { def open(partitionId: Long, version: Long): Boolean = { // Open connection } def process(record: String): Unit = { // Write string to connection } def close(errorOrNull: Throwable): Unit = { // Close the connection } } ).start() ``` - 在 Java 中,你必须扩展类ForeachWriter(docs)```java streamingDatasetOfString.writeStream().foreach( new ForeachWriter[String] { @Override public boolean open(long partitionId, long version) { // Open connection } @Override public void process(String record) { // Write string to connection } @Override public void close(Throwable errorOrNull) { // Close the connection } } ).start(); ``` **执行语义** 当流查询开始时,Spark以如下方式调用函数或对象的方法: - 此对象的单个副本负责查询中单个任务生成的所有数据。换句话说,一个实例负责处理以分布式方式生成的数据的一个分区。 - 此对象必须是可序列化的,因为每个任务将获得所提供对象的一个新的序列化-反序列化副本。因此,强烈建议对写入数据进行任何初始化(例如。在调用open()方法之后完成(打开连接或启动事务),这意味着任务已经准备好生成数据。 - 这些方法的生命周期如下: - 对于带有partition_id的每个分区: - 对于带有epoch_id的流数据的每batch/epoch: - 方法open(partitionId, epochId)被调用。 - 如果open(…)返回 true,那么对于partitionbatch/epoch中的每一行,都将调用方法process(row)。 - 方法close(error)是在处理行时发生错误(如果有的话)时调用的。 -close()方法(如果存在)在open()方法存在并成功返回时调用(与返回值无关),除非JVMPython进程中途崩溃。 - **Note**: Spark不保证相同的输出(partitionId, epochId),所以不能实现重复数据删除 (partitionId, epochId) 。例如,source由于某些原因提供了不同数量的分区,Spark optimization改变了分区数量,等等。详见[SPARK-28650](https://issues.apache.org/jira/browse/SPARK-28650)。如果你需要重复数据删除,试试foreachBatch。 ### Trigger 流查询的trigger设置定义了流数据处理的时间, 该查询是作为具有固定批处理间隔的micro-batch查询执行,还是作为连续处理查询执行。下面是支持的不同类型的trigger`。
Trigger Type Description
unspecified (default) If no trigger setting is explicitly specified, then by default, the query will be executed in micro-batch mode, where micro-batches will be generated as soon as the previous micro-batch has completed processing.
Fixed interval micro-batches The query will be executed with micro-batches mode, where micro-batches will be kicked off at the user-specified intervals.
  • If the previous micro-batch completes within the interval, then the engine will wait until the interval is over before kicking off the next micro-batch.
  • If the previous micro-batch takes longer than the interval to complete (i.e. if an interval boundary is missed), then the next micro-batch will start as soon as the previous one completes (i.e., it will not wait for the next interval boundary).
  • If no new data is available, then no micro-batch will be kicked off.
One-time micro-batch The query will execute only one micro-batch to process all the available data and then stop on its own. This is useful in scenarios you want to periodically spin up a cluster, process everything that is available since the last period, and then shutdown the cluster. In some case, this may lead to significant cost savings.
Continuous with fixed checkpoint interval
(experimental)
The query will be executed in the new low-latency, continuous processing mode. Read more about this in the Continuous Processing section below.

下面是一些代码示例。

  • Scala
  1. import org.apache.spark.sql.streaming.Trigger
  2. // Default trigger (runs micro-batch as soon as it can)
  3. df.writeStream
  4. .format("console")
  5. .start()
  6. // ProcessingTime trigger with two-seconds micro-batch interval
  7. df.writeStream
  8. .format("console")
  9. .trigger(Trigger.ProcessingTime("2 seconds"))
  10. .start()
  11. // One-time trigger
  12. df.writeStream
  13. .format("console")
  14. .trigger(Trigger.Once())
  15. .start()
  16. // Continuous trigger with one-second checkpointing interval
  17. df.writeStream
  18. .format("console")
  19. .trigger(Trigger.Continuous("1 second"))
  20. .start()
  • Java
  1. import org.apache.spark.sql.streaming.Trigger
  2. // Default trigger (runs micro-batch as soon as it can)
  3. df.writeStream
  4. .format("console")
  5. .start();
  6. // ProcessingTime trigger with two-seconds micro-batch interval
  7. df.writeStream
  8. .format("console")
  9. .trigger(Trigger.ProcessingTime("2 seconds"))
  10. .start();
  11. // One-time trigger
  12. df.writeStream
  13. .format("console")
  14. .trigger(Trigger.Once())
  15. .start();
  16. // Continuous trigger with one-second checkpointing interval
  17. df.writeStream
  18. .format("console")
  19. .trigger(Trigger.Continuous("1 second"))
  20. .start();

Stream 管理查询

启动查询时创建的 StreamingQuery 对象可用于监视和管理查询。

  • Scala
  1. val query = df.writeStream.format("console").start() // get the query object
  2. query.id // get the unique identifier of the running query that persists across restarts from checkpoint data
  3. query.runId // get the unique id of this run of the query, which will be generated at every start/restart
  4. query.name // get the name of the auto-generated or user-specified name
  5. query.explain() // print detailed explanations of the query
  6. query.stop() // stop the query
  7. query.awaitTermination() // block until query is terminated, with stop() or with error
  8. query.exception // the exception if the query has been terminated with error
  9. query.recentProgress // an array of the most recent progress updates for this query
  10. query.lastProgress // the most recent progress update of this streaming query
  • Java
  1. StreamingQuery query = df.writeStream().format("console").start(); // get the query object
  2. query.id(); // get the unique identifier of the running query that persists across restarts from checkpoint data
  3. query.runId(); // get the unique id of this run of the query, which will be generated at every start/restart
  4. query.name(); // get the name of the auto-generated or user-specified name
  5. query.explain(); // print detailed explanations of the query
  6. query.stop(); // stop the query
  7. query.awaitTermination(); // block until query is terminated, with stop() or with error
  8. query.exception(); // the exception if the query has been terminated with error
  9. query.recentProgress(); // an array of the most recent progress updates for this query
  10. query.lastProgress(); // the most recent progress update of this streaming query

你可以在单个SparkSession中启动任意数量的查询。它们将同时运行,共享集群资源。你可以使用sparkSession.streams() 来获得可用于管理当前活动查询的`StreamingQueryManager (Scala/Java/Python文档)。

  • Scala
  1. val spark: SparkSession = ...
  2. spark.streams.active // get the list of currently active streaming queries
  3. spark.streams.get(id) // get a query object by its unique id
  4. spark.streams.awaitAnyTermination() // block until any one of them terminates
  • Java
  1. SparkSession spark = ...
  2. spark.streams().active(); // get the list of currently active streaming queries
  3. spark.streams().get(id); // get a query object by its unique id
  4. spark.streams().awaitAnyTermination(); // block until any one of them terminates

Stream 监控查询

有多种方法可以监视 active 的流查询。你可以使用Spark的 Dropwizard 指标支持将指标推送到外部系统,或者以编程方式访问它们。

读指标交互

你能使用streamingQuery.lastProgress()streamingQuery.status() 直接地获取到 active 查询的当前状态和指标。 streamingQuery.lastProgress()ScalaJava 中返回一个 StreamingQueryProgress 对象,在 Python 中返回一个具有相同字段的字典。它包含关于流的最后一个 trigger 中所取得进展的所有信息-处理了什么数据、处理速率、延迟等等。还有 streamingQuery.recentProgress返回最近几个进展的数组。

此外,streamingQuery.status() 在Scala和Java中返回一个 StreamingQueryStatus 对象,在 Python 中返回一个具有相同字段的字典。它提供了关于查询正在立即执行的操作的信息-触发器是否活动、数据是否正在处理等等。

这里有几个例子。

  • Scala
  1. val query: StreamingQuery = ...
  2. println(query.lastProgress)
  3. /* Will print something like the following.
  4. {
  5. "id" : "ce011fdc-8762-4dcb-84eb-a77333e28109",
  6. "runId" : "88e2ff94-ede0-45a8-b687-6316fbef529a",
  7. "name" : "MyQuery",
  8. "timestamp" : "2016-12-14T18:45:24.873Z",
  9. "numInputRows" : 10,
  10. "inputRowsPerSecond" : 120.0,
  11. "processedRowsPerSecond" : 200.0,
  12. "durationMs" : {
  13. "triggerExecution" : 3,
  14. "getOffset" : 2
  15. },
  16. "eventTime" : {
  17. "watermark" : "2016-12-14T18:45:24.873Z"
  18. },
  19. "stateOperators" : [ ],
  20. "sources" : [ {
  21. "description" : "KafkaSource[Subscribe[topic-0]]",
  22. "startOffset" : {
  23. "topic-0" : {
  24. "2" : 0,
  25. "4" : 1,
  26. "1" : 1,
  27. "3" : 1,
  28. "0" : 1
  29. }
  30. },
  31. "endOffset" : {
  32. "topic-0" : {
  33. "2" : 0,
  34. "4" : 115,
  35. "1" : 134,
  36. "3" : 21,
  37. "0" : 534
  38. }
  39. },
  40. "numInputRows" : 10,
  41. "inputRowsPerSecond" : 120.0,
  42. "processedRowsPerSecond" : 200.0
  43. } ],
  44. "sink" : {
  45. "description" : "MemorySink"
  46. }
  47. }
  48. */
  49. println(query.status)
  50. /* Will print something like the following.
  51. {
  52. "message" : "Waiting for data to arrive",
  53. "isDataAvailable" : false,
  54. "isTriggerActive" : false
  55. }
  56. */
  • Java
  1. StreamingQuery query = ...
  2. System.out.println(query.lastProgress());
  3. /* Will print something like the following.
  4. {
  5. "id" : "ce011fdc-8762-4dcb-84eb-a77333e28109",
  6. "runId" : "88e2ff94-ede0-45a8-b687-6316fbef529a",
  7. "name" : "MyQuery",
  8. "timestamp" : "2016-12-14T18:45:24.873Z",
  9. "numInputRows" : 10,
  10. "inputRowsPerSecond" : 120.0,
  11. "processedRowsPerSecond" : 200.0,
  12. "durationMs" : {
  13. "triggerExecution" : 3,
  14. "getOffset" : 2
  15. },
  16. "eventTime" : {
  17. "watermark" : "2016-12-14T18:45:24.873Z"
  18. },
  19. "stateOperators" : [ ],
  20. "sources" : [ {
  21. "description" : "KafkaSource[Subscribe[topic-0]]",
  22. "startOffset" : {
  23. "topic-0" : {
  24. "2" : 0,
  25. "4" : 1,
  26. "1" : 1,
  27. "3" : 1,
  28. "0" : 1
  29. }
  30. },
  31. "endOffset" : {
  32. "topic-0" : {
  33. "2" : 0,
  34. "4" : 115,
  35. "1" : 134,
  36. "3" : 21,
  37. "0" : 534
  38. }
  39. },
  40. "numInputRows" : 10,
  41. "inputRowsPerSecond" : 120.0,
  42. "processedRowsPerSecond" : 200.0
  43. } ],
  44. "sink" : {
  45. "description" : "MemorySink"
  46. }
  47. }
  48. */
  49. System.out.println(query.status());
  50. /* Will print something like the following.
  51. {
  52. "message" : "Waiting for data to arrive",
  53. "isDataAvailable" : false,
  54. "isTriggerActive" : false
  55. }
  56. */

使用 Async API 编程输出指标报告

你还可以通过附加 StreamingQueryListener (Scala/Java文档)来异步监控与 SparkSession 相关的所有查询。一旦你将定制的 StreamingQueryListener 对象与 sparkSession.streams.attachListener() 连接起来,你将在查询开始和停止时以及在 active 查询中取得进展时获得回调。举个例子,

  • Scala
  1. val spark: SparkSession = ...
  2. spark.streams.addListener(new StreamingQueryListener() {
  3. override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
  4. println("Query started: " + queryStarted.id)
  5. }
  6. override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
  7. println("Query terminated: " + queryTerminated.id)
  8. }
  9. override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
  10. println("Query made progress: " + queryProgress.progress)
  11. }
  12. })
  • Java
  1. SparkSession spark = ...
  2. spark.streams().addListener(new StreamingQueryListener() {
  3. @Override
  4. public void onQueryStarted(QueryStartedEvent queryStarted) {
  5. System.out.println("Query started: " + queryStarted.id());
  6. }
  7. @Override
  8. public void onQueryTerminated(QueryTerminatedEvent queryTerminated) {
  9. System.out.println("Query terminated: " + queryTerminated.id());
  10. }
  11. @Override
  12. public void onQueryProgress(QueryProgressEvent queryProgress) {
  13. System.out.println("Query made progress: " + queryProgress.progress());
  14. }
  15. });

使用 Dropwizard 输出指标报告

Spark支持使用Dropwizard库报告指标。为了能够报告结构化流查询的指标,你必须在 SparkSession中显式地启用 spark.sql.streaming.metricsEnabled

  • Scala
  1. spark.conf.set("spark.sql.streaming.metricsEnabled", "true")
  2. // or
  3. spark.sql("SET spark.sql.streaming.metricsEnabled=true")
  • Java
  1. spark.conf().set("spark.sql.streaming.metricsEnabled", "true");
  2. // or
  3. spark.sql("SET spark.sql.streaming.metricsEnabled=true");

在启用此配置之后,所有在 SparkSession 中启动的查询都将通过 Dropwizard 向任何已配置的接收端报告指标(例如 GangliaGraphiteJMX等)。

使用 Checkpoint 从失败中 Recovery

在出现故障或有意关闭的情况下,你可以恢复前一个查询的进度和状态,并继续它停止的地方。这是使用 checkpointwrite-ahead 日志完成的。你可以使用 checkpoint 位置配置一个查询,该查询将保存所有的进度信息(即每个 triggle 中处理的偏移范围)和运行聚合(例如quick word count 示例中)到 checkpoint 位置。这个 checkpoint 位置必须是HDFS兼容的文件系统中的一个路径,并且可以在启动查询时在DataStreamWriter 中设置为一个选项。

  • Scala
  1. aggDF
  2. .writeStream
  3. .outputMode("complete")
  4. .option("checkpointLocation", "path/to/HDFS/dir")
  5. .format("memory")
  6. .start()
  • Java
  1. aggDF
  2. .writeStream()
  3. .outputMode("complete")
  4. .option("checkpointLocation", "path/to/HDFS/dir")
  5. .format("memory")
  6. .start();

Stream 查询中更改后的 Recovery 语义

在从同一 checkpoint 位置重新启动之间,允许流查询中的哪些更改是有限制的。以下是几种不允许的更改,或者这些更改的效果没有得到很好的定义。All:

  • 允许的术语意味着你可以执行指定的更改,但是其效果的语义是否定义良好取决于查询和更改。
  • 术语 不允许 意味着你不应该进行指定的更改,因为重新启动的查询可能会失败,出现不可预知的错误。sdf 表示一个使用 sparkSession.readStream 生成的流数据流。

类型的变化

  • 输入源的数量或类型(即不同的源)的改变:这是不允许的。
  • 输入源参数的更改: 是否允许这样做以及更改的语义是否定义良好取决于源和查询。这里有几个例子。

    • 允许添加/删除/修改速率限制: spark.readStream.format("kafka").option("subscribe", "topic")spark.readStream.format("kafka").option("subscribe", "topic").option("maxOffsetsPerTrigger", ...)
    • 通常不允许更改订阅的 topic/file,因为结果是不可预测的: spark.readStream.format("kafka").option("subscribe", "topic")spark.readStream.format("kafka").option("subscribe", "newTopic")
  • output sink类型的更改:允许在几个特定的接收器组合之间进行更改。这需要在个案的基础上加以核实。这里有几个例子。

    • 允许 File sink 更改为 Kafka sink : Kafka 仅会接收到新数据

    • Kafka sink 更改为 File sink 是不允许的

    • Kafka sink 可改为 foreach ,反之亦然。

  • 输出 sink 参数的更改:是否允许这样做,以及更改的语义是否定义良好取决于接收器和查询。这里有几个例子:
    • 不允许 file sink 更改输出目录: sdf.writeStream.format("parquet").option("path", "/somePath") to sdf.writeStream.format("parquet").option("path", "/anotherPath")
      • 允许更改输出 topic: sdf.writeStream.format("kafka").option("topic", "someTopic")sdf.writeStream.format("kafka").option("topic", "anotherTopic")
    • 允许对 user-defineforeach sink (即ForeachWriter代码)进行更改,但是更改的语义取决于代码。
  • 改变 projection/ filter / map-like 操作:某些情况下是允许的。例如:

    • 允许添加/删除 filter:sdf.selectExpr("a")sdf.where(...).selectExpr("a").filter(...)
    • 允许更改具有相同输出模式的 projection: sdf.selectExpr("stringColumn AS json").writeStreamsdf.selectExpr("anotherStringColumn AS json").writeStream
    • 有条件地允许更改具有不同输出模式的 projection: 仅仅当 output sink 允许 Schema ab更改时, sdf.selectExpr("a").writeStreamsdf.selectExpr("b").writeStream 才会被允许。
  • stateful 操作中的更改:流查询中的一些操作需要维护状态数据,以便持续更新结果。结构化流自动将状态数据 checkpointfault-tolerance 存储(例如,HDFS、AWS S3、Azure Blob存储),并在重启后恢复。但是,这假设状态数据的模式在重启时保持不变。这意味着在重新启动之间不允许对流查询的有状态操作进行任何更改(即添加、删除或模式修改)。以下是有状态操作的列表,为了确保状态恢复,在重启之间不应该更改这些操作的模式:

    • Streaming aggregation:例如,sdf.groupBy("a").agg(...).不允许对分组键或聚合的数量或类型进行任何更改。
    • 流式重复数据删除:例如,sdf.dropDuplicates(“a”)不允许对分组键或聚合的数量或类型进行任何更改。
    • stream-stream join: 例如: sdf1.join(sdf2, ...) (即。两个输入都是用sparkSession.readStream生成的)。不允许更改模式或 equi-joining 列。不允许更改连接类型(outer 或 inner)。join condition 中的其他更改定义不清。
  • 任意有状态操作*:例如,sdf.groupByKey(...).mapGroupsWithState(...)sdf.groupByKey(...).flatMapGroupsWithState(...)。 不允许对用户定义状态的模式和超时类型进行任何更改。允许在 user-definedstate-mapping 函数中进行任何更改,但是更改的语义效果取决于用户定义的逻辑。如果你真的希望支持状态模式更改,那么可以使用支持模式迁移的 encode/ decode方案显式地将复杂的状态数据结构 encode/decode 为字节。例如,如果你将你的状态保存为avro编码的字节,那么你可以自由地更改查询之间的Avro-state-schema,因为二进制状态将始终成功地恢复。

连续处理

[实验]

连续处理是 Spark 2.3中引入的一种新的实验性流执行模式,它支持低(~1 ms)end-to-end 延迟,并保证 at-least-once fault-tolerance 。与默认的 micro-batch 引擎相比,默认的 micro-batch 引擎只能实现一次精确的保证,但最多只能实现约100ms的延迟。对于某些类型的查询(下面将讨论),你可以在不修改应用程序逻辑的情况下选择执行它们的模式(即不更改 DataFrame/Dataset操作)。

要在连续处理模式下运行受支持的查询,只需指定一个连续触发器,并将所需的 checkpoint 间隔作为参数。例如,

  • Scala
  1. import org.apache.spark.sql.streaming.Trigger
  2. spark
  3. .readStream
  4. .format("kafka")
  5. .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  6. .option("subscribe", "topic1")
  7. .load()
  8. .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  9. .writeStream
  10. .format("kafka")
  11. .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  12. .option("topic", "topic1")
  13. .trigger(Trigger.Continuous("1 second")) // only change in query
  14. .start()
  • Java
  1. import org.apache.spark.sql.streaming.Trigger;
  2. spark
  3. .readStream
  4. .format("kafka")
  5. .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  6. .option("subscribe", "topic1")
  7. .load()
  8. .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  9. .writeStream
  10. .format("kafka")
  11. .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  12. .option("topic", "topic1")
  13. .trigger(Trigger.Continuous("1 second")) // only change in query
  14. .start();

checkpoint 间隔为1秒,这意味着连续处理引擎将每秒钟记录查询的进度。生成的 checkpoint 采用与 micro-batch 引擎兼容的格式,因此可以使用任何触发器重新启动任何查询。例如,使用 micro-batch 模式启动的受支持查询可以在连续模式中重新启动,反之亦然。注意,无论何时切换到连续模式,都至少会得到一次 fault-tolerance 保证。

支持查询

从Spark 2.4开始,在连续处理模式中只支持以下类型的查询。

  • Operations: 在连续模式下只支持 map-like 的 Dataset/DataFrame 操作,即只支持projection(selectmapflatMapmapPartition 等 )和 selection (wherefilter 等)。

    • 除了聚合函数(因为聚合还不受支持), current_timestamp()current_date()(使用时间的确定性计算很有挑战性)之外 所有的 SQL 函数都支持。
  • Source :

    • Kafka source:所有选项都支持。
    • Rate source : 适合测试。只有 numPartitionsrowsPerSecond 在连续模式下受支持
  • Sink:

    • Kafka sink:支持所有选项。

    • Memory sink:很适合调试。

    • Console sink:很适合调试。支持所有选项。请注意,控制台将在连续触发器中指定的每个 checkpoint 间隔打印。

    有关 input sourceoutput sink 的详细信息,请参阅输入源输出接收部分。虽然 Console sink 有利于测试、 end-to-end low-latency 处理可以是最好的观察与Kafka 的 sourcesink,这允许 engine 过程中,并在输入 topic中输入数据可用后的几毫秒内使结果在输出 topic中可用.

警告

  • 持续处理引擎启动多个长时间运行的任务,这些任务不断地从源读取数据、处理数据并不断地向 sink写入数据。查询所需的任务数量取决于查询可以并行地从源读取多少个 partition。因此,在开始连续处理查询之前,你必须确保集群中有足够多的 core 来并行处理所有任务。例如,如果你从一个有10个 partiton的Kafka topic 中读取数据,那么集群必须至少有10 core,以便查询取得进展。
  • 停止一个连续的处理流可能会产生虚假的任务终止警告。这些都可以安全忽略。
  • 目前没有失败任务的自动重试。任何失败都将导致停止查询,并且需要从 checkpoint 手动重新启动查询。

额外信息

进一步的阅读

会谈