From 17f12e31f2adb58f43b37b684d6b4681cae04d34 Mon Sep 17 00:00:00 2001 From: Tao He Date: Sat, 1 Jul 2023 13:12:09 +0800 Subject: [PATCH] Fixes the implementation of Observe() Signed-off-by: Tao He --- README.md | 6 ++-- etcd/v3/action_constants.hpp | 1 + src/v3/Action.cpp | 2 +- src/v3/AsyncGRPC.cpp | 27 ++++++++++++------ src/v3/action_constants.cpp | 1 + tst/ElectionTest.cpp | 54 ++++++++++++++++++++++++++++++++++++ 6 files changed, 78 insertions(+), 13 deletions(-) diff --git a/README.md b/README.md index 101d65f..89a7327 100644 --- a/README.md +++ b/README.md @@ -877,7 +877,7 @@ pplx::task proclaim(std::string const &name, int64_t lease_id, pplx::task leader(std::string const &name); -std::unique_ptr observe(std::string const &name); +std::unique_ptr observe(std::string const &name); pplx::task resign(std::string const &name, int64_t lease_id, std::string const &key, int64_t revision); @@ -891,7 +891,7 @@ The `Observer` returned by `observe()` can be use to monitor the changes of elec The observer stream will be canceled when been destructed. ```c++ - std::unique_ptr observer = etcd.observe("test"); + std::unique_ptr observer = etcd.observe("test"); // wait one change event, blocked execution etcd::Response resp = observer->WaitOnce(); @@ -906,7 +906,7 @@ The observer stream will be canceled when been destructed. observer.reset(nullptr); ``` -for more details, please refer to [etcd/Client.hpp](./etcd/Client.hpp). +for more details, please refer to [etcd/Client.hpp](./etcd/Client.hpp) and [tst/ElectionTest.cpp](./tst/ElectionTest.cpp). ### TODO diff --git a/etcd/v3/action_constants.hpp b/etcd/v3/action_constants.hpp index 7854945..a8bf82c 100644 --- a/etcd/v3/action_constants.hpp +++ b/etcd/v3/action_constants.hpp @@ -45,6 +45,7 @@ namespace etcdv3 extern char const * WATCH_FINISH; extern char const * ELECTION_OBSERVE_CREATE; + extern char const * ELECTION_OBSERVE_FINISH; extern const int ERROR_GRPC_OK; extern const int ERROR_GRPC_CANCELLED; diff --git a/src/v3/Action.cpp b/src/v3/Action.cpp index 0f3fedd..a68b6ae 100644 --- a/src/v3/Action.cpp +++ b/src/v3/Action.cpp @@ -73,7 +73,7 @@ void etcdv3::ActionParameters::dump(std::ostream &os) const { os << " grpc_timeout: " << grpc_timeout.count() << "(ms)" << std::endl; } -void etcdv3::Action::waitForResponse() +void etcdv3::Action::waitForResponse() { void* got_tag; bool ok = false; diff --git a/src/v3/AsyncGRPC.cpp b/src/v3/AsyncGRPC.cpp index d31e321..30c7aa6 100644 --- a/src/v3/AsyncGRPC.cpp +++ b/src/v3/AsyncGRPC.cpp @@ -817,10 +817,7 @@ void etcdv3::AsyncObserveAction::waitForResponse() response_reader->Read(&reply, (void *)this); if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void*)this) { auto response = ParseResponse(); - if (response.get_error_code() == 0) { - // issue the next read - response_reader->Read(&reply, (void *)this); - } else { + if (response.get_error_code() != 0) { this->CancelObserve(); } } else { @@ -835,13 +832,25 @@ void etcdv3::AsyncObserveAction::CancelObserve() if (!isCancelled.exchange(true)) { void* got_tag; bool ok = false; - response_reader->Finish(&status, (void *)this); - if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)this) { - // ok - } else { - std::cerr << "Failed to finish a election observing connection" << std::endl; + + response_reader->Finish(&status, (void *)ELECTION_OBSERVE_FINISH); + + // FIXME: not sure why the `Next()` after `Finish()` blocks forever. + // Using the `AsyncNext()` without a timeout to ensure the cancel is done. + switch (cq_.AsyncNext(&got_tag, &ok, std::chrono::system_clock::now() + std::chrono::microseconds(1))) { + case CompletionQueue::NextStatus::TIMEOUT: + case CompletionQueue::NextStatus::SHUTDOWN: + // ignore + break; + case CompletionQueue::NextStatus::GOT_EVENT: + if (!ok || got_tag != (void *)ELECTION_OBSERVE_FINISH) { + std::cerr << "Failed to finish a election observing connection" << std::endl; + } } + // cancel on-the-fly calls + context.TryCancel(); + cq_.Shutdown(); } } diff --git a/src/v3/action_constants.cpp b/src/v3/action_constants.cpp index 043a536..7f1a0e7 100644 --- a/src/v3/action_constants.cpp +++ b/src/v3/action_constants.cpp @@ -42,6 +42,7 @@ char const * etcdv3::WATCH_WRITES_DONE = "watch writes done"; char const * etcdv3::WATCH_FINISH = "watch finish"; char const * etcdv3::ELECTION_OBSERVE_CREATE = "observe create"; +char const * etcdv3::ELECTION_OBSERVE_FINISH = "observe finish"; const int etcdv3::ERROR_GRPC_OK = 0; const int etcdv3::ERROR_GRPC_CANCELLED = 1; diff --git a/tst/ElectionTest.cpp b/tst/ElectionTest.cpp index e8e3915..e3a46fb 100644 --- a/tst/ElectionTest.cpp +++ b/tst/ElectionTest.cpp @@ -55,6 +55,60 @@ TEST_CASE("campaign and resign") REQUIRE(0 == resp5.error_code()); } +TEST_CASE("campaign and observe") +{ + etcd::Client etcd(etcd_url); + + auto keepalive = etcd.leasekeepalive(60).get(); + auto lease_id = keepalive->Lease(); + + auto observer_thread = std::thread([&etcd]() { + std::unique_ptr observer = etcd.observe("test"); + // wait many change events, blocked execution + for (size_t i = 0; i < 10; ++i) { + etcd::Response resp = observer->WaitOnce(); + std::cout << "observe " << resp.value().key() << " as the leader: " << resp.value().as_string() << std::endl; + } + std::cout << "finish the observe" << std::endl; + // cancel the observers + observer.reset(nullptr); + }); + + std::this_thread::sleep_for(std::chrono::seconds(1)); + + for (int i = 0; i < 5; ++i) { + // campaign + auto resp1 = etcd.campaign("test", lease_id, "xxxx").get(); + REQUIRE(0 == resp1.error_code()); + std::cout << "key " << resp1.value().key() << " becomes the leader" + << std::endl; + + // proclaim + auto resp3 = etcd.proclaim("test", lease_id, resp1.value().key(), + resp1.value().created_index(), + "tttt - " + std::to_string(i)) + .get(); + REQUIRE(0 == resp3.error_code()); + + // leader + { + auto resp4 = etcd.leader("test").get(); + REQUIRE(0 == resp4.error_code()); + REQUIRE(resp1.value().key() == resp4.value().key()); + REQUIRE("tttt - " + std::to_string(i) == resp4.value().as_string()); + } + + // resign + auto resp5 = etcd.resign("test", lease_id, resp1.value().key(), + resp1.value().created_index()) + .get(); + REQUIRE(0 == resp5.error_code()); + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + + observer_thread.join(); +} + TEST_CASE("cleanup") { etcd::Client etcd(etcd_url);