1.19.5.4.流上的Join、常规Join、时间区间Join、时态表Join、基于处理时间的时态Join、时态表函数Join、用法

1.19.5.4.流上的Join、常规Join、时间区间Join、时态表Join、基于处理时间的时态Join、时态表函数Join、用法Join 在批数据处理中是比较常见且广为人知的运算 一般用于连接两张关系表 然而在动态表中 Join 的语义会难以理解甚至让人困惑 因而 Flink 提供了几种基于 TableAPI 和 SQL 的 Join 方法 欲获取更多关于 Join 语法的细节 请参考 TableAPI 和 SQL 中的 Join 章节 常规 Join 是最常用的 Join 用法 在常规 Join 中 任何新记录或对 Join 两侧表的任何更改都是可见的 并会影响最终整个 Join 的结果 例如 如果 Join 左侧插入了一条新

1.19.5.4.流上的Join

Join 在批数据处理中是比较常见且广为人知的运算,一般用于连接两张关系表。然而在动态表中 Join 的语义会难以理解甚至让人困惑。

因而,Flink 提供了几种基于 Table API 和 SQL 的 Join 方法。

欲获取更多关于 Join 语法的细节,请参考 Table API 和 SQL 中的 Join 章节。

1.19.5.4.1.常规Join

常规 Join 是最常用的 Join 用法。在常规 Join 中,任何新记录或对 Join 两侧表的任何更改都是可见的,并会影响最终整个 Join 的结果。例如,如果 Join 左侧插入了一条新的记录,那么它将会与 Join 右侧过去与将来的所有记录进行 Join 运算。

SELECT * FROM Orders INNER JOIN Product ON Orders.productId = Product.id 

上述语意允许对输入表进行任意类型的更新操作(insert, update, delete).

然而,常规 Join 隐含了一个重要的前提:即它需要在 Flink 的状态中永久保存 Join 两侧的数据。因而,如果 Join 操作中的一方或双方输入表持续增长的话,资源消耗也将会随之无限增长。

1.19.5.4.2.时间区间Join

如果一个Join 限定输入 时间属性 必须在一定的时间限制中(即时间窗口),那么就称之为时间区间

JoinSELECT * FROM Orders o, Shipments s WHERE o.id = s.orderId AND o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime 

与常规 Join 操作相比,时间区间 Join 只支持带有时间属性的递增表。由于时间属性是单调递增的,Flink 可以从状态中移除过期的数据,而不会影响结果的正确性。

1.19.5.4.3.时态表Join

注意 只在 Blink planner 中支持。 注意 时态表有两种方式去定义,即 时态表函数 和 时态表 DDL,使用时态表函数的时态表 join 只支持在 Table API 中使用,使用时态表 DDL 的时态表 join 只支持在 SQL 中使用。 请参考时态表页面获取更多关于时态表和时态表函数的区别。

时态表 Join 意味着对任意表(左输入/探针侧)去关联一个时态表(右输入/构建侧)的版本,时态表可以是一张跟踪所有变更记录的表(例如数据库表的 changelog,包含多个表快照),也可以是物化所有变更之后的表(例如数据库表,只有最新表快照)。

Flink 使用了 SQL:2011 标准引入的时态表 Join 语法,时态表 Join 的语法如下:

