新网创想网站建设,新征程启航
为企业提供网站建设、域名注册、服务器等服务
这篇文章将为大家详细讲解有关Flink中如何进行TableAPI 、SQL 与 Kafka 消息获取,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。
创新互联技术团队10多年来致力于为客户提供成都网站设计、成都网站建设、品牌网站设计、营销型网站建设、搜索引擎SEO优化等服务。经过多年发展,公司拥有经验丰富的技术团队,先后服务、推广了近1000家网站,包括各类中小企业、企事单位、高校等机构单位。
使用Tbale&SQL与Flink Kafka连接器从kafka的消息队列中获取数据
示例环境
java.version: 1.8.xflink.version: 1.11.1kafka:2.11
示例数据源 (项目码云下载)
Flink 系例 之 搭建开发环境与数据
示例模块 (pom.xml)
Flink 系例 之 TableAPI & SQL 与 示例模块
SelectToKafka.java
package com.flink.examples.kafka; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; /** * @Description 使用Tbale&SQL与Flink Kafka连接器从kafka的消息队列中获取数据 */ public class SelectToKafka { /** 官方参考:https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/connectors/kafka.html 开始偏移位置 config选项scan.startup.mode指定Kafka使用者的启动模式。有效的枚举是: group-offsets:从特定消费者组的ZK / Kafka经纪人中的承诺抵消开始。 earliest-offset:从最早的偏移量开始。 latest-offset:从最新的偏移量开始。 timestamp:从每个分区的用户提供的时间戳开始。 specific-offsets:从每个分区的用户提供的特定偏移量开始。 默认选项值group-offsets表示从ZK / Kafka经纪人中最后提交的偏移量消费 一致性保证 sink.semantic选项来选择三种不同的操作模式: NONE:Flink不能保证任何事情。产生的记录可能会丢失或可以重复。 AT_LEAST_ONCE (默认设置):这样可以确保不会丢失任何记录(尽管它们可以重复)。 EXACTLY_ONCE:Kafka事务将用于提供一次精确的语义。每当您使用事务写入Kafka时,请不要忘记为使用Kafka记录的任何应用程序设置所需的设置isolation.level(read_committed 或read_uncommitted-后者是默认值)。 */ static String table_sql = "CREATE TABLE KafkaTable (\n" + " `user_id` BIGINT,\n" + " `item_id` BIGINT,\n" + " `behavior` STRING,\n" + " `ts` TIMESTAMP(3)\n" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = 'user_behavior',\n" + " 'properties.bootstrap.servers' = '192.168.110.35:9092',\n" + " 'properties.group.id' = 'testGroup',\n" + " 'scan.startup.mode' = 'earliest-offset',\n" + " 'format' = 'json'\n" + ")"; public static void main(String[] args) throws Exception { //构建StreamExecutionEnvironment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //默认流时间方式 env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); //构建EnvironmentSettings 并指定Blink Planner EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); //构建StreamTableEnvironment StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings); //注册kafka数据维表 tEnv.executeSql(table_sql); String sql = "select user_id,item_id,behavior,ts from KafkaTable"; Table table = tEnv.sqlQuery(sql); //打印字段结构 table.printSchema(); //table 转成 dataStream 流 DataStreambehaviorStream = tEnv.toAppendStream(table, Row.class); behaviorStream.print(); env.execute(); } }
打印结果
root |-- user_id: BIGINT |-- item_id: BIGINT |-- behavior: STRING |-- ts: TIMESTAMP(3) 3> 1,1,normal,2021-01-26T10:25:44
关于Flink中如何进行TableAPI 、SQL 与 Kafka 消息获取就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。