admin管理员组文章数量:1794759
Flink
文章目录
- 前言
- 一、概述
- 1 Flink是什么
- 2 架构分层
- 3 数据处理流水线
- 4 运行组件
- TaskManager
- JobManager
- ResourceManager
- Dispatcher
- 5 其他流式计算框架
- 二、入门与使用
- 1 Flink基本安装
- 1.1 Linux
- 1.2 Java
- 1.3 Scala(待补充)
- 1.4 集群模式
- 2 常用API
- 2.1 DataStream 流处理
- DataSource
- Transformation
- Sink
- 示例一:自定义数据源(SourceFunction)
- 示例二:自定义分区
- 示例三:Socket通信示例
- 示例四:RabbitMQ作为数据源
- 示例五:自定义Sink
- 2.2 DataSet 批处理
- 2.3 Table API / SQL(待补充)
- 2.4 关于序列化
- 三、进阶使用
- 1 Flink中对于变量的高级用法
- Broadcast
- Accumulator
- 分布式缓存
- 总结
- 2 状态管理与恢复(待补充)
- 3 窗口(待补充)
- 窗口类型
- 窗口函数
- 参考文章
- 4 时间(待补充)
- 固定乱序长度策略
- 单调递增时间戳策略
- 不生成水印策略
- 关于水印延迟/窗口允许延迟
- 5 并行度
- 四、原理解析(待补充)
- 总结
目前本人是Java开发工程师,所以里面大部分的学习笔记都是以Java代码为主,Scala后面我再学所以后续再进行补充。
《Flink入门与实战》 - 徐葳 |
/ |
Apache Flink,内部是用Java及Scala编写的分布式流数据计算引擎,可以支持以批处理或流处理的方式处理数据,在2014年这个项目被Apache孵化器所接受后,Flink迅速成为ASF(ApacheSoftware Foundation)的顶级项目之一,在2019年1月,阿里巴巴集团收购了Flink创始公司(DataArtisans),打造了阿里云商业化的实时计算Flink产品。
它有如下几个特点
什么是有界数据/无界数据?
- 有界数据:数据是有限的,一条SELECT查询下的数据不会是源源不断的
- 无界数据:数据源源不断,不知道为什么时候结束,例如监控下的告警
Deploy 部署方式 | 本地/集群/云服务部署。 |
Core 分布式流处理模型 | 计算核心实现,为API层提供基础服务。 |
API 调用接口 | 提供面向无界数据的流处理API及有界数据的批处理API,其中流处理对应DataStream API,批处理对应DataSet API。 |
Library 应用层 | 提供应用计算框架,面向流处理支持CEP(复杂事件处理)、基于SQL-like的操作(基于Table的关系操作),面向批处理支持FlinkML(机器学习库)、Gelly(图处理)、Table 操作。 |
一个Flink任务 = DataSource + Transformation + DataSink
DataSource :数据源
Transformation :数据处理
DataSink:计算结果输出
而Flink在网络传输中通过缓存块承载数据,可以通过设置缓存块的超时时间,变相的决定了数据在网络中的处理方式。
4 运行组件Flink-运行时架构中的四大组件-SmallScorpion的CSDN博客 |
Flink运行时主要有四个大组件
下面来聊聊关于这四个组件的作用
TaskManager 工作进程,通常在一个Flink节点内会有多个TaskManager运行,而在每个TaskManager中又包含了多个插槽(slots),插槽的数量代表了TaskManager能够执行的任务数量。
进程启动后,TaskManager会向ResourceManager(资源管理器)注册自己的插槽,JobManager通过从ResourceManager请求到的插槽信,来分配任务执行。
JobManager控制一个应用程序执行的主进程,一个应用程序只会对应一个JobManager。
一个应用程序包括:
- 作业图 - JobGraph
- 逻辑数据流图 - logical dataflow graph
- 含有打包完的所需资源的Jar包
大致的流程是这样
ResourceManager 负责管理TaskManager的slot(插槽),插槽指处理资源单元,当JobManager申请插槽资源时,ResourceManager会把目前已经注册上来的空闲的插槽信分配给JobManager。
如果ResourceManager没有足够的插槽来满足JobManager的请求,它还可以向资源提供平台发起会话,以提供启动TaskManager进程的容器。
DispatcherFlink介绍、特点及和与其他大数据框架对比_zhangxm_qz的CSDN博客 |
前置描述:xxxxxxxxxxxxx
1.1 LinuxIndex of /dist/flink/flink-1.14.3 (apache) |
首先去apache官网下载部署的软件包,下载完成之后进行解压
## 解压 tar -zxvf flink-1.14.3-bin-scala_2.12.tgz ## 进入bin目录 启动 ./start-cluster.sh ## Flink提供的WebUI的端口是8081 此时可以去看看是否启动完成 netstat -anp |grep 8081接着通过页面访问8081端口来个初体验
关于Linux下的Flink Shell终端的使用
flink~使用shell终端_cai_and_luo的博客-CSDN博客 |
Flink入门之Flink程序开发步骤(java语言)_胖虎儿的博客-CSDN博客 |
导入依赖
<!-- mvnrepository/artifact/org.apache.flink/flink-java --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.14.3</version> </dependency> <!-- mvnrepository/artifact/org.apache.flink/flink-streaming-java --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>1.14.3</version> </dependency> <!-- mvnrepository/artifact/org.apache.flink/flink-clients --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.12</artifactId> <version>1.14.3</version> </dependency> <!-- mvnrepository/artifact/org.apache.flink/flink-core --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>1.14.3</version> </dependency>入门Demo
import org.apache.flink.apimon.RuntimeExecutionMode; import org.apache.flink.apimon.functions.FlatMapFunction; import org.apache.flink.apimon.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; public class DemoApplication { public static void main(String[] args) throws Exception { /** * 大致的流程就分为 * 1.环境准备 * 设置运行模式 * 2.加载数据源 * 3.数据转换 * 4.数据输出 * 5.执行程序 */ // 1.准备环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置运行模式 env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); // 2.加载数据源 DataStreamSource<String> elementsSource = env.fromElements("java,scala,php,c++", "java,scala,php", "java,scala", "java"); // 3.数据转换 DataStream<String> flatMap = elementsSource.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String element, Collector<String> out) throws Exception { String[] wordArr = element.split(","); for (String word : wordArr) { out.collect(word); } } }); // DataStream 下边为DataStream子类 SingleOutputStreamOperator<String> source = flatMap.map(new MapFunction<String, String>() { @Override public String map(String value) throws Exception { return value.toUpperCase(); } }); // 4.数据输出 source.print(); // 5.执行程序 env.execute(); } }关于在设置运行模式的代码上,有三种选择
/** * Runtime execution mode of DataStream programs. Among other things, this controls task scheduling, * network shuffle behavior, and time semantics. Some operations will also change their record * emission behaviour based on the configured execution mode. * * @see <a * href="cwiki.apache/confluence/display/FLINK/FLIP-134%3A+Batch+execution+for+the+DataStream+API"> * cwiki.apache/confluence/display/FLINK/FLIP-134%3A+Batch+execution+for+the+DataStream+API</a> */ @PublicEvolving public enum RuntimeExecutionMode { /** * The Pipeline will be executed with Streaming Semantics. All tasks will be deployed before * execution starts, checkpoints will be enabled, and both processing and event time will be * fully supported. */ /** 流处理模式 */ STREAMING, /** * The Pipeline will be executed with Batch Semantics. Tasks will be scheduled gradually based * on the scheduling region they belong, shuffles between regions will be blocking, watermarks * are assumed to be "perfect" i.e. no late data, and processing time is assumed to not advance * during execution. */ /** 批处理模式 */ BATCH, /** * Flink will set the execution mode to {@link RuntimeExecutionMode#BATCH} if all sources are * bounded, or {@link RuntimeExecutionMode#STREAMING} if there is at least one source which is * unbounded. */ /** 自动模式 */ AUTOMATIC } 1.3 Scala(待补充)与Java一样都在IDEA编译器上做,此时引入依赖
<!-- mvnrepository/artifact/org.apache.flink/flink-scala --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.12</artifactId> <version>1.14.3</version> </dependency> <!-- mvnrepository/artifact/org.apache.flink/flink-streaming-scala --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.12</artifactId> <version>1.14.3</version> </dependency> <!-- mvnrepository/artifact/org.apache.flink/flink-clients --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.12</artifactId> <version>1.14.3</version> </dependency> <!-- mvnrepository/artifact/org.apache.flink/flink-core --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>1.14.3</version> </dependency>// …
待定 …
// …
1.4 集群模式Flink集群部署详细步骤 - 简书 (jianshu) |
Flink集群部署 - 云+社区 - 腾讯云 (tencent) |
第一次学时,光看上面的Demo例子比较难以理解,所以通过书下面的API内容对照上面的Demo来进行理解,先来了解Flink四种层次的API详情
底层 API | 偏底层,易用性比较差,提供时间/状态的细粒度控制 | Stateful Stream Processing |
核心 API | 对有界/无界数据提供处理方法 | DataStream(流处理) / DataSet(批处理) |
Table API | / | 声明式DSL |
SQL | / | 高级语言 |
主要分为三个流程
看看他们的API
readTextFile(文件路径) | 逐行读取文本文件的数据 |
socketTextStream(地址信) | 从socket中读取数据 |
fromCollection(集合数据) | 从集合内获取数据 |
其他第三方输入数据…或者自定义数据源 | 通过Flink提供的内置连接器去链接其它数据源 |
如果是自定义数据源,有两种实现方式
什么是并行度?
一个Flink程序由多个任务(Source、Transformation和Sink)组成。一个任务由多个并行实例(线程)来执行,一个任务的并行实例(线程)数目被称为该任务的并行度。
Transformation接下来是Transformation数据处理,Flink针对DataStream提供了大量的已经实现的算子。
Map | 输入一个元素,然后返回一个元素,中间可以进行清洗转换等操作 |
FlatMap | 输入一个元素,可以返回零个、一个或者多个元素 |
Filter | 过滤函数,对传入的数据进行判断,符合条件的数据会被留下 |
KeyBy | 根据指定的Key进行分组,Key相同的数据会进入同一个分区,典型用法如下:1、DataStream.keyBy(“someKey”) 指定对象中的someKey段作为分组Key。2、DataStream.keyBy(0) 指定Tuple中的第一个元素作为分组Key。 |
Reduce | 对数据进行聚合操作,结合当前元素和上一次Reduce返回的值进行聚合操作,然后返回一个新的值 |
Aggregations | sum()、min()、max()等 |
Union | 合并多个流,新的流会包含所有流中的数据,但是Union有一个限制,就是所有合并的流类型必须是一致的 |
Connect | 和Union类似,但是只能连接两个流,两个流的数据类型可以不同,会对两个流中的数据应用不同的处理方法 |
coMap和coFlatMap | 在ConnectedStream中需要使用这种函数,类似于Map和flatMap |
Split | 根据规则把一个数据流切分为多个流 |
Select | 和Split配合使用,选择切分后的流 |
关于Flink针对DataStream提供的一些数据分区规则
DataStream.shuffle() | 随机分区 |
DataStream.rebalance() | 对数据集进行再平衡、重分区和消除数据倾斜 |
DataStream.rescale() | 重新调节 |
DataStream.broadcast() | 把元素广播给所有的分区,数据会被重复处理 |
DataStream.partitionCustom(partitioner,0) 或者 DataStream.partitionCustom(partitioner,“smeKey”) | 自定义分区 |
数据处理后的输出
writeAsText() | 将元素以字符串形式逐行写入,这些字符串通过调用每个元素的toString()方法来获取 |
print() / printToErr() | 打印每个元素的toString()方法的值到标准输出或者标准错误输出流中 |
自定义输出 | addSink可以实现把数据输出到第三方存储介质中。系统提供了一批内置的Connector,它们会提供对应的Sink支持 |
自定义Sink的两种方式
实际上,RichSinkFunction抽象类也是继承了SinkFunction这个接口,所以实际上差别不大
示例一:自定义数据源(SourceFunction)第一步,继承SourceFunction接口,实现自定义数据源类
import org.apache.flink.streaming.api.functions.source.SourceFunction; import java.util.Random; /** * 自定义数据源 * @author 李家民 */ public class DemoTransactionSource implements SourceFunction<String> { @Override public void run(SourceContext<String> ctx) throws Exception { while (true) { // 发射元素 ctx.collect(String.valueOf(new Random().nextInt(50) )); Thread.sleep(1000); } } @Override public void cancel() { } }第二步,在Flink代码中引入这个数据源
import org.apache.flink.apimon.RuntimeExecutionMode; import org.apache.flink.apimon.functions.FlatMapFunction; import org.apache.flink.apimon.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; /** * @author 李家民 */ @Component public class FlinkInitialize { @PostConstruct public void starter() throws Exception { // 1.准备环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置运行模式 env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); // 为流式作业启用检查点 以毫秒为单位 流式数据流的分布式状态将被定期快照 env.enableCheckpointing(5000); // 2.设置自定义数据源 DataStreamSource<String> stringDataStreamSource = env.addSource(new DemoTransactionSource(), "测试用的数据源"); // 3.数据处理 SingleOutputStreamOperator<String> stringSingleOutputStreamOperator = stringDataStreamSource.map(new MapFunction<String, String>() { @Override public String map(String value) throws Exception { return value; } }); // 4.数据输出 stringSingleOutputStreamOperator.print(); // 5.执行程序 env.execute(); } }此时执行代码,就可以把引入的数据进行打印
SourceFunction定义了run和cancel两个方法和SourceContext内部接口。
- run(SourceContex):实现数据获取逻辑,并可以通过传入的参数ctx进行向下游节点的数据转发。
- cancel():用来取消数据源,一般在run方法中,会存在一个循环来持续产生数据,cancel方法则可以使该循环终止。
- SourceContext:source函数用于发出元素和可能的watermark的接口,返回source生成的元素的类型。
数据源沿用上述案例的代码,自定义分区是通过实现Partitioner接口去做处理
首先看看自定义分区的实现类
/** * 自定义分区 * @author 李家民 */ public class DemoPartitioner implements Partitioner<String> { @Override public int partition(String key, int numPartitions) { System.out.println("目前分区总数=" + numPartitions + " 当前值=" + key + " 通过最左边的值看分区号"); if (new Integer(key) > 20) { return 1; } else { return 2; } } }然后在Flink的代码中体现
import org.apache.flink.apimon.RuntimeExecutionMode; import org.apache.flink.apimon.functions.MapFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; @Component public class FlinkInitialize { @PostConstruct public void starter() throws Exception { // 1.准备环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置运行模式 env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); // 2.设置自定义数据源 DataStreamSource<String> stringDataStreamSource = env.addSource(new DemoTransactionSource(), "测试用的数据源"); // 3.数据处理 DataStream<String> dataStream = stringDataStreamSource.map(new MapFunction<String, String>() { @Override public String map(String value) throws Exception { return value; } }).partitionCustom(new DemoPartitioner(), new KeySelector<String, String>() { @Override public String getKey(String value) throws Exception { return value; } }); // 4.数据输出 dataStream.print(); // 5.执行程序 env.execute(); } }输出后的结果如下
示例三:Socket通信示例第一步:搭建数据来源,这里使用Linux作为数据来源,在Linux上打命令把端口开启
nc -l 16668第二步:编写flink代码
import org.apache.flink.apimon.RuntimeExecutionMode; import org.apache.flink.apimon.functions.MapFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; @Component public class FlinkInitialize { @PostConstruct public void starter() throws Exception { // 1.准备环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置运行模式 env.setRuntimeMode(RuntimeExecutionMode.STREAMING); // 2.设置自定义数据源 String address = "47.106.207.254"; int port = 16668; DataStream<String> dataStreamSource = env.socketTextStream(address, port).map(new MapFunction<String, String>() { @Override public String map(String value) throws Exception { return value; } }); dataStreamSource.print(); // 5.执行程序 env.execute(); } }效果如下
你学废了吗
示例四:RabbitMQ作为数据源第一步:搭建RabbitMQ子系统
.....代码省略,不会RabbitMQ的看下面这篇文章RabbitMQ - SpringBoot集成版 - 开发+运维__-CSDN博客 |
第二步:编写flink代码,首先引入RabbitMQ/Flink的依赖
<!-- mvnrepository/artifact/org.apache.flink/flink-connector-rabbitmq --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-rabbitmq_2.12</artifactId> <version>1.14.3</version> </dependency>编写java代码
import org.apache.flink.apimon.RuntimeExecutionMode; import org.apache.flink.apimon.functions.MapFunction; import org.apache.flink.apimon.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.rabbitmq.RMQSource; import org.apache.flink.streaming.connectors.rabbitmqmon.RMQConnectionConfig; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; @Component public class FlinkInitialize { @PostConstruct public void starter() throws Exception { // 1.准备环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置运行模式 env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); // 2.设置数据源 RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() .setHost("47.106.207.254") .setPort(5672) .setUserName("admin") .setPassword("admin") .setVirtualHost("/") .build(); // 3.将RabbitMQ数据源加入 DataStreamSource<String> dataStreamSource = env.addSource( new RMQSource<String>( connectionConfig, "Demo01_queue", true, new SimpleStringSchema())); // 4.数据转换并输出 dataStreamSource.map(new MapFunction<String, String>() { @Override public String map(String value) throws Exception { return value; } }); dataStreamSource.print(); // 5.执行程序 env.execute(); } }在Flink代码中,有两步对于RabbitMQ的加入很关键
示例五:自定义Sink很简单,把上面的代码稍微改一下就好了
package com.ljm.flink; import org.apache.flink.apimon.RuntimeExecutionMode; import org.apache.flink.apimon.functions.MapFunction; import org.apache.flink.apimon.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.rabbitmq.RMQSource; import org.apache.flink.streaming.connectors.rabbitmqmon.RMQConnectionConfig; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; /** * @author 李家民 */ @Component public class FlinkInitialize { @PostConstruct public void starter() throws Exception { // 1.准备环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置运行模式 env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); // 2.设置数据源 RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() .setHost("47.106.207.254") .setPort(5672) .setUserName("admin") .setPassword("admin") .setVirtualHost("/") .build(); // 3.将RabbitMQ数据源加入 DataStreamSource<String> dataStreamSource = env.addSource( new RMQSource<String>( connectionConfig, "Demo01_queue", true, new SimpleStringSchema())); // 4.数据转换并输出 dataStreamSource.map(new MapFunction<String, String>() { @Override public String map(String value) throws Exception { return value; } }); // 自定义输出 dataStreamSource.addSink(new SinkDemo()); // 5.执行程序 env.execute(); } }继承RichSinkFunction抽象类
import org.apache.flink.apimon.eventtime.Watermark; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; /** * 自定义Flink输出 * @author 李家民 */ public class SinkDemo extends RichSinkFunction<String> { /** * 将给定值写入接收器。为每条记录调用此函数 * @param value 获取到的值 * @param context 可用于获取有关输入记录的附加数据的上下文 * @throws Exception */ @Override public void invoke(String value, Context context) throws Exception { System.out.println(value + " " + context.timestamp()); } @Override public void writeWatermark(Watermark watermark) throws Exception { super.writeWatermark(watermark); } /** * 此方法在数据处理结束时调用 * @throws Exception */ @Override public void finish() throws Exception { System.out.println("此方法在数据处理结束时调用"); } }接收到数据以后,就可以进行后续的一系列操作了
2.2 DataSet 批处理组件跟上面的DataStream差不多,都是分为这么三个,
一般是用来读取HDFS(分布式文件存储)中的文件数据,不作解释了。
2.3 Table API / SQL(待补充)Flink针对标准的流处理和批处理提供的两种关系型API:Table API 和 SQL。
<!-- mvnrepository/artifact/org.apache.flink/flink-table --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table</artifactId> <version>1.14.3</version> <type>pom</type> <scope>provided</scope> </dependency> <!-- mvnrepository/artifact/org.apache.flink/flink-table-common --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>1.14.3</version> <scope>provided</scope> </dependency> <!-- mvnrepository/artifact/org.apache.flink/flink-table-api-java --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java</artifactId> <version>1.14.3</version> </dependency>// …
待定 …
// …
2.4 关于序列化Flink自带针对一些标准类型的序列化器,如果涉及到这些自带的序列化器也无法处理的数据,则需要自定义序列化器。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 使用Avro序列化 env.getConfig().enableForceAvro(); // 使用Kryo序列化 env.getConfig().enableForceKryo(); // 自定义序列化器 env.getConfig().addDefaultKryoSerializer(xxxxx,xxxxx);在自定义序列化器参数中,需要填写序列化的类对象类,并且这个类切记需要继承序列化接口Serializer。
三、进阶使用 1 Flink中对于变量的高级用法前置描述:xxxxxxxxxxxxx
Broadcast这里的Broadcast指的是广播变量,而不是分区规则。
DataStream Broadcast(分区规则)
Flink Broadcast(广播变量)
广播变量指再每台机器上保持的一个只读的共享缓存变量,在任务进程需要的时候传递这个共享缓存变量,而不是一个变量副本,可以节省内存,但是修改广播变量的同时会影响到所有持有这个变量的节点。
public void starter() throws Exception { // 准备环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 被广播的数据 DataSource<String> dataSource = env.fromElements("5", "6", "7", "8"); // 常规数据 DataSet<String> dataSet = env.fromElements("哈哈哈哈1", "哈哈哈哈2", "哈哈哈哈3", "哈哈哈哈4"); // 数据处理 // 使用 RichMapFunction, 在open() 方法中拿到广播变量 // 由于我是在单个节点上去拿变量的 所以其实放在map方法里面也可以 但是分布式环境下还是得从open方法里获取比较好吧 dataSet.map(new RichMapFunction<String, String>() { @Override public String map(String value) throws Exception { return value; } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); List<String> broadcastVariable = getRuntimeContext().getBroadcastVariable("被广播的共享变量名"); System.out.println("print=" + broadcastVariable); } }).withBroadcastSet(dataSource, "被广播的共享变量名").print(); }累加器,统计Task在运行中的情况,例如在函数中处理了多少条数据,累加器的常用实现有
你学废了吗
分布式缓存 我的理解就是,一个节点将文件系统注册进集群内,当程序运行后,Flink会自动把这个文件信复制到其他TaskManager节点的本地文件系统。
注册
env.registerCachedFile();访问
File file = getRuntimeContext().getDistributedCache().getFile("文件名");大概就是这么回事。
总结-
Broadcast
只读变量缓存在各节点上,减少内存开销,但是禁止修改该变量。
-
Accumulator
不同任务中同一变量的累加统计操作,只有任务执行完成后才能得到这个结果。
-
Cache
分布式缓存系统,结合文件系统实现数据共享。
【Flink】Flink 状态管理 - 简书 (jianshu) |
临时补上一个下面会用到的一个时间依赖
<dependency> <groupId>joda-time</groupId> <artifactId>joda-time</artifactId> <version>2.10.5</version> </dependency> Flink的窗口实际上是将无限的流划分为窗口切割成一段一段有限的集合(流是无限的,通常很难对其进行元素计数),它也是从Stream到Batch的一个过程。
而对于窗口的切割的依据,可以由 时间 或 数据量 作为依据驱动,根据需要也可以的进行自定义。
基本窗口可以分为两种
下面具体说说窗口的类型
-
Tumbling Window - 滚动窗口,表示窗口内的数据没有重叠。
根据时间段进行窗口切割,所以数据故不可能发生重叠。
-
Sliding Window - 滑动窗口,表示窗口内的数据有重叠。
跟滚动窗口的区别在于,这个滑动是基于窗口的起点偏移量去制定下一个窗口的大小,故数据会发生重叠。
-
Session Window - 会话窗口,通过session活动来对元素进行分组,与上述相比,不会有重叠和固定的开始时间和结束时间的情况。
-
global Window - 全局窗口,将相同 key 的所有元素聚在一起,但是这种窗口没有起点也没有终点,因此必须自定义触发器。
下面上一个简单的示例代码来对上面打个样
// 时间长度为20秒的滚动窗口 dataStream.keyBy(value -> { return value; }).window(TumblingEventTimeWindows.of(Time.seconds(20))); // 每 10 秒打开 1 分钟的滚动窗口 dataStream.keyBy(value -> { return value; }).window(TumblingProcessingTimeWindows.of(Time.of(1, MINUTES), Time.of(10, SECONDS))); // 每小时 产生15分钟 的偏移量 的滑动窗口 dataStream.keyBy(value -> { return value; }).window(TumblingEventTimeWindows.of(Time.hours(1), Time.minutes(15))); // 全局窗口 将相同 key 的所有元素聚在一起 dataStream.keyBy(value -> { return value; }).window(GlobalWindows.create()); // 这个操作将并行度变为1 所有数据放在一个窗口进行操作 不进行分组 所以这个方法的前缀也不需要进行keyBy dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); 窗口函数通常我们的窗口代码中会三个步骤
// Keyed Window stream .keyBy(...) <- 按照一个Key进行分组 .window(...) <- 将数据流中的元素分配到相应的窗口中 [.trigger(...)] <- 指定触发器Trigger(可选) [.evictor(...)] <- 指定清除器Evictor(可选) .reduce/aggregate/process() <- 窗口处理函数Window Function // Non-Keyed Window stream .windowAll(...) <- 不分组,将数据流中的所有元素分配到相应的窗口中 [.trigger(...)] <- 指定触发器Trigger(可选) [.evictor(...)] <- 指定清除器Evictor(可选) .reduce/aggregate/process() <- 窗口处理函数Window Function下面聊聊有关于数据聚合的窗口函数,可以分成两个大类
-
增量聚合函数 - incremental aggregation functions
每来一个数据就计算
ReduceFuction / AggregationFunction
-
全窗口函数 - full window functions
把数据囤积起来,等到最后再一次性遍历计算
ProcessWindowFunction / WindowFunction
不学了,以后哪天用到了再补充这里。
参考文章旧版本 - Flink 的Window 操作 - 简书 (jianshu) |
Stream数据中的时间有三种
如果是1.2以前的Flink版本
Flink学习笔记:Time的故事 - 大数据研习社 - 博客园 (cnblogs) |
新版本的建议使用WatermarkStrategy,通过assignTimestampsAndWatermarks方法进行设置
- 固定乱序长度策略(forBoundedOutOfOrderness)
- 单调递增时间戳策略(forMonotonousTimestamps)
- 不生成水印策略(noWatermarks)
这三种策略都是通过实现WatermarkGenerator接口,下面来看看
public class DemoTimeWatermarks implements WatermarkGenerator { /** * 为每个事件调用,允许水印生成器检查并记住事件时间戳,或根据事件本身发出水印 * @param event 接收的事件数据 * @param eventTimestamp 事件时间戳 * @param output 可用output.emitWatermark方法生成一个Watermark */ @Override public void onEvent(Object event, long eventTimestamp, WatermarkOutput output) { System.out.println( "event=" + event + " eventTimestamp=" + eventTimestamp + " WatermarkOutput=" + output ); } /** * 周期性触发,可能会发出新的水印,也可能不会 * 调用此方法和生成水印的时间间隔取决于ExecutionConfig.getAutoWatermarkInterval() * @param output 可用output.emitWatermark方法生成一个Watermark */ @Override public void onPeriodicEmit(WatermarkOutput output) { System.out.println("被定期执行的方法onPeriodicEmit"); } } 固定乱序长度策略111
public void starter() throws Exception { // 1.创建环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 水印生成的间隔时间 5毫秒 // 将间隔设置为0将禁用周期性水印发射 env.getConfig().setAutoWatermarkInterval(5L); // 并行度 env.setParallelism(5); // 2.数据来源 DataStreamSource<String> datasource = env.fromElements( "1", "2", "3", "345345", "$5745457" ); // 3.数据处理 - 时间策略指定 SingleOutputStreamOperator<String> streamOperator = datasource.assignTimestampsAndWatermarks( // 设定事件时间戳无序的界限 这里是5毫秒 WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofMillis(5)) .withTimestampAssigner( // 为元素分配时间戳 从事件数据中抽取 new SerializableTimestampAssigner<String>() { @Override public long extractTimestamp(String event, long recordTimestamp) { System.out.println("event = " + event); System.out.println("recordTimestamp = " + recordTimestamp); return recordTimestamp; } }) ); // 4.sink输出 streamOperator.print(); // 5.任务执行 env.execute(); }111
单调递增时间戳策略111
不生成水印策略111
关于水印延迟/窗口允许延迟区分理解Flink水印延迟与窗口允许延迟的概念-51CTO.COM |
什么是并行度?
一个任务(Source、Transformation、Sink)的并行实例(线程)数目被称为该任务的并行度。
首先从书中了解到,每个TaskManager为集群提供Solt(插槽),Solt的数量通常与每个TaskManager节点的可用CPU内核数成比例,一般情况下Slot的数量就是每个节点的CPU的核数。
插槽内代表着应用程序所运行的执行图
一个任务的并行度设置可以从4个层面指定
-
Operator Level - 算子层面
DataStream<String> dataSet = env.fromElements("哈哈哈哈1", "哈哈哈哈2", "哈哈哈哈3", "哈哈哈哈4"); // setParallelism(4) 算子层面 dataSet.map(new RichMapFunction<String, String>() { @Override public String map(String value) throws Exception { return value; } }).setParallelism(4).print(); -
Execution Environment Level - 执行环境层面
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 执行环境层面 env.setParallelism(3); -
Client Level - 客户端层面
## 客户端提交job时设定,通过-p参数指定并行度 ./bin/flink run -p 4 XXXXX.jar -
System Level - 系统层面
通过修改配置文件conf/flink-conf.yaml中的parallelism.default属性
并行度也并非越大越好,上述也提到,需要考虑到CPU内核数。
四、原理解析(待补充)1
1
总结提示:这里对文章进行总结: 例如:以上就是今天要讲的内容。
本文标签: Flink
版权声明:本文标题:Flink 内容由林淑君副主任自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.xiehuijuan.com/baike/1686828528a107680.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论