Revert "Add support to re-activate watcher from wait_callback in watcher async wait."

This reverts commit 0467c0eef3.
This commit is contained in:
Tao He 2022-04-08 16:42:07 +08:00
parent 31c0161148
commit abfb674c0c
2 changed files with 11 additions and 23 deletions

View File

@ -70,17 +70,13 @@ namespace etcd
* An async wait, the callback will be called when the task has been stopped. * An async wait, the callback will be called when the task has been stopped.
* *
* The callback parameter would be true if the watch is been normally cancalled. * The callback parameter would be true if the watch is been normally cancalled.
* If callback returns true, will reactivate the watcher and continue receiving
* watch events, otherwise the watcher ends (default behaviour).
*/ */
void Wait(std::function<bool(bool)> callback); void Wait(std::function<void(bool)> callback);
/** /**
* Stop the watching action. * Stop the watching action.
*
* Returns true if the watcher has been cancelled, otherwise false.
*/ */
bool Cancel(); void Cancel();
~Watcher(); ~Watcher();
@ -92,7 +88,7 @@ namespace etcd
int index; int index;
std::function<void(Response)> callback; std::function<void(Response)> callback;
std::function<bool(bool)> wait_callback; std::function<void(bool)> wait_callback;
// Don't use `pplx::task` to avoid sharing thread pool with other actions on the client // Don't use `pplx::task` to avoid sharing thread pool with other actions on the client
// to avoid any potential blocking, which may block the keepalive loop and evict the lease. // to avoid any potential blocking, which may block the keepalive loop and evict the lease.

View File

@ -109,7 +109,7 @@ bool etcd::Watcher::Wait()
return stubs->call->Cancelled(); return stubs->call->Cancelled();
} }
void etcd::Watcher::Wait(std::function<bool(bool)> callback) void etcd::Watcher::Wait(std::function<void(bool)> callback)
{ {
if (wait_callback == nullptr) { if (wait_callback == nullptr) {
wait_callback = callback; wait_callback = callback;
@ -118,10 +118,10 @@ void etcd::Watcher::Wait(std::function<bool(bool)> callback)
} }
} }
bool etcd::Watcher::Cancel() void etcd::Watcher::Cancel()
{ {
stubs->call->CancelWatch(); stubs->call->CancelWatch();
return this->Wait(); this->Wait();
} }
void etcd::Watcher::doWatch(std::string const & key, void etcd::Watcher::doWatch(std::string const & key,
@ -141,19 +141,11 @@ void etcd::Watcher::doWatch(std::string const & key,
stubs->call.reset(new etcdv3::AsyncWatchAction(params)); stubs->call.reset(new etcdv3::AsyncWatchAction(params));
task_ = std::thread([this, params, callback]() { task_ = std::thread([this, callback]() {
bool continue_watch = false; // Loop should iterate only once when no 'wait_callback' (to preserve previous behaviour). stubs->call->waitForResponse(callback);
do if (wait_callback != nullptr) {
{ wait_callback(stubs->call->Cancelled());
stubs->call->waitForResponse(callback); }
if (wait_callback != nullptr) {
continue_watch = wait_callback(stubs->call->Cancelled());
if (continue_watch)
stubs->call.reset(new etcdv3::AsyncWatchAction(params));
}
} while (continue_watch);
stubs->call->CancelWatch();
}); });
cancelled.store(false); cancelled.store(false);
} }