愿景:"让编程不再难学,让技术与生活更加有趣"
更多架构课程请访问 xdclass.net
简介:玩转新一代流式计算框架Flink课程介绍
小滴课堂-全新录制的后端+大数据视频,从0到1讲解流式计算框架Flink1.13 ,掌握Flink核心概念和应用场景,包括大数据扫盲贴,离线/实时/批量/流式计算划分;
从JDK8的Stream流处理延伸到,为啥用Flink,再讲全链路知识点解析,Source—>Transformation->Sink多种案例实战
从零讲解Flink整体实战:JobManager、TaskManager、TaskSolt、OperatorChain等核心概念和应用场景
全套视频采用多个案例进行实操讲解对应的知识点应用,从实际需求出发更容易掌握。
核心基础
玩转多种Source实战:SocketStream、Kafka、RichParallelSourceFunction等多种数据来源
中间转换实战:Map、FlatMap、Rich相关算子应用、Filter、Max等多种Operator转换操作
结果输出实战:Sink输出Redis存储、Mysql、Kafka、自定义Sink组件等多种Sink处理
进阶高级:
KeyBy分组聚合、滑动时间窗、滚动时间窗实战多个业务需求
多种聚合函数实战Reduce/aggregate/ProcessWindowFunction实战
多种时间语义介绍、迟到乱序数据处理+Watermark应用和侧输出流实战
State状态存储和Checkpoint检查点配置、模拟MaxBy算子功能实战
Flink CEP复杂事件处理检测,实战恶意登录高级功能
打包部署和综合项目实战
阿里云 ECS Linux服务器部署Flink Standalon+项目打包插件和配置实战
Docker-Compose集群部署Flink+SessionCluster+PerJob+Application模式解析
WebUI并行度案例测试Parallelism+TaskSolt核心概念和应用
项目实战:
开发一个监控告警案例(类似nginx访问日志)
滑动时间窗统计各个接口的访问量、各个状态码次数、CEP告警检测
讲课前我一直再思考一个问题
为什么要学习Flink流式计算框架
多数互联网公司里面用的技术栈,打造实时监控、数仓、推荐、画像系统技术栈
流式计算框架中最火爆的技术,在多数互联网公司中,Flink占有率很高,流式计算框架领域领头羊
中大型互联网公司等基本都离不开Flink
可以作为公司内部培训技术分享必备知识
谁在用Flink(大厂基本都在用,所以重要程度知道吧,想进一二线大厂的同学)
学后水平
【进阶高级】
【打包部署和综合项目实战】
阿里云 ECS Linux服务器部署Flink Standalon+项目打包插件和配置实战
Docker-Compose集群部署Flink+SessionCluster+PerJob+Application模式解析
WebUI并行度案例测试Parallelism+TaskSolt核心概念和应用
综合案例实战
贯穿全套视频掌握Flink常见面试题+大数据学习路线规划
大数据工程师和后端工程师的恩恩怨怨
你会的我也会,我会的你不一定会,你会的我也不一定会
技术栈交集大
适合人群
课程技术技术栈和环境说明
学习形式
简介:新一代流式计算框架Flink课程大纲速览
课程效果演示和安装包
课程学前基础
Java基础即可,其他可以后补
PS:不会上面的基础也没关系, 这些专题我们都有课程,【联系我们客服】购买即可,且是刚录制的新版课程
目录大纲浏览
学习寄语
保持谦虚好学、术业有专攻、在校的学生,有些知识掌握也比工作好几年掌握的人厉害
简介:必看-新一代流式计算框架Flink课程相关准备
学习过程中可能遇到的问题
记得加我微信+进技术交流群(不管你哪里看到的视频,都可以加我,经常有大厂内推、技术分享等)
愿景:"让编程不再难学,让技术与生活更加有趣"
更多架构课程请访问 xdclass.net
简介:零基础扫盲贴-急速认知大数据+机器学习应用领域
前言
什么是大数据?
什么是人工智能
例子(讲很多概念都是虚的,直接上例子大家更容易懂,url过期的话不影响,知道就行)
简介:零基础扫盲贴-大数据的计算模式的概念
大数据的【计算模式】主要分为两种,适用于不同的大数据应用场景
批量计算(batch computing)
流式计算(stream computing)
区分( 离线计算和实时计算 +流式计算和批量计算)
简介:零基础扫盲贴-大数据里面常用的技术栈和用途
大数据技术栈
初级阶段
中级阶段
高级阶段
再看下例子(精细化运营)
简介:小滴课堂-大课训练营里面大数据体系的应用
愿景:"让编程不再难学,让技术与生活更加有趣"
更多架构课程请访问 xdclass.net
简介:讲解lombok的介绍和安装
什么lombok
你是否发现每个JavaBean都会写getter,setter,equals,hashCode和toString的模板代码,特别的多于没技术
使用方式
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.16</version>
<scope>provided</scope>
</dependency>
<!--https://mvnrepository.com/artifact/org.projectlombok/lombok/1.18.16-->
<!--scope=provided,说明它只在编译阶段生效,不需要打入包中, Lombok在编译期将带Lombok注解的Java文件正确编译为完整的Class文件-->
添加IDE工具对Lombok的支持
简介:Flink前奏-完成一个小的java需求让你明白用途
需求
电商订单数据处理,根据下⾯的list1和list2 各10个订单
Lombok插件配置
//总价 35
List<VideoOrder> videoOrders1 = Arrays.asList(
new VideoOrder("20190242812", "springboot教程", 3),
new VideoOrder("20194350812", "微服务SpringCloud", 5),
new VideoOrder("20190814232", "Redis教程", 9),
new VideoOrder("20190523812", "⽹⻚开发教程", 9),
new VideoOrder("201932324", "百万并发实战Netty", 9));
//总价 54
List<VideoOrder> videoOrders2 = Arrays.asList(
new VideoOrder("2019024285312", "springboot教程", 3),
new VideoOrder("2019081453232", "Redis教程", 9),
new VideoOrder("20190522338312", "⽹⻚开发教程", 9),
new VideoOrder("2019435230812", "Jmeter压⼒测试", 5),
new VideoOrder("2019323542411", "Git+Jenkins持续集成", 7),
new VideoOrder("2019323542424", "Idea全套教程", 21));
//订单实体类
@Data
@AllArgsConstructor
@NoArgsConstructor
public class VideoOrder {
private String tradeNo;
private String title;
private int money;
}
//两个订单平均价格
double videoOrderAvg1 =
videoOrders1.stream().collect(Collectors.averagingInt(VideoOrder::getMoney))
.doubleValue();
System.out.println("订单列表1平均价格="+videoOrderAvg1);
double videoOrderAvg2 =
videoOrders2.stream().collect(Collectors.averagingInt(VideoOrder::getMoney))
.doubleValue();
System.out.println("订单列表2平均价格="+videoOrderAvg2);
//订单总价
int totalMoney1 =
videoOrders1.stream().collect(Collectors.summingInt(VideoOrder::getMoney)).i
ntValue();
int totalMoney2 =
videoOrders2.stream().collect(Collectors.summingInt(VideoOrder::getMoney)).i
ntValue();
System.out.println("订单列表1总价="+totalMoney1);
System.out.println("订单列表2总价="+totalMoney2);
上面的就是JDK8的Stream流式处理,Flink也是实现类似的功能
简介:JDK8 里面的Stream和流式处理框架Flink对比
上集完成了一个需求,工作里面应用也是类似的流程链路
JDK8 Stream也是流处理,flink也是流处理, 那区别点来啦
数据来源和输出有多样化怎么处理;
海量数据需要进行实时处理
统计时间段内数据,但数据达到是无序的
其他太多。。。。
为了实现一个天猫双十一实时交易大盘各个品类数据展示功能
小滴课堂-老王说他乐意这样写java代码,你奈我何??
二当家小D采用了Flink进行开发这个功能
大型互联网公司都有的数据监控后台例子
简介:新一代流式处理框架Flink介绍和重要概念讲解
什么是Flink
Apache Flink 是一个框架和分布式处理引擎,用于在无边界和有边界数据流上进行有状态的计算
官网:https://flink.apache.org/zh/flink-architecture.html
有谁在用呢(基本大厂都在用)
概念
数据流
什么是有界流
什么是无界流
Apache Flink 擅长处理无界和有界数据集,有出色的性能
代码使用例子
愿景:"让编程不再难学,让技术与生活更加有趣"
更多架构课程请访问 xdclass.net
简介:带你飞速体验第一个Flink案例-java版本环境搭建
创建项目
添加依赖(后续有变动再调整)
<properties>
<encoding>UTF-8</encoding>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<java.version>1.8</java.version>
<scala.version>2.12</scala.version>
<flink.version>1.13.1</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.16</version>
</dependency>
<!--flink客户端-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!--scala版本-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!--java版本-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!--streaming的scala版本-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!--streaming的java版本-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!--日志输出-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
<!--json依赖包-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.44</version>
</dependency>
</dependencies>
### 配置appender名称
log4j.rootLogger = debugFile, errorFile
### debug级别以上的日志到:src/logs/debug.log
log4j.appender.debugFile = org.apache.log4j.DailyRollingFileAppender
log4j.appender.debugFile.File = src/logs/flink.log
log4j.appender.debugFile.Append = true
#Threshold属性指定输出等级
log4j.appender.debugFile.Threshold = info
log4j.appender.debugFile.layout = org.apache.log4j.PatternLayout
log4j.appender.debugFile.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss} [ %t:%r ] - [ %p ] %n%m%n
### error级别以上的日志 src/logs/error.log
log4j.appender.errorFile = org.apache.log4j.DailyRollingFileAppender
log4j.appender.errorFile.File = src/logs/error.log
log4j.appender.errorFile.Append = true
log4j.appender.errorFile.Threshold = error
log4j.appender.errorFile.layout = org.apache.log4j.PatternLayout
log4j.appender.errorFile.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss} [ %t:%r ] - [ %p ] %n%m%n
简介:Tuple数据类型+Map+FlatMap操作介绍
什么是Tuple类型
元组类型, 多个语言都有的特性, flink的java版 tuple最多支持25个
用途
Tuple3<Integer, String, Double> t = Tuple3.of(1,"xdclass.net",32.1);
System.out.println(t.f0);
System.out.println(t.f1);
System.out.println(t.f2);
什么是java里面的Map操作
什么是java里面的FlatMap操作
List<String> list1 = new ArrayList<>();
list1.add("springboot,springcloud");
list1.add("redis6,docker");
list1.add("kafka,rabbitmq");
//一对一转换
List<String> list2 = list1.stream().map(obj -> {
obj = "小滴课堂" + obj;
return obj;
}).collect(Collectors.toList());
System.out.println(list2);
//一对多转换
List<String> list3 = list1.stream().flatMap(
obj -> {
return Arrays.stream(obj.split(","));
}
).collect(Collectors.toList());
System.out.println(list3);
简介:带你飞速体验第一个Flink案例-代码编写实战
需求
代码
xxxxxxxxxx
public static void main(String[] args) throws Exception {
//构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//相同类型元素的数据流
DataStream<String> stringDataStream = env.fromElements("java,springboot","java,springcloud");
// FlatMapFunction<String, String>, key是输入类型,value是Collector响应的收集的类型,看源码注释,也是 DataStream<String>里面泛型类型
DataStream<String> flatMapDataStream = stringDataStream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
String[] arr = value.split(",");
for (String word : arr) {
out.collect(word);
}
}
});
flatMapDataStream.print("结果");
//DataStream需要调用execute,可以取个名称
env.execute("data stream job");
}
简介:带你飞速体验第一个Flink案例-代码编写实战
需求
代码
xxxxxxxxxx
public static void main(String [] args) throws Exception {
//构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//设置并行度
env.setParallelism(1);
//相同类型元素的数据集 source
DataSet<String> stringDS = env.fromElements("java,SpringBoot", "spring cloud,redis", "kafka,小滴课堂");
stringDS.print("处理前");
// FlatMapFunction<String, String>, key是输入类型,value是Collector响应的收集的类型,看源码注释,也是 DataStream<String>里面泛型类型
DataSet<String> flatMapDS = stringDS.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> collector) throws Exception {
String [] arr = value.split(",");
for(String str : arr){
collector.collect(str);
}
}
});
//输出 sink
flatMapDS.print("处理后");
//DataStream需要调用execute,可以取个名称
env.execute("flat map job");
}
处理前> java,SpringBoot
处理前> spring cloud,redis
处理前> kafka,小滴课堂
处理后> java
处理后> SpringBoot
处理后> spring cloud
处理后> redis
处理后> kafka
处理后> 小滴课堂
注意
简介:Blink介绍和IDEA里面运行Flink运行流程解析
Flink和Blink关系
2019年Flink的母公司被阿里全资收购
阿里进行高度定制并取名为Blink (加了很多特性 )
阿里巴巴官方说明:Blink不会单独作为一个开源项目运作,而是Flink的一部分
都在不断演进中,对比其他流式计算框架(老到新)
算子Operator
xxxxxxxxxx
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
那我们在IDEA里面运行这样就行?实际项目也是这样用???
最终线上部署会把main函数打成jar包,提交到Flink进群进行运行, 会有UI可视化界面
服务端部署例子(后续会讲)
Flink 部署方式是灵活,主要是对Flink计算时所需资源的管理方式不同
简介:Flink可视化控制台依赖配置和界面介绍
WebUI可视化界面
xxxxxxxxxx
<!--Flink web ui-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
代码开发
xxxxxxxxxx
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
//env.setParallelism(1);
DataStream<String> stringDataStream = env.socketTextStream("127.0.0.1",8888);
DataStream<String> flatMapDataStream = stringDataStream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
String[] arr = value.split(",");
for (String word : arr) {
out.collect(word);
}
}
});
flatMapDataStream.print("结果");
//DataStream需要调用execute,可以取个名称
env.execute("data stream job");
}
nc命令介绍
win | linux 需要安装
win 百度搜索博文参考不同系统安装
linux 安装
愿景:"让编程不再难学,让技术与生活更加有趣"
更多架构课程请访问 xdclass.net
简介:Flink常见的运行部署模式介绍和运行流程浅析
建议:第一遍看大概,等学完整个后看第二遍
Flink 部署方式是灵活,主要是对Flink计算时所需资源的管理方式不同
运行流程
用户提交Flink程序到JobClient,
JobClient的 解析、优化提交到JobManager
TaskManager运行task, 并上报信息给JobManager
通俗解释
简介:Flink整体架构和组件角色介绍
Flink 是一个分布式系统,需要有效分配和管理计算资源才能执行流应用程序
运行时由两种类型的进程组成
什么是JobManager(大Boss,包工头)
协调 Flink 应用程序的分布式执行的功能
什么是TaskManager (任务组长,搬砖的人)
简介:Flink整体架构和组件角色介绍
建议:第一遍看大概,等学完整个后看第二遍
Jobmanager进阶
JobManager进程由三个不同的组件组成
ResourceManager
Dispatcher
JobMaster
TaskManager 进阶
TaskManager中 task slot 的数量表示并发处理 task 的数量
一个 task slot 中可以执行多个算子,里面多个线程
算子 opetator
对于分布式执行,Flink 将算子的 subtasks 链接成 tasks,每个 task 由一个线程执行
将算子链接成 task 是个有用的优化:它减少线程间切换、缓冲的开销,并且减少延迟的同时增加整体吞吐量
简介:Flink整体架构和组件角色介绍
Task Slots 任务槽
Task Slot是Flink中的任务执行器,每个Task Slot可以运行多个subtask ,每个subtask会以单独的线程来运行
每个 worker(TaskManager)是一个 JVM 进程,可以在单独的线程中执行一个(1个solt)或多个 subtask
为了控制一个 TaskManager 中接受多少个 task,就有了所谓的 task slots(至少一个)
每个 task slot 代表 TaskManager 中资源的固定子集
注意
5 个 subtask 执行,因此有 5 个并行线程
Task 正好封装了一个 Operator 或者 Operator Chain 的 parallel instance。
Sub-Task 强调的是同一个 Operator 或者 Operator Chain 具有多个并行的 Task
图中source和map算子组成一个算子链,作为一个task运行在一个线程上
文档
简介:Flink的并行度概念理解和调整优先级说明
Flink 是分布式流式计算框架
流程
并行度的调整配置
Flink流程序中不同的算子可能具有不同的并行度,可以在多个地方配置,有不同的优先级
Flink并行度配置级别 (高到低)
算子
全局env
客户端cli
Flink配置文件
某些算子无法设置并行度
本地IDEA运行 并行度默认为cpu核数
一个很重要的区分 TaskSolt和parallelism并行度配置
Flink有3中运行模式
xxxxxxxxxx
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
愿景:"让编程不再难学,让技术与生活更加有趣"
更多架构课程请访问 xdclass.net
简介:Flink 的API层级介绍Source Operator速览
Flink的API层级 为流式/批式处理应用程序的开发提供了不同级别的抽象
第一层是最底层的抽象为有状态实时流处理,抽象实现是 Process Function,用于底层处理
第二层抽象是 Core APIs,许多应用程序不需要使用到上述最底层抽象的 API,而是使用 Core APIs 进行开发
第三层抽象是 Table API。 是以表Table为中心的声明式编程API,Table API 使用起来很简洁但是表达能力差
第四层最顶层抽象是 SQL,这层程序表达式上都类似于 Table API,但是其程序实现都是 SQL 查询表达式
注意:Table和SQL层变动多,还在持续发展中,大致知道即可,核心是第一和第二层
Source来源
元素集合
文件/文件系统
基于Socket
自定义Source,实现接口自定义数据源,rich相关的api更丰富
并行度为1
并行度大于1
Connectors与第三方系统进行对接(用于source或者sink都可以)
Apache Bahir连接器
总结 和外部系统进行读取写入的
第一种 Flink 里面预定义的 source 和 sink。
第二种 Flink 内部也提供部分 Boundled connectors。
第三种是第三方 Apache Bahir 项目中的连接器。
第四种是通过异步 IO 方式
简介:预定义的Source 数据源 案例实战
Source来源
元素集合
xxxxxxxxxx
public static void main(String [] args) throws Exception {
//构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//相同类型元素的数据流 source
DataStream<String> stringDS1 = env.fromElements("java,SpringBoot", "spring cloud,redis", "kafka,小滴课堂");
stringDS1.print("stringDS1");
DataStream<String> stringDS2 = env.fromCollection(Arrays.asList("微服务项目大课,java","alibabacloud,rabbitmq","hadoop,hbase"));
stringDS2.print("stringDS2");
DataStreamSource<Long> longDS3 = env.fromSequence(0,10);
longDS3.print("longDS3");
//DataStream需要调用execute,可以取个名称
env.execute("xdclass job");
}
简介:预定义的Source 数据源案例实战
文件/文件系统
xxxxxxxxxx
public static void main(String [] args) throws Exception {
//构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<String> textDS = env.readTextFile("/Users/xdclass/Desktop/xdclass_access.log");
//DataStream<String> textDS = env.readTextFile("hdfs://xdclass_node:8010/file/log/words.txt");
textDS.print();
env.execute("xdclass job");
}
基于Socket
xxxxxxxxxx
public static void main(String [] args) throws Exception {
//构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<String> stringDataStream = env.socketTextStream("127.0.0.1",8888);
stringDataStream.print();
env.execute("xdclass job");
}
简介: Flink自定义的Source 数据源案例-订单来源实战
自定义Source,实现接口自定义数据源
并行度为1
并行度大于1
Rich相关的api更丰富,多了Open、Close方法,用于初始化连接等
创建接口
xxxxxxxxxx
@Data
@AllArgsConstructor
@NoArgsConstructor
public class VideoOrder {
private String tradeNo;
private String title;
private int money;
private int userId;
private Date createTime;
}
public class VideoOrderSource extends RichParallelSourceFunction<VideoOrder> {
private volatile Boolean flag = true;
private Random random = new Random();
private static List<String> list = new ArrayList<>();
static {
list.add("spring boot2.x课程");
list.add("微服务SpringCloud课程");
list.add("RabbitMQ消息队列");
list.add("Kafka课程");
list.add("小滴课堂面试专题第一季");
list.add("Flink流式技术课程");
list.add("工业级微服务项目大课训练营");
list.add("Linux课程");
}
@Override
public void run(SourceContext<VideoOrder> ctx) throws Exception {
while (flag){
Thread.sleep(1000);
String id = UUID.randomUUID().toString();
int userId = random.nextInt(10);
int money = random.nextInt(100);
int videoNum = random.nextInt(list.size());
String title = list.get(videoNum);
ctx.collect(new VideoOrder(id,title,money,userId,new Date()));
}
}
/**
* 取消任务
*/
@Override
public void cancel() {
flag = false;
}
}
xxxxxxxxxx
public static void main(String [] args) throws Exception {
//构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<VideoOrder> videoOrderDataStream = env.addSource(new VideoOrderSource());
videoOrderDataStream.print();
//DataStream需要调用execute,可以取个名称
env.execute("custom source job");
}
简介: Flink自定义的Source 数据源案例-并行度调整结合WebUI
xxxxxxxxxx
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
xxxxxxxxxx
public static void main(String[] args) throws Exception {
//构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
//StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
env.setParallelism(2);
DataStream<VideoOrder> videoOrderDS = env.addSource(new VideoOrderSource());
DataStream<VideoOrder> filterDS = videoOrderDS.filter(new FilterFunction<VideoOrder>() {
@Override
public boolean filter(VideoOrder videoOrder) throws Exception {
return videoOrder.getMoney()>5;
}
}).setParallelism(3);
filterDS.print().setParallelism(4);
//DataStream需要调用execute,可以取个名称
env.execute("source job");
}
愿景:"让编程不再难学,让技术与生活更加有趣"
更多架构课程请访问 xdclass.net
简介:Flink 的核心知识 Sink Operator
Sink 输出源
预定义
自定义
SinkFunction
RichSinkFunction
flink官方提供 Bundle Connector
Apache Bahir
简介:Flink 自定义的SinkFunction 数据源输出案例实战
自定义
SinkFunction
RichSinkFunction
Flink连接mysql的几种方式(都需要加jdbc驱动)
xxxxxxxxxx
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.12</artifactId>
<version>1.12.0</version>
</dependency>
保存视频订单到Mysql
Mysql环境自己本地搭建
建表
xxxxxxxxxx
CREATE TABLE `video_order` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`user_id` int(11) DEFAULT NULL,
`money` int(11) DEFAULT NULL,
`title` varchar(32) DEFAULT NULL,
`trade_no` varchar(64) DEFAULT NULL,
`create_time` date DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
xxxxxxxxxx
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.25</version>
</dependency>
xxxxxxxxxx
public class MysqlSink extends RichSinkFunction<VideoOrder> {
private Connection conn = null;
private PreparedStatement ps = null;
@Override
public void open(Configuration parameters) throws Exception {
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/xd_order?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&serverTimezone=Asia/Shanghai", "root", "xdclass.net");
String sql = "INSERT INTO `video_order` (`user_id`, `money`, `title`, `trade_no`, `create_time`) VALUES(?,?,?,?,?);";
ps = conn.prepareStatement(sql);
}
@Override
public void close() throws Exception {
if (conn != null) {
conn.close();
}
if (ps != null) {
ps.close();
}
}
@Override
public void invoke(VideoOrder videoOrder, Context context) throws Exception {
//给ps中的?设置具体值
ps.setInt(1,videoOrder.getUserId());
ps.setInt(2,videoOrder.getMoney());
ps.setString(3,videoOrder.getTitle());
ps.setString(4,videoOrder.getTradeNo());
ps.setDate(5,new Date(videoOrder.getCreateTime().getTime()));
ps.executeUpdate();
}
}
简介:Flink 自定义的SinkFunction 数据源输出案例实战
xxxxxxxxxx
DataStream<VideoOrder> videoOrderDS = env.addSource(new VideoOrderSource());
videoOrderDS.addSink(new MysqlSink());
自定义MysqlSink
建议继承RichSinkFunction函数
如果选择继承SinkFunction,会在每次写入一条数据时都会创建一个JDBC连接
简介:Flink 实战Bahir Connetor 存储数据到Redis6.X实战
Flink怎么操作redis?
Redis Sink 核心是RedisMapper 是一个接口,使用时要编写自己的redis操作类实现这个接口中的三个方法
使用
xxxxxxxxxx
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
编码
xxxxxxxxxx
public class MyRedisSink implements RedisMapper<Tuple2<String, Integer>> {
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.HSET, "VIDEO_ORDER_COUNTER");
}
@Override
public String getKeyFromData(Tuple2<String, Integer> value) {
return value.f0;
}
@Override
public String getValueFromData(Tuple2<String, Integer> value) {
return value.f1.toString();
}
}
简介:Flink 商品销量统计-转换-分组-聚合-存储自定义的Redis Sink实战
Redis环境说明 redis6
课程使用docker部署redis6.x
xxxxxxxxxx
docker run -d -p 6379:6379 redis
编码实战
xxxxxxxxxx
public static void main(String[] args) throws Exception {
//构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<VideoOrder> ds = env.fromElements(new VideoOrder("21312","java",32,5,new Date()),
new VideoOrder("314","java",32,5,new Date()),
new VideoOrder("542","springboot",32,5,new Date()),
new VideoOrder("42","redis",32,5,new Date()),
new VideoOrder("52","java",32,5,new Date()),
new VideoOrder("523","redis",32,5,new Date())
);
//map转换,来一个记录一个,方便后续统计
// DataStream<Tuple2<String,Integer>> mapDS = ds.map(new MapFunction<VideoOrder, Tuple2<String,Integer>>() {
// @Override
// public Tuple2<String, Integer> map(VideoOrder value) throws Exception {
// return new Tuple2<>(value.getTitle(),1);
// }
// });
//只是一对一记录而已,没必要使用flatMap
DataStream<Tuple2<String,Integer>> mapDS = ds.flatMap(new FlatMapFunction<VideoOrder, Tuple2<String,Integer>>() {
@Override
public void flatMap(VideoOrder value, Collector<Tuple2<String, Integer>> out) throws Exception {
out.collect(new Tuple2<>(value.getTitle(),1));
}
});
//key是返回的类型,value是分组key的类型; DataSet里面分组是groupBy, 流处理里面分组用 keyBy
KeyedStream<Tuple2<String,Integer>,String> keyByDS = mapDS.keyBy(new KeySelector<Tuple2<String,Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
});
//对各个组内的数据按照数量(value)进行聚合就是求sum, 1表示按照tuple中的索引为1的字段也就是按照数量进行聚合累加
DataStream<Tuple2<String, Integer>> sumDS = keyByDS.sum(1);
sumDS.print();
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build();
sumDS.addSink(new RedisSink<>(conf, new MyRedisSink()));
//DataStream需要调用execute,可以取个名称
env.execute("custom sink job");
}
愿景:"让编程不再难学,让技术与生活更加有趣"
更多架构课程请访问 xdclass.net
简介:Kafka相关运行环境说明和必备基础知识点
大家自行搭建Kafka环境,如果不会kafka的话看视频
Docker快速部署kafka
xxxxxxxxxx
#zk
docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper
#kafka
docker run -d --name xdclass_kafka \
-p 9092:9092 \
-e KAFKA_BROKER_ID=0 \
-e KAFKA_ZOOKEEPER_CONNECT=192.168.0.116:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.0.116:9092 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka
#进入容器内部,创建topic
docker exec -it 9b5070d79dfa /bin/bash
cd /opt/kafka
bin/kafka-topics.sh --create --zookeeper 192.168.0.116:2181 --replication-factor 1 --partitions 1 --topic xdclass-topic
#创建生产者发送消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic xdclass-topic
#运行一个消费者
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic xdclass-topic --from-beginning
特别强调,如果搭建失败或者kafka本身不会的,请看kafka专题视频,这个是必备!!!!
简介:Flink Source 读取消息队列Kafka连接器整合实战
之前自定义SourceFunction,Flink官方也有提供对接外部系统的,比如读取Kafka
flink官方提供的连接器
xxxxxxxxxx
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
编写代码
xxxxxxxxxx
public static void main(String[] args) throws Exception {
//构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties props = new Properties();
//kafka地址
props.setProperty("bootstrap.servers", "localhost:9092");
//组名
props.setProperty("group.id", "video-order-group");
//字符串序列化和反序列化规则
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//offset重置规则
props.setProperty("auto.offset.reset", "latest");
//自动提交
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "2000");
//有后台线程每隔10s检测一下Kafka的分区变化情况
props.setProperty("flink.partition-discovery.interval-millis","10000");
FlinkKafkaConsumer<String> consumer =new FlinkKafkaConsumer<>("xdclass-topic", new SimpleStringSchema(), props);
//设置从记录的消费者组内的offset开始消费
consumer.setStartFromGroupOffsets();
DataStream<String> ds = env.addSource(consumer);
ds.print();
//DataStream需要调用execute,可以取个名称
env.execute("custom source job");
}
简介:Flink Sink读取消息队列Kafka连接器整合实战
xxxxxxxxxx
public static void main(String[] args) throws Exception {
//构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties props = new Properties();
//kafka地址
props.setProperty("bootstrap.servers", "localhost:9092");
//组名
props.setProperty("group.id", "video-order-group");
//字符串序列化和反序列化规则
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//offset重置规则
props.setProperty("auto.offset.reset", "latest");
//自动提交
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "2000");
//有后台线程每隔10s检测一下Kafka的分区变化情况
props.setProperty("flink.partition-discovery.interval-millis","10000");
FlinkKafkaConsumer<String> consumer =new FlinkKafkaConsumer<>("xdclass-topic", new SimpleStringSchema(), props);
//设置从记录的消费者组内的offset开始消费, 如果没有记录从 auto.offset.reset 配置开始消费
consumer.setStartFromGroupOffsets();
DataStream<String> ds = env.addSource(consumer);
ds.print();
//处理,拼接字符串
DataStream<String> mapDS = ds.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
return "小滴课堂:"+value;
}
});
//输出
FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>("xdclass-order", new SimpleStringSchema(), props);
mapDS.addSink(kafkaSink);
env.execute();
//DataStream需要调用execute,可以取个名称
env.execute("custom source job");
}
xxxxxxxxxx
#创建生产者发送消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic xdclass-topic
#运行一个消费者
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic xdclass-order --from-beginning
愿景:"让编程不再难学,让技术与生活更加有趣"
更多架构课程请访问 xdclass.net
简介:Flink 里面的Map和FlatMap 算子实战
需求:多数算子,我们会用订单 转换-过滤-分组-统计 来实现
结果类型 idea自动提示
什么是java里面的Map操作
xxxxxxxxxx
DataStream<VideoOrder> ds = env.fromElements(
new VideoOrder("253","java",30,15,new Date()),
new VideoOrder("323","java",30,5,new Date()),
new VideoOrder("42","java",30,5,new Date()),
new VideoOrder("543","springboot",21,5,new Date()),
new VideoOrder("423","redis",40,5,new Date()),
new VideoOrder("15","redis",40,5,new Date()),
new VideoOrder("312","springcloud",521,5,new Date()),
new VideoOrder("125","kafka",1,55,new Date())
);
// map转换,来一个记录一个,方便后续统计
DataStream<Tuple2<String,Integer>> mapDS = ds.map(new MapFunction<VideoOrder, Tuple2<String,Integer>>() {
@Override
public Tuple2<String, Integer> map(VideoOrder value) throws Exception {
return new Tuple2<>(value.getTitle(),1);
}
});
mapDS.print();
什么是java里面的FlatMap操作
xxxxxxxxxx
//只是一对一记录而已,没必要使用flatMap
// FlatMapFunction<String, String>, key是输入类型,value是Collector响应的收集的类型,看源码注释,也是 DataStream<String>里面泛型类型
DataStream<Tuple2<String,Integer>> mapDS = ds.flatMap(new FlatMapFunction<VideoOrder, Tuple2<String,Integer>>() {
@Override
public void flatMap(VideoOrder value, Collector<Tuple2<String, Integer>> out) throws Exception {
out.collect(new Tuple2<>(value.getTitle(),1));
}
});
简介:Flink 里面的RichMap和RichFlatMap 算子实战
Rich相关的api更丰富,多了Open、Close方法,用于初始化连接等
RichXXX相关Open、Close、setRuntimeContext等 API方法会根据并行度进行操作的
xxxxxxxxxx
//构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<VideoOrder> ds = env.fromElements(
new VideoOrder("253","java",30,15,new Date()),
new VideoOrder("323","java",30,5,new Date()),
new VideoOrder("42","java",30,5,new Date()),
new VideoOrder("543","springboot",21,5,new Date()),
new VideoOrder("423","redis",40,5,new Date()),
new VideoOrder("15","redis",40,5,new Date()),
new VideoOrder("312","springcloud",521,5,new Date()),
new VideoOrder("125","kafka",1,55,new Date())
);
DataStream<Tuple2<String,Integer>> mapDS = ds.map(new RichMapFunction<VideoOrder, Tuple2<String, Integer>>() {
@Override
public void open(Configuration parameters) throws Exception {
System.out.println("open====");
}
@Override
public void close() throws Exception {
System.out.println("close====");
}
@Override
public Tuple2<String, Integer> map(VideoOrder value) throws Exception {
return new Tuple2<>(value.getTitle(),1);
}
});
DataStream<Tuple2<String,Integer>> flatMapDS = ds.flatMap(new RichFlatMapFunction<VideoOrder, Tuple2<String,Integer>>() {
@Override
public void open(Configuration parameters) throws Exception {
System.out.println("open====");
}
@Override
public void close() throws Exception {
System.out.println("close====");
}
@Override
public void flatMap(VideoOrder value, Collector<Tuple2<String, Integer>> out) throws Exception {
out.collect(new Tuple2<>(value.getTitle(),1));
}
});
简介:Flink 里面的KeyBy分组概念讲解+订单统计实战
KeyBy分组概念介绍
xxxxxxxxxx
KeyedStream<VideoOrder, String> videoOrderStringKeyedStream = filterDS.keyBy(new KeySelector<VideoOrder, String>() {
@Override
public String getKey(VideoOrder value) throws Exception {
return value.getTitle();
}
});
DataStream<VideoOrder> sumDS = videoOrderStringKeyedStream.sum("money");
sumDS.print();
常规的数据流转
注意: KeyBy后的聚合函数,只处理当前分组后组内的数据,不同组内数据互不影响
简介:Flink 里面的filter和sum 算子实战电商订单成交价统计
xxxxxxxxxx
public static void main(String[] args) throws Exception {
//构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
env.setParallelism(1);
DataStreamSource<VideoOrder> ds = env.addSource(new VideoOrderSourceV2());
DataStream<VideoOrder> sumDS = ds.filter(new FilterFunction<VideoOrder>() {
@Override
public boolean filter(VideoOrder value) throws Exception {
return value.getMoney() > 20;
}
}).keyBy(new KeySelector<VideoOrder, String>() {
@Override
public String getKey(VideoOrder value) throws Exception {
return value.getTitle();
}
}).sum("money");
sumDS.print();
//DataStream需要调用execute,可以取个名称
env.execute("map job");
}
简介: Flink 核心API Reduce聚合讲解和sum区别应用场景
reduce函数
xxxxxxxxxx
videoOrderStringKeyedStream.reduce(new ReduceFunction<VideoOrder>() {
@Override
public VideoOrder reduce(VideoOrder value1, VideoOrder value2) throws Exception {
VideoOrder videoOrder = new VideoOrder();
videoOrder.setTitle(value1.getTitle());
videoOrder.setMoney(value1.getMoney() + value2.getMoney());
return videoOrder;
}
});
reduce.print();
value1是历史对象,value2是加入统计的对象,所以value1.f1是历史值,value2.f1是新值,不断累加
sum区别
sum("xxx")使用的时候,如果是tuple元组则用序号,POJO则用属性名称
keyBy分组后聚合统计sum和reduce实现一样的效果
sum是简单聚合,reduce是可以自定义聚合,aggregate支持复杂的自定义聚合
简介: Flink 核心API maxBy-max-minBy-min 区别和应用
如果是用了keyby,在后续算子要用maxby,minby类型,才可以再分组里面找对应的数据
例子
xxxxxxxxxx
//并行度不为1才看得出效果
//env.setParallelism(1);
DataStream<VideoOrder> ds = env.fromElements(
new VideoOrder("1","java",31,15,new Date()),
new VideoOrder("2","java",32,45,new Date()),
new VideoOrder("3","java",33,52,new Date()),
new VideoOrder("4","springboot",21,5,new Date()),
new VideoOrder("5","redis",41,52,new Date()),
new VideoOrder("6","redis",40,15,new Date()),
new VideoOrder("7","kafka",1,55,new Date())
);
SingleOutputStreamOperator<VideoOrder> maxByDS = ds.keyBy(new KeySelector<VideoOrder, String>() {
@Override
public String getKey(VideoOrder value) throws Exception {
return value.getTitle();
}
}).maxBy("money");
SingleOutputStreamOperator<VideoOrder> maxDS = ds.keyBy(new KeySelector<VideoOrder, String>() {
@Override
public String getKey(VideoOrder value) throws Exception {
return value.getTitle();
}
}).max("money");
//maxByDS.print("maxByDS:");
maxDS.print("maxDS:");
愿景:"让编程不再难学,让技术与生活更加有趣"
更多架构课程请访问 xdclass.net
简介: Flink 核心Window窗口介绍和应用场景
先说下:本章内容很重要!!!!!!!!
文档地址
背景
分类
time Window 时间窗口,即按照一定的时间规则作为窗口统计
count Window 数量窗口,即按照一定的数据量作为窗口统计,相对少用
窗口属性
滑动窗口 Sliding Windows
滚动窗口 Tumbling Windows
窗口大小size 和 滑动间隔 slide
简介: Flink 核心-Window 窗口API和使用流程介绍
什么情况下才可以使用WindowAPI
注意
一个窗口内 的是左闭右开
countWindow没过期,但timeWindow在1.12过期,统一使用window;
窗口分配器 Window Assigners
窗口触发器 trigger
窗口 window function ,对窗口内的数据做啥?
定义了要对窗口中收集的数据做的计算操作
增量聚合函数
xxxxxxxxxx
aggregate(agg函数,WindowFunction(){ })
xxxxxxxxxx
AggregateFunction<IN, ACC, OUT>
IN是输入类型,ACC是中间聚合状态类型,OUT是输出类型,是聚合统计当前窗口的数据
全窗口函数
xxxxxxxxxx
apply(new processWindowFunction(){ })
xxxxxxxxxx
IN是输入类型,OUT是输出类型,KEY是分组类型,W是时间窗
WindowFunction<IN, OUT, KEY, W extends Window>
如果想处理每个元素更底层的API的时候用
xxxxxxxxxx
//对数据进行解析 ,process对每个元素进行处理,相当于 map+flatMap+filter
process(new KeyedProcessFunction(){processElement、onTimer})
简介: Tumbling-Window滚动时间窗介绍和案例实战
滚动窗口 Tumbling Windows
比如指定了一个5分钟大小的滚动窗口,无限流的数据会根据时间划分为[0:00, 0:05)、[0:05, 0:10)、[0:10, 0:15)等窗口
案例实战(可以单一个数据测试,后续再讲时间语义)
xxxxxxxxxx
//方便调试,POJO类增加对象产生信息
public String toString() {
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
ZoneId zoneId = ZoneId.systemDefault();
return "VideoOrder{" +
"tradeNo='" + tradeNo + '\'' +
", title='" + title + '\'' +
", money=" + money +
", userId=" + userId +
", createTime=" + formatter.format(createTime.toInstant().atZone(zoneId)) +
'}';
}
//VideoOrderSourceV2
System.out.println("产生:"+videoOrder.getTitle()+",价格:"+videoOrder.getMoney()+", 时间:"+videoOrder.getCreateTime());
env.setParallelism(1);
DataStream<VideoOrder> ds = env.addSource(new VideoOrderSourceV2());
SingleOutputStreamOperator<VideoOrder> sumDS = ds.keyBy(new KeySelector<VideoOrder, String>() {
@Override
public String getKey(VideoOrder value) throws Exception {
return value.getTitle();
}
}).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).sum("money");
sumDS.print();
简介: Sliding -Window滑动时间窗介绍和案例实战
滑动窗口 Sliding Windows
案例实战
xxxxxxxxxx
DataStream<VideoOrder> sumDS = keyByDS.window(SlidingProcessingTimeWindows.of(Time.seconds(20),Time.seconds(5))).sum("money");
简介:数量Count Window窗口介绍和案例实战
基于数量的滚动窗口, 滑动计数窗口
案例:
xxxxxxxxxx
//数据源 source
DataStream<VideoOrder> ds = env.addSource(new VideoOrderSourceV2());
KeyedStream<VideoOrder, String> keyByDS = ds.keyBy(new KeySelector<VideoOrder, String>() {
@Override
public String getKey(VideoOrder value) throws Exception {
return value.getTitle();
}
});
//分组后的组内数据超过5个则触发
//DataStream<VideoOrder> sumDS = keyByDS.countWindow(5).sum("money");
//分组后的组内数据超过3个则触发统计过去的5个数据
DataStream<VideoOrder> sumDS = keyByDS.countWindow(5,3).sum("money");
sumDS.print();
愿景:"让编程不再难学,让技术与生活更加有趣"
更多架构课程请访问 xdclass.net
简介: Flink里面你需要知道的AggregateFunction增量聚合函数知识
窗口 window function ,对窗口内的数据做啥?
定义了要对窗口中收集的数据做的计算操作
增量聚合函数
xxxxxxxxxx
aggregate(agg函数,WindowFunction(){ })
xxxxxxxxxx
AggregateFunction<IN, ACC, OUT>
IN是输入类型,ACC是中间聚合状态类型,OUT是输出类型,是聚合统计当前窗口的数据
案例实战-滚动时间窗
xxxxxxxxxx
SingleOutputStreamOperator<VideoOrder> aggDS = ds.keyBy(new KeySelector<VideoOrder, String>() {
@Override
public String getKey(VideoOrder value) throws Exception {
return value.getTitle();
}
}).window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.aggregate(new AggregateFunction<VideoOrder, VideoOrder, VideoOrder>() {
//累加器初始化
@Override
public VideoOrder createAccumulator() {
VideoOrder videoOrder = new VideoOrder();
return videoOrder;
}
//聚合方式
@Override
public VideoOrder add(VideoOrder value, VideoOrder accumulator) {
accumulator.setMoney(value.getMoney()+accumulator.getMoney());
accumulator.setTitle(value.getTitle());
if(accumulator.getCreateTime()==null){
accumulator.setCreateTime(value.getCreateTime());
}
return accumulator;
}
//获取结果
@Override
public VideoOrder getResult(VideoOrder accumulator) {
return accumulator;
}
//合并内容,一般不用
@Override
public VideoOrder merge(VideoOrder a, VideoOrder b) {
VideoOrder videoOrder = new VideoOrder();
videoOrder.setMoney(a.getMoney()+b.getMoney());
return videoOrder;
}
});
aggDS.print();
简介: Flink里面你需要知道的WindowFunction全窗口函数知识
全窗口函数
xxxxxxxxxx
apply(new WindowFunction(){ })
xxxxxxxxxx
IN是输入类型,OUT是输出类型,KEY是分组类型,W是时间窗
WindowFunction<IN, OUT, KEY, W extends Window>
案例实战
xxxxxxxxxx
SingleOutputStreamOperator<VideoOrder> aggDS = ds.keyBy(new KeySelector<VideoOrder, String>() {
@Override
public String getKey(VideoOrder value) throws Exception {
return value.getTitle();
}
}).window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5)))
.apply( new WindowFunction<VideoOrder, VideoOrder, String, TimeWindow>() {
@Override
public void apply(String key, TimeWindow window, Iterable<VideoOrder> input, Collector<VideoOrder> out) throws Exception {
List<VideoOrder> list = IteratorUtils.toList(input.iterator());
int total =list.stream().collect(Collectors.summingInt(VideoOrder::getMoney)).intValue();
VideoOrder videoOrder = new VideoOrder();
videoOrder.setMoney(total);
videoOrder.setCreateTime(list.get(0).getCreateTime());
videoOrder.setTitle(list.get(0).getTitle());
out.collect(videoOrder);
}
});
简介: Flink里面你需要知道的processWindowFunction全窗口函数知识
全窗口函数
xxxxxxxxxx
process(new ProcessWindowFunction(){})
xxxxxxxxxx
IN是输入类型,OUT是输出类型,KEY是分组类型,W是时间窗
ProcessWindowFunction<IN, OUT, KEY, W extends Window>
案例实战
xxxxxxxxxx
SingleOutputStreamOperator<VideoOrder> aggDS = ds.keyBy(new KeySelector<VideoOrder, String>() {
@Override
public String getKey(VideoOrder value) throws Exception {
return value.getTitle();
}
}).window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5)))
.process(new ProcessWindowFunction<VideoOrder, VideoOrder, String, TimeWindow>() {
@Override
public void process(String key, Context context, Iterable<VideoOrder> elements, Collector<VideoOrder> out) throws Exception {
List<VideoOrder> list = IteratorUtils.toList(elements.iterator());
int total =list.stream().collect(Collectors.summingInt(VideoOrder::getMoney)).intValue();
VideoOrder videoOrder = new VideoOrder();
videoOrder.setMoney(total);
videoOrder.setTitle(list.get(0).getTitle());
videoOrder.setCreateTime(list.get(0).getCreateTime());
out.collect(videoOrder);
}
});
窗口函数对比
增量聚合
xxxxxxxxxx
aggregate(new AggregateFunction(){});
全窗口聚合
xxxxxxxxxx
apply(new WindowFunction(){})
xxxxxxxxxx
process(new ProcessWindowFunction(){}) //比上面这个强
愿景:"让编程不再难学,让技术与生活更加有趣"
更多架构课程请访问 xdclass.net
简介: Flink的多种时间概念介绍和应用场景
背景
Flink里面时间分类
事件时间EventTime(重点关注)
进入时间 IngestionTime
处理时间ProcessingTime
事件被flink处理的时间
指正在执行相应操作的机器的系统时间
是最简单的时间概念,不需要流和机器之间的协调,它提供最佳性能和最低延迟
但是在分布式和异步环境中,处理时间有不确定性,存在延迟或乱序问题
大家的疑惑
举个例子
小滴课堂-老王 做了一个电商平台买 "超短男装衣服",如果要统计10分钟内成交额,你认为是哪个时间比较好?
简介: Flink乱序延迟时间处理-Watermark讲解
背景
一般我们都是用EventTime事件时间进行处理统计数据
但数据由于网络问题延迟、乱序到达会导致窗口计算数据不准确
需求:比如时间窗是 [12:01:01,12:01:10 ) ,但是有数据延迟到达
生活中的例子
小滴课堂:每天10点后就是迟到,需要扣工资
老王上班 路途遥远(延迟) 经常迟到
超过5分钟就不用来了吗?还是要来的继续工作的,不然今天上午工资就没了
那如果迟到30分钟呢? 也要来的,不然就容易产生更大的问题,缺勤开除。。。。
Watermark 水位线介绍
由flink的某个operator操作生成后,就在整个程序中随event数据流转
衡量数据是否乱序的时间,什么时候不用等早之前的数据
是一个全局时间戳,不是某一个key下的值
是一个特殊字段,单调递增的方式,主要是和数据本身的时间戳做比较
用来确定什么时候不再等待更早的数据了,可以触发窗口进行计算,忍耐是有限度的,给迟到的数据一些机会
注意
窗口触发计算的时机
watermark之前是按照窗口的关闭时间点计算的 [12:01:01,12:01:10 )
watermark之后,触发计算的时机
触发计算后,其他窗口内数据再到达也被丢弃
Watermaker = 当前计算窗口最大的事件时间 - 允许乱序延迟的时间
简介: Flink乱序延迟时间处理-Watermark讲解
案例
window大小为10s,窗口是W1 [23:12:00~23:12:10) 、 W2[23:12:10~23:12:20)
没加入watermark,由上到下进入flink
加入watermark, 允许5秒延迟乱序,由上到下进入flink
数据A到达
数据B到达
数据C到达
数据D到达
数据E到达
Watermaker 计算 = 当前计算窗口最大的事件时间 - 允许乱序延迟的时间
什么时候触发W1窗口计算
Watermaker >= Window EndTime窗口结束时间
当前计算窗口最大的事件时间 - 允许乱序延迟的时间 >= Window EndTime窗口结束时间
简介: Flink重点组合-Watermark+Window编码实战-分类统计电商订单成交额
需求
xxxxxxxxxx
/**
* date 转 字符串
*
* @param time
* @return
*/
public static String format(long timestamp) {
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
ZoneId zoneId = ZoneId.systemDefault();
String timeStr = formatter.format(new Date(timestamp).toInstant().atZone(zoneId));
return timeStr;
}
/**
* 字符串 转 date
*
* @param time
* @return
*/
public static Date strToDate(String time) {
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
LocalDateTime localDateTime = LocalDateTime.parse(time, formatter);
return Date.from(localDateTime.atZone(ZoneId.systemDefault()).toInstant());
}
xxxxxxxxxx
/构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<String> ds = env.socketTextStream("127.0.0.1",8888);
DataStream<Tuple3<String, String,Integer>> flatMapDS = ds.flatMap(new FlatMapFunction<String, Tuple3<String, String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple3<String, String,Integer>> out) throws Exception {
String[] arr = value.split(",");
out.collect(Tuple3.of(arr[0], arr[1],Integer.parseInt(arr[2])));
}
});
SingleOutputStreamOperator<Tuple3<String, String,Integer>> watermakerDS = flatMapDS.assignTimestampsAndWatermarks(WatermarkStrategy
//指定最大允许的延迟/乱序 时间
.<Tuple3<String, String,Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner(
(event, timestamp) -> {
//指定POJO的事件时间列
return TimeUtil.strToDate(event.f1).getTime();
}
));
SingleOutputStreamOperator<String> sumDS = watermakerDS.keyBy(new KeySelector<Tuple3<String, String,Integer>, String>() {
@Override
public String getKey(Tuple3<String, String,Integer> value) throws Exception {
return value.f0;
}
}).window(TumblingEventTimeWindows.of(Time.seconds(10))).apply(new WindowFunction<Tuple3<String, String,Integer>, String, String, TimeWindow>() {
@Override
public void apply(String key, TimeWindow window, Iterable<Tuple3<String, String,Integer>> input, Collector<String> out) throws Exception {
//存放窗口的数据的事件时间
List<String> eventTimeList = new ArrayList<>();
int total = 0;
for (Tuple3<String, String,Integer> order : input) {
eventTimeList.add(order.f1);
total = total+order.f2;
}
String outStr = String.format("分组key:%s,聚合值:%s,窗口开始结束:[%s~%s),窗口所有事件时间:%s", key,total, TimeUtil.format(window.getStart()),TimeUtil.format(window.getEnd()), eventTimeList);
out.collect(outStr);
}
});
sumDS.print();
env.execute("watermark job");
简介: Flink实战-分类统计电商订单成交额-SocketStream数据测试实战
测试数据
窗口 [23:12:00 ~ 23:12:10) | [23:12:10 ~ 23:12:20)
触发窗口计算条件
xxxxxxxxxx
java,2022-11-11 23:12:07,10
java,2022-11-11 23:12:11,10
java,2022-11-11 23:12:08,10
mysql,2022-11-11 23:12:13,10
java,2022-11-11 23:12:13,10
java,2022-11-11 23:12:17,10
java,2022-11-11 23:12:09,10
java,2022-11-11 23:12:20,10
java,2022-11-11 23:12:22,10
java,2022-11-11 23:12:23,10
简介: Flink 二次兜底延迟数据处理 allowedLateness 更新数据实战
背景
xxxxxxxxxx
//分组 开窗
SingleOutputStreamOperator<String> sumDS = watermarkDS.keyBy(new KeySelector<Tuple3<String, String, Integer>, String>() {
@Override
public String getKey(Tuple3<String, String, Integer> value) throws Exception {
return value.f0;
}
})
//开窗
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
//允许 1分钟
.allowedLateness(Time.minutes(1))
//聚合, 方便调试拿到窗口全部数据,全窗口函数
.apply();
sumDS.print();
xxxxxxxxxx
java,2022-11-11 23:12:07,10
java,2022-11-11 23:12:11,10
java,2022-11-11 23:12:08,10
java,2022-11-11 23:12:13,10
java,2022-11-11 23:12:23,10
#延迟1分钟内,所以会输出
java,2022-11-11 23:12:09,10
java,2022-11-11 23:12:02,10
java,2022-11-11 23:14:30,10
#延迟超过1分钟,不会输出
java,2022-11-11 23:12:03,10
简介: Flink 最后的兜底延迟数据处理 测输出流实战
背景
编码
xxxxxxxxxx
OutputTag<Tuple3<String, String,Integer>> lateData = new OutputTag<Tuple3<String, String,Integer>>("lateData"){};
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
//允许 1分钟
.allowedLateness(Time.minutes(1))
//最后的兜底容忍
.sideOutputLateData(lateData)
//不会更新之前的窗口数据,需要代码单独写逻辑处理更新之前的数据,也可以积累后批处理
sumDS.getSideOutput(lateData).print("late data");
测试数据
窗口 [23:12:00 ~ 23:12:10) | [23:12:10 ~ 23: 12:20)
触发窗口计算条件
xxxxxxxxxx
java,2022-11-11 23:12:07,10
java,2022-11-11 23:12:11,10
java,2022-11-11 23:12:08,10
java,2022-11-11 23:12:13,10
java,2022-11-11 23:12:23,10
#延迟1分钟内,所以会输出
java,2022-11-11 23:12:09,10
java,2022-11-11 23:12:02,10
java,2022-11-11 23:14:30,10
#延迟超过1分钟,不会输出,配置了sideOutPut,会在兜底输出
java,2022-11-11 23:12:03,10
java,2022-11-11 23:12:04,10
简介: Flink乱序延迟时间处理-多层保证措施介绍和归纳
面试题:如何保证在需要的窗口内获得指定的数据?数据有乱序延迟
flink采用watermark 、allowedLateness() 、sideOutputLateData()三个机制来保证获取数据
watermark的作用是防止数据出现延迟乱序,允许等待一会再触发窗口计算,提前输出
allowLateness,是将窗口关闭时间再延迟一段时间.设置后就像window变大了
sideOutPut是最后兜底操作,超过allowLateness后,窗口已经彻底关闭了,就会把数据放到侧输出流
应用场景:实时监控平台
总结Flink的机制
第一层 窗口window 的作用是从DataStream数据流里指定范围获取数据。
第二层 watermark的作用是防止数据出现乱序延迟,允许窗口等待延迟数据达到,再触发计算
第三层 allowLateness 会让窗口关闭时间再延迟一段时间, 如果还有数据达到,会局部修复数据并主动更新窗口的数据输出
第四层 sideOutPut侧输出流是最后兜底操作,在窗口已经彻底关闭后,所有过期延迟数据放到侧输出流,可以单独获取,存储到某个地方再批量更新之前的聚合的数据
注意
xxxxxxxxxx
新接口,`WatermarkStrategy`,`TimestampAssigner` 和 `WatermarkGenerator` 因为其对时间戳和 watermark 等重点的抽象和分离很清晰,并且还统一了周期性和标记形式的 watermark 生成方式
新接口之前是用AssignerWithPeriodicWatermarks和AssignerWithPunctuatedWatermarks ,现在可以弃用了
愿景:"让编程不再难学,让技术与生活更加有趣"
更多架构课程请访问 xdclass.net
简介: Flink的状态State介绍和应用场景解析
什么是State状态
有状态和无状态介绍
状态管理分类
ManagedState(用的多)
Flink管理,自动存储恢复
细分两类
Keyed State 键控状态(用的多)
Operator State 算子状态(用的少,部分source会用)
RawState(用的少)
State数据结构(状态值可能存在内存、磁盘、DB或者其他分布式存储中)
ValueState 简单的存储一个值(ThreadLocal / String)
ListState 列表
MapState 映射类型
简介: Flink 1.13 的状态State后端存储讲解
xxxxxxxxxx
从Flink 1.13开始,社区重新设计了其公共状态后端类,以帮助用户更好地理解本地状态存储和检查点存储的分离
用户可以迁移现有应用程序以使用新 API,而不会丢失任何状态或一致性
文档:https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/state_backends/
State状态后端:存储在哪里
Flink 内置了以下这些开箱即用的 state backends :
(新版)HashMapStateBackend、EmbeddedRocksDBStateBackend
(旧版)MemoryStateBackend、FsStateBackend、RocksDBStateBackend
状态详解
HashMapStateBackend 保存数据在内部作为Java堆的对象。
键/值状态和窗口操作符持有哈希表,用于存储值、触发器等
非常快,因为每个状态访问和更新都对 Java 堆上的对象进行操作
但是状态大小受集群内可用内存的限制
场景:
EmbeddedRocksDBStateBackend 在RocksDB数据库中保存状态数据
该数据库(默认)存储在 TaskManager 本地数据目录中
与HashMapStateBackend在java存储 对象不同,数据存储为序列化的字节数组
RocksDB可以根据可用磁盘空间进行扩展,并且是唯一支持增量快照的状态后端。
但是每个状态访问和更新都需要(反)序列化并可能从磁盘读取,这导致平均性能比内存状态后端慢一个数量级
场景
旧版
xxxxxxxxxx
MemoryStateBackend(内存,不推荐在生产场景使用)
FsStateBackend(文件系统上,本地文件系统、HDFS, 性能更好,常用)
RocksDBStateBackend (无需担心 OOM 风险,是大部分时候的选择)
配置
flink-conf.yaml
使用配置键在 中配置默认状态后端state.backend
。xxxxxxxxxx
配置条目的可能值是hashmap (HashMapStateBackend)、rocksdb (EmbeddedRocksDBStateBackend)
或实现状态后端工厂StateBackendFactory的类的完全限定类名
#全局配置例子一
# The backend that will be used to store operator state checkpoints
state.backend: hashmap
# Optional, Flink will automatically default to JobManagerCheckpointStorage
# when no checkpoint directory is specified.
state.checkpoint-storage: jobmanager
#全局配置例子二
state.backend: rocksdb
state.checkpoints.dir: file:///checkpoint-dir/
# Optional, Flink will automatically default to FileSystemCheckpointStorage
# when a checkpoint directory is specified.
state.checkpoint-storage: filesystem
xxxxxxxxxx
//代码配置一
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage());
//代码配置二
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new EmbeddedRocksDBStateBackend());
env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir");
//或者
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///checkpoint-dir"));
xxxxxxxxxx
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_${scala.version}</artifactId>
<version>1.13.1</version>
</dependency>
简介: Flink的状态State管理实战-订单数据统计实现MaxBy操作
sum()、maxBy() 等函数底层源码也是有ValueState进行状态存储
需求:
编码实战
xxxxxxxxxx
/**
* 使用valueState实现maxBy功能,统计分组内订单金额最高的订单
*
* @param args
*/
public static void main(String[] args) throws Exception {
//构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<String> ds = env.socketTextStream("127.0.0.1", 8888);
DataStream<Tuple3<String, String, Integer>> flatMapDS = ds.flatMap(new RichFlatMapFunction<String, Tuple3<String, String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple3<String, String, Integer>> out) throws Exception {
String[] arr = value.split(",");
out.collect(Tuple3.of(arr[0], arr[1], Integer.parseInt(arr[2])));
}
});
//一定要key by后才可以使用键控状态ValueState
SingleOutputStreamOperator<Tuple2<String, Integer>> maxVideoOrder = flatMapDS.keyBy(new KeySelector<Tuple3<String,String,Integer>, String>() {
@Override
public String getKey(Tuple3<String, String, Integer> value) throws Exception {
return value.f0;
}
}).map(new RichMapFunction<Tuple3<String, String, Integer>, Tuple2<String, Integer>>() {
private ValueState<Integer> valueState = null;
@Override
public void open(Configuration parameters) throws Exception {
valueState = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("total", Integer.class));
}
@Override
public Tuple2<String, Integer> map(Tuple3<String, String, Integer> tuple3) throws Exception {
// 取出State中的最大值
Integer stateMaxValue = valueState.value();
Integer currentValue = tuple3.f2;
if (stateMaxValue == null || currentValue > stateMaxValue) {
//更新状态,把当前的作为新的最大值存到状态中
valueState.update(currentValue);
return Tuple2.of(tuple3.f0, currentValue);
} else {
//历史值更大
return Tuple2.of(tuple3.f0, stateMaxValue);
}
}
});
maxVideoOrder.print();
env.execute("valueState job");
}
简介:Flink的Checkpoint-SavePoint和端到端状态一致性介绍
什么是Checkpoint 检查点
开箱即用,Flink 捆绑了这些检查点存储类型:
配置
xxxxxxxxxx
//全局配置checkpoints
state.checkpoints.dir: hdfs:///checkpoints/
//作业单独配置checkpoints
env.getCheckpointConfig().setCheckpointStorage("hdfs:///checkpoints-data/");
xxxxxxxxxx
//全局配置savepoint
state.savepoints.dir: hdfs:///flink/savepoints
Savepoint 与 Checkpoint 的不同之处
端到端(end-to-end)状态一致性
xxxxxxxxxx
数据一致性保证都是由流处理器实现的,也就是说都是在Flink流处理器内部保证的
在真实应用中,了流处理器以外还包含了数据源(例如Kafka、Mysql)和输出到持久化系统(Kafka、Mysql、Hbase、CK)
端到端的一致性保证,是意味着结果的正确性贯穿了整个流处理应用的各个环节,每一个组件都要保证自己的一致性。
Source
内部
Sink:
简介:Flink的Checkpoint代码配置案例实战
xxxxxxxxxx
//两个检查点之间间隔时间,默认是0,单位毫秒
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
//Checkpoint过程中出现错误,是否让整体任务都失败,默认值为0,表示不容忍任何Checkpoint失败
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);
//Checkpoint是进行失败恢复,当一个 Flink 应用程序失败终止、人为取消等时,它的 Checkpoint 就会被清除
//可以配置不同策略进行操作
// DELETE_ON_CANCELLATION: 当作业取消时,Checkpoint 状态信息会被删除,因此取消任务后,不能从 Checkpoint 位置进行恢复任务
// RETAIN_ON_CANCELLATION(多): 当作业手动取消时,将会保留作业的 Checkpoint 状态信息,要手动清除该作业的 Checkpoint 状态信息
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//Flink 默认提供 Extractly-Once 保证 State 的一致性,还提供了 Extractly-Once,At-Least-Once 两种模式,
// 设置checkpoint的模式为EXACTLY_ONCE,也是默认的,
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//设置checkpoint的超时时间, 如果规定时间没完成则放弃,默认是10分钟
env.getCheckpointConfig().setCheckpointTimeout(60000);
//设置同一时刻有多少个checkpoint可以同时执行,默认为1就行,以避免占用太多正常数据处理资源
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
//设置了重启策略, 作业在失败后能自动恢复,失败后最多重启3次,每次重启间隔10s
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000));
愿景:"让编程不再难学,让技术与生活更加有趣"
更多架构课程请访问 xdclass.net
简介:Flink的复杂事件处理 CEP介绍和应用场景
什么是FlinkCEP
CEP全称 Complex event processing 复杂事件处理
FlinkCEP 是在 Flink 之上实现的复杂事件处理(CEP)库
擅长高吞吐、低延迟的处理,市场上有多种CEP的解决方案,例如Spark,但是Flink专门类库更方便使用
地址:https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/libs/cep/
用途
使用流程
xxxxxxxxxx
DataStream<Event> input = ...
Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(
new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) {
return event.getId() == 42;
}
}
).next("middle").subtype(SubEvent.class).where(
new SimpleCondition<SubEvent>() {
@Override
public boolean filter(SubEvent subEvent) {
return subEvent.getVolume() >= 10.0;
}
}
).followedBy("end").where(
new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) {
return event.getName().equals("end");
}
}
);
PatternStream<Event> patternStream = CEP.pattern(input, pattern);
DataStream<Alert> result = patternStream.process(
new PatternProcessFunction<Event, Alert>() {
@Override
public void processMatch(
Map<String, List<Event>> pattern,
Context ctx,
Collector<Alert> out) throws Exception {
out.collect(createAlertFrom(pattern));
}
});
xxxxxxxxxx
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
简介:Flink的复杂事件处理 CEP常见概念解析
概念
模式(Pattern):定义处理事件的规则
三种模式PatternAPI
近邻模式
模式分类
其他参数
简介:Flink的复杂事件处理 CEP案例实战-账号登录风控检测《上》
需求
编码
xxxxxxxxxx
public static void main(String[] args) throws Exception {
//构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<String> ds = env.socketTextStream("127.0.0.1", 8888);
DataStream<Tuple3<String, String, Integer>> flatMapDS = ds.flatMap(new FlatMapFunction<String, Tuple3<String, String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple3<String, String, Integer>> out) throws Exception {
String[] arr = value.split(",");
out.collect(Tuple3.of(arr[0], arr[1], Integer.parseInt(arr[2])));
}
});
SingleOutputStreamOperator<Tuple3<String, String, Integer>> watermakerDS = flatMapDS.assignTimestampsAndWatermarks(WatermarkStrategy
//延迟策略去掉了延迟时间,时间是单调递增,event中的时间戳充当了水印
.<Tuple3<String, String, Integer>>forMonotonousTimestamps()
//生成一个延迟3s的固定水印
//.<Tuple3<String, String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner(
(event, timestamp) -> {
//指定POJO的事件时间列
return TimeUtil.strToDate(event.f1).getTime();
}
));
KeyedStream<Tuple3<String, String, Integer>, String> keyedStream = watermakerDS.keyBy(new KeySelector<Tuple3<String, String, Integer>, String>() {
@Override
public String getKey(Tuple3<String, String, Integer> value) throws Exception {
return value.f0;
}
});
//定义模式
Pattern<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>> pattern = Pattern
.<Tuple3<String, String, Integer>>begin("firstTimeLogin")
.where(new SimpleCondition<Tuple3<String, String, Integer>>() {
@Override
public boolean filter(Tuple3<String, String, Integer> value) throws Exception {
// -1 是登录失败错误码
return value.f2 == -1;
}
})//.times(2).within(Time.seconds(10));//不是严格近邻
.next("secondTimeLogin")
.where(new SimpleCondition<Tuple3<String, String, Integer>>() {
@Override
public boolean filter(Tuple3<String, String, Integer> value) throws Exception {
return value.f2 == -1;
}
}).within(Time.seconds(5));
//匹配检查
PatternStream<Tuple3<String, String, Integer>> patternStream = CEP.pattern(keyedStream, pattern);
SingleOutputStreamOperator<Tuple3<String, String,String>> select = patternStream.select(new PatternSelectFunction<Tuple3<String, String, Integer>, Tuple3<String, String,String>>() {
@Override
public Tuple3<String, String,String> select(Map<String, List<Tuple3<String, String, Integer>>> map) throws Exception {
Tuple3<String, String, Integer> firstLoginFail = map.get("firstTimeLogin").get(0);
Tuple3<String, String, Integer> secondLoginFail = map.get("secondTimeLogin").get(0);
return Tuple3.of(firstLoginFail.f0,firstLoginFail.f1,secondLoginFail.f1);
}
});
select.print("匹配结果");
env.execute("CEP job");
}
简介:Flink的复杂事件处理 CEP案例实战-账号登录风控检测《下》
xxxxxxxxxx
小D,2022-11-11 12:01:01,-1
老王,2022-11-11 12:01:10,-1
老王,2022-11-11 12:01:11,-1
小D,2022-11-11 12:01:13,-1
老王,2022-11-11 12:01:14,-1
老王,2022-11-11 12:01:15,1
小D,2022-11-11 12:01:16,-1
老王,2022-11-11 12:01:17,-1
小D,2022-11-11 12:01:20,1
愿景:"让编程不再难学,让技术与生活更加有趣"
更多架构课程请访问 xdclass.net
简介:Flink服务端部署多种部署模式介绍
项目都IDEA里面开发,生产环境怎么办?
Flink 部署方式是灵活,主要是对Flink计算时所需资源的管理方式不同
文档:https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/deployment/overview/
Local 本地部署,直接启动进程,适合调试使用
Standalone Cluster集群部署,flink自带集群模式
Hadoop YARN 计算资源统一由Hadoop YARN管理资源进行调度,按需使用提高集群的资源利用率
Kubernetes 部署
Docker部署
简介:Flink部署云服务器介绍和相关环境准备说明
云厂商
阿里云新用户地址(如果地址失效,联系我或者客服即可,1折)
联系客服免费领取【热门IT环境搭建视频】
阿里云服务器选购+阿里云控制台界面 课程 服务器:Linux Centos7.X
https://detail.tmall.com/item.htm?id=650488560239
环境问题说明
必备环境
xxxxxxxxxx
Please specify JAVA_HOME. Either in Flink config ./conf/flink-conf.yaml or as system-wide JAVA_HOME.
简介:新版Flink下载和Local本地模式部署Linux服务器
Flink下载地址
步骤
解压 tar -zxvf
目录介绍
conf
xxxxxxxxxx
#web ui 端口
rest.port=8081
#调整,我的机器是2、4、8g都有的,大家最好买4g或者8g
jobmanager.memory.process.size: 1000m
taskmanager.memory.process.size: 1000m
bin
example
启动 bin/start-cluster.sh
停止 bin/stop-cluster.sh
查看进程 jps
网络安全组或者防火墙开放端口 8081
简介:Flink的local模式运行官方案例操作详解
flink测试案例
xxxxxxxxxx
cd /usr/local/software/flink/examples/source
vim xdclass_source.txt
java xdclass
springboot springcloud
html flink
springboot redis
java flink
kafka flink
java springboot
xxxxxxxxxx
./flink run /usr/local/software/flink/examples/batch/WordCount.jar --input /usr/local/software/flink/examples/source/xdclass_source.txt --output /usr/local/software/flink/examples/source/xdclass_result.txt
简介:maven常用插件介绍和本地Flink项目打包
xxxxxxxxxx
<build>
<finalName>xdclass-flink</finalName>
<plugins>
<!--默认编译版本比较低,所以用compiler插件,指定项目源码的jdk版本,编译后的jdk版本和编码,-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
<encoding>${file.encoding}</encoding>
</configuration>
</plugin>
<!-- 添加依赖到jar包 -->
<!--<plugin>-->
<!--<artifactId>maven-assembly-plugin</artifactId>-->
<!--<configuration>-->
<!--<descriptorRefs>-->
<!--<descriptorRef>jar-with-dependencies</descriptorRef>-->
<!--</descriptorRefs>-->
<!--</configuration>-->
<!--<executions>-->
<!--<execution>-->
<!--<id>make-assembly</id>-->
<!--<phase>package</phase>-->
<!--<goals>-->
<!--<goal>single</goal>-->
<!--</goals>-->
<!--</execution>-->
<!--</executions>-->
<!--</plugin>-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
大家的疑惑
打包插件
测试插件
简介:通过WebUI部署Flink项目到阿里云Linux运行
访问WebUI
上传jar包
Task Solt 是指taskmanager的并发执行能力,parallelism是指taskmanager实际使用的并发能力
xxxxxxxxxx
taskmanager.numberOfTaskSlots:4
假如每一个taskmanager中的分配4个TaskSlot,
那有3个taskmanager一共有12个TaskSlot
测试数据
xxxxxxxxxx
AA,2022-11-11 12:01:01,-1
BB,2022-11-11 12:01:02,1
AA,2022-11-11 12:01:04,-1
AA,2022-11-11 12:01:05,-1
并行度和solt的疑惑
愿景:"让编程不再难学,让技术与生活更加有趣"
更多架构课程请访问 xdclass.net
简介:Linux云服务器安装docker和DockerCompose环境说明
联系客服免费领取【热门IT环境搭建视频】
阿里云服务器选购+阿里云控制台界面 课程 服务器:Linux Centos7.X
https://detail.tmall.com/item.htm?id=650488560239
xxxxxxxxxx
yum install docker-io -y
systemctl start docker
xxxxxxxxxx
systemctl start docker #运行Docker守护进程
systemctl stop docker #停止Docker守护进程
systemctl restart docker #重启Docker守护进程
xxxxxxxxxx
vim /etc/docker/daemon.json
#改为下面内容,然后重启docker
{
"debug":true,"experimental":true,
"registry-mirrors":["https://pb5bklzr.mirror.aliyuncs.com","https://hub-mirror.c.163.com","https://docker.mirrors.ustc.edu.cn"]
}
#查看信息
docker info
什么是Docker Compose容器编排
可以轻松、高效的管理容器,它是一个用于定义和运行多容器 Docker 的应用程序工具
属于容器编排工具,可以配置并启动多个容器,适合复杂业务场景
场景
问题
常规的部署只是Docker的冰山一角,课程快速入门,如果想深入可以看我们的Docker专题【购买,便宜】
简介:Flink多节点集群部署模式介绍延伸
前言知识回顾
每个 Flink 集群的作业里,都是有客户端在运行,主要是获取 Flink 应用程序的代码,将其转换为 JobGraph 并提交给 JobManager
JobManager 将工作分配到 TaskManagers 上,在那里运行实际的操作符(例如源、转换和接收器)
客户端是啥?
Local 本地部署,直接启动进程,适合调试使用
Standalone Cluster集群部署,flink自带集群模式
Hadoop YARN 计算资源统一由Hadoop YARN管理资源进行调度,按需使用提高集群的资源利用率
Yarn集群三种模式介绍,
Session模式
Per-Job模式(Docker-Compose不支持)
Application模式(Flink 1.11版本中)
注意:
HA模式
官方文档
简介:DockerCompose容器化部署Flink 集群-Session Cluster模式
机器和配置准备
创建docker-compose.yml 文件 Session Cluster模式
xxxxxxxxxx
version: "3.7"
services:
jobmanager:
image: flink:scala_2.12-java8
ports:
- "8081:8081"
command: jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager:
image: flink:scala_2.12-java8
depends_on:
- jobmanager
command: taskmanager
scale: 3
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 2
xxxxxxxxxx
version: "3.7"
services:
flink-jobmanager-01:
image: flink:scala_2.12-java8
container_name: flink-jobmanager-01
hostname: flink-jobmanager-01
expose:
- "6123"
ports:
- "8081:8081"
command: jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=flink-jobmanager-01
flink-taskmanager-01:
image: flink:scala_2.12-java8
container_name: flink-taskmanager-01
hostname: flink-taskmanager-01
expose:
- "6121"
- "6122"
depends_on:
- flink-jobmanager-01
command: taskmanager
links:
- "flink-jobmanager-01:jobmanager"
environment:
- JOB_MANAGER_RPC_ADDRESS=flink-jobmanager-01
flink-taskmanager-02:
image: flink:scala_2.12-java8
container_name: flink-taskmanager-02
hostname: flink-taskmanager-02
expose:
- "6121"
- "6122"
depends_on:
- flink-jobmanager-01
command: taskmanager
links:
- "flink-jobmanager-01:jobmanager"
environment:
- JOB_MANAGER_RPC_ADDRESS=flink-jobmanager-01
xxxxxxxxxx
The Web Client is on port 8081
JobManager RPC port 6123
TaskManagers RPC port 6122
TaskManagers Data port 6121
注意:
expose暴露容器给link到当前容器的容器
ports是暴露容器端口到宿主机端口进行映射
问题
愿景:"让编程不再难学,让技术与生活更加有趣"
更多架构课程请访问 xdclass.net
简介:新版Flink综合案例实战-实时监控告警需求说明
案例说明
综合案例练习前面学的知识点
如果需要大型项目看 海量数据处理项目大课训练营(联系客服)
需求:公司开发一个监控告警平台(类似nginx访问日志)
日志来源
xxxxxxxxxx
@Data
@AllArgsConstructor
@NoArgsConstructor
public class AccessLogDO {
private String title;
private String url;
private String method;
private Integer httpCode;
private String body;
private Date createTime;
private String userId;
private String city;
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ResultCount {
private String url;
private Integer code;
private Long count;
private String startTime;
private String endTime;
private String type;
}
简介:新版Flink综合案例实战-自定义Source模拟日志来源
xxxxxxxxxx
public class AccessLogSource extends RichParallelSourceFunction<AccessLogDO> {
private volatile Boolean flag = true;
private Random random = new Random();
//接口
private static List<AccessLogDO> urlList = new ArrayList<>();
static {
urlList.add(new AccessLogDO("首页","/pub/api/v1/web/index_card","GET",200,"",new Date(),"",""));
urlList.add(new AccessLogDO("个人信息","/pub/api/v1/web/user_info","GET",200,"",new Date(),"",""));
// urlList.add(new AccessLogDO("分类列表","/pub/api/v1/web/all_category","GET",200,"",new Date(),"",""));
// urlList.add(new AccessLogDO("分页视频","/pub/api/v1/web/page_video","GET",200,"",new Date(),"",""));
// urlList.add(new AccessLogDO("收藏","/user/api/v1/favorite/save","POST",200,"",new Date(),"",""));
// urlList.add(new AccessLogDO("下单","/user/api/v1/product/order/save","POST",200,"",new Date(),"",""));
//urlList.add(new AccessLogDO("异常url","","POST",200,"",new Date(),"",""));
}
//状态码
private static List<Integer> codeList = new ArrayList<>();
static {
codeList.add(200);
codeList.add(200);
codeList.add(502);
codeList.add(403);
}
@Override
public void run(SourceContext<AccessLogDO> ctx) throws Exception {
while (flag){
Thread.sleep(1000);
int userId = random.nextInt(50);
int httpCodeNum = random.nextInt(codeList.size());
int accessLogNum = random.nextInt(urlList.size());
AccessLogDO accessLogDO = urlList.get(accessLogNum);
accessLogDO.setHttpCode(codeList.get(httpCodeNum));
accessLogDO.setUserId(userId+"");
//模拟迟到数据,100秒波动
//long timestamp = System.currentTimeMillis() - random.nextInt(100000);
long timestamp = System.currentTimeMillis() - random.nextInt(5000);
accessLogDO.setCreateTime(new Date(timestamp));
System.out.println("产生:"+accessLogDO.getTitle()+",状态:"+accessLogDO.getHttpCode()+", 时间:"+ TimeUtil.format(accessLogDO.getCreateTime()));
ctx.collect(accessLogDO);
}
}
@Override
public void cancel() {
flag = false;
}
}
简介:新版Flink综合案例实战-过滤-分组-开窗-分配Watermark
xxxxxxxxxx
public static void main(String[] args) throws Exception {
//构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//java,2022-11-11 09-10-10,15
DataStream<AccessLogDO> ds = env.addSource(new AccessLogSource());
SingleOutputStreamOperator<AccessLogDO> filterDS = ds.filter(new FilterFunction<AccessLogDO>() {
@Override
public boolean filter(AccessLogDO value) throws Exception {
return StringUtils.isNotBlank(value.getUrl());
}
});
//指定watermark
SingleOutputStreamOperator<AccessLogDO> watermarkDS = filterDS.assignTimestampsAndWatermarks(WatermarkStrategy
//指定允许乱序延迟的最大时间 3 秒
.<AccessLogDO>forBoundedOutOfOrderness(Duration.ofSeconds(3))
//指定POJO事件时间列,毫秒
.withTimestampAssigner((event, timestamp) -> event.getCreateTime().getTime()));
//最后的兜底数据
OutputTag<AccessLogDO> lateData = new OutputTag<AccessLogDO>("lateDataLog"){};
//分组
KeyedStream<AccessLogDO, String> keyedStream = watermarkDS.keyBy(new KeySelector<AccessLogDO, String>() {
@Override
public String getKey(AccessLogDO value) throws Exception {
return value.getUrl();
}
});
SingleOutputStreamOperator<ResultCount> tenMinPV = keyedStream
//开窗
.window(SlidingEventTimeWindows.of(Time.seconds(60), Time.seconds(5)))
//允许1分钟延迟
.allowedLateness(Time.minutes(1))
.sideOutputLateData(lateData);
简介:新版Flink综合案例实战-增量聚合统计进阶应用实战
xxxxxxxxxx
.aggregate(new AggregateFunction<AccessLogDO, Long, Long>() {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(AccessLogDO value, Long accumulator) {
return accumulator + 1;
}
@Override
public Long getResult(Long accumulator) {
return accumulator;
}
@Override
public Long merge(Long a, Long b) {
return a + b;
}
}, new ProcessWindowFunction<Long, ResultCount, String, TimeWindow>() {
@Override
public void process(String value, Context context, Iterable<Long> elements, Collector<ResultCount> out) throws Exception {
ResultCount resultCount = new ResultCount();
resultCount.setUrl(value);
resultCount.setType("每5秒统计近1分接口PV");
resultCount.setStartTime(TimeUtil.format(context.window().getStart()));
resultCount.setEndTime(TimeUtil.format(context.window().getEnd()));
long total = elements.iterator().next();
resultCount.setCount(total);
out.collect(resultCount);
}
});
ProcessWindowFunction 方法说明
xxxxxxxxxx
aggregate( AggregateFunction<T, ACC, V> aggFunction,
ProcessWindowFunction<V, R, K, W> windowFunction )
简介:新版Flink综合案例实战-滑动统计各个接口的PV验证实战+乱序验证
简介:新版Flink综合案例实战--多接口-多状态码统计实战
xxxxxxxxxx
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<AccessLogDO> ds = env.addSource(new AccessLogSource());
//过滤
SingleOutputStreamOperator<AccessLogDO> filterDS = ds.filter(new FilterFunction<AccessLogDO>() {
@Override
public boolean filter(AccessLogDO value) throws Exception {
return StringUtils.isNotBlank(value.getUrl());
}
});
//指定watermark
SingleOutputStreamOperator<AccessLogDO> watermarkDS = filterDS.assignTimestampsAndWatermarks(
WatermarkStrategy.<AccessLogDO>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner((event, timestamp) -> event.getCreateTime().getTime()));
//最后兜底数据
OutputTag<AccessLogDO> lateData = new OutputTag<AccessLogDO>("lateDataLog") {
};
//多个字段分组
KeyedStream<AccessLogDO, Tuple2<String, Integer>> keyedStream = watermarkDS.keyBy(new KeySelector<AccessLogDO, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> getKey(AccessLogDO value) throws Exception {
return Tuple2.of(value.getUrl(),value.getHttpCode());
}
});
//开窗
SingleOutputStreamOperator<ResultCount> aggregateDS = keyedStream.window(SlidingEventTimeWindows.of(Time.seconds(60), Time.seconds(5)))
.allowedLateness(Time.minutes(1))
.sideOutputLateData(lateData)
.aggregate(new AggregateFunction<AccessLogDO, Long, Long>() {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(AccessLogDO value, Long accumulator) {
return accumulator+1;
}
@Override
public Long getResult(Long accumulator) {
return accumulator;
}
@Override
public Long merge(Long a, Long b) {
return a+b;
}
}, new ProcessWindowFunction<Long, ResultCount, Tuple2<String, Integer>, TimeWindow>() {
@Override
public void process(Tuple2<String, Integer> value, Context context, Iterable<Long> elements, Collector<ResultCount> out) throws Exception {
ResultCount resultCount = new ResultCount();
resultCount.setUrl(value.f0);
resultCount.setHttpCode(value.f1);
long total = elements.iterator().next();
resultCount.setCount(total);
resultCount.setStartTime(TimeUtil.format(context.window().getStart()));
resultCount.setEndTime(TimeUtil.format(context.window().getEnd()));
out.collect(resultCount);
}
});
aggregateDS.print("接口状态码");
aggregateDS.getSideOutput(lateData).print("late data");
env.execute("XdclassMonitorApp");
}
简介:新版Flink综合案例实战-Flink CEP接口监控告警
需求
CEP 乱序延迟问题
xxxxxxxxxx
Flink CEP会将事件进行缓存,在相应的watermark到底之后,Flink CEP将缓存中的事件按照事件时间进行升序排序,然后再进行的模式匹配。
当watermark到达时,处理该缓冲区中事件时间小于watermark时间的所有数据。
时间小于上次的watermark的时间就是迟到的数据,迟到的数据需要用侧输出流处理
愿景:"让编程不再难学,让技术与生活更加有趣"
更多架构课程请访问 xdclass.net
简介:新版Flink课程总结归纳和接下去的安排
课程总结
特性
Flink进阶
大数据技术栈
大课训练营 -笔记查看(不对外提供,【原创资料】)
给个学习建议,大课推出的目的,也是很多同学需要这个
项目大课训练营
海量数据分库分表项目大课训练营
大数据中台建设大课训练营
...更多
简介:小滴课堂架构师学习路线和大课训练营介绍
小滴课堂-每个技术栈课程不贵,提升自己技术能力 + 不浪费时间 + 涨薪,是超过N倍的收益
小滴课堂永久会员 & 小滴课堂年度会员
适合IT后端人员,零基础就业、在职充电、架构师学习路线-专题技术方向
学后水平:一线城市 15~25k
适合人员:校招、应届生、培训机构出、工作1~10年的同学都适合
学到技术,涨薪是关键,付出肯定会有收获,未来 一定是持续学习的人
我也在学习每年参加各种技术沙⻰和内部分享会投 入超过10万+,但是我认为是值的,且带来的收益更 大
大课综合训练营
适合用于专项拔高,适合有一定工作经验的同学,架构师方向发展的训练营,包括多个方向
综合项目大课训练营
海量数据分库分表大课训练营
架构解决方案大课训练营
全链路性能优化大课训练营
安全攻防大课训练营
数据分析大课训练营
算法刷题大课训练营
一对一辅导
技术人的导航站(涵盖我们主流的技术网站、社区、工具等等,即将上线)
小滴课堂,愿景:让编程不在难学,让技术与生活更加有趣
相信我们,这个是可以让你学习更加轻松的平台,里面的课程绝对会让你技术不断提升
欢迎加小D讲师的微信: xdclass-lw
我们官方网站:https://xdclass.net
千人IT技术交流QQ群: 718617859
重点来啦:加讲师微信 免费赠送你干货文档大集合,包含前端,后端,测试,大数据,运维主流技术文档(持续更新)
https://mp.weixin.qq.com/s/qYnjcDYGFDQorWmSfE7lpQ