Fixes the watcher cannot be cancelled issue with etcd 3.x

Signed-off-by: Tao He <sighingnow@gmail.com>
This commit is contained in:
Tao He 2023-07-15 18:42:07 +08:00
parent 265064e1e5
commit 6619a3ef0a
1 changed files with 83 additions and 25 deletions

View File

@ -1091,17 +1091,8 @@ etcdv3::AsyncWatchAction::AsyncWatchAction(etcdv3::ActionParameters&& params)
isCancelled.store(false); isCancelled.store(false);
stream = parameters.watch_stub->AsyncWatch(&context, &cq_, stream = parameters.watch_stub->AsyncWatch(&context, &cq_,
(void*) etcdv3::WATCH_CREATE); (void*) etcdv3::WATCH_CREATE);
// The unique watcher id causes the watcher cannot be cancelled as expected this->watch_id =
// on Ubuntu 20.04. std::chrono::high_resolution_clock::now().time_since_epoch().count();
//
// See CI failures:
// https://github.com/etcd-cpp-apiv3/etcd-cpp-apiv3/actions/runs/5561397273/jobs/10159051536
//
// Added in https://github.com/etcd-cpp-apiv3/etcd-cpp-apiv3/pull/232
// Removed in https://github.com/etcd-cpp-apiv3/etcd-cpp-apiv3/pull/236
//
// this->watch_id =
// std::chrono::high_resolution_clock::now().time_since_epoch().count();
// #ifndef NDEBUG // #ifndef NDEBUG
// std::clog << "etcd-cpp-apiv3: watch_id: " << this->watch_id << std::endl; // std::clog << "etcd-cpp-apiv3: watch_id: " << this->watch_id << std::endl;
// #endif // #endif
@ -1135,11 +1126,52 @@ etcdv3::AsyncWatchAction::AsyncWatchAction(etcdv3::ActionParameters&& params)
} }
} }
/**
* Notes: `Cancel` and `waitForResponse` of watchers.
*
* We meet failures about failed to cancel the watcher on Ubuntu 20.04
* due to unable to receive the "etcdv3::WATCH_FINISH" tag from the gRPC
* completion queue.
*
* See CI:
* https://github.com/etcd-cpp-apiv3/etcd-cpp-apiv3/actions/runs/5561458372/jobs/10159155857
*
* To address the problem, we use the `AsyncNext()` to wait for the
* the last token from the completion queue (wait for 1 second, once
* we called the method `stream->Finish()`).
*
* Remark: the issue might be caused by lower version etcd.
*/
void etcdv3::AsyncWatchAction::waitForResponse() { void etcdv3::AsyncWatchAction::waitForResponse() {
void* got_tag; void* got_tag;
bool ok = false; bool ok = false;
bool the_final_round = false;
while (cq_.Next(&got_tag, &ok)) { while (true) {
if (!the_final_round) {
if (!cq_.Next(&got_tag, &ok)) {
break;
}
} else {
auto deadline =
std::chrono::system_clock::now() + std::chrono::seconds(1);
switch (cq_.AsyncNext(&got_tag, &ok, deadline)) {
case CompletionQueue::NextStatus::TIMEOUT:
case CompletionQueue::NextStatus::SHUTDOWN: {
std::cerr << "[warn] watcher does't exit normally" << std::endl;
// pretend to be received a "WATCH_FINISH" tag: shutdown
context.TryCancel();
cq_.Shutdown();
ok = false; // jump out
break;
}
case CompletionQueue::NextStatus::GOT_EVENT: {
// normal execution flow
break;
}
}
}
if (ok == false) { if (ok == false) {
break; break;
} }
@ -1148,6 +1180,7 @@ void etcdv3::AsyncWatchAction::waitForResponse() {
continue; continue;
} }
if (got_tag == (void*) etcdv3::WATCH_WRITES_DONE) { if (got_tag == (void*) etcdv3::WATCH_WRITES_DONE) {
the_final_round = true;
stream->Finish(&status, (void*) etcdv3::WATCH_FINISH); stream->Finish(&status, (void*) etcdv3::WATCH_FINISH);
continue; continue;
} }
@ -1168,7 +1201,7 @@ void etcdv3::AsyncWatchAction::waitForResponse() {
// we stop watch under two conditions: // we stop watch under two conditions:
// //
// 1. watch for a future revision, return immediately with empty events // 1. watch for a future revision, return immediately with empty events
// set // set
// 2. receive any effective events. // 2. receive any effective events.
if ((reply.created() && if ((reply.created() &&
reply.header().revision() < parameters.revision) || reply.header().revision() < parameters.revision) ||
@ -1200,23 +1233,36 @@ void etcdv3::AsyncWatchAction::waitForResponse() {
} }
} }
void etcdv3::AsyncWatchAction::CancelWatch() {
if (!isCancelled.exchange(true)) {
WatchRequest cancel_req;
cancel_req.mutable_cancel_request()->set_watch_id(this->watch_id);
stream->Write(cancel_req, (void*) etcdv3::WATCH_WRITE_CANCEL);
isCancelled.store(true);
}
}
bool etcdv3::AsyncWatchAction::Cancelled() const { return isCancelled.load(); }
void etcdv3::AsyncWatchAction::waitForResponse( void etcdv3::AsyncWatchAction::waitForResponse(
std::function<void(etcd::Response)> callback) { std::function<void(etcd::Response)> callback) {
void* got_tag; void* got_tag;
bool ok = false; bool ok = false;
bool the_final_round = false;
while (cq_.Next(&got_tag, &ok)) { while (true) {
if (!the_final_round) {
if (!cq_.Next(&got_tag, &ok)) {
break;
}
} else {
auto deadline =
std::chrono::system_clock::now() + std::chrono::seconds(1);
switch (cq_.AsyncNext(&got_tag, &ok, deadline)) {
case CompletionQueue::NextStatus::TIMEOUT:
case CompletionQueue::NextStatus::SHUTDOWN: {
std::cerr << "[warn] watcher does't exit normally" << std::endl;
// pretend to be received a "WATCH_FINISH" tag: shutdown
context.TryCancel();
cq_.Shutdown();
ok = false; // jump out
break;
}
case CompletionQueue::NextStatus::GOT_EVENT: {
// normal execution flow
break;
}
}
}
if (ok == false) { if (ok == false) {
break; break;
} }
@ -1225,6 +1271,7 @@ void etcdv3::AsyncWatchAction::waitForResponse(
continue; continue;
} }
if (got_tag == (void*) etcdv3::WATCH_WRITES_DONE) { if (got_tag == (void*) etcdv3::WATCH_WRITES_DONE) {
the_final_round = true;
stream->Finish(&status, (void*) etcdv3::WATCH_FINISH); stream->Finish(&status, (void*) etcdv3::WATCH_FINISH);
continue; continue;
} }
@ -1267,6 +1314,17 @@ void etcdv3::AsyncWatchAction::waitForResponse(
} }
} }
void etcdv3::AsyncWatchAction::CancelWatch() {
if (!isCancelled.exchange(true)) {
WatchRequest cancel_req;
cancel_req.mutable_cancel_request()->set_watch_id(this->watch_id);
stream->Write(cancel_req, (void*) etcdv3::WATCH_WRITE_CANCEL);
isCancelled.store(true);
}
}
bool etcdv3::AsyncWatchAction::Cancelled() const { return isCancelled.load(); }
etcdv3::AsyncWatchResponse etcdv3::AsyncWatchAction::ParseResponse() { etcdv3::AsyncWatchResponse etcdv3::AsyncWatchAction::ParseResponse() {
AsyncWatchResponse watch_resp; AsyncWatchResponse watch_resp;
watch_resp.set_action(etcdv3::WATCH_ACTION); watch_resp.set_action(etcdv3::WATCH_ACTION);