admin管理员组文章数量:1794759
一文掌握教育行业FlinkCDC+Paimon实时湖仓案例实践总结
前言
本文记录某高校客户的实时湖仓案例实践总结。业务数据库为MySQL,最终方案为很经典的流式架构:Mysql -> FlinkCDC -> Paimon -> FlinkSQL。组件版本信息如下:
MySQL | 5.7.36 |
---|---|
FlinkCDC | 3.2.0 |
Paimon | 0.9.0 |
Flink | 1.18.1 |
欢迎关注微信公众号:大数据从业者
Paimon支持以多种形式FlinkCDC实时导入源端数据与元数据变更(schema evolution)到Paimon表中。也就是说源端增加列、不用重启Flink作业、可以自动识别实时导入到Paimon表。目前,Paimon支持的CDC形式包括:Mysql、Postgres、Kafka、Mongo、Pulsar。
举例说明:将Mysql表tableA导入到Paimon,两种方式:FlinkSQL或者MySqlSyncTableAction。注意:目前只有MySqlSyncTableAction支持schema evolution。使用FlinkSQL不能将Mysql表新增字段field_4同步到Paimon,如图所示:
使用MySqlSyncTableAction能够将Mysql表新增字段field_4同步到Paimon,如图所示:
Schema Evolution有限支持如下:
代码语言:javascript代码运行次数:0运行复制1. Adding columns.
2. Altering column types. More specifically:
altering from a string type (char, varchar, text) to another string type with longer length,
altering from a binary type (binary, varbinary, blob) to another binary type with longer length,
altering from an integer type (tinyint, smallint, int, bigint) to another integer type with wider range,
altering from a floating-point type (float, double) to another floating-point type with wider range,
Flink编译部署
Flink源码编译、整合Hadoop与Hive,参见历史文章,这里不再赘述。Flink部署路径:/home/myHadoopCluster/flink-1.18.1
FlinkCDC编译部署
代码语言:javascript代码运行次数:0运行复制git clone -b release-3.2.0 .git
mvn clean package –DskipTests
拷贝flink-sql-connector-mysql-cdc-3.2.0.jar到Flink部署路径
代码语言:javascript代码运行次数:0运行复制cp flink-cdc-connect/flink-cdc-source-connectors/flink-sql-connector-mysql-cdc/target/flink-sql-connector-mysql-cdc-3.2.0.jar /home/myHadoopCluster/flink-1.18.1/lib/
Paimon编译部署
代码语言:javascript代码运行次数:0运行复制git clone -b release-0.9.0 .git
mvn clean package –DskipTests
拷贝paimon-flink-1.18-0.9.0.jar到Flink部署路径
代码语言:javascript代码运行次数:0运行复制cp paimon-flink/paimon-flink-1.18/target/paimon-flink-1.18-0.9.0.jar /home/myHadoopCluster/flink-1.18.1/lib/
MySQL
设置binlog格式,重启MySQL。
代码语言:javascript代码运行次数:0运行复制vi /etc/myf
binlog_format=ROW
创建示例数据库felixzh_db和数据表felixzh_user
代码语言:javascript代码运行次数:0运行复制create database felixzh_db;
create table felixzh_db.felixzh_user(id int auto_increment primary key, name varchar(10), salary decimal(10,2));
Flink SQL方式实践总结
1. 通过sql-client创建示例catalog、mysql表、paimon表,如下:
代码语言:javascript代码运行次数:0运行复制CREATE CATALOG paimon_catalog WITH (
'type' = 'paimon',
'warehouse' = 'hdfs:///flink/paimon'
);
use catalog paimon_catalog;
CREATE TABLE paimon_users (
id BIGINT,
name STRING,
salary DOUBLE,
PRIMARY KEY (id) NOT ENFORCED
);
CREATE TEMPORARY TABLE users (
id INT,
name STRING,
salary DECIMAL(10, 2),
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'felixzh',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'felixzh_db',
'table-name' = 'felixzh_user');
2. 提交SQL作业
代码语言:javascript代码运行次数:0运行复制insert into paimon_users select * from users;
3. Flink OLAP查询
代码语言:javascript代码运行次数:0运行复制SET 'sql-client.execution.result-mode' = 'tableau';
RESET 'execution.checkpointing.interval';
SET 'execution.runtime-mode' = 'batch';
4.1 Mysql表insert数据、FlinkSQL查看Paimon表
代码语言:javascript代码运行次数:0运行复制insert into felixzh_user values(1, 'felixzh', '3.2');
结论:insert数据同步正常。
4.2 Mysql表update数据、FlinkSQL查看Paimon表
代码语言:javascript代码运行次数:0运行复制update felixzh_user set salary='3.5';
结论:update数据同步正常。
4.3 Mysql表alter增加列、FlinkSQL查看Paimon表
代码语言:javascript代码运行次数:0运行复制alter table felixzh_user add phone varchar(20);
结论:alter增加列数据未同步!
4.4 Mysql表delete数据、FlinkSQL查看Paimon表
代码语言:javascript代码运行次数:0运行复制delete from felixzh_user;
结论:delete数据同步正常。
MySqlSyncTableAction方式实践
1. 将paimon-flink-action相关jar拷贝到Flink部署路径
代码语言:javascript代码运行次数:0运行复制cp paimon-flink/paimon-flink-action/target/paimon-flink-action-0.9.0.jar /home/myHadoopCluster/flink-1.18.1/
2. 提交paimon-flink-action作业,如下:
代码语言:javascript代码运行次数:0运行复制./flink run -yid application_1726125159688_0006 ../paimon-flink-action-0.9.0.jar mysql_sync_table --warehouse hdfs:///flink/paimon1/ --database felixzh_db --table felixzh_user --primary_keys id --mysql_conf hostname=felixzh --mysql_conf username=root --mysql_conf password=123456 --mysql_conf database-name=felixzh_db --mysql_conf table-name=felixzh_user
3. 通过sql-client创建上述Paimon表对应catalog
代码语言:javascript代码运行次数:0运行复制CREATE CATALOG paimon_catalog1 WITH (
'type' = 'paimon',
'warehouse' = 'hdfs:///flink/paimon1'
);
注意:catalog warehouse必须与Paimon表存储路径相同。
代码语言:javascript代码运行次数:0运行复制use catalog paimon_catalog1;
虽然show tables看不到库表,不影响直接查询表,如下:
代码语言:javascript代码运行次数:0运行复制select * from felixzh_db.felixzh_user;
4.1 Mysql表insert数据、FlinkSQL查看Paimon表
代码语言:javascript代码运行次数:0运行复制insert into felixzh_user values(10, 'felixzh', '10.9');
4.2 Mysql表update数据、FlinkSQL查看Paimon表
代码语言:javascript代码运行次数:0运行复制update felixzh_user set salary='20.1' where id=10;
4.3 Mysql表add column、FlinkSQL查看Paimon表
代码语言:javascript代码运行次数:0运行复制alter table felixzh_user add phone varchar(20);
4.4 Mysql表insert数据(新加的列)、FlinkSQL查看Paimon表
代码语言:javascript代码运行次数:0运行复制insert into felixzh_user values(11, 'felixzh', '11.1', '110');
4.5 Mysql表delete部分数据、FlinkSQL查看Paimon表
代码语言:javascript代码运行次数:0运行复制delete from felixzh_user where id<10;
结论
推荐优先使用FlinkSQL,除非需要使用Schema Change Evolution。
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。原始发表:2024-10-12,如有侵权请联系 cloudcommunity@tencent 删除教育部署行业实践数据本文标签: 一文掌握教育行业FlinkCDCPaimon实时湖仓案例实践总结
版权声明:本文标题:一文掌握教育行业FlinkCDC+Paimon实时湖仓案例实践总结 内容由林淑君副主任自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.xiehuijuan.com/baike/1754863846a1707480.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论