admin管理员组

文章数量:1794759

FlinkSQL建表语句与插入语句

FlinkSQL建表语句与插入语句

FlinkSQL来构建实时数仓,其思路大概如下:Flink的Table API提供了对kafka/jdbc/hbase等实时开发涉及到的组件的支持,以kafka为例,将kafka topic抽象成Flink Table,如下:

FlinkSQL读数据建表语句

CREATE TABLE flink_rtdw.demo.kafka_source_table (   topic STRING,   bidWord STRING,   planID STRING,   eventTime INTEGER,   procTime AS PROCTIME(),   ets AS TO_TIMESTAMP(FROM_UNIXTIME(eventTime)),   WATERMARK FOR ets AS ets - INTERVAL '1' MINUTE ) WITH (   'connector' = 'kafka',   'topic' = 'ba.join.shbt2.search-ocpc-click',   'properties.bootstrap.servers' = ‘Kafka-broker',   'properties.group.id' = 'testGroup',   'scan.startup.mode' = 'latest-offset',   'format' = 'json' );

FlinkSQL写数据建表语句

CREATE TABLE flink_rtdw.demo.kafka_sink_table (   window_time BIGINT,   topic STRING,   bid_word_count BIGINT ) WITH (   'connector' = 'kafka',   'topic' = 'ultron.demo.shbt2.into.shbt2.tumlewindow.dev',   'properties.bootstrap.servers' = ‘kafka-broker',   'format' = 'json' );

读取kafka_source_table中数据根据指标统计写如kafka_source_table 统计一秒滚动窗口出现次数

INSERT INTO   flink_rtdw.demo.kafka_sink_table SELECT   UNIX_TIMESTAMP(     DATE_FORMAT(       TUMBLE_START(procTime, INTERVAL '1' MINUTE),       'yyyy-MM-dd HH:mm:ss'     )   ) * 1000 as window_time,   topic,   COUNT(bidWord) FROM   flink_rtdw.demo.kafka_source_table GROUP BY   TUMBLE(procTime, INTERVAL '1' MINUTE),   topic;

本文标签: 表语语句FlinkSQL