Fixes the implementation of Observe()

Signed-off-by: Tao He <sighingnow@gmail.com>
This commit is contained in:
Tao He 2023-07-01 13:12:09 +08:00
parent 09f665fe3e
commit 17f12e31f2
6 changed files with 78 additions and 13 deletions

View File

@ -877,7 +877,7 @@ pplx::task<Response> proclaim(std::string const &name, int64_t lease_id,
pplx::task<Response> leader(std::string const &name); pplx::task<Response> leader(std::string const &name);
std::unique_ptr<Observer> observe(std::string const &name); std::unique_ptr<SyncClient::Observer> observe(std::string const &name);
pplx::task<Response> resign(std::string const &name, int64_t lease_id, pplx::task<Response> resign(std::string const &name, int64_t lease_id,
std::string const &key, int64_t revision); std::string const &key, int64_t revision);
@ -891,7 +891,7 @@ The `Observer` returned by `observe()` can be use to monitor the changes of elec
The observer stream will be canceled when been destructed. The observer stream will be canceled when been destructed.
```c++ ```c++
std::unique_ptr<etcd::Observer> observer = etcd.observe("test"); std::unique_ptr<etcd::SyncClient::Observer> observer = etcd.observe("test");
// wait one change event, blocked execution // wait one change event, blocked execution
etcd::Response resp = observer->WaitOnce(); etcd::Response resp = observer->WaitOnce();
@ -906,7 +906,7 @@ The observer stream will be canceled when been destructed.
observer.reset(nullptr); observer.reset(nullptr);
``` ```
for more details, please refer to [etcd/Client.hpp](./etcd/Client.hpp). for more details, please refer to [etcd/Client.hpp](./etcd/Client.hpp) and [tst/ElectionTest.cpp](./tst/ElectionTest.cpp).
### TODO ### TODO

View File

@ -45,6 +45,7 @@ namespace etcdv3
extern char const * WATCH_FINISH; extern char const * WATCH_FINISH;
extern char const * ELECTION_OBSERVE_CREATE; extern char const * ELECTION_OBSERVE_CREATE;
extern char const * ELECTION_OBSERVE_FINISH;
extern const int ERROR_GRPC_OK; extern const int ERROR_GRPC_OK;
extern const int ERROR_GRPC_CANCELLED; extern const int ERROR_GRPC_CANCELLED;

View File

@ -73,7 +73,7 @@ void etcdv3::ActionParameters::dump(std::ostream &os) const {
os << " grpc_timeout: " << grpc_timeout.count() << "(ms)" << std::endl; os << " grpc_timeout: " << grpc_timeout.count() << "(ms)" << std::endl;
} }
void etcdv3::Action::waitForResponse() void etcdv3::Action::waitForResponse()
{ {
void* got_tag; void* got_tag;
bool ok = false; bool ok = false;

View File

@ -817,10 +817,7 @@ void etcdv3::AsyncObserveAction::waitForResponse()
response_reader->Read(&reply, (void *)this); response_reader->Read(&reply, (void *)this);
if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void*)this) { if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void*)this) {
auto response = ParseResponse(); auto response = ParseResponse();
if (response.get_error_code() == 0) { if (response.get_error_code() != 0) {
// issue the next read
response_reader->Read(&reply, (void *)this);
} else {
this->CancelObserve(); this->CancelObserve();
} }
} else { } else {
@ -835,13 +832,25 @@ void etcdv3::AsyncObserveAction::CancelObserve()
if (!isCancelled.exchange(true)) { if (!isCancelled.exchange(true)) {
void* got_tag; void* got_tag;
bool ok = false; bool ok = false;
response_reader->Finish(&status, (void *)this);
if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)this) { response_reader->Finish(&status, (void *)ELECTION_OBSERVE_FINISH);
// ok
} else { // FIXME: not sure why the `Next()` after `Finish()` blocks forever.
std::cerr << "Failed to finish a election observing connection" << std::endl; // Using the `AsyncNext()` without a timeout to ensure the cancel is done.
switch (cq_.AsyncNext(&got_tag, &ok, std::chrono::system_clock::now() + std::chrono::microseconds(1))) {
case CompletionQueue::NextStatus::TIMEOUT:
case CompletionQueue::NextStatus::SHUTDOWN:
// ignore
break;
case CompletionQueue::NextStatus::GOT_EVENT:
if (!ok || got_tag != (void *)ELECTION_OBSERVE_FINISH) {
std::cerr << "Failed to finish a election observing connection" << std::endl;
}
} }
// cancel on-the-fly calls
context.TryCancel();
cq_.Shutdown(); cq_.Shutdown();
} }
} }

View File

@ -42,6 +42,7 @@ char const * etcdv3::WATCH_WRITES_DONE = "watch writes done";
char const * etcdv3::WATCH_FINISH = "watch finish"; char const * etcdv3::WATCH_FINISH = "watch finish";
char const * etcdv3::ELECTION_OBSERVE_CREATE = "observe create"; char const * etcdv3::ELECTION_OBSERVE_CREATE = "observe create";
char const * etcdv3::ELECTION_OBSERVE_FINISH = "observe finish";
const int etcdv3::ERROR_GRPC_OK = 0; const int etcdv3::ERROR_GRPC_OK = 0;
const int etcdv3::ERROR_GRPC_CANCELLED = 1; const int etcdv3::ERROR_GRPC_CANCELLED = 1;

View File

@ -55,6 +55,60 @@ TEST_CASE("campaign and resign")
REQUIRE(0 == resp5.error_code()); REQUIRE(0 == resp5.error_code());
} }
TEST_CASE("campaign and observe")
{
etcd::Client etcd(etcd_url);
auto keepalive = etcd.leasekeepalive(60).get();
auto lease_id = keepalive->Lease();
auto observer_thread = std::thread([&etcd]() {
std::unique_ptr<etcd::SyncClient::Observer> observer = etcd.observe("test");
// wait many change events, blocked execution
for (size_t i = 0; i < 10; ++i) {
etcd::Response resp = observer->WaitOnce();
std::cout << "observe " << resp.value().key() << " as the leader: " << resp.value().as_string() << std::endl;
}
std::cout << "finish the observe" << std::endl;
// cancel the observers
observer.reset(nullptr);
});
std::this_thread::sleep_for(std::chrono::seconds(1));
for (int i = 0; i < 5; ++i) {
// campaign
auto resp1 = etcd.campaign("test", lease_id, "xxxx").get();
REQUIRE(0 == resp1.error_code());
std::cout << "key " << resp1.value().key() << " becomes the leader"
<< std::endl;
// proclaim
auto resp3 = etcd.proclaim("test", lease_id, resp1.value().key(),
resp1.value().created_index(),
"tttt - " + std::to_string(i))
.get();
REQUIRE(0 == resp3.error_code());
// leader
{
auto resp4 = etcd.leader("test").get();
REQUIRE(0 == resp4.error_code());
REQUIRE(resp1.value().key() == resp4.value().key());
REQUIRE("tttt - " + std::to_string(i) == resp4.value().as_string());
}
// resign
auto resp5 = etcd.resign("test", lease_id, resp1.value().key(),
resp1.value().created_index())
.get();
REQUIRE(0 == resp5.error_code());
std::this_thread::sleep_for(std::chrono::seconds(1));
}
observer_thread.join();
}
TEST_CASE("cleanup") TEST_CASE("cleanup")
{ {
etcd::Client etcd(etcd_url); etcd::Client etcd(etcd_url);