Fixes the Check() error when if next refresh is not triggered yet (#193)

Signed-off-by: Tao He <sighingnow@gmail.com>
This commit is contained in:
Tao He 2023-02-15 17:14:48 +08:00 committed by GitHub
parent ceb1af1110
commit 3133fbec21
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 87 additions and 17 deletions

View File

@ -217,6 +217,9 @@ jobs:
echo "Run the etcd watcher test ........................." echo "Run the etcd watcher test ........................."
./build/bin/WatcherTest ./build/bin/WatcherTest
echo "Run the etcd keepalive test ........................."
./build/bin/KeepAliveTest
echo "Run the etcd transaction test ........................." echo "Run the etcd transaction test ........................."
./build/bin/TransactionTest ./build/bin/TransactionTest

View File

@ -830,6 +830,8 @@ is constructed.
Without handler, the internal state can be checked via `KeepAlive::Check()` and it will rethrow Without handler, the internal state can be checked via `KeepAlive::Check()` and it will rethrow
the async exception when there are errors during keeping the lease alive. the async exception when there are errors during keeping the lease alive.
Note that even with `handler`, the `KeepAlive::Check()` still rethrow if there's an async exception.
### Etcd transactions ### Etcd transactions
Etcd v3's [Transaction APIs](https://etcd.io/docs/v3.4/learning/api/#transaction) is supported via the Etcd v3's [Transaction APIs](https://etcd.io/docs/v3.4/learning/api/#transaction) is supported via the

View File

@ -99,7 +99,10 @@ namespace etcd
~KeepAlive(); ~KeepAlive();
protected: protected:
// automatically refresh loop
void refresh(); void refresh();
// refresh once immediately
void refresh_once();
struct EtcdServerStubs; struct EtcdServerStubs;
struct EtcdServerStubsDeleter { struct EtcdServerStubsDeleter {
@ -120,7 +123,7 @@ namespace etcd
int64_t lease_id; int64_t lease_id;
// protect the initializing status of `timer`. // protect the initializing status of `timer`.
std::mutex mutex_for_refresh_; std::recursive_mutex mutex_for_refresh_;
std::atomic_bool continue_next; std::atomic_bool continue_next;
// grpc timeout in `refresh()` // grpc timeout in `refresh()`

View File

@ -90,12 +90,13 @@ etcd::KeepAlive::KeepAlive(SyncClient const &client,
this->refresh(); this->refresh();
context.run(); context.run();
} catch (...) { } catch (...) {
if (handler_) { // run canceller first
handler_(std::current_exception());
} else {
eptr_ = std::current_exception();
}
this->Cancel(); this->Cancel();
// propogate the exception
eptr_ = std::current_exception();
if (handler_) {
handler_(eptr_);
}
} }
}); });
} }
@ -124,7 +125,7 @@ etcd::KeepAlive::~KeepAlive()
void etcd::KeepAlive::Cancel() void etcd::KeepAlive::Cancel()
{ {
std::lock_guard<std::mutex> scope_lock(mutex_for_refresh_); std::lock_guard<std::recursive_mutex> scope_lock(mutex_for_refresh_);
if (!continue_next.exchange(false)) { if (!continue_next.exchange(false)) {
return; return;
} }
@ -139,11 +140,27 @@ void etcd::KeepAlive::Check() {
if (eptr_) { if (eptr_) {
std::rethrow_exception(eptr_); std::rethrow_exception(eptr_);
} }
// issue an refresh to make sure it still alive
try {
this->refresh_once();
} catch (...) {
// run canceller first
this->Cancel();
// propogate the exception, as we throw in `Check()`, the `handler` won't be touched
eptr_ = std::current_exception();
if (handler_) {
handler_(eptr_);
}
// rethrow in `Check()` to keep the consistent semantics
std::rethrow_exception(eptr_);
}
} }
void etcd::KeepAlive::refresh() void etcd::KeepAlive::refresh()
{ {
std::lock_guard<std::mutex> scope_lock(mutex_for_refresh_); std::lock_guard<std::recursive_mutex> scope_lock(mutex_for_refresh_);
if (!continue_next.load()) { if (!continue_next.load()) {
return; return;
} }
@ -157,18 +174,28 @@ void etcd::KeepAlive::refresh()
#endif #endif
} else { } else {
if (this->continue_next.load()) { if (this->continue_next.load()) {
this->stubs->call->mutable_parameters().grpc_timeout = this->grpc_timeout; // execute refresh
auto resp = this->stubs->call->Refresh(); this->refresh_once();
if (!resp.is_ok()) {
throw std::runtime_error("Failed to refresh lease: error code: " + std::to_string(resp.error_code()) +
", message: " + resp.error_message());
}
if (resp.value().ttl() == 0) {
throw std::out_of_range("Failed to refresh lease due to expiration: the new TTL is 0.");
}
// trigger the next round; // trigger the next round;
this->refresh(); this->refresh();
} }
} }
}); });
} }
void etcd::KeepAlive::refresh_once()
{
std::lock_guard<std::recursive_mutex> scope_lock(mutex_for_refresh_);
if (!continue_next.load()) {
return;
}
this->stubs->call->mutable_parameters().grpc_timeout = this->grpc_timeout;
auto resp = this->stubs->call->Refresh();
if (!resp.is_ok()) {
throw std::runtime_error("Failed to refresh lease: error code: " + std::to_string(resp.error_code()) +
", message: " + resp.error_message());
}
if (resp.value().ttl() == 0) {
throw std::out_of_range("Failed to refresh lease due to expiration: the new TTL is 0.");
}
}

35
tst/KeepAliveTest.cpp Normal file
View File

@ -0,0 +1,35 @@
#define CATCH_CONFIG_MAIN
#include <catch.hpp>
#include <chrono>
#include <thread>
#include "etcd/Client.hpp"
#include "etcd/KeepAlive.hpp"
#include "etcd/Response.hpp"
#include "etcd/SyncClient.hpp"
#include "etcd/Value.hpp"
static std::string etcd_uri(
"http://127.0.0.1:2379,http://127.0.0.1:2479,http://127.0.0.1:2579");
TEST_CASE("keepalive revoke and check if alive") {
etcd::Client etcd(etcd_uri);
// create a lease with 30 seconds TTL
auto keepalive = etcd.leasekeepalive(30).get();
auto lease_id = keepalive->Lease();
// revoke the lease before it reaches its TTL
etcd.leaserevoke(lease_id).wait();
// retrieves its TTL again, and it is now -1
auto response = etcd.leasetimetolive(lease_id).get();
REQUIRE(response.value().ttl() == -1);
// shorter than the TLL, or no sleep
std::this_thread::sleep_for(std::chrono::seconds(1));
// expect keep_alive->Check() to throw exception
REQUIRE_THROWS(keepalive->Check());
}