diff --git a/etcd/Watcher.hpp b/etcd/Watcher.hpp index 0a7b3e5..366f6ed 100644 --- a/etcd/Watcher.hpp +++ b/etcd/Watcher.hpp @@ -70,17 +70,13 @@ namespace etcd * An async wait, the callback will be called when the task has been stopped. * * The callback parameter would be true if the watch is been normally cancalled. - * If callback returns true, will reactivate the watcher and continue receiving - * watch events, otherwise the watcher ends (default behaviour). */ - void Wait(std::function callback); + void Wait(std::function callback); /** * Stop the watching action. - * - * Returns true if the watcher has been cancelled, otherwise false. */ - bool Cancel(); + void Cancel(); ~Watcher(); @@ -92,7 +88,7 @@ namespace etcd int index; std::function callback; - std::function wait_callback; + std::function wait_callback; // Don't use `pplx::task` to avoid sharing thread pool with other actions on the client // to avoid any potential blocking, which may block the keepalive loop and evict the lease. diff --git a/src/Watcher.cpp b/src/Watcher.cpp index c62efda..d0dbf54 100644 --- a/src/Watcher.cpp +++ b/src/Watcher.cpp @@ -109,7 +109,7 @@ bool etcd::Watcher::Wait() return stubs->call->Cancelled(); } -void etcd::Watcher::Wait(std::function callback) +void etcd::Watcher::Wait(std::function callback) { if (wait_callback == nullptr) { wait_callback = callback; @@ -118,10 +118,10 @@ void etcd::Watcher::Wait(std::function callback) } } -bool etcd::Watcher::Cancel() +void etcd::Watcher::Cancel() { stubs->call->CancelWatch(); - return this->Wait(); + this->Wait(); } void etcd::Watcher::doWatch(std::string const & key, @@ -141,19 +141,11 @@ void etcd::Watcher::doWatch(std::string const & key, stubs->call.reset(new etcdv3::AsyncWatchAction(params)); - task_ = std::thread([this, params, callback]() { - bool continue_watch = false; // Loop should iterate only once when no 'wait_callback' (to preserve previous behaviour). - do - { - stubs->call->waitForResponse(callback); - if (wait_callback != nullptr) { - continue_watch = wait_callback(stubs->call->Cancelled()); - if (continue_watch) - stubs->call.reset(new etcdv3::AsyncWatchAction(params)); - } - } while (continue_watch); - - stubs->call->CancelWatch(); + task_ = std::thread([this, callback]() { + stubs->call->waitForResponse(callback); + if (wait_callback != nullptr) { + wait_callback(stubs->call->Cancelled()); + } }); cancelled.store(false); }