Add support to re-activate watcher from wait_callback in watcher async wait.

Returns wait result when cancelling watcher.
This commit is contained in:
David Pastor 2022-02-09 16:07:36 +01:00 committed by Tao He
parent 82f632de11
commit 0467c0eef3
2 changed files with 23 additions and 11 deletions

View File

@ -70,13 +70,17 @@ namespace etcd
* An async wait, the callback will be called when the task has been stopped.
*
* The callback parameter would be true if the watch is been normally cancalled.
* If callback returns true, will reactivate the watcher and continue receiving
* watch events, otherwise the watcher ends (default behaviour).
*/
void Wait(std::function<void(bool)> callback);
void Wait(std::function<bool(bool)> callback);
/**
* Stop the watching action.
*
* Returns true if the watcher has been cancelled, otherwise false.
*/
void Cancel();
bool Cancel();
~Watcher();
@ -88,7 +92,7 @@ namespace etcd
int index;
std::function<void(Response)> callback;
std::function<void(bool)> wait_callback;
std::function<bool(bool)> wait_callback;
// Don't use `pplx::task` to avoid sharing thread pool with other actions on the client
// to avoid any potential blocking, which may block the keepalive loop and evict the lease.

View File

@ -109,7 +109,7 @@ bool etcd::Watcher::Wait()
return stubs->call->Cancelled();
}
void etcd::Watcher::Wait(std::function<void(bool)> callback)
void etcd::Watcher::Wait(std::function<bool(bool)> callback)
{
if (wait_callback == nullptr) {
wait_callback = callback;
@ -118,10 +118,10 @@ void etcd::Watcher::Wait(std::function<void(bool)> callback)
}
}
void etcd::Watcher::Cancel()
bool etcd::Watcher::Cancel()
{
stubs->call->CancelWatch();
this->Wait();
return this->Wait();
}
void etcd::Watcher::doWatch(std::string const & key,
@ -141,11 +141,19 @@ void etcd::Watcher::doWatch(std::string const & key,
stubs->call.reset(new etcdv3::AsyncWatchAction(params));
task_ = std::thread([this, callback]() {
stubs->call->waitForResponse(callback);
if (wait_callback != nullptr) {
wait_callback(stubs->call->Cancelled());
}
task_ = std::thread([this, params, callback]() {
bool continue_watch = false; // Loop should iterate only once when no 'wait_callback' (to preserve previous behaviour).
do
{
stubs->call->waitForResponse(callback);
if (wait_callback != nullptr) {
continue_watch = wait_callback(stubs->call->Cancelled());
if (continue_watch)
stubs->call.reset(new etcdv3::AsyncWatchAction(params));
}
} while (continue_watch);
stubs->call->CancelWatch();
});
cancelled.store(false);
}