diff --git a/dbms/src/Flash/Planner/Plans/PhysicalJoin.cpp b/dbms/src/Flash/Planner/Plans/PhysicalJoin.cpp index 149a17e1602..59f4587da79 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalJoin.cpp +++ b/dbms/src/Flash/Planner/Plans/PhysicalJoin.cpp @@ -221,7 +221,6 @@ void PhysicalJoin::probeSideTransform(DAGPipeline & probe_pipeline, Context & co execId(), needScanHashMapAfterProbe(join_ptr->getKind())); join_ptr->initProbe(probe_pipeline.firstStream()->getHeader(), probe_pipeline.streams.size()); - join_ptr->setCancellationHook([&] { return context.isCancelled(); }); size_t probe_index = 0; for (auto & stream : probe_pipeline.streams) { @@ -233,6 +232,7 @@ void PhysicalJoin::probeSideTransform(DAGPipeline & probe_pipeline, Context & co settings.max_block_size); stream->setExtraInfo(join_probe_extra_info); } + join_ptr->setCancellationHook([&] { return context.isCancelled(); }); } void PhysicalJoin::buildSideTransform(DAGPipeline & build_pipeline, Context & context, size_t max_streams) diff --git a/dbms/src/Flash/Planner/Plans/PhysicalJoinBuild.cpp b/dbms/src/Flash/Planner/Plans/PhysicalJoinBuild.cpp index cbd6db1fa3a..a62e1f7cdc8 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalJoinBuild.cpp +++ b/dbms/src/Flash/Planner/Plans/PhysicalJoinBuild.cpp @@ -40,7 +40,6 @@ void PhysicalJoinBuild::buildPipelineExecGroupImpl( join_execute_info.join_build_profile_infos = group_builder.getCurProfileInfos(); join_ptr->initBuild(group_builder.getCurrentHeader(), group_builder.concurrency()); join_ptr->setInitActiveBuildThreads(); - join_ptr->setCancellationHook([&]() { return exec_context.isCancelled(); }); join_ptr.reset(); } } // namespace DB diff --git a/dbms/src/Flash/Planner/Plans/PhysicalJoinProbe.cpp b/dbms/src/Flash/Planner/Plans/PhysicalJoinProbe.cpp index 926738912f2..2c952fecfe6 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalJoinProbe.cpp +++ b/dbms/src/Flash/Planner/Plans/PhysicalJoinProbe.cpp @@ -61,6 +61,7 @@ void PhysicalJoinProbe::buildPipelineExecGroupImpl( // it is only necessary to add it during the "restore build stage." // The order of build/probe here is ensured by the event. exec_context.addOneTimeFuture(join_ptr->wait_probe_finished_future); + join_ptr->setCancellationHook([&]() { return exec_context.isCancelled(); }); join_ptr.reset(); } } // namespace DB diff --git a/dbms/src/Interpreters/Join.cpp b/dbms/src/Interpreters/Join.cpp index 16df7442aeb..466868041be 100644 --- a/dbms/src/Interpreters/Join.cpp +++ b/dbms/src/Interpreters/Join.cpp @@ -1352,6 +1352,10 @@ Block Join::doJoinBlockHash(ProbeProcessInfo & probe_process_info, const JoinBui Block Join::removeUselessColumn(Block & block) const { + // cancelled + if (!block) + return block; + Block projected_block; for (const auto & name_and_type : output_columns_after_finalize) { @@ -2218,6 +2222,9 @@ Block Join::joinBlock(ProbeProcessInfo & probe_process_info, bool dry_run) const else block = joinBlockHash(probe_process_info); + // if cancelled, just return empty block + if (!block) + return block; /// for (cartesian)antiLeftSemi join, the meaning of "match-helper" is `non-matched` instead of `matched`. if (kind == Cross_LeftOuterAnti) {