Enhance the campaign test and document the behaviour when timeout is set (#205)

Signed-off-by: Tao He <sighingnow@gmail.com>
This commit is contained in:
Tao He 2023-03-14 19:43:37 +08:00 committed by GitHub
parent d27f0b9e81
commit f0f9c4e8c2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 73 additions and 4 deletions

View File

@ -884,6 +884,10 @@ pplx::task<Response> resign(std::string const &name, int64_t lease_id,
std::string const &key, int64_t revision); std::string const &key, int64_t revision);
``` ```
Note that if grpc timeout is set, `campaign()` will return an timeout error response if it
cannot acquire the election ownership within the timeout period. Otherwise will block until
become the leader.
The `Observer` returned by `observe()` can be use to monitor the changes of election ownership. The `Observer` returned by `observe()` can be use to monitor the changes of election ownership.
The observer stream will be canceled when been destructed. The observer stream will be canceled when been destructed.

View File

@ -31,12 +31,12 @@ etcdv3::AsyncCampaignResponse etcdv3::AsyncCampaignAction::ParseResponse()
{ {
AsyncCampaignResponse campaign_resp; AsyncCampaignResponse campaign_resp;
campaign_resp.set_action(etcdv3::CAMPAIGN_ACTION); campaign_resp.set_action(etcdv3::CAMPAIGN_ACTION);
if(!status.ok()) { if(!status.ok()) {
campaign_resp.set_error_code(status.error_code()); campaign_resp.set_error_code(status.error_code());
campaign_resp.set_error_message(status.error_message()); campaign_resp.set_error_message(status.error_message());
} }
else { else {
campaign_resp.ParseResponse(reply); campaign_resp.ParseResponse(reply);
} }
return campaign_resp; return campaign_resp;

View File

@ -13,7 +13,7 @@
static std::string etcd_uri = etcdv3::detail::resolve_etcd_endpoints( static std::string etcd_uri = etcdv3::detail::resolve_etcd_endpoints(
"http://127.0.0.1:2379,http://127.0.0.1:2479,http://127.0.0.1:2579"); "http://127.0.0.1:2379,http://127.0.0.1:2479,http://127.0.0.1:2579");
TEST_CASE("campaign and loadership using keepalive") { TEST_CASE("campaign and leadership using keepalive") {
etcd::Client etcd(etcd_uri); etcd::Client etcd(etcd_uri);
auto keepalive = etcd.leasekeepalive(5).get(); auto keepalive = etcd.leasekeepalive(5).get();
auto lease_id = keepalive->Lease(); auto lease_id = keepalive->Lease();
@ -32,8 +32,73 @@ TEST_CASE("campaign and loadership using keepalive") {
std::cout << "finish campaign" << std::endl; std::cout << "finish campaign" << std::endl;
auto resp2 = etcd.leader("/leader").get(); auto resp2 = etcd.leader("/leader").get();
std::cout << resp2.value().as_string() << std::endl; CHECK(0 == resp2.error_code());
CHECK(value == resp2.value().as_string());
CHECK(resp1.value().key() == resp2.value().key());
std::cout << resp2.value().key() << std::endl; std::cout << resp2.value().key() << std::endl;
std::cout << "finish leader" << std::endl; std::cout << "finish leader" << std::endl;
auto resp3 = etcd.resign("/leader", resp1.value().lease(), resp1.value().key(), resp1.value().created_index()).get();
CHECK(0 == resp3.error_code());
std::cout << "finish resign" << std::endl;
}
TEST_CASE("concurrent campaign with grpc timeout") {
std::string value1 = std::string("192.168.1.6:1880");
std::string value2 = std::string("192.168.1.6:1890");
auto fn1 = [&]() {
etcd::Client etcd(etcd_uri);
auto keepalive = etcd.leasekeepalive(5).get();
auto lease_id = keepalive->Lease();
auto resp1 = etcd.campaign("/leader", lease_id, value1).get();
CHECK(0 == resp1.error_code());
std::this_thread::sleep_for(std::chrono::seconds(10));
// resign
auto resp2 = etcd.resign("/leader", resp1.value().lease(), resp1.value().key(), resp1.value().created_index()).get();
CHECK(0 == resp2.error_code());
};
auto fn2 = [&]() {
etcd::Client etcd(etcd_uri);
auto keepalive = etcd.leasekeepalive(5).get();
auto lease_id = keepalive->Lease();
// set client timeout
etcd.set_grpc_timeout(std::chrono::seconds(3));
std::this_thread::sleep_for(std::chrono::seconds(3));
auto resp1 = etcd.campaign("/leader", lease_id, value2).get();
std::cout <<resp1.error_code() << resp1.error_message() << std::endl;
CHECK(0 != resp1.error_code());
// wait until success
while (true) {
resp1 = etcd.campaign("/leader", lease_id, value2).get();
if (resp1.error_code() == 0) {
// check value
auto resp2 = etcd.leader("/leader").get();
CHECK(0 == resp2.error_code());
CHECK(value2 == resp2.value().as_string());
CHECK(resp1.value().key() == resp2.value().key());
// break the loop
break;
}
}
auto resp2 = etcd.resign("/leader", resp1.value().lease(), resp1.value().key(), resp1.value().created_index()).get();
CHECK(0 == resp2.error_code());
};
std::thread t1(fn1);
std::thread t2(fn2);
t1.join();
t2.join();
} }