Flink学习之flink sql「建议收藏」

Flink学习之flink sql「建议收藏」???? 昨天我们学习完TableAPI后,今天我们继续学SQL,TableAPI和SQL可以处理SQL语言编写的查询语句,但是这些查询需要嵌入用Java、Scala和python编写的程序中。hadoop专题:hadoop系列文章.spark专题:spark系列文章.flink专题:Flink系列文章.????只需要具备SQL的基础知识即可,不需要其他编程经验。我的SQL客户端选择的是docker安装的FlinkSQLClick,大家根据自己的需求安装即可。目录1.1.

大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。

Jetbrains全系列IDE使用 1年只要46元 售后保障 童叟无欺

? 昨天我们学习完Table API后,今天我们继续学SQL,Table API和SQL可以处理SQL语言编写的查询语句,但是这些查询需要嵌入用Java、Scala和python编写的程序中。

?flink sql只需要具备 SQL 的基础知识即可,不需要其他编程经验。我的SQL 客户端选择的是docker安装的Flink SQL Click,大家根据自己的需求安装即可。

1. SQL客户端

SQL客户端内置在Flink的版本中,大家只要启动即可,我使用的是docker环境中配置的Flink SQL Click,让我们测试一下:
在这里插入图片描述
输入’helloworld’ 看看输出的结果。

SELECT ‘hello world’;

结果如下:说明运行成功!
在这里插入图片描述

2. SQL语句

2.1 create

CREATE 语句用于向当前或指定的 Catalog 中注册表、视图或函数。注册后的表、视图和函数可以在 SQL 查询中使用。

CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name
  (
    { <physical_column_definition> | <metadata_column_definition> | <computed_column_definition> }[ , ...n]
    [ <watermark_definition> ]
    [ <table_constraint> ][ , ...n]
  )
  [COMMENT table_comment]
  [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
  WITH (key1=val1, key2=val2, ...)
  [ LIKE source_table [( <like_options> )] ]


-- 例如
CREATE TABLE Orders_with_watermark (
    `user` BIGINT,
    product STRING,
    order_time TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND 
) WITH (
    'connector' = 'kafka',
    'scan.startup.mode' = 'latest-offset'
);

2.2 drop

DROP 语句可用于删除指定的 catalog,也可用于从当前或指定的 Catalog 中删除一个已经注册的表、视图或函数。

--删除表
DROP TABLE [IF EXISTS] [catalog_name.][db_name.]table_name
--删除数据库
DROP DATABASE [IF EXISTS] [catalog_name.]db_name [ (RESTRICT | CASCADE) ]
--删除视图
DROP [TEMPORARY] VIEW  [IF EXISTS] [catalog_name.][db_name.]view_name
--删除函数
DROP [TEMPORARY|TEMPORARY SYSTEM] FUNCTION [IF EXISTS] [catalog_name.][db_name.]function_name;

2.3 alter

ALTER 语句用于修改一个已经在 Catalog 中注册的表、视图或函数定义。

--修改表名
ALTER TABLE [catalog_name.][db_name.]table_name RENAME TO new_table_name
--设置或修改表属性
ALTER TABLE [catalog_name.][db_name.]table_name SET (key1=val1, key2=val2, ...)
--修改视图名
ALTER VIEW [catalog_name.][db_name.]view_name RENAME TO new_view_name
--在数据库中设置一个或多个属性。若个别属性已经在数据库中设定,将会使用新值覆盖旧值。
ALTER DATABASE [catalog_name.]db_name SET (key1=val1, key2=val2, ...)

2.4 insert

INSERT 语句用来向表中添加行(INTO是追加,OVERWRITE是覆盖)

-- 1. 插入别的表的数据
INSERT { INTO | OVERWRITE } [catalog_name.][db_name.]table_name [PARTITION part_spec] select_statement

-- 2. 将值插入表中 
INSERT { INTO | OVERWRITE } [catalog_name.][db_name.]table_name VALUES [values_row , values_row ...]



-- 追加行到该静态分区中 (date='2019-8-30', country='China')
INSERT INTO country_page_view PARTITION (date='2019-8-30', country='China')
  SELECT user, cnt FROM page_view_source;

-- 追加行到分区 (date, country) 中,其中 date 是静态分区 '2019-8-30';country 是动态分区,其值由每一行动态决定
INSERT INTO country_page_view PARTITION (date='2019-8-30')
  SELECT user, cnt, country FROM page_view_source;

-- 覆盖行到静态分区 (date='2019-8-30', country='China')
INSERT OVERWRITE country_page_view PARTITION (date='2019-8-30', country='China')
  SELECT user, cnt FROM page_view_source;

-- 覆盖行到分区 (date, country) 中,其中 date 是静态分区 '2019-8-30';country 是动态分区,其值由每一行动态决定
INSERT OVERWRITE country_page_view PARTITION (date='2019-8-30')
  SELECT user, cnt, country FROM page_view_source;

2.5 show

show用于列出所有的catalog、database、function等

-- 列出catalog
SHOW CATALOGS;
-- 列出数据库
SHOW DATABASES;
--列出表
SHOW TABLES;
-- 列出视图
SHOW VIEWS;
--列出函数
SHOW FUNCTIONS;
-- 列出所有激活的 module
SHOW MODULES;

3. Window Functions

这里的Window Functions不是指我们sql中的窗口函数,是指处理流数据中特有的窗口操作。

3.1 滚动窗口 TUMBLE

TUMBLE函数把行分配到有固定间隔时间且不重叠的窗口上,滚动窗口在批处理和流处理可以定义在事件时间上,但只有流处理可以定义在处理时间上。
在这里插入图片描述

--1. TUMBLE函数的参数
TUMBLE(TABLE data, DESCRIPTOR(timecol), size [, offset ])
-- TABLE:代表数据源
-- DESCRIPTOR(timecol):指时间列
-- size:指窗口大小
-- offset:可增加其他参数,会有特别的意义

-- 2.实例
SELECT window_start, window_end, SUM(price)
  FROM TABLE(
    TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
  GROUP BY window_start, window_end;

3.2 滑动窗口 HOP

滑动窗口在批处理和流处理中可以定义在事件时间上,但只有流处理可以定义在处理时间上。(数据会有重复)
在这里插入图片描述

-- 1. HOP函数的参数
HOP(TABLE data, DESCRIPTOR(timecol), slide, size [, offset ])
-- TABLE:代表数据源
-- DESCRIPTOR(timecol):指时间列
-- slide:指窗口滑动的大小
-- size:指窗口大小
-- offset:可增加其他参数,会有特别的意义

-- 2.实例
SELECT window_start, window_end, SUM(price)
  FROM TABLE(
    CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES))
  GROUP BY window_start, window_end;

3.3 累计窗口 CUMULATE

累计窗口是指在固定窗口内,每隔一段时间触发操作。类似于滚动窗口内定时进行累计操作。
在这里插入图片描述

--1. 累计窗口的参数
CUMULATE(TABLE data, DESCRIPTOR(timecol), step, size)
--data: 和时间有关的数据源
--timecol: 时间列,数据的哪些时间属性列应该映射到滚动窗口。
--step: 是指定顺序累积窗口结束之间增加的窗口大小的持续时间。
--size: 是指定累积窗口最大宽度的持续时间。size 必须是 step 的整数倍。
-- offset:可增加其他参数,会有特别的意义


-- 实例
SELECT window_start, window_end, SUM(price)
  FROM TABLE(
    CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES))
  GROUP BY window_start, window_end;

4. 其他函数

处理上述这些,剩下还有的操作都是和我们的SQL语法差不多,就不再阐述:

  • 窗口聚合函数:group by、…
  • 分组聚合函数:count、having、count(distinct xxx)、…
  • over聚合函数:over(partition by xxx order by xxx)、…
  • 内外连接函数:join、left join 、outer join、…
  • limit 函数
  • TOP-N函数: rank()、dense_rank()、row_number()

对以上内容感兴趣的小伙伴可以参考如下链接:

5. 总结

