Add support to re-activate watcher from wait_callback in watcher async wait.
Returns wait result when cancelling watcher.
This commit is contained in:
parent
82f632de11
commit
0cb8a04218
|
|
@ -70,13 +70,17 @@ namespace etcd
|
||||||
* An async wait, the callback will be called when the task has been stopped.
|
* An async wait, the callback will be called when the task has been stopped.
|
||||||
*
|
*
|
||||||
* The callback parameter would be true if the watch is been normally cancalled.
|
* The callback parameter would be true if the watch is been normally cancalled.
|
||||||
|
* If callback returns true, will reactivate the watcher and continue receiving
|
||||||
|
* watch events, otherwise the watcher ends (default behaviour).
|
||||||
*/
|
*/
|
||||||
void Wait(std::function<void(bool)> callback);
|
void Wait(std::function<bool(bool)> callback);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Stop the watching action.
|
* Stop the watching action.
|
||||||
|
*
|
||||||
|
* Returns true if the watcher has been cancelled, otherwise false.
|
||||||
*/
|
*/
|
||||||
void Cancel();
|
bool Cancel();
|
||||||
|
|
||||||
~Watcher();
|
~Watcher();
|
||||||
|
|
||||||
|
|
@ -88,7 +92,7 @@ namespace etcd
|
||||||
|
|
||||||
int index;
|
int index;
|
||||||
std::function<void(Response)> callback;
|
std::function<void(Response)> callback;
|
||||||
std::function<void(bool)> wait_callback;
|
std::function<bool(bool)> wait_callback;
|
||||||
|
|
||||||
// Don't use `pplx::task` to avoid sharing thread pool with other actions on the client
|
// Don't use `pplx::task` to avoid sharing thread pool with other actions on the client
|
||||||
// to avoid any potential blocking, which may block the keepalive loop and evict the lease.
|
// to avoid any potential blocking, which may block the keepalive loop and evict the lease.
|
||||||
|
|
|
||||||
|
|
@ -109,7 +109,7 @@ bool etcd::Watcher::Wait()
|
||||||
return stubs->call->Cancelled();
|
return stubs->call->Cancelled();
|
||||||
}
|
}
|
||||||
|
|
||||||
void etcd::Watcher::Wait(std::function<void(bool)> callback)
|
void etcd::Watcher::Wait(std::function<bool(bool)> callback)
|
||||||
{
|
{
|
||||||
if (wait_callback == nullptr) {
|
if (wait_callback == nullptr) {
|
||||||
wait_callback = callback;
|
wait_callback = callback;
|
||||||
|
|
@ -118,10 +118,10 @@ void etcd::Watcher::Wait(std::function<void(bool)> callback)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void etcd::Watcher::Cancel()
|
bool etcd::Watcher::Cancel()
|
||||||
{
|
{
|
||||||
stubs->call->CancelWatch();
|
stubs->call->CancelWatch();
|
||||||
this->Wait();
|
return this->Wait();
|
||||||
}
|
}
|
||||||
|
|
||||||
void etcd::Watcher::doWatch(std::string const & key,
|
void etcd::Watcher::doWatch(std::string const & key,
|
||||||
|
|
@ -141,11 +141,19 @@ void etcd::Watcher::doWatch(std::string const & key,
|
||||||
|
|
||||||
stubs->call.reset(new etcdv3::AsyncWatchAction(params));
|
stubs->call.reset(new etcdv3::AsyncWatchAction(params));
|
||||||
|
|
||||||
task_ = std::thread([this, callback]() {
|
task_ = std::thread([this, params, callback]() {
|
||||||
stubs->call->waitForResponse(callback);
|
bool continue_watch = false; // Loop should iterate only once when no 'wait_callback' (to preserve previous behaviour).
|
||||||
if (wait_callback != nullptr) {
|
do
|
||||||
wait_callback(stubs->call->Cancelled());
|
{
|
||||||
}
|
stubs->call->waitForResponse(callback);
|
||||||
|
if (wait_callback != nullptr) {
|
||||||
|
continue_watch = wait_callback(stubs->call->Cancelled());
|
||||||
|
if (continue_watch)
|
||||||
|
stubs->call.reset(new etcdv3::AsyncWatchAction(params));
|
||||||
|
}
|
||||||
|
} while (continue_watch);
|
||||||
|
|
||||||
|
stubs->call->CancelWatch();
|
||||||
});
|
});
|
||||||
cancelled.store(false);
|
cancelled.store(false);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue