Flink四大基石之窗口(Window)使用详解

目录

一、引言

二、为什么需要 Window

三、Window 的控制属性

窗口的长度(大小)

窗口的间隔

四、Flink 窗口应用代码结构

是否分组

Keyed Window --键控窗

Non-Keyed Window

核心操作流程

五、Window 的生命周期

分配阶段

触发计算

六、Window 的分类

滚动窗口- TumblingWindow概念

滑动窗口– SlidingWindow概念

会话窗口 [了解]

七、Windows Function 窗口函数

分类剖析

增量聚合函数(以 AggregateFunction 为例)

全量聚合函数

八、案例实战

案例一

滚动窗口演示

滑动窗口演示

热词统计案例

kafka发送消息的模板代码

九、总结

本文深入探讨 Flink 中高级 API 里窗口(Window)的相关知识,涵盖为什么需要窗口、其控制属性、应用代码结构、生命周期、分类,以及窗口函数的各类细节,并辅以实例进行讲解,旨在助力开发者透彻理解并熟练运用 Flink 的窗口机制处理流数据。

一、引言

在大数据实时处理领域,Apache Flink 凭借其卓越性能与丰富功能占据重要地位。而窗口(Window)作为 Flink 从流处理(Streaming)到批处理(Batch)的关键桥梁,理解与掌握其使用对高效数据处理意义非凡,接下来将全方位剖析其奥秘。

二、为什么需要 Window

在流处理场景中,数据如潺潺溪流般持续涌入、无休无止。但诸多业务场景要求我们对特定时段数据做聚合操作,像统计 “过去的 1 分钟内有多少用户点击了我们的网页”。若不划定范围,面对无尽数据洪流,根本无法开展有针对性计算。窗口恰似神奇 “箩筐”,按规则收集一定时长或一定数据量数据,将无限流拆分成有限 “桶”,便于精准计算,满足如 “每隔 10min,计算最近 24h 的热搜词” 这类实时需求。

三、Window 的控制属性

窗口的长度(大小)

明确要计算最近多久的数据,以时间维度举例,若关注 24 小时内热搜词数据量,那 24 小时即窗口长度;计数维度下,设定统计前 N 条数据,N 就是计数窗口的长度规格。

窗口的间隔

决定隔多久进行一次计算操作。像 “每隔 10min,计算最近 24h 的热搜词” 里,每隔 10 分钟便是间隔设定,它把控着计算频次节奏。

四、Flink 窗口应用代码结构

是否分组

首先要判定是否依 Key 对 DataStream 分组,经 keyBy 操作后,数据流成多组,下游算子多实例可并行跑,提效显著;若用 windowAll 则不分组,所有数据送下游单个实例(并行度为 1),后续窗口操作逻辑与分组情形(Keyed Window)类似,仅执行主体有别。

Keyed Window --键控窗

// Keyed Window

stream

.keyBy(...) <- 按照一个Key进行分组

.window(...) <- 将数据流中的元素分配到相应的窗口中

[.trigger(...)] <- 指定触发器Trigger(可选)

[.evictor(...)] <- 指定清除器Evictor(可选)

.reduce/aggregate/process/apply() <- 窗口处理函数Window Function

Non-Keyed Window

// Non-Keyed Window

stream

.windowAll(...) <- 不分组,将数据流中的所有元素分配到相应的窗口中

[.trigger(...)] <- 指定触发器Trigger(可选)

[.evictor(...)] <- 指定清除器Evictor(可选)

.reduce/aggregate/process() <- 窗口处理函数Window Function

核心操作流程

借助窗口分配器(WindowAssigner)依时间(Event Time 或 Processing Time)把数据流元素 “分拣” 进对应窗口;待满足触发条件(常是窗口结束时间到等情况),用窗口处理函数(如 reduce、aggregate、process 等常用函数)处理窗口内数据,此外,trigger、evictor 是面向高级自定义需求的触发、销毁附加项,默认配置也能应对常见场景。

五、Window 的生命周期

分配阶段

窗口分配器依据设定规则(像按时间间隔、计数规则等),为流入数据 “找家”,安置到合适窗口 “桶” 内,确定数据归属,构建基础计算单元。

触发计算

当预设触发条件达成,如时间窗口到结束点,对应窗口函数 “登场”,对窗口内数据按既定逻辑聚合处理,不同窗口函数(reduce、aggregate、process)处理细节、能力有差异,像 process 更底层、功能更强大,自带 open/close 生命周期方法且能获取 RuntimeContext。

上图是窗口的生命周期示意图,假如我们设置的是一个10分钟的滚动窗口,第一个窗口的起始时间是0:00,结束时间是0:10,后面以此类推。当数据流中的元素流入后,窗口分配器会根据时间(Event Time或Processing Time)分配给相应的窗口。相应窗口满足了触发条件,比如已经到了窗口的结束时间,会触发相应的Window Function进行计算。注意,本图只是一个大致示意图,不同的Window Function的处理方式略有不同。

从数据类型上来看,一个DataStream经过keyBy转换成KeyedStream,再经过window转换成WindowedStream,我们要在之上进行reduce、aggregate或process等Window Function,对数据进行必要的聚合操作。

六、Window 的分类

Window可以分成两类:

CountWindow按指定数据条数生成窗口,与时间脱钩。

滚动计数窗口:每隔 N 条数据,聚焦统计前 N 条,如每来 10 条统计前 10 条信息。

滑动计数窗口:每隔 N 条数据,统计前 M 条(N≠M),像每过 20 条统计前 15 条情况。

TimeWindow(重点):基于时间划定窗口。

滚动时间窗口:每隔 N 时间,统计前 N 时间范围数据,如每隔 5 分钟统计前 5 分钟车辆通过量,窗口长度与滑动距离均为 5 分钟。

滑动时间窗口:每隔 N 时间,统计前 M 时间范围数据(M≠N),像每隔 30 秒统计前 1 分钟车辆数据,窗口长度 1 分钟、滑动距离 30 秒。

会话窗口:设会话超时时间(如 10 分钟),期间无数据来则结算上一窗口数据,按毫秒精细界定范围,与 Key 值关联紧密,Key 值无新输入达设定时长就统计,不受全局新数据流入干扰。

滚动窗口- TumblingWindow概念

流是连续的,无界的(有明确的开始,无明确的结束)

假设有个红绿灯,提出个问题:计算一下通过这个路口的汽车数量

对于这个问题,肯定是无法回答的,为何?

因为,统计是一种对固定数据进行计算的动作。

因为流的数据是源源不断的,无法满足固定数据的要求(因为不知道何时结束)

那么,我们换个问题:统计1分钟内通过的汽车数量

那么,对于这个问题,我们就可以解答了。因为这个问题确定了数据的边界,从无界的流数据中,取出了一部分有边界的数据子集合进行计算。

描述完整就是:每隔1分钟,统计这1分钟内通过汽车的数量。窗口长度是1分钟,时间间隔是1分钟,所以这样的窗口就是滚动窗口。

那么,这个行为或者说这个统计的数据边界,就称之为窗口。

同时,我们的问题,是以时间来划分被处理的数据边界的,那么按照时间划分边界的就称之为:时间窗口

反之,如果换个问题,统计100辆通过的车里面有多少宝马品牌,那么这个边界的划分就是按照数量的,这样的称之为:计数窗口

同时,这样的窗口被称之为滚动窗口,按照窗口划分依据分为:滚动时间窗口、滚动计数窗口。

滑动窗口– SlidingWindow概念

同样是需求,改为:

每隔1分钟,统计前面2分钟内通过的车辆数

对于这个需求我们可以看出,窗口长度是2分钟,每隔1分钟统计一次,窗口长度和时间间隔不相等,并且是大于关系,就是滑动窗口

或者:每通过100辆车,统计前面通过的50辆车的品牌占比

对于这个需求可以看出,窗口长度是50辆车,但是每隔100辆车统计一次

对于这样的窗口,我们称之为滑动窗口。

那么在这里面,统计多少数据是窗口长度(如统计2分钟内的数据,统计50辆车中的数据)

隔多久统计一次称之为滑动距离(如,每隔1分钟,每隔100辆车)

那么可以看出,滑动窗口,就是滑动距离不等于窗口长度的一种窗口

