From 2a39c71ca71268f479269e027503dce107a6091a Mon Sep 17 00:00:00 2001 From: Tao He Date: Sat, 1 Jul 2023 13:33:45 +0800 Subject: [PATCH] Fixes a possible bug about watcher's id Signed-off-by: Tao He --- src/v3/Action.cpp | 3 +++ src/v3/AsyncGRPC.cpp | 8 ++++++++ tst/WatcherTest.cpp | 14 ++++++++++++-- 3 files changed, 23 insertions(+), 2 deletions(-) diff --git a/src/v3/Action.cpp b/src/v3/Action.cpp index a68b6ae..4c8c857 100644 --- a/src/v3/Action.cpp +++ b/src/v3/Action.cpp @@ -89,6 +89,9 @@ void etcdv3::Action::waitForResponse() break; } case CompletionQueue::NextStatus::GOT_EVENT: { + if (!ok) { + status = grpc::Status(grpc::StatusCode::ABORTED, "Failed to execute the action: not ok or invalid tag"); + } break; } } diff --git a/src/v3/AsyncGRPC.cpp b/src/v3/AsyncGRPC.cpp index 30c7aa6..30115d2 100644 --- a/src/v3/AsyncGRPC.cpp +++ b/src/v3/AsyncGRPC.cpp @@ -1178,6 +1178,7 @@ etcdv3::AsyncWatchAction::AsyncWatchAction( { isCancelled.store(false); stream = parameters.watch_stub->AsyncWatch(&context,&cq_,(void*)etcdv3::WATCH_CREATE); + this->watch_id = std::chrono::steady_clock::now().time_since_epoch().count(); WatchRequest watch_req; WatchCreateRequest watch_create_req; @@ -1199,6 +1200,7 @@ etcdv3::AsyncWatchAction::AsyncWatchAction( watch_create_req.set_prev_kv(true); watch_create_req.set_start_revision(parameters.revision); + watch_create_req.set_watch_id(this->watch_id); watch_req.mutable_create_request()->CopyFrom(watch_create_req); @@ -1254,6 +1256,9 @@ void etcdv3::AsyncWatchAction::waitForResponse() continue; } + // record the watcher id + this->watch_id = reply.watch_id(); + // we stop watch under two conditions: // // 1. watch for a future revision, return immediately with empty events set @@ -1339,6 +1344,9 @@ void etcdv3::AsyncWatchAction::waitForResponse(std::functionwatch_id = reply.watch_id(); + // for the callback case, we don't invoke callback immediately if watching // for a future revision, we wait until there are some effective events. if(reply.events_size()) diff --git a/tst/WatcherTest.cpp b/tst/WatcherTest.cpp index 30c025d..a5b815c 100644 --- a/tst/WatcherTest.cpp +++ b/tst/WatcherTest.cpp @@ -14,10 +14,12 @@ static int watcher_called = 0; void printResponse(etcd::Response const & resp) { if (resp.error_code()) { - std::cout << resp.error_code() << ": " << resp.error_message() << std::endl; + std::cout << "Watcher "<< resp.watch_id() + << " fails with " << resp.error_code() << ": " << resp.error_message() << std::endl; } else { - std::cout << resp.action() << " " << resp.value().as_string() << std::endl; + std::cout << "Watcher " << resp.watch_id() + << " responses with " << resp.action() << " " << resp.value().as_string() << std::endl; std::cout << "Previous value: " << resp.prev_value().as_string() << std::endl; std::cout << "Events size: " << resp.events().size() << std::endl; @@ -191,6 +193,14 @@ TEST_CASE("watch changes on the same key (#212)") } } +TEST_CASE("create two watcher") +{ + etcd::Watcher w1(etcd_url, "/test", printResponse, true); + etcd::Watcher w2(etcd_url, "/test", printResponse, true); + + std::this_thread::sleep_for(std::chrono::seconds(5)); +} + // TEST_CASE("request cancellation") // { // etcd::Client etcd(etcd_url);