今天学习的sql,和往常不一样的地方在于,以往的sql都是处理的是批数据,而今天学习的flink sql可以处理流数据,流数据随着时间的变化而变化,flink sql可以对流数据进行类似表一样的处理,可以实现大部分DataStream API和DataSet API的功能。

?还有就是,flink sql中的窗口函数和我们传统的窗口函数不一样,按理来说,我们正常的窗口函数应该叫over聚合函数

6. 参考资料

《Flink入门与实战》
《PyDocs》(pyflink官方文档)
《Kafka权威指南》
《Apache Flink 必知必会》
《Apache Flink 零基础入门》
《Flink 基础教程》

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/171513.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • 一个对话让你明白架构师是做什么的?[通俗易懂]

    一个对话让你明白架构师是做什么的?[通俗易懂]阅读本文大概需要6分钟。很多人都想知道架构师是做什么?我们看看下面的一段对话。菜鸟——刚入门的程序员老鸟——资深架构师老鸟:菜鸟,你的目标是什么?菜鸟:我要成为一个软件架构师。老鸟:对一个年轻的工程师来说,这是一个很好的目标。那你为什么要成为架构师呢?菜鸟:我要领导一个团队,还要做所有关于数据库、框架和Web服务器的重要决定。老鸟…

    2025年7月14日
    2
  • 群晖 winscp php,群晖DSM开启ROOT权限及WinSCP使用ROOT登录

    群晖 winscp php,群晖DSM开启ROOT权限及WinSCP使用ROOT登录本文以群晖DSM6.1.7(以下简称DSM)为例:一、准备工具1、putty2、WinSCP下载地址:http://pan.myxzy.com/download.php?id=81二、DSM开启SSHDSM的“控制面板”—>“终端机和SNMP”,勾上“启动Telnet功能”和“启动SSH功能”的勾,然后点击“应用”三、开启ROOT账号和修改密码1、使用putty连接DSM主机名称填…

    2022年6月6日
    1.4K
  • 数据结构线性表笔记_数据结构实验报告线性表

    数据结构线性表笔记_数据结构实验报告线性表1.线性表的定义若将线性表记为(a1,…,ai1,ai,ai+1,…,an),则表中ai1领先于ai,ai领先于ai+1,称ai1是ai的直接前驱元素,ai+1是ai的直接后继元素。

    2022年8月6日
    6
  • 惠普m154a状态页_惠普m154a感叹号闪烁[通俗易懂]

    惠普m154a状态页_惠普m154a感叹号闪烁[通俗易懂]大家好,我是时间财富网智能客服时间君,上述问题将由我为大家进行解答。惠普m154a感叹号闪烁的解决方法如下:1、打开电脑管家,点击“工具箱”。2、在工具箱里找到“硬件检测”。3、在硬件检测里点击“驱动安装”。4、可以看到“安装状态”,如果是未安装可以直接点击安装。惠普公司(Hewlett-PackardDevelopmentCompany,L.P,简称HP)总部位于美国加利福尼亚州的帕罗奥多…

    2022年8月13日
    17
  • 最炫python表白代码_有趣的python代码表白

    最炫python表白代码_有趣的python代码表白文章目录前言演示网站制作部署网站二维码制作总结前言跟着我做,不要跳着看,否则你会失败。第一步是制作二维码;第二步是制作网站。演示具体成果地址:https://yanghanwen.xyz/ai/网站制作首先你需要下载我的这个完整项目:链接:https://pan.baidu.com/s/1EmRehx_gRnT5hLjJvKuAIg提取码:pz1y–来自百度网盘超级会员V2的分享下载好后文件目录如下:然后你需要注意的是我把img里面的图片删了,涉及隐私,大家自己替换自己追

    2022年8月30日
    6
  • Linux系统结构详解

    Linux系统结构详解Linux系统一般有4个主要部分:内核、shell、文件系统和应用程序。内核、shell和文件系统一起形成了基本的操作系统结构,它们使得用户可以运行程序、管理文件并使用系统。部分层次结构如图1-1所示。1.linux内核Linux内核是世界上最大的开源项目之一,内核是与计算机硬件接口的易替换软件的最低级别。它负责将所有以“用户模式”运行的应用程…

    2022年5月25日
    39

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号