比如,每隔1分钟 统计先前5分钟的数据,窗口长度5分钟,滑动距离1分钟,不相等

比如,每隔100条数据,统计先前50条数据,窗口长度50条,滑动距离100条,不相等

那如果相等呢?相等就是比如:每隔1分钟统计前面1分钟的数据,窗口长度1分钟,滑动距离1分钟,相等。

对于这样的需求可以简化成:每隔1分钟统计一次数据,这就是前面说的滚动窗口

那么,我们可以看出:

滚动窗口:窗口长度= 滑动距离

滑动窗口:窗口长度!= 滑动距离

总结:其中可以发现,对于滑动窗口:

滑动距离> 窗口长度,会漏掉数据,比如:每隔5分钟,统计前面1分钟的数据(滑动距离5分钟,窗口长度1分钟,漏掉4分钟的数据)这样的东西,没人用。

滑动距离< 窗口长度,会重复处理数据,比如:每隔1分钟,统计前面5分钟的数据(滑动距离1分钟,窗口长度5分钟,重复处理4分钟的数据)

滑动距离= 窗口长度,不漏也不会重复,也就是滚动窗口

窗口的长度(大小) > 窗口的间隔 : 如每隔5s, 计算最近10s的数据 【滑动窗口】

窗口的长度(大小) = 窗口的间隔: 如每隔10s,计算最近10s的数据 【滚动窗口】

窗口的长度(大小) < 窗口的间隔: 每隔15s,计算最近10s的数据 【没有名字,不用】

会话窗口 [了解]

Session 会话,一次会话。就是谈话。

设置一个会话超时时间间隔即可, 如10分钟,那么表示:

如果10分钟没有数据到来, 就计算上一个窗口的数据

代码中,并行度设置为1,测试比较 方便。

窗口的范围:

窗口的判断是按照毫秒为单位

如果窗口长度是5秒

窗口的开始: start

窗口的结束: start + 窗口长度 -1 毫秒

比如窗口长度是5秒, 从0开始

那么窗口结束是: 0 + 5000 -1 = 4999

七、Windows Function 窗口函数

分类剖析

全量函数:耐心缓存窗口所有元素,直至触发条件成熟,才对全量数据 “开刀” 计算,此特性可满足数据排序等复杂需求。

增量函数:保存中间数据 “蓝本”,新元素流入就与之融合更新,持续迭代中间成果,高效且灵活。

增量聚合函数(以 AggregateFunction 为例)

每有新数据 “入局”,立马按规则计算,其接口含输入类型(IN)、累加器类型(ACC)、输出类型(OUT)参数,有对应 add、createAccumulator、merge、extractOutput 等方法,构建严谨聚合流程。

实现方法(常见的增量聚合函数如下): reduce(reduceFunction) aggregate(aggregateFunction) sum() min() max()

reduce接受两个相同类型的输入,生成一个同类型输出,所以泛型就一个 maxBy、minBy、sum这3个底层都是由reduce实现的 aggregate的输入值、中间结果值、输出值它们3个类型可以各不相同,泛型有

AggregateFunction 【了解】

AggregateFunction 比 ReduceFunction 更加的通用,它有三个参数:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)。

输入类型是输入流中的元素类型,AggregateFunction有一个add方法可以将一个输入元素添加到一个累加器中。该接口还具有创建初始累加器(createAccumulator方法)、将两个累加器合并到一个累加器(merge方法)以及从累加器中提取输出(类型为OUT)的方法。

package com.bigdata.windows;

import org.apache.flink.api.common.RuntimeExecutionMode;

import org.apache.flink.api.common.functions.AggregateFunction;

import org.apache.flink.api.java.functions.KeySelector;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.api.java.tuple.Tuple3;

import org.apache.flink.streaming.api.datastream.DataStreamSource;

