From 6619a3ef0ad586d3dd2e14ae354139a534386878 Mon Sep 17 00:00:00 2001 From: Tao He Date: Sat, 15 Jul 2023 18:42:07 +0800 Subject: [PATCH] Fixes the watcher cannot be cancelled issue with etcd 3.x Signed-off-by: Tao He --- src/v3/AsyncGRPC.cpp | 108 +++++++++++++++++++++++++++++++++---------- 1 file changed, 83 insertions(+), 25 deletions(-) diff --git a/src/v3/AsyncGRPC.cpp b/src/v3/AsyncGRPC.cpp index 8cb8c7f..9af8824 100644 --- a/src/v3/AsyncGRPC.cpp +++ b/src/v3/AsyncGRPC.cpp @@ -1091,17 +1091,8 @@ etcdv3::AsyncWatchAction::AsyncWatchAction(etcdv3::ActionParameters&& params) isCancelled.store(false); stream = parameters.watch_stub->AsyncWatch(&context, &cq_, (void*) etcdv3::WATCH_CREATE); - // The unique watcher id causes the watcher cannot be cancelled as expected - // on Ubuntu 20.04. - // - // See CI failures: - // https://github.com/etcd-cpp-apiv3/etcd-cpp-apiv3/actions/runs/5561397273/jobs/10159051536 - // - // Added in https://github.com/etcd-cpp-apiv3/etcd-cpp-apiv3/pull/232 - // Removed in https://github.com/etcd-cpp-apiv3/etcd-cpp-apiv3/pull/236 - // - // this->watch_id = - // std::chrono::high_resolution_clock::now().time_since_epoch().count(); + this->watch_id = + std::chrono::high_resolution_clock::now().time_since_epoch().count(); // #ifndef NDEBUG // std::clog << "etcd-cpp-apiv3: watch_id: " << this->watch_id << std::endl; // #endif @@ -1135,11 +1126,52 @@ etcdv3::AsyncWatchAction::AsyncWatchAction(etcdv3::ActionParameters&& params) } } +/** + * Notes: `Cancel` and `waitForResponse` of watchers. + * + * We meet failures about failed to cancel the watcher on Ubuntu 20.04 + * due to unable to receive the "etcdv3::WATCH_FINISH" tag from the gRPC + * completion queue. + * + * See CI: + * https://github.com/etcd-cpp-apiv3/etcd-cpp-apiv3/actions/runs/5561458372/jobs/10159155857 + * + * To address the problem, we use the `AsyncNext()` to wait for the + * the last token from the completion queue (wait for 1 second, once + * we called the method `stream->Finish()`). + * + * Remark: the issue might be caused by lower version etcd. + */ + void etcdv3::AsyncWatchAction::waitForResponse() { void* got_tag; bool ok = false; + bool the_final_round = false; - while (cq_.Next(&got_tag, &ok)) { + while (true) { + if (!the_final_round) { + if (!cq_.Next(&got_tag, &ok)) { + break; + } + } else { + auto deadline = + std::chrono::system_clock::now() + std::chrono::seconds(1); + switch (cq_.AsyncNext(&got_tag, &ok, deadline)) { + case CompletionQueue::NextStatus::TIMEOUT: + case CompletionQueue::NextStatus::SHUTDOWN: { + std::cerr << "[warn] watcher does't exit normally" << std::endl; + // pretend to be received a "WATCH_FINISH" tag: shutdown + context.TryCancel(); + cq_.Shutdown(); + ok = false; // jump out + break; + } + case CompletionQueue::NextStatus::GOT_EVENT: { + // normal execution flow + break; + } + } + } if (ok == false) { break; } @@ -1148,6 +1180,7 @@ void etcdv3::AsyncWatchAction::waitForResponse() { continue; } if (got_tag == (void*) etcdv3::WATCH_WRITES_DONE) { + the_final_round = true; stream->Finish(&status, (void*) etcdv3::WATCH_FINISH); continue; } @@ -1168,7 +1201,7 @@ void etcdv3::AsyncWatchAction::waitForResponse() { // we stop watch under two conditions: // // 1. watch for a future revision, return immediately with empty events - // set + // set // 2. receive any effective events. if ((reply.created() && reply.header().revision() < parameters.revision) || @@ -1200,23 +1233,36 @@ void etcdv3::AsyncWatchAction::waitForResponse() { } } -void etcdv3::AsyncWatchAction::CancelWatch() { - if (!isCancelled.exchange(true)) { - WatchRequest cancel_req; - cancel_req.mutable_cancel_request()->set_watch_id(this->watch_id); - stream->Write(cancel_req, (void*) etcdv3::WATCH_WRITE_CANCEL); - isCancelled.store(true); - } -} - -bool etcdv3::AsyncWatchAction::Cancelled() const { return isCancelled.load(); } - void etcdv3::AsyncWatchAction::waitForResponse( std::function callback) { void* got_tag; bool ok = false; + bool the_final_round = false; - while (cq_.Next(&got_tag, &ok)) { + while (true) { + if (!the_final_round) { + if (!cq_.Next(&got_tag, &ok)) { + break; + } + } else { + auto deadline = + std::chrono::system_clock::now() + std::chrono::seconds(1); + switch (cq_.AsyncNext(&got_tag, &ok, deadline)) { + case CompletionQueue::NextStatus::TIMEOUT: + case CompletionQueue::NextStatus::SHUTDOWN: { + std::cerr << "[warn] watcher does't exit normally" << std::endl; + // pretend to be received a "WATCH_FINISH" tag: shutdown + context.TryCancel(); + cq_.Shutdown(); + ok = false; // jump out + break; + } + case CompletionQueue::NextStatus::GOT_EVENT: { + // normal execution flow + break; + } + } + } if (ok == false) { break; } @@ -1225,6 +1271,7 @@ void etcdv3::AsyncWatchAction::waitForResponse( continue; } if (got_tag == (void*) etcdv3::WATCH_WRITES_DONE) { + the_final_round = true; stream->Finish(&status, (void*) etcdv3::WATCH_FINISH); continue; } @@ -1267,6 +1314,17 @@ void etcdv3::AsyncWatchAction::waitForResponse( } } +void etcdv3::AsyncWatchAction::CancelWatch() { + if (!isCancelled.exchange(true)) { + WatchRequest cancel_req; + cancel_req.mutable_cancel_request()->set_watch_id(this->watch_id); + stream->Write(cancel_req, (void*) etcdv3::WATCH_WRITE_CANCEL); + isCancelled.store(true); + } +} + +bool etcdv3::AsyncWatchAction::Cancelled() const { return isCancelled.load(); } + etcdv3::AsyncWatchResponse etcdv3::AsyncWatchAction::ParseResponse() { AsyncWatchResponse watch_resp; watch_resp.set_action(etcdv3::WATCH_ACTION);