Skip to content

Commit

Permalink
[BugFix] Fix SenderQueue hang when cancel (#35380)
Browse files Browse the repository at this point in the history
Signed-off-by: stdpain <[email protected]>
(cherry picked from commit 571b9e5)
  • Loading branch information
stdpain authored and wanpengfei-git committed Nov 24, 2023
1 parent 90a24c0 commit 8b17325
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 5 deletions.
37 changes: 33 additions & 4 deletions be/src/runtime/sender_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "util/logging.h"
#include "util/runtime_profile.h"
#include "util/time.h"
#include "util/uid_util.h"

namespace starrocks {

Expand Down Expand Up @@ -566,6 +567,33 @@ void DataStreamRecvr::PipelineSenderQueue::clean_buffer_queues() {
}
}

void DataStreamRecvr::PipelineSenderQueue::check_leak_closure() {
std::lock_guard<Mutex> l(_lock);
for (size_t i = 0; i < _chunk_queues.size(); i++) {
auto& chunk_queue = _chunk_queues[i];
ChunkItem item;
while (chunk_queue.size_approx() > 0) {
if (chunk_queue.try_dequeue(item)) {
if (item.closure != nullptr) {
DCHECK(false) << "leak closure detected";
LOG(WARNING) << "leak closure detected in fragment:" << print_id(_recvr->fragment_instance_id());
}
}
}
}

for (auto& [_, chunk_queues] : _buffered_chunk_queues) {
for (auto& [_, chunk_queue] : chunk_queues) {
for (auto& item : chunk_queue) {
if (item.closure != nullptr) {
DCHECK(false) << "leak closure detected";
LOG(WARNING) << "leak closure detected in fragment:" << print_id(_recvr->fragment_instance_id());
}
}
}
}
}

Status DataStreamRecvr::PipelineSenderQueue::try_to_build_chunk_meta(const PTransmitChunkParams& request,
Metrics& metrics) {
ScopedTimer<MonotonicStopWatch> wait_timer(metrics.wait_lock_timer);
Expand Down Expand Up @@ -750,14 +778,15 @@ Status DataStreamRecvr::PipelineSenderQueue::add_chunks(const PTransmitChunkPara
// We cannot early-return for short circuit, it may occur for parts of parallelism,
// and the other parallelism may need to proceed.
}
if (_is_cancelled) {
clean_buffer_queues();
return Status::OK();
}
_recvr->_num_buffered_bytes += chunk_bytes;
}
}

// if senderqueue is cancelled clear all closure buffers
if (_is_cancelled) {
clean_buffer_queues();
}

return Status::OK();
}

Expand Down
7 changes: 6 additions & 1 deletion be/src/runtime/sender_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,10 @@ class DataStreamRecvr::NonPipelineSenderQueue final : public DataStreamRecvr::Se
class DataStreamRecvr::PipelineSenderQueue final : public DataStreamRecvr::SenderQueue {
public:
PipelineSenderQueue(DataStreamRecvr* parent_recvr, int num_senders, int degree_of_parallism);
~PipelineSenderQueue() override { close(); }
~PipelineSenderQueue() override {
check_leak_closure();
close();
}

Status get_chunk(vectorized::Chunk** chunk, const int32_t driver_sequence = -1) override;

Expand Down Expand Up @@ -198,6 +201,8 @@ class DataStreamRecvr::PipelineSenderQueue final : public DataStreamRecvr::Sende

void clean_buffer_queues();

void check_leak_closure();

StatusOr<ChunkList> get_chunks_from_pass_through(const int32_t sender_id, size_t& total_chunk_bytes);

template <bool need_deserialization>
Expand Down

0 comments on commit 8b17325

Please sign in to comment.