import org.apache.flink.streaming.api.datastream.KeyedStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class _04_AggDemo {

public static final Tuple3[] ENGLISH = new Tuple3[] {

Tuple3.of("class1", "张三", 100L),

Tuple3.of("class1", "李四", 40L),

Tuple3.of("class1", "王五", 60L),

Tuple3.of("class2", "赵六", 20L),

Tuple3.of("class2", "小七", 30L),

Tuple3.of("class2", "小八", 50L)

};

public static void main(String[] args) throws Exception {

//1. env-准备环境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

env.setParallelism(1);

//2. source-加载数据

DataStreamSource> dataStreamSource = env.fromElements(ENGLISH);

KeyedStream, String> keyedStream = dataStreamSource.keyBy(new KeySelector, String>() {

@Override

public String getKey(Tuple3 tuple3) throws Exception {

return tuple3.f0;

}

});

//3. transformation-数据处理转换

// 三个参数:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)

keyedStream.countWindow(3).aggregate(new AggregateFunction, Tuple3, Tuple2>() {

// 初始化一个中间变量

Tuple3 tuple3 = Tuple3.of(null,0L,0);

@Override

public Tuple3 createAccumulator() {

return tuple3;

}

@Override

public Tuple3 add(Tuple3 value, Tuple3 accumulator) {

long tempScore = value.f2 + accumulator.f1;

int length = accumulator.f2 + 1;

return Tuple3.of(value.f0, tempScore,length);

}

@Override

public Tuple2 getResult( Tuple3 accumulator) {

return Tuple2.of(accumulator.f0,(double) accumulator.f1 / accumulator.f2);

}

@Override

public Tuple3 merge(Tuple3 a, Tuple3 b) {

return Tuple3.of(a.f0,a.f1+b.f1,a.f2+b.f2);

}

}).print();

//4. sink-数据输出

//5. execute-执行

env.execute();

}

}

全量聚合函数

坚守等窗口数据集齐 “发令枪响” 才运算原则,确保计算基于完整数据集,保障结果准确性、完整性,契合多场景聚合诉求。

实现方法 apply(windowFunction) process(processWindowFunction)

全量聚合: 窗口需要维护全部原始数据,窗口触发进行全量聚合。 ProcessWindowFunction一次性迭代整个窗口里的所有元素,比较重要的一个对象是Context,可以获取到事件和状态信息,这样我们就可以实现更加灵活的控制,该算子会浪费很多性能,主要原因是不增量计算,要缓存整个窗口然后再去处理,所以要设计好内存。

package com.bigdata.day04;

import org.apache.flink.api.common.RuntimeExecutionMode;

import org.apache.flink.api.common.functions.AggregateFunction;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.api.java.tuple.Tuple3;

import org.apache.flink.streaming.api.datastream.DataStreamSource;

import org.apache.flink.streaming.api.datastream.KeyedStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.functions.windowing.WindowFunction;

import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;

import org.apache.flink.util.Collector;

public class Demo03 {

public static void main(String[] args) throws Exception {

//1. env-准备环境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

//2. source-加载数据

Tuple3[] ENGLISH = new Tuple3[] {

Tuple3.of("class1", "张三", 100L),

Tuple3.of("class1", "李四", 40L),

Tuple3.of("class1", "王五", 60L),

Tuple3.of("class2", "赵六", 20L),

Tuple3.of("class2", "小七", 30L),

Tuple3.of("class2", "小八", 50L)

};

// 先求每个班级的总分数,再求每个班级的总人数

DataStreamSource> streamSource = env.fromElements(ENGLISH);

KeyedStream, String> keyedStream = streamSource.keyBy(v -> v.f0);

// 每个分区中的数据都达到了3条才能触发,哪个分区达到了三条,哪个就触发,不够的不计算

// //Tuple3 输入类型

// //Tuple2 累加器ACC类型,保存中间状态 第一个值代表总成绩,第二个值代表总人数

// //Double 输出类型

// 第一个泛型是输入数据的类型,第二个泛型是返回值类型 第三个是key 的类型, 第四个是窗口对象

keyedStream.countWindow(3).apply(new WindowFunction, Double, String, GlobalWindow>() {

@Override

public void apply(String s, GlobalWindow window, Iterable> input, Collector out) throws Exception {

// 计算总成绩,计算总人数

int sumScore = 0,sumPerson=0;

for (Tuple3 tuple3 : input) {

sumScore += tuple3.f2;

sumPerson += 1;

}

out.collect((double)sumScore/sumPerson);

}

}).print();

//5. execute-执行

env.execute();

}

}

