From 0cb8a0421821ef88558ad5d98a77d05accc72a2f Mon Sep 17 00:00:00 2001 From: David Pastor Date: Wed, 9 Feb 2022 16:07:36 +0100 Subject: [PATCH] Add support to re-activate watcher from wait_callback in watcher async wait. Returns wait result when cancelling watcher. --- etcd/Watcher.hpp | 10 +++++++--- src/Watcher.cpp | 24 ++++++++++++++++-------- 2 files changed, 23 insertions(+), 11 deletions(-) diff --git a/etcd/Watcher.hpp b/etcd/Watcher.hpp index 366f6ed..0a7b3e5 100644 --- a/etcd/Watcher.hpp +++ b/etcd/Watcher.hpp @@ -70,13 +70,17 @@ namespace etcd * An async wait, the callback will be called when the task has been stopped. * * The callback parameter would be true if the watch is been normally cancalled. + * If callback returns true, will reactivate the watcher and continue receiving + * watch events, otherwise the watcher ends (default behaviour). */ - void Wait(std::function callback); + void Wait(std::function callback); /** * Stop the watching action. + * + * Returns true if the watcher has been cancelled, otherwise false. */ - void Cancel(); + bool Cancel(); ~Watcher(); @@ -88,7 +92,7 @@ namespace etcd int index; std::function callback; - std::function wait_callback; + std::function wait_callback; // Don't use `pplx::task` to avoid sharing thread pool with other actions on the client // to avoid any potential blocking, which may block the keepalive loop and evict the lease. diff --git a/src/Watcher.cpp b/src/Watcher.cpp index d0dbf54..c62efda 100644 --- a/src/Watcher.cpp +++ b/src/Watcher.cpp @@ -109,7 +109,7 @@ bool etcd::Watcher::Wait() return stubs->call->Cancelled(); } -void etcd::Watcher::Wait(std::function callback) +void etcd::Watcher::Wait(std::function callback) { if (wait_callback == nullptr) { wait_callback = callback; @@ -118,10 +118,10 @@ void etcd::Watcher::Wait(std::function callback) } } -void etcd::Watcher::Cancel() +bool etcd::Watcher::Cancel() { stubs->call->CancelWatch(); - this->Wait(); + return this->Wait(); } void etcd::Watcher::doWatch(std::string const & key, @@ -141,11 +141,19 @@ void etcd::Watcher::doWatch(std::string const & key, stubs->call.reset(new etcdv3::AsyncWatchAction(params)); - task_ = std::thread([this, callback]() { - stubs->call->waitForResponse(callback); - if (wait_callback != nullptr) { - wait_callback(stubs->call->Cancelled()); - } + task_ = std::thread([this, params, callback]() { + bool continue_watch = false; // Loop should iterate only once when no 'wait_callback' (to preserve previous behaviour). + do + { + stubs->call->waitForResponse(callback); + if (wait_callback != nullptr) { + continue_watch = wait_callback(stubs->call->Cancelled()); + if (continue_watch) + stubs->call.reset(new etcdv3::AsyncWatchAction(params)); + } + } while (continue_watch); + + stubs->call->CancelWatch(); }); cancelled.store(false); }