Fixes the implementation of Observe() (#231)
Signed-off-by: Tao He <sighingnow@gmail.com>
This commit is contained in:
parent
09f665fe3e
commit
32fae70113
|
|
@ -877,7 +877,7 @@ pplx::task<Response> proclaim(std::string const &name, int64_t lease_id,
|
||||||
|
|
||||||
pplx::task<Response> leader(std::string const &name);
|
pplx::task<Response> leader(std::string const &name);
|
||||||
|
|
||||||
std::unique_ptr<Observer> observe(std::string const &name);
|
std::unique_ptr<SyncClient::Observer> observe(std::string const &name);
|
||||||
|
|
||||||
pplx::task<Response> resign(std::string const &name, int64_t lease_id,
|
pplx::task<Response> resign(std::string const &name, int64_t lease_id,
|
||||||
std::string const &key, int64_t revision);
|
std::string const &key, int64_t revision);
|
||||||
|
|
@ -891,7 +891,7 @@ The `Observer` returned by `observe()` can be use to monitor the changes of elec
|
||||||
The observer stream will be canceled when been destructed.
|
The observer stream will be canceled when been destructed.
|
||||||
|
|
||||||
```c++
|
```c++
|
||||||
std::unique_ptr<etcd::Observer> observer = etcd.observe("test");
|
std::unique_ptr<etcd::SyncClient::Observer> observer = etcd.observe("test");
|
||||||
|
|
||||||
// wait one change event, blocked execution
|
// wait one change event, blocked execution
|
||||||
etcd::Response resp = observer->WaitOnce();
|
etcd::Response resp = observer->WaitOnce();
|
||||||
|
|
@ -906,7 +906,7 @@ The observer stream will be canceled when been destructed.
|
||||||
observer.reset(nullptr);
|
observer.reset(nullptr);
|
||||||
```
|
```
|
||||||
|
|
||||||
for more details, please refer to [etcd/Client.hpp](./etcd/Client.hpp).
|
for more details, please refer to [etcd/Client.hpp](./etcd/Client.hpp) and [tst/ElectionTest.cpp](./tst/ElectionTest.cpp).
|
||||||
|
|
||||||
### TODO
|
### TODO
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -45,6 +45,7 @@ namespace etcdv3
|
||||||
extern char const * WATCH_FINISH;
|
extern char const * WATCH_FINISH;
|
||||||
|
|
||||||
extern char const * ELECTION_OBSERVE_CREATE;
|
extern char const * ELECTION_OBSERVE_CREATE;
|
||||||
|
extern char const * ELECTION_OBSERVE_FINISH;
|
||||||
|
|
||||||
extern const int ERROR_GRPC_OK;
|
extern const int ERROR_GRPC_OK;
|
||||||
extern const int ERROR_GRPC_CANCELLED;
|
extern const int ERROR_GRPC_CANCELLED;
|
||||||
|
|
|
||||||
|
|
@ -817,10 +817,7 @@ void etcdv3::AsyncObserveAction::waitForResponse()
|
||||||
response_reader->Read(&reply, (void *)this);
|
response_reader->Read(&reply, (void *)this);
|
||||||
if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void*)this) {
|
if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void*)this) {
|
||||||
auto response = ParseResponse();
|
auto response = ParseResponse();
|
||||||
if (response.get_error_code() == 0) {
|
if (response.get_error_code() != 0) {
|
||||||
// issue the next read
|
|
||||||
response_reader->Read(&reply, (void *)this);
|
|
||||||
} else {
|
|
||||||
this->CancelObserve();
|
this->CancelObserve();
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
|
@ -835,12 +832,24 @@ void etcdv3::AsyncObserveAction::CancelObserve()
|
||||||
if (!isCancelled.exchange(true)) {
|
if (!isCancelled.exchange(true)) {
|
||||||
void* got_tag;
|
void* got_tag;
|
||||||
bool ok = false;
|
bool ok = false;
|
||||||
response_reader->Finish(&status, (void *)this);
|
|
||||||
if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)this) {
|
response_reader->Finish(&status, (void *)ELECTION_OBSERVE_FINISH);
|
||||||
// ok
|
|
||||||
} else {
|
// FIXME: not sure why the `Next()` after `Finish()` blocks forever.
|
||||||
|
// Using the `AsyncNext()` without a timeout to ensure the cancel is done.
|
||||||
|
switch (cq_.AsyncNext(&got_tag, &ok, std::chrono::system_clock::now() + std::chrono::microseconds(1))) {
|
||||||
|
case CompletionQueue::NextStatus::TIMEOUT:
|
||||||
|
case CompletionQueue::NextStatus::SHUTDOWN:
|
||||||
|
// ignore
|
||||||
|
break;
|
||||||
|
case CompletionQueue::NextStatus::GOT_EVENT:
|
||||||
|
if (!ok || got_tag != (void *)ELECTION_OBSERVE_FINISH) {
|
||||||
std::cerr << "Failed to finish a election observing connection" << std::endl;
|
std::cerr << "Failed to finish a election observing connection" << std::endl;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// cancel on-the-fly calls
|
||||||
|
context.TryCancel();
|
||||||
|
|
||||||
cq_.Shutdown();
|
cq_.Shutdown();
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -42,6 +42,7 @@ char const * etcdv3::WATCH_WRITES_DONE = "watch writes done";
|
||||||
char const * etcdv3::WATCH_FINISH = "watch finish";
|
char const * etcdv3::WATCH_FINISH = "watch finish";
|
||||||
|
|
||||||
char const * etcdv3::ELECTION_OBSERVE_CREATE = "observe create";
|
char const * etcdv3::ELECTION_OBSERVE_CREATE = "observe create";
|
||||||
|
char const * etcdv3::ELECTION_OBSERVE_FINISH = "observe finish";
|
||||||
|
|
||||||
const int etcdv3::ERROR_GRPC_OK = 0;
|
const int etcdv3::ERROR_GRPC_OK = 0;
|
||||||
const int etcdv3::ERROR_GRPC_CANCELLED = 1;
|
const int etcdv3::ERROR_GRPC_CANCELLED = 1;
|
||||||
|
|
|
||||||
|
|
@ -55,6 +55,60 @@ TEST_CASE("campaign and resign")
|
||||||
REQUIRE(0 == resp5.error_code());
|
REQUIRE(0 == resp5.error_code());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_CASE("campaign and observe")
|
||||||
|
{
|
||||||
|
etcd::Client etcd(etcd_url);
|
||||||
|
|
||||||
|
auto keepalive = etcd.leasekeepalive(60).get();
|
||||||
|
auto lease_id = keepalive->Lease();
|
||||||
|
|
||||||
|
auto observer_thread = std::thread([&etcd]() {
|
||||||
|
std::unique_ptr<etcd::SyncClient::Observer> observer = etcd.observe("test");
|
||||||
|
// wait many change events, blocked execution
|
||||||
|
for (size_t i = 0; i < 10; ++i) {
|
||||||
|
etcd::Response resp = observer->WaitOnce();
|
||||||
|
std::cout << "observe " << resp.value().key() << " as the leader: " << resp.value().as_string() << std::endl;
|
||||||
|
}
|
||||||
|
std::cout << "finish the observe" << std::endl;
|
||||||
|
// cancel the observers
|
||||||
|
observer.reset(nullptr);
|
||||||
|
});
|
||||||
|
|
||||||
|
std::this_thread::sleep_for(std::chrono::seconds(1));
|
||||||
|
|
||||||
|
for (int i = 0; i < 5; ++i) {
|
||||||
|
// campaign
|
||||||
|
auto resp1 = etcd.campaign("test", lease_id, "xxxx").get();
|
||||||
|
REQUIRE(0 == resp1.error_code());
|
||||||
|
std::cout << "key " << resp1.value().key() << " becomes the leader"
|
||||||
|
<< std::endl;
|
||||||
|
|
||||||
|
// proclaim
|
||||||
|
auto resp3 = etcd.proclaim("test", lease_id, resp1.value().key(),
|
||||||
|
resp1.value().created_index(),
|
||||||
|
"tttt - " + std::to_string(i))
|
||||||
|
.get();
|
||||||
|
REQUIRE(0 == resp3.error_code());
|
||||||
|
|
||||||
|
// leader
|
||||||
|
{
|
||||||
|
auto resp4 = etcd.leader("test").get();
|
||||||
|
REQUIRE(0 == resp4.error_code());
|
||||||
|
REQUIRE(resp1.value().key() == resp4.value().key());
|
||||||
|
REQUIRE("tttt - " + std::to_string(i) == resp4.value().as_string());
|
||||||
|
}
|
||||||
|
|
||||||
|
// resign
|
||||||
|
auto resp5 = etcd.resign("test", lease_id, resp1.value().key(),
|
||||||
|
resp1.value().created_index())
|
||||||
|
.get();
|
||||||
|
REQUIRE(0 == resp5.error_code());
|
||||||
|
std::this_thread::sleep_for(std::chrono::seconds(1));
|
||||||
|
}
|
||||||
|
|
||||||
|
observer_thread.join();
|
||||||
|
}
|
||||||
|
|
||||||
TEST_CASE("cleanup")
|
TEST_CASE("cleanup")
|
||||||
{
|
{
|
||||||
etcd::Client etcd(etcd_url);
|
etcd::Client etcd(etcd_url);
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue