admin管理员组文章数量:1794759
FlinkSQL建表语句与插入语句
FlinkSQL来构建实时数仓,其思路大概如下:Flink的Table API提供了对kafka/jdbc/hbase等实时开发涉及到的组件的支持,以kafka为例,将kafka topic抽象成Flink Table,如下:
FlinkSQL读数据建表语句
CREATE TABLE flink_rtdw.demo.kafka_source_table ( topic STRING, bidWord STRING, planID STRING, eventTime INTEGER, procTime AS PROCTIME(), ets AS TO_TIMESTAMP(FROM_UNIXTIME(eventTime)), WATERMARK FOR ets AS ets - INTERVAL '1' MINUTE ) WITH ( 'connector' = 'kafka', 'topic' = 'ba.join.shbt2.search-ocpc-click', 'properties.bootstrap.servers' = ‘Kafka-broker', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' );FlinkSQL写数据建表语句
CREATE TABLE flink_rtdw.demo.kafka_sink_table ( window_time BIGINT, topic STRING, bid_word_count BIGINT ) WITH ( 'connector' = 'kafka', 'topic' = 'ultron.demo.shbt2.into.shbt2.tumlewindow.dev', 'properties.bootstrap.servers' = ‘kafka-broker', 'format' = 'json' );读取kafka_source_table中数据根据指标统计写如kafka_source_table 统计一秒滚动窗口出现次数
INSERT INTO flink_rtdw.demo.kafka_sink_table SELECT UNIX_TIMESTAMP( DATE_FORMAT( TUMBLE_START(procTime, INTERVAL '1' MINUTE), 'yyyy-MM-dd HH:mm:ss' ) ) * 1000 as window_time, topic, COUNT(bidWord) FROM flink_rtdw.demo.kafka_source_table GROUP BY TUMBLE(procTime, INTERVAL '1' MINUTE), topic;版权声明:本文标题:FlinkSQL建表语句与插入语句 内容由林淑君副主任自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.xiehuijuan.com/baike/1687027523a129467.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论