大家好,知其字详作我是所然述老羊,今天我们来学习 Flink SQL 中的两万· Join 操作。 Flink 支持了非常多的知其字详作数据 Join 方式,主要包括以下三种: 细分 Flink SQL 支持的所然述 Join: 下面这个案例为 Inner Join 案例: -- 曝光日志数据 CREATE TABLE show_log_table ( log_id BIGINT, show_params STRING ) WITH ( connector = datagen, rows-per-second = 2, fields.show_params.length = 1, fields.log_id.min = 1, fields.log_id.max = 100 ); -- 点击日志数据 CREATE TABLE click_log_table ( log_id BIGINT, click_params STRING ) WITH ( connector = datagen, rows-per-second = 2, fields.click_params.length = 1, fields.log_id.min = 1, fields.log_id.max = 10 ); CREATE TABLE sink_table ( s_id BIGINT, s_params STRING, c_id BIGINT, c_params STRING ) WITH ( connector = print ); -- 流的 INNER JOIN,条件为 log_id INSERT INTO sink_table SELECT show_log_table.log_id as s_id, show_log_table.show_params as s_params, click_log_table.log_id as c_id, click_log_table.click_params as c_params FROM show_log_table 输出结果如下: +I[5, d, 5, f] +I[5, d, 5, 8] +I[5, d, 5, 2] +I[3, 4, 3, 0] +I[3, 4, 3, 3] 如果为 Left Join 案例: CREATE TABLE show_log_table ( log_id BIGINT, show_params STRING ) WITH ( connector = datagen, rows-per-second = 1, fields.show_params.length = 3, fields.log_id.min = 1, fields.log_id.max = 10 ); CREATE TABLE click_log_table ( log_id BIGINT, click_params STRING ) WITH ( connector = datagen, rows-per-second = 1, fields.click_params.length = 3, fields.log_id.min = 1, fields.log_id.max = 10 ); CREATE TABLE sink_table ( s_id BIGINT, s_params STRING, c_id BIGINT, c_params STRING ) WITH ( connector = print ); INSERT INTO sink_table SELECT show_log_table.log_id as s_id, show_log_table.show_params as s_params, click_log_table.log_id as c_id, click_log_table.click_params as c_params FROM show_log_table 输出结果如下: +I[5, f3c, 5, c05] +I[5, 6e2, 5, 1f6] +I[5, 86b, 5, 1f6] +I[5, f3c, 5, 1f6] -D[3, 4ab, null, null] -D[3, 6f2, null, null] +I[3, 4ab, 3, 765] +I[3, 6f2, 3, 765] +I[2, 3c4, null, null] +I[3, 4ab, 3, a8b] +I[3, 6f2, 3, a8b] +I[2, c03, null, null] 如果为 Full Join 案例: CREATE TABLE show_log_table ( log_id BIGINT, show_params STRING ) WITH ( connector = datagen, rows-per-second = 2, fields.show_params.length = 1, fields.log_id.min = 1, fields.log_id.max = 10 ); CREATE TABLE click_log_table ( log_id BIGINT, click_params STRING ) WITH ( connector = datagen, rows-per-second = 2, fields.click_params.length = 1, fields.log_id.min = 1, fields.log_id.max = 10 ); CREATE TABLE sink_table ( s_id BIGINT, s_params STRING, c_id BIGINT, c_params STRING ) WITH ( connector = print ); INSERT INTO sink_table SELECT show_log_table.log_id as s_id, show_log_table.show_params as s_params, click_log_table.log_id as c_id, click_log_table.click_params as c_params FROM show_log_table 输出结果如下: +I[null, null, 7, 6] +I[6, 5, null, null] -D[1, c, null, null] +I[1, c, 1, 2] +I[3, 1, null, null] +I[null, null, 7, d] +I[10, 0, null, null] +I[null, null, 2, 6] -D[null, null, 7, 6] -D[null, null, 7, d] 关于 Regular Join 的云南idc服务商注意事项: 详细的 SQL 语义案例可以参考: flink sql 知其所以然(十二):流 join 很难嘛???(上)。 flink sql 知其所以然(十三):流 join 很难嘛???(下)。 可以发现 Inner Interval Join 和其他三种 Outer Interval Join 的区别在于,Outer 在随着时间推移的过程中,如果有数据过期了之后,会根据是否是 Outer 将没有 Join 到的数据也给输出。 下面为 Inner Interval Join: CREATE TABLE show_log_table ( log_id BIGINT, show_params STRING, row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)), WATERMARK FOR row_time AS row_time ) WITH ( connector = datagen, rows-per-second = 1, fields.show_params.length = 1, fields.log_id.min = 1, fields.log_id.max = 10 ); CREATE TABLE click_log_table ( log_id BIGINT, click_params STRING, row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)), WATERMARK FOR row_time AS row_time ) WITH ( connector = datagen, rows-per-second = 1, fields.click_params.length = 1, fields.log_id.min = 1, fields.log_id.max = 10 ); CREATE TABLE sink_table ( s_id BIGINT, s_params STRING, c_id BIGINT, c_params STRING ) WITH ( connector = print ); INSERT INTO sink_table SELECT show_log_table.log_id as s_id, show_log_table.show_params as s_params, click_log_table.log_id as c_id, click_log_table.click_params as c_params FROM show_log_table INNER JOIN click_log_table ON show_log_table.log_id = click_log_table.log_id 输出结果如下: 6> +I[2, a, 2, 6] 6> +I[2, 6, 2, 6] 2> +I[4, 1, 4, 5] 2> +I[10, 8, 10, d] 2> +I[10, 7, 10, d] 2> +I[10, d, 10, d] 2> +I[5, b, 5, d] 如果是 Left Interval Join: CREATE TABLE show_log ( log_id BIGINT, show_params STRING, row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)), WATERMARK FOR row_time AS row_time ) WITH ( connector = datagen, rows-per-second = 1, fields.show_params.length = 1, fields.log_id.min = 1, fields.log_id.max = 10 ); CREATE TABLE click_log ( log_id BIGINT, click_params STRING, row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)), WATERMARK FOR row_time AS row_time ) WITH ( connector = datagen, rows-per-second = 1, fields.click_params.length = 1, fields.log_id.min = 1, fields.log_id.max = 10 ); CREATE TABLE sink_table ( s_id BIGINT, s_params STRING, c_id BIGINT, c_params STRING ) WITH ( connector = print ); INSERT INTO sink_table SELECT show_log.log_id as s_id, show_log.show_params as s_params, click_log.log_id as c_id, click_log.click_params as c_params FROM show_log LEFT JOIN click_log ON show_log.log_id = click_log.log_id 输出结果如下: +I[6, e, 6, 7] +I[11, d, null, null] +I[7, b, null, null] +I[8, 0, 8, 3] 如果是 Full Interval Join: CREATE TABLE show_log ( log_id BIGINT, show_params STRING, row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)), WATERMARK FOR row_time AS row_time ) WITH ( connector = datagen, rows-per-second = 1, fields.show_params.length = 1, fields.log_id.min = 5, fields.log_id.max = 15 ); CREATE TABLE click_log ( log_id BIGINT, click_params STRING, row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)), WATERMARK FOR row_time AS row_time ) WITH ( connector = datagen, rows-per-second = 1, fields.click_params.length = 1, fields.log_id.min = 1, fields.log_id.max = 10 ); CREATE TABLE sink_table ( s_id BIGINT, s_params STRING, c_id BIGINT, c_params STRING ) WITH ( connector = print ); INSERT INTO sink_table SELECT show_log.log_id as s_id, show_log.show_params as s_params, click_log.log_id as c_id, click_log.click_params as c_params FROM show_log LEFT JOIN click_log ON show_log.log_id = click_log.log_id 输出结果如下: +I[6, 1, null, null] +I[7, 3, 7, 8] +I[null, null, 6, 6] +I[null, null, 4, d] +I[8, d, null, null] 关于 Interval Join 的注意事项: 实时 Interval Join 可以不是 等值 join。等值 join 和 非等值 join 区别在于,等值 join 数据 shuffle 策略是 Hash,会按照 Join on 中的等值条件作为 id 发往对应的下游;非等值 join 数据 shuffle 策略是 Global,所有数据发往一个并发,然后将满足条件的数据进行关联输出。 关于详细的 SQL 语义可以参考。 flink sql 知其所以然(十三):流 join 很难嘛???(下)。 Temporal Join 定义(支持 Batch\Streaming):Temporal Join 在离线的概念中其实是没有类似的 Join 概念的,但是离线中常常会维护一种表叫做 拉链快照表,使用一个明细表去 join 这个 拉链快照表 的 join 方式就叫做 Temporal Join。而 Flink SQL 中也有对应的概念,表叫做 Versioned Table,使用一个明细表去 join 这个 Versioned Table 的 join 操作就叫做 Temporal Join。Temporal Join 中,Versioned Table 其实就是对同一条 key(在 DDL 中以 primary key 标记同一个 key)的历史版本(根据时间划分版本)做一个维护,当有明细表 Join 这个表时,可以根据明细表中的时间版本选择 Versioned Table 对应时间区间内的快照数据进行 join。应用场景:比如常见的汇率数据(实时的根据汇率计算总金额),在 12:00 之前(事件时间),人民币和美元汇率是 7:1,在 12:00 之后变为 6:1,那么在 12:00 之前数据就要按照 7:1 进行计算,12:00 之后就要按照 6:1 计算。在事件时间语义的任务中,事件时间 12:00 之前的数据,要按照 7:1 进行计算,12:00 之后的数据,要按照 6:1 进行计算。这其实就是离线中快照的概念,维护具体汇率的表在 Flink SQL 体系中就叫做 Versioned Table。Verisoned Table:Verisoned Table 中存储的数据通常是来源于 CDC 或者会发生更新的数据。Flink SQL 会为 Versioned Table 维护 Primary Key 下的所有历史时间版本的数据。举一个汇率的场景的案例来看一下一个 Versioned Table 的两种定义方式。PRIMARY KEY 定义方式:-- 定义一个汇率 versioned 表,其中 versioned 表的概念下文会介绍到 CREATE TABLE currency_rates ( currency STRING, conversion_rate DECIMAL(32, 2), update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp` VIRTUAL, WATERMARK FOR update_time AS update_time, -- PRIMARY KEY 定义方式 PRIMARY KEY(currency) NOT ENFORCED ) WITH ( connector = kafka, value.format = debezium-json, /* ... */ );Deduplicate 定义方式:-- 定义一个 append-only 的数据源表 CREATE TABLE currency_rates ( currency STRING, conversion_rate DECIMAL(32, 2), update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp` VIRTUAL, WATERMARK FOR update_time AS update_time ) WITH ( connector = kafka, value.format = debezium-json, /* ... */ ); -- 将数据源表按照 Deduplicate 方式定义为 Versioned Table CREATE VIEW versioned_rates AS SELECT currency, conversion_rate, update_time -- 1. 定义 `update_time` 为时间字段 FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY currency -- 2. 定义 `currency` 为主键 ORDER BY update_time DESC -- 3. ORDER BY 中必须是时间戳列 ) AS rownum FROM currency_rates) 以 事件时间 任务举例: -- 1. 定义一个输入订单表 CREATE TABLE orders ( order_id STRING, price DECIMAL(32,2), currency STRING, order_time TIMESTAMP(3), WATERMARK FOR order_time AS order_time ) WITH (/* ... */); -- 2. 定义一个汇率 versioned 表,其中 versioned 表的概念下文会介绍到 CREATE TABLE currency_rates ( currency STRING, conversion_rate DECIMAL(32, 2), update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp` VIRTUAL, WATERMARK FOR update_time AS update_time, PRIMARY KEY(currency) NOT ENFORCED ) WITH ( connector = kafka, value.format = debezium-json, /* ... */ ); SELECT order_id, price, currency, conversion_rate, order_time, FROM orders -- 3. Temporal Join 逻辑 -- SQL 语法为:FOR SYSTEM_TIME AS OF LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF orders.order_time 结果如下,可以看到相同的货币汇率会根据具体数据的事件时间不同 Join 到对应时间的汇率: order_id price 货币 汇率 order_time ======== ===== ======== =============== ========= o_001 11.11 EUR 1.14 12:00:00 注意: 还是相同的案例,如果是 处理时间 语义: 10:15> SELECT * FROM LatestRates; currency rate ======== ====== US Dollar 102 Euro 114 Yen 1 10:30> SELECT * FROM LatestRates; currency rate ======== ====== US Dollar 102 Euro 114 Yen 1 -- 10:42 时,Euro 的汇率从 114 变为 116 10:52> SELECT * FROM LatestRates; currency rate ======== ====== US Dollar 102 Euro 116 <==== 从 114 变为 116 Yen 1 -- 从 Orders 表查询数据 SELECT * FROM Orders; amount currency ====== ========= 2 Euro <== 在处理时间 10:15 到达的一条数据 1 US Dollar <== 在处理时间 10:30 到达的一条数据 2 Euro <== 在处理时间 10:52 到达的一条数据 -- 执行关联查询 SELECT o.amount, o.currency, r.rate, o.amount * r.rate FROM Orders AS o JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r ON r.currency = o.currency -- 结果如下: amount currency rate amount*rate ====== ========= ======= ============ 2 Euro 114 228 <== 在处理时间 10:15 到达的一条数据 1 US Dollar 102 102 <== 在处理时间 10:30 到达的一条数据 可以发现处理时间就比较好理解了,因为处理时间语义中是根据左流数据到达的时间决定拿到的汇率值。Flink 就只为 LatestRates 维护了最新的状态数据,不需要关心历史版本的数据。 来一波输入数据: 曝光用户日志流(show_log)数据(数据存储在 kafka 中): log_id timestamp user_id 1 2021-11-01 00:01:03 a 2 2021-11-01 00:03:00 b 3 2021-11-01 00:05:00 c 4 2021-11-01 00:06:00 b 用户画像维表(user_profile)数据(数据存储在 redis 中): user_id(主键) age sex a 12-18 男 b 18-24 女 注意: redis 中的数据结构存储是按照 key,value 去存储的。其中 key 为 user_id,value 为 age,sex 的 json。 具体 SQL: CREATE TABLE show_log ( log_id BIGINT, `timestamp` as cast(CURRENT_TIMESTAMP as timestamp(3)), user_id STRING, proctime AS PROCTIME() ) WITH ( connector = datagen, rows-per-second = 10, fields.user_id.length = 1, fields.log_id.min = 1, fields.log_id.max = 10 ); CREATE TABLE user_profile ( user_id STRING, age STRING, sex STRING ) WITH ( connector = redis, hostname = 127.0.0.1, port = 6379, format = json, lookup.cache.max-rows = 500, lookup.cache.ttl = 3600, lookup.max-retries = 1 ); CREATE TABLE sink_table ( log_id BIGINT, `timestamp` TIMESTAMP(3), user_id STRING, proctime TIMESTAMP(3), age STRING, sex STRING ) WITH ( connector = print ); -- lookup join 的 query 逻辑 INSERT INTO sink_table SELECT s.log_id as log_id , s.`timestamp` as `timestamp` , s.user_id as user_id , s.proctime as proctime , u.sex as sex , u.age as age FROM show_log AS s LEFT JOIN user_profile FOR SYSTEM_TIME AS OF s.proctime AS u 输出数据如下: log_id timestamp user_id age sex 1 2021-11-01 00:01:03 a 12-18 男 2 2021-11-01 00:03:00 b 18-24 女 3 2021-11-01 00:05:00 c 18-24 男 4 2021-11-01 00:06:00 b 18-24 女 注意: 实时的 lookup 维表关联能使用 处理时间 去做关联。 详细 SQL 语义及案例可见: flink sql 知其所以然:维表 join 的性能优化之路(上)附源码。 flink sql 知其所以然:改了改源码,实现了个 batch lookup join(附源码)。 其实,Flink 官方并没有提供 redis 的维表 connector 实现。 没错,博主自己实现了一套。关于 redis 维表的 connector 实现,直接参考下面的文章。都是可以从 github 上找到源码拿来用的! 注意: 再说说维表常见的性能问题及优化思路。 所有的维表性能问题都可以总结为:高 qps 下访问维表存储引擎产生的任务背压,数据产出延迟问题。 举个例子: 这就是为什么维表 join 的算子会产生背压,任务产出会延迟。 那么当然,解决方案也是有很多的。抛开 Flink SQL 想一下,如果我们使用 DataStream API,甚至是在做一个后端应用,需要访问外部存储时,常用的优化方案有哪些?这里列举一下: 博主认为上述优化效果中,最好用的是 1 + 3,2 相比 3 还是一条一条发请求,性能会差一些。 既然 DataStream 可以这样做,Flink SQL 必须必的也可以借鉴上面的这些优化方案。具体怎么操作呢?看下文骚操作: flink sql 知其所以然:改了改源码,实现了个 batch lookup join(附源码)。 应用场景(支持 Batch\Streaming):将表中 ARRAY 类型字段(列)拍平,转为多行。实际案例:比如某些场景下,日志是合并、攒批上报的,就可以使用这种方式将一个 Array 转为多行。CREATE TABLE show_log_table ( log_id BIGINT, show_params ARRAY ) WITH ( connector = datagen, rows-per-second = 1, fields.log_id.min = 1, fields.log_id.max = 10 ); CREATE TABLE sink_table ( log_id BIGINT, show_param STRING ) WITH ( connector = print ); INSERT INTO sink_table SELECT log_id, t.show_param as show_param FROM show_log_table -- array 炸开语法 show_log_table 原始数据: +I[7, [a, b, c]] 输出结果如下所示: -- +I[7, [a, b, c]] 一行转为 3 行 +I[7, a] +I[7, b] +I[7, b] -- +I[5, [d, e, f]] 一行转为 3 行 +I[5, d] +I[5, e] 应用场景(支持 Batch\Streaming):这个其实和 Array Expansion 功能类似,但是 Table Function 本质上是个 UDTF 函数,和离线 Hive SQL 一样,我们可以自定义 UDTF 去决定列转行的逻辑。 Table Function 使用分类:Inner Join Table Function:如果 UDTF 返回结果为空,则相当于 1 行转为 0 行,这行数据直接被丢弃。Left Join Table Function:如果 UDTF 返回结果为空,折行数据不会被丢弃,只会在结果中填充 null 值。实际案例:直接上 SQL 。public class TableFunctionInnerJoin_Test { public static void main(String[] args) throws Exception { FlinkEnv flinkEnv = FlinkEnvUtils.getStreamTableEnv(args); String sql = "CREATE FUNCTION user_profile_table_func AS flink.examples.sql._07.query._06_joins._06_table_function" + "._01_inner_join.TableFunctionInnerJoin_Test$UserProfileTableFunction;\n" + "\n" + "CREATE TABLE source_table (\n" + " user_id BIGINT NOT NULL,\n" + " name STRING,\n" + " row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),\n" + " WATERMARK FOR row_time AS row_time - INTERVAL 5 SECOND\n" + ") WITH (\n" + " connector = datagen,\n" + " rows-per-second = 10,\n" + " fields.name.length = 1,\n" + " fields.user_id.min = 1,\n" + " fields.user_id.max = 10\n" + ");\n" + "\n" + "CREATE TABLE sink_table (\n" + " user_id BIGINT,\n" + " name STRING,\n" + " age INT,\n" + " row_time TIMESTAMP(3)\n" + ") WITH (\n" + " connector = print\n" + ");\n" + "\n" + "INSERT INTO sink_table\n" + "SELECT user_id,\n" + " name,\n" + " age,\n" + " row_time\n" + "FROM source_table,\n" // Table Function Join 语法对应 LATERAL TABLE + "LATERAL TABLE(user_profile_table_func(user_id)) t(age)"; Arrays.stream(sql.split(";")) .forEach(flinkEnv.streamTEnv()::executeSql); } public static class UserProfileTableFunction extends TableFunction { public void eval(long userId) { // 自定义输出逻辑 if (userId <= 5) { // 一行转 1 行 collect(1); } else { // 一行转 3 行 collect(1); collect(2); collect(3); } } } 执行结果如下: -- <= 5,则只有 1 行结果 +I[3, 7, 1, 2021-05-01T18:23:42.560] -- > 5,则有行 3 结果 +I[8, e, 1, 2021-05-01T18:23:42.560] +I[8, e, 2, 2021-05-01T18:23:42.560] +I[8, e, 3, 2021-05-01T18:23:42.560] -- <= 5,则只有 1 行结果 +I[4, 9, 1, 2021-05-01T18:23:42.561] -- > 5,则有行 3 结果 +I[8, c, 1, 2021-05-01T18:23:42.561] +I[8, c, 2, 2021-05-01T18:23:42.561]Flink Joins
1、Regular Join
Regular Join 定义(支持 Batch\Streaming):Regular Join 其实就是和离线 Hive SQL 一样的 Regular Join,通过条件关联两条流数据输出。应用场景:Join 其实在我们的数仓建设过程中应用是非常广泛的。离线数仓可以说基本上是离不开 Join 的。那么实时数仓的建设也必然离不开 Join,比如日志关联扩充维度数据,构建宽表;日志通过 ID 关联计算 CTR。Regular Join 包含以下几种(以 L 作为左流中的数据标识,R 作为右流中的数据标识):Inner Join(Inner Equal Join):流任务中,只有两条流 Join 到才输出,输出+[L, R]。Left Join(Outer Equal Join):流任务中,左流数据到达之后,无论有没有 Join 到右流的数据,亿华云都会输出(Join 到输出+[L, R],没 Join 到输出+[L, null]),如果右流之后数据到达之后,发现左流之前输出过没有 Join 到的数据,则会发起回撤流,先输出-[L, null],然后输出+[L, R]。Right Join(Outer Equal Join):有 Left Join 一样,左表和右表的执行逻辑完全相反。Full Join(Outer Equal Join):流任务中,左流或者右流的数据到达之后,无论有没有 Join 到另外一条流的数据,都会输出(对右流来说:Join 到输出+[L, R],没 Join 到输出+[null, R];对左流来说:Join 到输出+[L, R],没 Join 到输出+[L, null])。如果一条流的数据到达之后,发现之前另一条流之前输出过没有 Join 到的数据,则会发起回撤流(左流数据到达为例:回撤-[null, R],输出+[L, R],右流数据到达为例:回撤-[L, null],输出+[L, R])。实际案例:案例为曝光日志关联点击日志筛选既有曝光又有点击的数据,并且补充点击的扩展参数(show inner click):2、Interval Join(时间区间 Join)
Interval Join 定义(支持 Batch\Streaming):Interval Join 在离线的概念中是没有的。Interval Join 可以让一条流去 Join 另一条流中前后一段时间内的数据。应用场景:为什么有 Regular Join 还要 Interval Join 呢?刚刚的案例也讲了,Regular Join 会产生回撤流,但是在实时数仓中一般写入的 sink 都是类似于 Kafka 这样的消息队列,然后后面接 clickhouse 等引擎,这些引擎又不具备处理回撤流的能力。所以博主理解 Interval Join 就是用于消灭回撤流的。 Interval Join 包含以下几种(以 L 作为左流中的数据标识,R 作为右流中的数据标识):Inner Interval Join:流任务中,只有两条流 Join 到(满足 Join on 中的条件:两条流的数据在时间区间 + 满足其他等值条件)才输出,输出 +[L, R]。Left Interval Join:流任务中,左流数据到达之后,如果没有 Join 到右流的数据,就会等待(放在 State 中等),如果之后右流之后数据到达之后,发现能和刚刚那条左流数据 Join 到,则会输出 +[L, R]。事件时间中随着 Watermark 的推进(也支持处理时间)。如果发现发现左流 State 中的数据过期了,就把左流中过期的数据从 State 中删除,然后输出 +[L, null],如果右流 State 中的数据过期了,就直接从 State 中删除。Right Interval Join:和 Left Interval Join 执行逻辑一样,只不过左表和右表的执行逻辑完全相反。Full Interval Join:流任务中,左流或者右流的数据到达之后,如果没有 Join 到另外一条流的数据,就会等待(左流放在左流对应的 State 中等,右流放在右流对应的 State 中等),如果之后另一条流数据到达之后,发现能和刚刚那条数据 Join 到,则会输出 +[L, R]。事件时间中随着 Watermark 的推进(也支持处理时间),发现 State 中的数据能够过期了,就将这些数据从 State 中删除并且输出(左流过期输出 +[L, null],右流过期输出 -[null, R])。3、Temporal Join(快照 Join)
4、Lookup Join(维表 Join)
Lookup Join 定义(支持 Batch\Streaming):Lookup Join 其实就是维表 Join,比如拿离线数仓来说,常常会有用户画像,设备画像等数据,而对应到实时数仓场景中,这种实时获取外部缓存的 Join 就叫做维表 Join。应用场景:小伙伴萌会问,我们既然已经有了上面介绍的 Regular Join,Interval Join 等,为啥还需要一种 Lookup Join?因为上面说的这几种 Join 都是流与流之间的 Join,而 Lookup Join 是流与 Redis,Mysql,HBase 这种存储介质的 Join。Lookup 的意思就是实时查找,而实时的画像数据一般都是存储在 Redis,Mysql,HBase 中,这就是 Lookup Join 的由来。 实际案例:使用曝光用户日志流(show_log)关联用户画像维表(user_profile)关联到用户的维度之后,提供给下游计算分性别,年龄段的曝光用户数使用。5、Array Expansion(数组列转行)
6、Table Function(自定义列转行)