Fixes the watcher cannot be cancelled issue with etcd 3.x (#238)
Signed-off-by: Tao He <sighingnow@gmail.com>
This commit is contained in:
parent
153546f965
commit
068f37ba5c
|
|
@ -1091,17 +1091,8 @@ etcdv3::AsyncWatchAction::AsyncWatchAction(etcdv3::ActionParameters&& params)
|
||||||
isCancelled.store(false);
|
isCancelled.store(false);
|
||||||
stream = parameters.watch_stub->AsyncWatch(&context, &cq_,
|
stream = parameters.watch_stub->AsyncWatch(&context, &cq_,
|
||||||
(void*) etcdv3::WATCH_CREATE);
|
(void*) etcdv3::WATCH_CREATE);
|
||||||
// The unique watcher id causes the watcher cannot be cancelled as expected
|
this->watch_id =
|
||||||
// on Ubuntu 20.04.
|
std::chrono::high_resolution_clock::now().time_since_epoch().count();
|
||||||
//
|
|
||||||
// See CI failures:
|
|
||||||
// https://github.com/etcd-cpp-apiv3/etcd-cpp-apiv3/actions/runs/5561397273/jobs/10159051536
|
|
||||||
//
|
|
||||||
// Added in https://github.com/etcd-cpp-apiv3/etcd-cpp-apiv3/pull/232
|
|
||||||
// Removed in https://github.com/etcd-cpp-apiv3/etcd-cpp-apiv3/pull/236
|
|
||||||
//
|
|
||||||
// this->watch_id =
|
|
||||||
// std::chrono::high_resolution_clock::now().time_since_epoch().count();
|
|
||||||
// #ifndef NDEBUG
|
// #ifndef NDEBUG
|
||||||
// std::clog << "etcd-cpp-apiv3: watch_id: " << this->watch_id << std::endl;
|
// std::clog << "etcd-cpp-apiv3: watch_id: " << this->watch_id << std::endl;
|
||||||
// #endif
|
// #endif
|
||||||
|
|
@ -1135,11 +1126,52 @@ etcdv3::AsyncWatchAction::AsyncWatchAction(etcdv3::ActionParameters&& params)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Notes: `Cancel` and `waitForResponse` of watchers.
|
||||||
|
*
|
||||||
|
* We meet failures about failed to cancel the watcher on Ubuntu 20.04
|
||||||
|
* due to unable to receive the "etcdv3::WATCH_FINISH" tag from the gRPC
|
||||||
|
* completion queue.
|
||||||
|
*
|
||||||
|
* See CI:
|
||||||
|
* https://github.com/etcd-cpp-apiv3/etcd-cpp-apiv3/actions/runs/5561458372/jobs/10159155857
|
||||||
|
*
|
||||||
|
* To address the problem, we use the `AsyncNext()` to wait for the
|
||||||
|
* the last token from the completion queue (wait for 1 second, once
|
||||||
|
* we called the method `stream->Finish()`).
|
||||||
|
*
|
||||||
|
* Remark: the issue might be caused by lower version etcd.
|
||||||
|
*/
|
||||||
|
|
||||||
void etcdv3::AsyncWatchAction::waitForResponse() {
|
void etcdv3::AsyncWatchAction::waitForResponse() {
|
||||||
void* got_tag;
|
void* got_tag;
|
||||||
bool ok = false;
|
bool ok = false;
|
||||||
|
bool the_final_round = false;
|
||||||
|
|
||||||
while (cq_.Next(&got_tag, &ok)) {
|
while (true) {
|
||||||
|
if (!the_final_round) {
|
||||||
|
if (!cq_.Next(&got_tag, &ok)) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
auto deadline =
|
||||||
|
std::chrono::system_clock::now() + std::chrono::seconds(1);
|
||||||
|
switch (cq_.AsyncNext(&got_tag, &ok, deadline)) {
|
||||||
|
case CompletionQueue::NextStatus::TIMEOUT:
|
||||||
|
case CompletionQueue::NextStatus::SHUTDOWN: {
|
||||||
|
std::cerr << "[warn] watcher does't exit normally" << std::endl;
|
||||||
|
// pretend to be received a "WATCH_FINISH" tag: shutdown
|
||||||
|
context.TryCancel();
|
||||||
|
cq_.Shutdown();
|
||||||
|
ok = false; // jump out
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case CompletionQueue::NextStatus::GOT_EVENT: {
|
||||||
|
// normal execution flow
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
if (ok == false) {
|
if (ok == false) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
@ -1148,6 +1180,7 @@ void etcdv3::AsyncWatchAction::waitForResponse() {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (got_tag == (void*) etcdv3::WATCH_WRITES_DONE) {
|
if (got_tag == (void*) etcdv3::WATCH_WRITES_DONE) {
|
||||||
|
the_final_round = true;
|
||||||
stream->Finish(&status, (void*) etcdv3::WATCH_FINISH);
|
stream->Finish(&status, (void*) etcdv3::WATCH_FINISH);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
@ -1200,23 +1233,36 @@ void etcdv3::AsyncWatchAction::waitForResponse() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void etcdv3::AsyncWatchAction::CancelWatch() {
|
|
||||||
if (!isCancelled.exchange(true)) {
|
|
||||||
WatchRequest cancel_req;
|
|
||||||
cancel_req.mutable_cancel_request()->set_watch_id(this->watch_id);
|
|
||||||
stream->Write(cancel_req, (void*) etcdv3::WATCH_WRITE_CANCEL);
|
|
||||||
isCancelled.store(true);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
bool etcdv3::AsyncWatchAction::Cancelled() const { return isCancelled.load(); }
|
|
||||||
|
|
||||||
void etcdv3::AsyncWatchAction::waitForResponse(
|
void etcdv3::AsyncWatchAction::waitForResponse(
|
||||||
std::function<void(etcd::Response)> callback) {
|
std::function<void(etcd::Response)> callback) {
|
||||||
void* got_tag;
|
void* got_tag;
|
||||||
bool ok = false;
|
bool ok = false;
|
||||||
|
bool the_final_round = false;
|
||||||
|
|
||||||
while (cq_.Next(&got_tag, &ok)) {
|
while (true) {
|
||||||
|
if (!the_final_round) {
|
||||||
|
if (!cq_.Next(&got_tag, &ok)) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
auto deadline =
|
||||||
|
std::chrono::system_clock::now() + std::chrono::seconds(1);
|
||||||
|
switch (cq_.AsyncNext(&got_tag, &ok, deadline)) {
|
||||||
|
case CompletionQueue::NextStatus::TIMEOUT:
|
||||||
|
case CompletionQueue::NextStatus::SHUTDOWN: {
|
||||||
|
std::cerr << "[warn] watcher does't exit normally" << std::endl;
|
||||||
|
// pretend to be received a "WATCH_FINISH" tag: shutdown
|
||||||
|
context.TryCancel();
|
||||||
|
cq_.Shutdown();
|
||||||
|
ok = false; // jump out
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case CompletionQueue::NextStatus::GOT_EVENT: {
|
||||||
|
// normal execution flow
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
if (ok == false) {
|
if (ok == false) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
@ -1225,6 +1271,7 @@ void etcdv3::AsyncWatchAction::waitForResponse(
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (got_tag == (void*) etcdv3::WATCH_WRITES_DONE) {
|
if (got_tag == (void*) etcdv3::WATCH_WRITES_DONE) {
|
||||||
|
the_final_round = true;
|
||||||
stream->Finish(&status, (void*) etcdv3::WATCH_FINISH);
|
stream->Finish(&status, (void*) etcdv3::WATCH_FINISH);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
@ -1267,6 +1314,17 @@ void etcdv3::AsyncWatchAction::waitForResponse(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void etcdv3::AsyncWatchAction::CancelWatch() {
|
||||||
|
if (!isCancelled.exchange(true)) {
|
||||||
|
WatchRequest cancel_req;
|
||||||
|
cancel_req.mutable_cancel_request()->set_watch_id(this->watch_id);
|
||||||
|
stream->Write(cancel_req, (void*) etcdv3::WATCH_WRITE_CANCEL);
|
||||||
|
isCancelled.store(true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bool etcdv3::AsyncWatchAction::Cancelled() const { return isCancelled.load(); }
|
||||||
|
|
||||||
etcdv3::AsyncWatchResponse etcdv3::AsyncWatchAction::ParseResponse() {
|
etcdv3::AsyncWatchResponse etcdv3::AsyncWatchAction::ParseResponse() {
|
||||||
AsyncWatchResponse watch_resp;
|
AsyncWatchResponse watch_resp;
|
||||||
watch_resp.set_action(etcdv3::WATCH_ACTION);
|
watch_resp.set_action(etcdv3::WATCH_ACTION);
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue