From ad4215511ff6476433b5bce437a106b37615b3cf Mon Sep 17 00:00:00 2001 From: Tao He Date: Tue, 18 May 2021 14:08:46 +0800 Subject: [PATCH] Fixes race condition (repeatly cancel) in watch. Signed-off-by: Tao He --- etcd/Watcher.hpp | 2 ++ etcd/v3/AsyncWatchAction.hpp | 3 ++- src/KeepAlive.cpp | 4 +++- src/Watcher.cpp | 12 ++++++------ src/v3/AsyncWatchAction.cpp | 19 ++++++++----------- 5 files changed, 21 insertions(+), 19 deletions(-) diff --git a/etcd/Watcher.hpp b/etcd/Watcher.hpp index d10cbe9..1c5300f 100644 --- a/etcd/Watcher.hpp +++ b/etcd/Watcher.hpp @@ -1,6 +1,7 @@ #ifndef __ETCD_WATCHER_HPP__ #define __ETCD_WATCHER_HPP__ +#include #include #include #include @@ -98,6 +99,7 @@ namespace etcd private: int fromIndex; bool recursive; + std::atomic_bool cancelled; }; } diff --git a/etcd/v3/AsyncWatchAction.hpp b/etcd/v3/AsyncWatchAction.hpp index f356653..7fc4dea 100644 --- a/etcd/v3/AsyncWatchAction.hpp +++ b/etcd/v3/AsyncWatchAction.hpp @@ -1,6 +1,7 @@ #ifndef __ASYNC_WATCHACTION_HPP__ #define __ASYNC_WATCHACTION_HPP__ +#include #include #include @@ -28,7 +29,7 @@ namespace etcdv3 private: WatchResponse reply; std::unique_ptr> stream; - bool isCancelled; + std::atomic_bool isCancelled; std::mutex protect_is_cancalled; }; } diff --git a/src/KeepAlive.cpp b/src/KeepAlive.cpp index 6fb37d9..a933edc 100644 --- a/src/KeepAlive.cpp +++ b/src/KeepAlive.cpp @@ -115,7 +115,9 @@ void etcd::KeepAlive::Cancel() // clean up context.stop(); - task_.join(); + if (task_.joinable()) { + task_.join(); + } } void etcd::KeepAlive::Check() { diff --git a/src/Watcher.cpp b/src/Watcher.cpp index afd8d9d..bc2216c 100644 --- a/src/Watcher.cpp +++ b/src/Watcher.cpp @@ -92,16 +92,15 @@ etcd::Watcher::Watcher(std::string const & address, etcd::Watcher::~Watcher() { - stubs->call->CancelWatch(); - if (task_.joinable()) { - task_.join(); - } + this->Cancel(); } bool etcd::Watcher::Wait() { - if (task_.joinable()) { - task_.join(); + if (cancelled.exchange(true)) { + if (task_.joinable()) { + task_.join(); + } } return stubs->call->Cancelled(); } @@ -144,4 +143,5 @@ void etcd::Watcher::doWatch(std::string const & key, wait_callback(stubs->call->Cancelled()); } }); + cancelled.store(false); } diff --git a/src/v3/AsyncWatchAction.cpp b/src/v3/AsyncWatchAction.cpp index 81acaa2..0c4a77e 100644 --- a/src/v3/AsyncWatchAction.cpp +++ b/src/v3/AsyncWatchAction.cpp @@ -9,7 +9,7 @@ using etcdserverpb::WatchCreateRequest; etcdv3::AsyncWatchAction::AsyncWatchAction(etcdv3::ActionParameters param) : etcdv3::Action(param) { - isCancelled = false; + isCancelled.store(false); stream = parameters.watch_stub->AsyncWatch(&context,&cq_,(void*)"create"); WatchRequest watch_req; @@ -61,14 +61,14 @@ void etcdv3::AsyncWatchAction::waitForResponse() break; } if(got_tag == (void*)"writes done") { - isCancelled = true; + isCancelled.store(true); cq_.Shutdown(); break; } if(got_tag == (void*)this) // read tag { if (reply.canceled()) { - isCancelled = true; + isCancelled.store(true); cq_.Shutdown(); } else if ((reply.created() && reply.header().revision() < parameters.revision) || @@ -77,7 +77,7 @@ void etcdv3::AsyncWatchAction::waitForResponse() // // 1. watch for a future revision, return immediately with empty events set // 2. receive any effective events. - isCancelled = true; + isCancelled.store(true); stream->WritesDone((void*)"writes done"); grpc::Status status; stream->Finish(&status, (void *)this); @@ -100,9 +100,7 @@ void etcdv3::AsyncWatchAction::waitForResponse() void etcdv3::AsyncWatchAction::CancelWatch() { std::lock_guard scope_lock(this->protect_is_cancalled); - if(isCancelled == false) - { - isCancelled = true; + if (!isCancelled.exchange(true)) { stream->WritesDone((void*)"writes done"); grpc::Status status; stream->Finish(&status, (void *)this); @@ -111,7 +109,7 @@ void etcdv3::AsyncWatchAction::CancelWatch() } bool etcdv3::AsyncWatchAction::Cancelled() const { - return isCancelled; + return isCancelled.load(); } void etcdv3::AsyncWatchAction::waitForResponse(std::function callback) @@ -127,14 +125,14 @@ void etcdv3::AsyncWatchAction::waitForResponse(std::function