Windows是处理无线数据流的核心,它将流分割成有限大小的桶(buckets),并在其上执行各种计算。
窗口化的Flink程序的结构通常如下,有分组流(keyed streams)和无分组流(non-keyed streams)两种。两者的不同之处在于,分组流中调用了keyBy(...)
方法,无分组流中使用windowAll(...)
替代分组流中的window(...)
方法。
Window生命周期#
当属于一个窗口的第一个元素到达时,这个窗口被创建,当时间(event or processing time)经过了它的结束时间戳与用户指定允许延时之后,窗口将被完全移除。同时,Flink确保只对基于时间的窗口执行删除操作,而对于其他类型不做此处理(例:global windows)。举个例子,基于事件时间的窗口策略每5分钟创建一个不重叠窗口,允许1分钟的延时,那么,当时间戳属于12:00-12:05这个区间的第一个元素到达时,Flink将为其创建一个新的窗口,一直到watermark到达12:06这个时间戳时,Flink删除该窗口。
Flink中,每个窗口都有一个触发器(Trigger)和函数(ProcessWindowFunction, ReduceFunction, AggregateFunction or FoldFunction)与之关联。其中,函数中包含了作用于窗口中的元素的计算逻辑,触发器用于说明什么条件下执行窗口的函数计算。触发策略通常类似于“当窗口中的元素个数超过4个时”,或者“当watermark到达窗口结束时间戳时”。触发器还可以决定在窗口生命周期内的任何时间清除窗口中的内容。这种情况下的清除操作只会涉及到窗口中的元素,而不会清除窗口的元数据(window metadata)。也就是说,新的元素任然可以被添加到这个窗口中。
除此之外,你还可以指定一个回收器(Evictor),它能够在触发器被触发后以及函数作用之前或之后从窗口中删除元素。
分组窗口和无分组窗口#
在定义窗口之前,首先需要明确的是我们的数据流是否需要分组。使用keyBy(...)
会将无线流分隔成逻辑上分组的流,反之,则不会分组流数据。
在分组流中,传入事件的任何属性都可以作为分组流的键。由于每个分组流都可以独立于其他流被处理,所以分组流中允许多个任务并行地进行窗口计算。所有引用了同一个键的元素将会被发送到相同的并行任务。
对于无分组的数据流,数据源不会被分隔成多个逻辑流,所有的窗口计算逻辑将会在一个任务中执行。
窗口分配器(Window Assigners)#
确定了窗口是否分组之后,接下来我们需要定义分配器,窗口分配器定义如何将元素分配给窗口。
WindowAssigner负责将传入的元素分配给一个或多个窗口。Flink基于一些常见的应用场景,为我们提供了几个预定义的WindowAssigner,分别是滚动窗口(tumbling windows)、滑动窗口(sliding windows)、会话窗口(session windows)以及全局窗口(global windows)。我们也可以通过继承WindowAssigner类来自定义窗口分配器逻辑。Flink内置的WindowAssigner中,除了global windows,其余几个都是基于时间(processing time or event time)来为窗口分配元素。
基于时间的窗口包含一个start timestamp(大于等于)和一个end timestamp(小于),两者的时间差用于表示窗口大小。同时,我们可以通过Flink提供的TimeWindow来查询开始、结束时间戳,还可以通过maxTimestamp()
方法获取给定窗口允许的最大时间戳。
Tumbling Windows#
滚动窗口分配器会将每个元素分配给一个指定窗口大小的窗口。滚动窗口具有固定的窗口大小,并且窗口之间不会重叠。比如下图展示的是一个设定为5分钟窗口大小的滚动窗口,每五分钟会创建一个新的窗口。
DataStream<T> input = ...;
// tumbling event-time windows
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>);
// tumbling processing-time windows
input
.keyBy(<key selector>)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>);
// daily tumbling event-time windows offset by -8 hours.
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
.<windowed transformation>(<window function>);
如上段代码中最后一个例子展示的那样,tumbling window assigners包含一个可选的offset
参数,我们可以用它来改变窗口的对齐方式。比如,一个没有偏移量的按小时滚动窗口,它创建的时间窗口通常是1:00:00.000 - 1:59:59.999
,2:00:00.000 - 2:59:59.999
,当我们给定一个15分钟的偏移量时,时间窗口将会变成1:15:00.000 - 2:14:59.999
,2:15:00.000 - 3:14:59.999
。在实际应用中,一个比较常见的使用场景是通过offset
将窗口调整到UTC-0以外的时区,比如通过Time.hours(-8)
调整时区到东八区。
Sliding Windows#
滑动窗口分配器同样是将元素分配给固定大小的时间窗口,窗口大小的配置方式与滚动窗口一样,不同之处在于,滑动窗口还有一个额外的slide
参数用于控制窗口滑动的频率。当slide
小于window size
时,滑动窗口便会重叠。这种情况下同一个元素将会被分配给多个窗口。
比如下图这样,设置了一个10分钟大小的滑动窗口,它的滑动参数(slide
)为5分钟。这样的话,每5分钟将会创建一个新的窗口,并且这个窗口中包含了一部分来自上一个窗口的元素。
DataStream<T> input = ...;
// sliding event-time windows
input
.keyBy(<key selector>)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>);
// sliding processing-time windows
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>);
// sliding processing-time windows offset by -8 hours
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
.<windowed transformation>(<window function>);
同样,我们可以通过offset
参数来为窗口设置偏移量。
Session Windows#
会话窗口通过活动会话来对元素进行分组。不同于滚动窗口和滑动窗口,会话窗口不会重叠,也没有固定的开始、结束时间。当一个会话窗口在指定的时间区间内没有接收到新的数据时,这个窗口将会被关闭。会话窗口分配器可以直接配置一个静态常量会话间隔,也可以通过函数来动态指定会话间隔时间。
DataStream<T> input = ...;
// event-time session windows with static gap
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>);
// event-time session windows with dynamic gap
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withDynamicGap((element) -> {
// determine and return session gap
}))
.<windowed transformation>(<window function>);
// processing-time session windows with static gap
input
.keyBy(<key selector>)
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>);
// processing-time session windows with dynamic gap
input
.keyBy(<key selector>)
.window(ProcessingTimeSessionWindows.withDynamicGap((element) -> {
// determine and return session gap
}))
.<windowed transformation>(<window function>);
如上,固定大小的会话间隔可以通过Time.milliseconds(x)
,Time.seconds(x)
,Time.minutes(x)
来指定,动态会话间隔通过实现SessionWindowTimeGapExtractor
接口来指定。
注意:由于会话窗口没有固定的开始结束时间,它的计算方法与滚动窗口、滑动窗口有所不同。在一个会话窗口算子内部会为每一个接收到的元素创建一个新的窗口,如果这些元素之间的时间间隔小于定义的会话窗口间隔,则将阿门合并到一个窗口。为了能够进行窗口合并,我们需要为会话窗口定义一个Tigger
函数和Window Function
函数(例如ReduceFunction, AggregateFunction, or ProcessWindowFunction. FoldFunction不能用于合并)。
Global Windows#
全局窗口分配器会将具有相同key值的所有元素分配在同一个窗口。这种窗口模式下需要我们设置一个自定义的Trigger
,否则将不会执行任何计算,这是因为全局窗口中没有一个可以处理聚合元素的自然末端。
DataStream<T> input = ...;
input
.keyBy(<key selector>)
.window(GlobalWindows.create())
.<windowed transformation>(<window function>);
Window Function#
定义好窗口分配器之后,我们需要指定作用于每个窗口上的计算。这可以通过指定Window Function来实现,一旦系统确定了某个窗口已经准备好进行处理,该函数将会处理窗口中的每个元素。
Window Function通常有这几种:ReduceFunction,AggregateFunction,FoldFunction以及ProcessWindowFunction。其中,前两个函数可以高效执行,因为Flink可以在每个元素到达窗口时增量的聚合这些元素。ProcessWindowFunction持有一个窗口中包含的所有元素的Iterable对象,以及元素所属窗口的附加meta信息。
ProcessWindowFunction
无法高效执行是因为在调用函数之前Flink必须在内部缓存窗口中的所有元素。我们可以将ProcessWindowFunction
和ReduceFunction
,AggregateFunction
, 或者FoldFunction
函数结合来缓解这个问题,从而可以获取窗口元素的聚合数据以及ProcessWindowFunction接收的窗口meta数据。
ReduceFunction#
ReduceFunction用于指明如何组合输入流中的两个元素来生成一个相同类型的输出元素。Flink使用ReduceFunction增量地聚合窗口中的元素。
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.reduce(new ReduceFunction<Tuple2<String, Long>> {
public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {
return new Tuple2<>(v1.f0, v1.f1 + v2.f1);
}
});
AggregateFunction#
AggregateFunction可以称之为广义上的ReduceFunction,它包含三种元素类型:输入类型(IN),累加器类型(ACC)以及输出类型(OUT)。AggregateFunction接口中有一个用于创建初始累加器、合并两个累加器的值到一个累加器以及从累加器中提取输出结果的方法。
/**
* The accumulator is used to keep a running sum and a count. The {@code getResult} method
* computes the average.
*/
private static class AverageAggregate
implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
@Override
public Tuple2<Long, Long> createAccumulator() {
return new Tuple2<>(0L, 0L);
}
@Override
public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
}
@Override
public Double getResult(Tuple2<Long, Long> accumulator) {
return ((double) accumulator.f0) / accumulator.f1;
}
@Override
public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
}
}
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.aggregate(new AverageAggregate());
FoldFunction#
FoldFunction用于指定窗口中的输入元素如何与给定类型的输出元素相结合。对于输入到窗口中的每个元素,递增调用FoldFunction将其与当前输出值合并。
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.fold("", new FoldFunction<Tuple2<String, Long>, String>> {
public String fold(String acc, Tuple2<String, Long> value) {
return acc + value.f1;
}
});
注意:fold()不能用于会话窗口或其他可合并的窗口
ProcessWindowFunction#
从ProcessWindowFunction中可以获取一个包含窗口中所有元素的迭代对象,以及一个用来访问时间和状态信息的Context对象,这使得它比其他窗口函数更加灵活。当然,这也带来了更大的性能开销和资源消耗。
public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> implements Function {
/**
* Evaluates the window and outputs none or several elements.
*
* @param key The key for which this window is evaluated.
* @param context The context in which the window is being evaluated.
* @param elements The elements in the window being evaluated.
* @param out A collector for emitting elements.
*
* @throws Exception The function may throw exceptions to fail the program and trigger recovery.
*/
public abstract void process(
KEY key,
Context context,
Iterable<IN> elements,
Collector<OUT> out) throws Exception;
/**
* The context holding window metadata.
*/
public abstract class Context implements java.io.Serializable {
/**
* Returns the window that is being evaluated.
*/
public abstract W window();
/** Returns the current processing time. */
public abstract long currentProcessingTime();
/** Returns the current event-time watermark. */
public abstract long currentWatermark();
/**
* State accessor for per-key and per-window state.
*
* <p><b>NOTE:</b>If you use per-window state you have to ensure that you clean it up
* by implementing {@link ProcessWindowFunction#clear(Context)}.
*/
public abstract KeyedStateStore windowState();
/**
* State accessor for per-key global state.
*/
public abstract KeyedStateStore globalState();
}
}
其中的key参数是通过keyBy()
中指定的KeySelector
来获取的键值。对于元组(tuple)索引的key或是字符串字段引用的key,这里的KEY参数类型都是元组类型,我们需要手动将其转换为正确大小的元组,以便于从中提取key值。
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(t -> t.f0)
.timeWindow(Time.minutes(5))
.process(new MyProcessWindowFunction());
/* ... */
public class MyProcessWindowFunction
extends ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow> {
@Override
public void process(String key, Context context, Iterable<Tuple2<String, Long>> input, Collector<String> out) {
long count = 0;
for (Tuple2<String, Long> in: input) {
count++;
}
out.collect("Window: " + context.window() + "count: " + count);
}
}
ProcessWindowFunction with Incremental Aggregation#
正如前文中提到的,我们可以将ReduceFunction、AggregateFunction或者FoldFunction与ProcessWindowFunction结合起来使用,这样不但可以增量地执行窗口计算,还可以获取ProcessWindowFunction为我们提供的一些额外的窗口meta信息。
Incremental Window Aggregation with ReduceFunction
下面这个例子说明了如何将二者结合起来,以返回窗口中的最小事件和窗口的开始时间
DataStream<SensorReading> input = ...;
input
.keyBy(<key selector>)
.timeWindow(<duration>)
.reduce(new MyReduceFunction(), new MyProcessWindowFunction());
// Function definitions
private static class MyReduceFunction implements ReduceFunction<SensorReading> {
public SensorReading reduce(SensorReading r1, SensorReading r2) {
return r1.value() > r2.value() ? r2 : r1;
}
}
private static class MyProcessWindowFunction
extends ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> {
public void process(String key,
Context context,
Iterable<SensorReading> minReadings,
Collector<Tuple2<Long, SensorReading>> out) {
SensorReading min = minReadings.iterator().next();
out.collect(new Tuple2<Long, SensorReading>(context.window().getStart(), min));
}
}
Incremental Window Aggregation with AggregateFunction
示例:计算元素平均值,同时输出key值与均值。
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.timeWindow(<duration>)
.aggregate(new AverageAggregate(), new MyProcessWindowFunction());
// Function definitions
/**
* The accumulator is used to keep a running sum and a count. The {@code getResult} method
* computes the average.
*/
private static class AverageAggregate
implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
@Override
public Tuple2<Long, Long> createAccumulator() {
return new Tuple2<>(0L, 0L);
}
@Override
public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
}
@Override
public Double getResult(Tuple2<Long, Long> accumulator) {
return ((double) accumulator.f0) / accumulator.f1;
}
@Override
public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
}
}
private static class MyProcessWindowFunction
extends ProcessWindowFunction<Double, Tuple2<String, Double>, String, TimeWindow> {
public void process(String key,
Context context,
Iterable<Double> averages,
Collector<Tuple2<String, Double>> out) {
Double average = averages.iterator().next();
out.collect(new Tuple2<>(key, average));
}
}
Incremental Window Aggregation with FoldFunction
示例:返回窗口中的事件数量,同时返回key值和窗口结束时间。
DataStream<SensorReading> input = ...;
input
.keyBy(<key selector>)
.timeWindow(<duration>)
.fold(new Tuple3<String, Long, Integer>("",0L, 0), new MyFoldFunction(), new MyProcessWindowFunction())
// Function definitions
private static class MyFoldFunction
implements FoldFunction<SensorReading, Tuple3<String, Long, Integer> > {
public Tuple3<String, Long, Integer> fold(Tuple3<String, Long, Integer> acc, SensorReading s) {
Integer cur = acc.getField(2);
acc.setField(cur + 1, 2);
return acc;
}
}
private static class MyProcessWindowFunction
extends ProcessWindowFunction<Tuple3<String, Long, Integer>, Tuple3<String, Long, Integer>, String, TimeWindow> {
public void process(String key,
Context context,
Iterable<Tuple3<String, Long, Integer>> counts,
Collector<Tuple3<String, Long, Integer>> out) {
Integer count = counts.iterator().next().getField(2);
out.collect(new Tuple3<String, Long, Integer>(key, context.window().getEnd(),count));
}
}
Triggers#
Trigger用于决定窗口什么时候被window function处理。Flink中每个WindowAssigner都有一个默认的Trigger。我们也可以通过trigger(...)
函数来自定义触发规则。
Trigger接口包含以下5个方法:
-
The
onElement()
method is called for each element that is added to a window. -
The
onEventTime()
method is called when a registered event-time timer fires. -
The
onProcessingTime()
method is called when a registered processing-time timer fires. -
The
onMerge()
method is relevant for stateful triggers and merges the states of two triggers when their corresponding windows merge,_e.g._when using session windows. -
Finally the
clear()
method performs any action needed upon removal of the corresponding window.
Evictors#
Flink窗口模式允许我们指定一个WindowAssigner和Trigger之外的可选的Evictor。Evictor可以在触发器启动之后、窗口函数作用之前或之后移出窗口中的元素。
/**
* Optionally evicts elements. Called before windowing function.
*
* @param elements The elements currently in the pane.
* @param size The current number of elements in the pane.
* @param window The {@link Window}
* @param evictorContext The context for the Evictor
*/
void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
/**
* Optionally evicts elements. Called after windowing function.
*
* @param elements The elements currently in the pane.
* @param size The current number of elements in the pane.
* @param window The {@link Window}
* @param evictorContext The context for the Evictor
*/
void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
Flink为我们提供了三个预定义的evictors:
-
CountEvictor
: 保留窗口中用户指定数量的元素,从窗口缓冲区开始部分删除其他元素。 -
DeltaEvictor
: 获取一个DeltaFunction函数和阈值,计算窗口缓冲区中其余元素与最后一个元素的Delta值,然后将Delta值大于等于阈值的元素移除。 -
TimeEvictor
: 持有一个毫秒级的interval
参数,对于一个给定窗口,找到元素中的最大时间戳max_ts,然后删除那些时间戳小于max_ts - interval值的元素。
所有预定义的Evictor均会在窗口函数作用之前执行。
Allowed Lateness#
当使用事件时间窗口时,可能会出现元素延迟到达的情况。例如,Flink用于跟踪单事件时间进程的watermark已经越过了元素所属窗口的结束时间。
默认情况下,当watermark越过了窗口结束时间时,延迟到达的元素将会被丢弃。但是,Flink允许我们指定一个窗口的最大延迟时间,允许元素在被删除前(watermark到达结束时间时)可以延迟多长时间,它的默认值为0。根据所用触发器的不同,延迟到达但未废弃的元素可能会导致窗口的再次触发,使用EventTimeTrigger
会有这种情况。
DataStream<T> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.allowedLateness(<time>)
.<windowed transformation>(<window function>);
Side Output#
Flink的side output可以让我们获得一个废弃元素的数据流。如下,通过设置窗口的sideOutputLateData(OutputTag)
可以获取旁路输出流。
final OutputTag<T> lateOutputTag = new OutputTag<T>("late-data"){};
DataStream<T> input = ...;
SingleOutputStreamOperator<T> result = input
.keyBy(<key selector>)
.window(<window assigner>)
.allowedLateness(<time>)
.sideOutputLateData(lateOutputTag)
.<windowed transformation>(<window function>);
DataStream<T> lateStream = result.getSideOutput(lateOutputTag);
阅读原文:一文搞懂Flink Window机制