八、案例实战

案例一

需求为 “每 5 秒钟统计一次,最近 5 秒钟内,各个路口通过红绿灯汽车的数量”,借 Flink 代码实现,底层算法作用下,数据按节奏聚合统计,时间设 1 分钟更易观察效果,能清晰看到各时段车辆数统计产出。

nc -lk 9999 有如下数据表示: 信号灯编号和通过该信号灯的车的数量 9,3 9,2 9,7 4,9 2,6 1,5 2,3 5,7 5,4

没有添加窗口的写法:

package com.bigdata.day03.time;

import org.apache.flink.api.common.RuntimeExecutionMode;

import org.apache.flink.api.common.functions.MapFunction;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.streaming.api.datastream.DataStreamSource;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Demo07 {

public static void main(String[] args) throws Exception {

//1. env-准备环境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

//2. source-加载数据

DataStreamSource streamSource = env.socketTextStream("localhost", 9999);

// 9,2 --> (9,2)

//3. transformation-数据处理转换

streamSource.map(new MapFunction>() {

@Override

public Tuple2 map(String line) throws Exception {

String[] arr = line.split(",");

int monitor_id = Integer.valueOf(arr[0]);

int num = Integer.valueOf(arr[1]);

return Tuple2.of(monitor_id,num);

}

}).keyBy(tuple->tuple.f0).sum(1).print();

//4. sink-数据输出

//5. execute-执行

env.execute();

}

}

此处的sum求和,中count ,其实是CartInfo中的一个字段而已。

演示:

滚动窗口演示

package com.bigdata.day03.time;

import org.apache.flink.api.common.RuntimeExecutionMode;

import org.apache.flink.api.common.functions.MapFunction;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.streaming.api.datastream.DataStreamSource;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;

import org.apache.flink.streaming.api.windowing.time.Time;

public class Demo08 {

public static void main(String[] args) throws Exception {

//1. env-准备环境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

//2. source-加载数据

DataStreamSource streamSource = env.socketTextStream("localhost", 9999);

// 9,2 --> (9,2)

//3. transformation-数据处理转换

streamSource.map(new MapFunction>() {

@Override

public Tuple2 map(String line) throws Exception {

String[] arr = line.split(",");

int monitor_id = Integer.valueOf(arr[0]);

int num = Integer.valueOf(arr[1]);

return Tuple2.of(monitor_id,num);

}

}).keyBy(tuple->tuple.f0)

.window(TumblingProcessingTimeWindows.of(Time.minutes(1)))

.sum(1).print();

//4. sink-数据输出

//5. execute-执行

env.execute();

}

}

以上代码的时间最好修改为1分钟,假如时间间隔是1分钟,那么48分03秒时输入的信号灯数据,49分整点会统计出来结果,原因是底层有一个算法。

滑动窗口的话,不太容易看到效果,因为有些数据被算到了多个窗口中,需要我们拿笔自己计算一下,对比一下:

滑动窗口演示

同样统计各路口汽车数量,但需求改为 “每 5 秒钟统计一次,最近 10 秒钟内”,因数据会在多窗口重复计算,需手动比对梳理,深入体会滑动窗口数据处理逻辑与特点。

package com.bigdata.day03.time;

import org.apache.flink.api.common.RuntimeExecutionMode;

import org.apache.flink.api.common.functions.MapFunction;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.streaming.api.datastream.DataStreamSource;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;

import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;

import org.apache.flink.streaming.api.windowing.time.Time;

public class Demo09 {

public static void main(String[] args) throws Exception {

//1. env-准备环境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

//2. source-加载数据

DataStreamSource streamSource = env.socketTextStream("localhost", 9999);

// 9,2 --> (9,2)

//3. transformation-数据处理转换

streamSource.map(new MapFunction>() {

@Override

public Tuple2 map(String line) throws Exception {

String[] arr = line.split(",");

int monitor_id = Integer.valueOf(arr[0]);

int num = Integer.valueOf(arr[1]);

return Tuple2.of(monitor_id,num);

}

}).keyBy(tuple->tuple.f0)

.window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5)))

.sum(1).print();

//4. sink-数据输出

//5. execute-执行

env.execute();

}

}

热词统计案例

借助 Kafka 随机发送 50000 个热词(200 毫秒间隔),分别基于滚动、滑动窗口统计,编写 Flink 代码时着重体会 apply 方法,兼顾二者效果差异,同时知晓工作中 process 函数因更强大底层能力常成首选。

apply和process都是处理全量计算,但工作中正常用process。

process更加底层,更加强大,有open/close生命周期方法,又可获取RuntimeContext。

package com.bigdata.day03.time;

import org.apache.flink.api.common.RuntimeExecutionMode;

import org.apache.flink.api.common.functions.MapFunction;

import org.apache.flink.api.common.serialization.SimpleStringSchema;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.streaming.api.datastream.DataStreamSource;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.functions.windowing.WindowFunction;

import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;

import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;

import org.apache.flink.streaming.api.windowing.time.Time;

import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import org.apache.flink.util.Collector;

import java.text.SimpleDateFormat;

import java.util.Date;

import java.util.Properties;

public class Demo10 {

public static void main(String[] args) throws Exception {

//1. env-准备环境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

//2. source-加载数据

Properties properties = new Properties();

properties.setProperty("bootstrap.servers","bigdata01:9092");

properties.setProperty("group.id","g2");

//2. source-加载数据

FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer("flink-01",new SimpleStringSchema(),properties);

DataStreamSource kafkaSource = env.addSource(kafkaConsumer);

//3. transformation-数据处理转换

kafkaSource.map(new MapFunction>() {

@Override

public Tuple2 map(String value) throws Exception {

return Tuple2.of(value,1);

}

}).keyBy(tuple->tuple.f0)

//.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))

.window(SlidingProcessingTimeWindows.of(Time.seconds(5),Time.seconds(2)))

// 第一个泛型是输入数据的类型,第二个泛型是返回值类型 第三个是key 的类型, 第四个是窗口对象

.apply(new WindowFunction, String, String, TimeWindow>() {

@Override

public void apply(String key, // 代表分组key值 五旬老太守国门

TimeWindow window, // 代表窗口对象

Iterable> input, // 分组过之后的数据 [1,1,1,1,1]

Collector out // 用于输出的对象

) throws Exception {

SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

long start = window.getStart();

long end = window.getEnd();

int sum = 0;

for (Tuple2 tuple2 : input) {

sum += tuple2.f1;

}

out.collect(key+",窗口开始:"+dateFormat.format(new Date(start))+",结束时间:"+dateFormat.format(new Date(end))+","+sum);

//out.collect(key+",窗口开始:"+start+",结束时间:"+end+","+sum);

}

}).print();

//4. sink-数据输出

//5. execute-执行

env.execute();

}

}

kafka发送消息的模板代码

package com.bigdata.day03.time;

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.ProducerConfig;

import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

import java.util.Random;

public class CustomProducer {

public static void main(String[] args) {

// Properties 它是map的一种

Properties properties = new Properties();

// 设置连接kafka集群的ip和端口

properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata01:9092");

properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,

"org.apache.kafka.common.serialization.StringSerializer");

properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,

"org.apache.kafka.common.serialization.StringSerializer");

// 创建了一个消息生产者对象

KafkaProducer kafkaProducer = new KafkaProducer(properties);

// 调用这个里面的send方法

String[] hotWords= new String[]{"郭有才","歌手2024","五旬老太守国门","师夷长技以制夷"};

Random random = new Random();

for (int i = 0; i < 50000; i++) {

String word = hotWords[random.nextInt(4)];

ProducerRecord producerRecord = new ProducerRecord("flink-01",word);

kafkaProducer.send(producerRecord);

}

kafkaProducer.close();

}

}

九、总结

Flink 窗口机制犹如精密仪器,从控制属性、分类设计到函数运用,各环节紧密相扣。深入理解其原理、熟练实操代码,能为实时流数据处理注入强大动力,解锁更多高效、智能数据聚合分析场景,助力开发者在大数据浪潮中稳立潮头、驾驭数据。后续可深入探索自定义窗口逻辑、优化性能调优等进阶方向,深挖 Flink 窗口潜力。