Skip to content

Commit

Permalink
[FLINK-xx][table] Add Join remove rules.
Browse files Browse the repository at this point in the history
  • Loading branch information
liuyongvs committed Jan 10, 2025
1 parent 96a4e90 commit f449c2e
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,10 @@ object FlinkBatchRuleSets {
CoreRules.AGGREGATE_UNION_AGGREGATE,
// expand distinct aggregate to normal aggregate with groupby
FlinkAggregateExpandDistinctAggregatesRule.INSTANCE,
CoreRules.PROJECT_JOIN_JOIN_REMOVE,
CoreRules.PROJECT_JOIN_REMOVE,
CoreRules.AGGREGATE_JOIN_JOIN_REMOVE,
CoreRules.AGGREGATE_JOIN_REMOVE,

// reduce aggregate functions like AVG, STDDEV_POP etc.
CoreRules.AGGREGATE_REDUCE_FUNCTIONS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,10 @@ object FlinkStreamRuleSets {
// using variants of aggregate union rule
CoreRules.AGGREGATE_UNION_AGGREGATE_FIRST,
CoreRules.AGGREGATE_UNION_AGGREGATE_SECOND,
CoreRules.PROJECT_JOIN_JOIN_REMOVE,
CoreRules.PROJECT_JOIN_REMOVE,
CoreRules.AGGREGATE_JOIN_JOIN_REMOVE,
CoreRules.AGGREGATE_JOIN_REMOVE,

// reduce aggregate functions like AVG, STDDEV_POP etc.
CoreRules.AGGREGATE_REDUCE_FUNCTIONS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,7 @@ LogicalProject(a1=[$1])
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[a1])
+- NestedLoopJoin(joinType=[RightOuterJoin], where=[(cnt = a2)], select=[cnt, a1, a2], build=[left], singleRowJoin=[true])
:- Exchange(distribution=[broadcast])
: +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS cnt])
: +- Exchange(distribution=[single])
: +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
: +- Calc(select=[0 AS $f0])
: +- TableSourceScan(table=[[default_catalog, default_database, B]], fields=[b1, b2])
+- TableSourceScan(table=[[default_catalog, default_database, A]], fields=[a1, a2])
+- TableSourceScan(table=[[default_catalog, default_database, A]], fields=[a1, a2])
]]>
</Resource>
</TestCase>
Expand All @@ -91,14 +84,7 @@ LogicalProject(a1=[$1])
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[a1])
+- NestedLoopJoin(joinType=[RightOuterJoin], where=[(cnt < a2)], select=[cnt, a1, a2], build=[left], singleRowJoin=[true])
:- Exchange(distribution=[broadcast])
: +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS cnt])
: +- Exchange(distribution=[single])
: +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
: +- Calc(select=[0 AS $f0])
: +- TableSourceScan(table=[[default_catalog, default_database, B]], fields=[b1, b2])
+- TableSourceScan(table=[[default_catalog, default_database, A]], fields=[a1, a2])
+- TableSourceScan(table=[[default_catalog, default_database, A]], fields=[a1, a2])
]]>
</Resource>
</TestCase>
Expand All @@ -119,14 +105,7 @@ LogicalProject(a2=[$1])
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[a2])
+- NestedLoopJoin(joinType=[LeftOuterJoin], where=[(a1 > cnt)], select=[a1, a2, cnt], build=[right], singleRowJoin=[true])
:- TableSourceScan(table=[[default_catalog, default_database, A]], fields=[a1, a2])
+- Exchange(distribution=[broadcast])
+- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS cnt])
+- Exchange(distribution=[single])
+- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
+- Calc(select=[0 AS $f0])
+- TableSourceScan(table=[[default_catalog, default_database, B]], fields=[b1, b2])
+- TableSourceScan(table=[[default_catalog, default_database, A]], fields=[a1, a2])
]]>
</Resource>
</TestCase>
Expand Down Expand Up @@ -176,14 +155,7 @@ LogicalProject(a2=[$1])
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[a2])
+- NestedLoopJoin(joinType=[LeftOuterJoin], where=[(a1 = cnt)], select=[a1, a2, cnt], build=[right], singleRowJoin=[true])
:- TableSourceScan(table=[[default_catalog, default_database, A]], fields=[a1, a2])
+- Exchange(distribution=[broadcast])
+- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS cnt])
+- Exchange(distribution=[single])
+- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
+- Calc(select=[0 AS $f0])
+- TableSourceScan(table=[[default_catalog, default_database, B]], fields=[b1, b2])
+- TableSourceScan(table=[[default_catalog, default_database, A]], fields=[a1, a2])
]]>
</Resource>
</TestCase>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,35 +199,19 @@ LogicalSink(table=[default_catalog.default_database.sink2], fields=[window_start
<![CDATA[
Sink(table=[default_catalog.default_database.sink1], fields=[window_start, window_end, user_id, dt, hour])
+- Calc(select=[window_start, window_end, user_id, DATE_FORMAT(+(window_end, 25200000:INTERVAL HOUR), 'yyyyMMdd') AS dt, DATE_FORMAT(+(window_end, 25200000:INTERVAL HOUR), 'HH') AS hour])
+- WindowJoin(leftWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[1 min])], rightWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[1 min])], joinType=[LeftOuterJoin], where=[=(user_id, user_id0)], select=[user_id, window_start, window_end, user_id0, window_start0, window_end0])
:- Exchange(distribution=[hash[user_id]])
: +- GlobalWindowAggregate(groupBy=[user_id], window=[TUMBLE(slice_end=[$slice_end], size=[1 min])], select=[user_id, start('w$) AS window_start, end('w$) AS window_end])
: +- Exchange(distribution=[hash[user_id]])
: +- LocalWindowAggregate(groupBy=[user_id], window=[TUMBLE(time_col=[event_time], size=[1 min])], select=[user_id, slice_end('w$) AS $slice_end])
: +- WatermarkAssigner(rowtime=[event_time], watermark=[event_time])
: +- TableSourceScan(table=[[default_catalog, default_database, food_order, project=[user_id, event_time], metadata=[]]], fields=[user_id, event_time])
+- GlobalWindowAggregate(groupBy=[user_id], window=[TUMBLE(slice_end=[$slice_end], size=[1 min])], select=[user_id, start('w$) AS window_start, end('w$) AS window_end])
+- Exchange(distribution=[hash[user_id]])
+- GlobalWindowAggregate(groupBy=[user_id], window=[TUMBLE(slice_end=[$slice_end], size=[1 min])], select=[user_id, start('w$) AS window_start, end('w$) AS window_end])
+- Exchange(distribution=[hash[user_id]])
+- LocalWindowAggregate(groupBy=[user_id], window=[TUMBLE(time_col=[event_time], size=[1 min])], select=[user_id, slice_end('w$) AS $slice_end])
+- WatermarkAssigner(rowtime=[event_time], watermark=[event_time])
+- TableSourceScan(table=[[default_catalog, default_database, food_order, project=[user_id, event_time], metadata=[]]], fields=[user_id, event_time])
+- LocalWindowAggregate(groupBy=[user_id], window=[TUMBLE(time_col=[event_time], size=[1 min])], select=[user_id, slice_end('w$) AS $slice_end])
+- WatermarkAssigner(rowtime=[event_time], watermark=[event_time])
+- TableSourceScan(table=[[default_catalog, default_database, food_order, project=[user_id, event_time], metadata=[]]], fields=[user_id, event_time])
Sink(table=[default_catalog.default_database.sink2], fields=[window_start, window_end, user_id, dt, hour])
+- Calc(select=[window_start, window_end, user_id, DATE_FORMAT(+(window_end, 25200000:INTERVAL HOUR), 'yyyyMMdd') AS dt, DATE_FORMAT(+(window_end, 25200000:INTERVAL HOUR), 'HH') AS hour])
+- WindowJoin(leftWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[1 min])], rightWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[1 min])], joinType=[LeftOuterJoin], where=[=(user_id, user_id0)], select=[user_id, window_start, window_end, user_id0, window_start0, window_end0])
:- Exchange(distribution=[hash[user_id]])
: +- GlobalWindowAggregate(groupBy=[user_id], window=[TUMBLE(slice_end=[$slice_end], size=[1 min])], select=[user_id, start('w$) AS window_start, end('w$) AS window_end])
: +- Exchange(distribution=[hash[user_id]])
: +- LocalWindowAggregate(groupBy=[user_id], window=[TUMBLE(time_col=[event_time], size=[1 min])], select=[user_id, slice_end('w$) AS $slice_end])
: +- WatermarkAssigner(rowtime=[event_time], watermark=[event_time])
: +- TableSourceScan(table=[[default_catalog, default_database, food_order, project=[user_id, event_time], metadata=[]]], fields=[user_id, event_time])
+- GlobalWindowAggregate(groupBy=[user_id], window=[TUMBLE(slice_end=[$slice_end], size=[1 min])], select=[user_id, start('w$) AS window_start, end('w$) AS window_end])
+- Exchange(distribution=[hash[user_id]])
+- GlobalWindowAggregate(groupBy=[user_id], window=[TUMBLE(slice_end=[$slice_end], size=[1 min])], select=[user_id, start('w$) AS window_start, end('w$) AS window_end])
+- Exchange(distribution=[hash[user_id]])
+- LocalWindowAggregate(groupBy=[user_id], window=[TUMBLE(time_col=[event_time], size=[1 min])], select=[user_id, slice_end('w$) AS $slice_end])
+- WatermarkAssigner(rowtime=[event_time], watermark=[event_time])
+- TableSourceScan(table=[[default_catalog, default_database, food_order, project=[user_id, event_time], metadata=[]]], fields=[user_id, event_time])
+- LocalWindowAggregate(groupBy=[user_id], window=[TUMBLE(time_col=[event_time], size=[1 min])], select=[user_id, slice_end('w$) AS $slice_end])
+- WatermarkAssigner(rowtime=[event_time], watermark=[event_time])
+- TableSourceScan(table=[[default_catalog, default_database, food_order, project=[user_id, event_time], metadata=[]]], fields=[user_id, event_time])
]]>
</Resource>
</TestCase>
Expand Down

0 comments on commit f449c2e

Please sign in to comment.