SELECT [column_list] FROM table1 [AS <alias1>] [LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.{ proctime | rowtime } [AS <alias2>] ON table1.column-name1 = table2.column-name1 
1.19.5.4.3.1.基于事件时间的时态Join

基于事件时间的时态表 join 使用(左侧输入/探针侧) 的 事件时间 去关联(右侧输入/构建侧) 版本表 对应的版本。 基于事件时间的时态表 join 仅支持关版本表或版本视图,版本表或版本视图只能是一个 changelog 流。 但是,Flink 支持将 append-only 流转换成 changelog 流,因此版本表也可以来自一个 append-only 流。 查看声明版本视图 获取更多的信息关于如何声明一张来自 append-only 流的版本表。

将事件时间作为时间属性时,可将 过去 时间属性与时态表一起使用。这允许对两个表中在相同时间点的记录执行 Join 操作。 与基于处理时间的时态 Join 相比,时态表不仅将构建侧记录的最新版本(是否最新由所定义的主键所决定)保存在 state 中,同时也会存储自上一个 watermarks 以来的所有版本(按时间区分)。

例如,在探针侧表新插入一条事件时间时间为 12:30:00 的记录,它将和构建侧表时间点为 12:30:00 的版本根据时态表的概念进行 Join 运算。 因此,新插入的记录仅与时间戳小于等于 12:30:00 的记录进行 Join 计算(由主键决定哪些时间点的数据将参与计算)。

通过定义事件时间,watermarks 允许 Join 运算不断向前滚动,丢弃不再需要的构建侧快照。因为不再需要时间戳更低或相等的记录。

下面的例子展示了订单流关联产品表这个场景举例,orders 表包含了来自 Kafka 的实时订单流,product_changelog 表来自数据库表 products 的 changelog , 产品的价格在数据库表 products 中是随时间实时变化的。

SELECT * FROM product_changelog; (changelog kind) update_time product_name price ================= =========== ============ ===== +(INSERT) 00:01:00 scooter 11.11 +(INSERT) 00:02:00 basketball 23.11 -(UPDATE_BEFORE) 12:00:00 scooter 11.11 +(UPDATE_AFTER) 12:00:00 scooter 12.99 <= 产品 `scooter``12:00:00` 时涨价到了 `12.99` -(UPDATE_BEFORE) 12:00:00 basketball 23.11 +(UPDATE_AFTER) 12:00:00 basketball 19.99 <= 产品 `basketball``12:00:00` 时降价到了 `19.99` -(DELETE) 18:00:00 scooter 12.99 <= 产品 `scooter``18:00:00` 从数据库表中删除 

如果我们想输出 product_changelog 表在 10:00:00 对应的版本,表的内容如下所示:

update_time product_id product_name price =========== ========== ============ ===== 00:01:00 p_001 scooter 11.11 00:02:00 p_002 basketball 23.11 

通过基于事件时间的时态表 join, 我们可以 join 上版本表中的不同版本:

CREATE TABLE orders ( order_id STRING, product_id STRING, order_time TIMESTAMP(3), WATERMARK FOR order_time AS order_time -- defines the necessary event time ) WITH ( ... ); -- 设置会话的时间区间, changelog 里的数据库操作时间是以 epoch 开始的毫秒数存储的, -- 在从毫秒转化为时间戳时,Flink SQL 会使用会话的时间区间 -- 因此,请根据 changelog 中的数据库操作时间设置合适的时间区间 SET table.local-time-zone=UTC; -- 声明一张版本表 CREATE TABLE product_changelog ( product_id STRING, product_name STRING, product_price DECIMAL(10, 4), update_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL, -- 注意:自动从毫秒数转为时间戳 PRIMARY KEY(product_id) NOT ENFORCED, -- (1) defines the primary key constraint WATERMARK FOR update_time AS update_time -- (2) defines the event time by watermark  ) WITH ( 'connector' = 'kafka', 'topic' = 'products', 'scan.startup.mode' = 'earliest-offset', 'properties.bootstrap.servers' = 'localhost:9092', 'value.format' = 'debezium-json' ); -- 基于事件时间的时态表 Join SELECT order_id, order_time, product_name, product_time, price FROM orders AS O LEFT JOIN product_changelog FOR SYSTEM_TIME AS OF O.order_time AS P ON O.product_id = P.product_id; order_id order_time product_name product_time price ======== ========== ============ ============ ===== o_001 00:01:00 scooter 00:01:00 11.11 o_002 00:03:00 basketball 00:02:00 23.11 o_003 12:00:00 scooter 12:00:00 12.99 o_004 12:00:00 basketball 12:00:00 19.99 o_005 18:00:00 NULL NULL NULL 

基于事件时间的时态表 Join 通常用在通过 changelog 丰富流上数据的场景。

注意: 基于事件时间的时态表 Join 是通过左右两侧的 watermark 触发,请确保为 join 两侧的表设置了合适的 watermark。

注意: 基于事件时间的时态表 Join 的 join key 必须包含时态表的主键,例如:表 product_changelog 的主键 P.product_id 必须包含在 join 条件 O.product_id = P.product_id 中。

1.19.5.4.4.基于处理时间的时态Join

基于处理时间的时态表 join 使用任意表 (左侧输入/探针侧) 的 处理时间 去关联 (右侧输入/构建侧) 普通表的最新版本. 基于处理时间的时态表 join 当前只支持关联普通表或普通视图,且支持普通表或普通视图当前只能是 append-only 流。

如果将处理时间作为时间属性,过去 时间属性将无法与时态表一起使用。根据定义,处理时间总会是当前时间戳。 因此,关联时态表的调用将始终返回底层表的最新已知版本,并且底层表中的任何更新也将立即覆盖当前值。

可以将处理时间的时态 Join 视作简单的 HashMap

,HashMap 中存储来自构建侧的所有记录。 当来自构建侧的新插入的记录与旧值具有相同的 Key 时,旧值会被覆盖。 探针侧的每条记录将总会根据 HashMap 的最新/当前状态来计算。

接下来的示例展示了订单流 Orders 该如何与实时变化的汇率表 Lates 进行基于处理时间的时态 Join 操作,LatestRates 总是表示 HBase 表 Rates 的最新内容。

表 LastestRates 中的数据在时间点 10:15 和 10:30 时是相等的。欧元汇率在时间点 10:52 从 114 变化至 116 。

表 Orders 包含了金额字段 amount 和货币字段 currency 的支付记录数据。例如在 10:15 有一笔金额为 2 欧元的订单记录。

SELECT * FROM Orders; amount currency ====== ========= 2 Euro <== arrived at time 10:15 1 US Dollar <== arrived at time 10:30 2 Euro <== arrived at time 10:52 

基于以上,我们想要计算所有 Orders 表的订单金额总和,并同时转换为对应成日元的金额。

例如,我们想要以表 LatestRates 中的汇率将以下订单转换,则结果将为:

amount currency rate amout*rate ====== ========= ======= ============ 2 Euro 114 228 <== arrived at time 10:15 1 US Dollar 102 102 <== arrived at time 10:30 2 Euro 116 232 <== arrived at time 10:52 

通过时态表 Join,我们可以将上述操作表示为以下 SQL 查询:

SELECT o.amout, o.currency, r.rate, o.amount * r.rate FROM Orders AS o JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r ON r.currency = o.currency 

探针侧表中的每个记录都将与构建侧表的当前版本所关联。 在此示例中,查询使用处理时间作为处理时间,因而新增订单将始终与表 LatestRates 的最新汇率执行 Join 操作。注意,结果对于处理时间来说不是确定的。 基于处理时间的时态表 Join 通常用在通过外部表(例如维度表)丰富流上数据的场景。

1.19.5.4.5.时态表函数Join

时态表函数 Join 连接了一个递增表(左输入/探针侧)和一个时态表(右输入/构建侧),即一个随时间变化且不断追踪其改动的表。请参考时态表的相关章节查看更多细节。

下方示例展示了一个递增表 Orders 与一个不断改变的汇率表 RatesHistory 的 Join 操作。

Orders 表示了包含支付数据(数量字段 amount 和货币字段 currency)的递增表。例如 10:15 对应行的记录代表了一笔 2 欧元支付记录。

SELECT * FROM Orders; rowtime amount currency ======= ====== ========= 10:15 2 Euro 10:30 1 US Dollar 10:32 50 Yen 10:52 3 Euro 11:04 5 US Dollar 

字段 RatesHistory 表示不断变化的汇率信息。汇率以日元为基准(即 Yen 永远为 1)。例如,09:00 到 10:45 间欧元对日元的汇率是 114,10:45 到 11:15 间为 116。

SELECT * FROM RatesHistory; rowtime currency rate ======= ======== ====== 09:00 US Dollar 102 09:00 Euro 114 09:00 Yen 1 10:45 Euro 116 11:15 Euro 119 11:49 Pounds 108 

基于上述信息,欲计算 Orders 表中所有交易量并全部转换成日元。例如,若要转换下表中的交易,需要使用对应时间区间内的汇率(即 114)。

rowtime amount currency ======= ====== ========= 10:15 2 Euro 

如果没有时态表概念,则需要写一段这样的查询:

SELECT SUM(o.amount * r.rate) AS amount FROM Orders AS o, RatesHistory AS r WHERE r.currency = o.currency AND r.rowtime = ( SELECT MAX(rowtime) FROM RatesHistory AS r2 WHERE r2.currency = o.currency AND r2.rowtime <= o.rowtime); 

有了时态表函数 Rates 和 RatesHistory 的帮助,我们可以将上述查询写成:

SELECT o.amount * r.rate AS amount FROM Orders AS o, LATERAL TABLE (Rates(o.rowtime)) AS r WHERE r.currency = o.currency 
1.19.5.4.5.1.用法

在定义时态表函数之后就可以使用了。时态表函数可以和普通表函数一样使用。

SELECT SUM(o_amount * r_rate) AS amount FROM Orders, LATERAL TABLE (Rates(o_proctime)) WHERE r_currency = o_currency 

Java代码:

Table result = orders .joinLateral("rates(o_proctime)", "o_currency = r_currency") .select("(o_amount * r_rate).sum as amount"); 

Scala代码:

val result = orders .joinLateral(rates('o_proctime), 'r_currency === 'o_currency) .select(('o_amount * 'r_rate).sum as 'amount) 
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/209096.html原文链接:https://javaforall.net

(0)
上一篇 2026年3月19日 上午10:02
下一篇 2026年3月19日 上午10:03


相关推荐

  • 直方图和条形图区别

    直方图和条形图区别https blog csdn net xjl article details pythonMatplo 系列教程 三 绘制直方图和条形图

    2026年3月20日
    1
  • 描述编程语言的BNF

    描述编程语言的BNF转 来自维基百科 BNF 规定是推导规则 产生式 的集合 写为 lt 符号 gt lt 使用符号的表达式 gt 这里的 lt 符号 gt 是非终结符 而表达式由一个符号序列 或用指示选择的竖杠 分隔的多个符号序列构成 每个符号序列整体都是左端的符号的一种可能的替代 从未在左端出现的符号叫做终结符 基本原理 nbsp nbsp nbsp nbsp nbsp BNF 类似一种数学游戏 从一个符号开始 叫做起始标志 实例

    2026年3月19日
    2
  • pytest运行_ios12清除缓存

    pytest运行_ios12清除缓存前言pytest运行完用例之后会生成一个.pytest_cache的缓存文件夹,用于记录用例的ids和上一次失败的用例。方便我们在运行用例的时候加上–lf和–ff参数,快速运行上一

    2022年7月31日
    7
  • 通过nginx转发WebSocket

    通过nginx转发WebSocket通过nginx请求wensocket的时候需要修改配置文件,对于websocket请求需要特殊处理一下,需要在conf配置文件中添加一些配置:server{listen8080;server_nametest.com;add_header’Access-Control-Allow-Origin”*’always;add_header’Access-Control-Allow-Credentials”true’;add_header’A

    2022年10月18日
    5
  • Python处理CSV文件(一)

    Python处理CSV文件(一)CSV文件CSV(comma-separatedvalue,逗号分隔值)文件格式是一种非常简单的数据存储与分享方式。CSV文件将数据表格存储为纯文本,表格(或电子表格)中的每个单元格都是一个数值或字符串。与Excel文件相比,CSV文件的一个主要优点是有很多程序可以存储、转换和处理纯文本文件;相比之下,能够处理Excel文件的程序却不多。所有电子表格程序、文字处理程序或简单的文本编辑器都可以处理纯文本文件,但不是所有的程序都能处理Excel文件。尽管Excel是一个功能非常强大的工

    2022年7月20日
    22
  • 一元函数微分学与多元函数微分学的对比学习

    一元函数微分学与多元函数微分学的对比学习一元函数微分学与多元函数微分学的对比学习

    2026年3月18日
    3

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号