diff --git a/.github/workflows/build-test.yml b/.github/workflows/build-test.yml index f0ecdd6..e82a19b 100644 --- a/.github/workflows/build-test.yml +++ b/.github/workflows/build-test.yml @@ -217,6 +217,9 @@ jobs: echo "Run the etcd watcher test ........................." ./build/bin/WatcherTest + echo "Run the etcd keepalive test ........................." + ./build/bin/KeepAliveTest + echo "Run the etcd transaction test ........................." ./build/bin/TransactionTest diff --git a/README.md b/README.md index 665fbee..e207c26 100644 --- a/README.md +++ b/README.md @@ -830,6 +830,8 @@ is constructed. Without handler, the internal state can be checked via `KeepAlive::Check()` and it will rethrow the async exception when there are errors during keeping the lease alive. +Note that even with `handler`, the `KeepAlive::Check()` still rethrow if there's an async exception. + ### Etcd transactions Etcd v3's [Transaction APIs](https://etcd.io/docs/v3.4/learning/api/#transaction) is supported via the diff --git a/etcd/KeepAlive.hpp b/etcd/KeepAlive.hpp index f7ea1d3..71f86fc 100644 --- a/etcd/KeepAlive.hpp +++ b/etcd/KeepAlive.hpp @@ -99,7 +99,10 @@ namespace etcd ~KeepAlive(); protected: + // automatically refresh loop void refresh(); + // refresh once immediately + void refresh_once(); struct EtcdServerStubs; struct EtcdServerStubsDeleter { @@ -120,7 +123,7 @@ namespace etcd int64_t lease_id; // protect the initializing status of `timer`. - std::mutex mutex_for_refresh_; + std::recursive_mutex mutex_for_refresh_; std::atomic_bool continue_next; // grpc timeout in `refresh()` diff --git a/src/KeepAlive.cpp b/src/KeepAlive.cpp index 43ca87e..1f4089a 100644 --- a/src/KeepAlive.cpp +++ b/src/KeepAlive.cpp @@ -90,12 +90,13 @@ etcd::KeepAlive::KeepAlive(SyncClient const &client, this->refresh(); context.run(); } catch (...) { - if (handler_) { - handler_(std::current_exception()); - } else { - eptr_ = std::current_exception(); - } + // run canceller first this->Cancel(); + // propogate the exception + eptr_ = std::current_exception(); + if (handler_) { + handler_(eptr_); + } } }); } @@ -124,7 +125,7 @@ etcd::KeepAlive::~KeepAlive() void etcd::KeepAlive::Cancel() { - std::lock_guard scope_lock(mutex_for_refresh_); + std::lock_guard scope_lock(mutex_for_refresh_); if (!continue_next.exchange(false)) { return; } @@ -139,11 +140,27 @@ void etcd::KeepAlive::Check() { if (eptr_) { std::rethrow_exception(eptr_); } + // issue an refresh to make sure it still alive + try { + this->refresh_once(); + } catch (...) { + // run canceller first + this->Cancel(); + + // propogate the exception, as we throw in `Check()`, the `handler` won't be touched + eptr_ = std::current_exception(); + if (handler_) { + handler_(eptr_); + } + + // rethrow in `Check()` to keep the consistent semantics + std::rethrow_exception(eptr_); + } } void etcd::KeepAlive::refresh() { - std::lock_guard scope_lock(mutex_for_refresh_); + std::lock_guard scope_lock(mutex_for_refresh_); if (!continue_next.load()) { return; } @@ -157,18 +174,28 @@ void etcd::KeepAlive::refresh() #endif } else { if (this->continue_next.load()) { - this->stubs->call->mutable_parameters().grpc_timeout = this->grpc_timeout; - auto resp = this->stubs->call->Refresh(); - if (!resp.is_ok()) { - throw std::runtime_error("Failed to refresh lease: error code: " + std::to_string(resp.error_code()) + - ", message: " + resp.error_message()); - } - if (resp.value().ttl() == 0) { - throw std::out_of_range("Failed to refresh lease due to expiration: the new TTL is 0."); - } + // execute refresh + this->refresh_once(); // trigger the next round; this->refresh(); } } }); } + +void etcd::KeepAlive::refresh_once() +{ + std::lock_guard scope_lock(mutex_for_refresh_); + if (!continue_next.load()) { + return; + } + this->stubs->call->mutable_parameters().grpc_timeout = this->grpc_timeout; + auto resp = this->stubs->call->Refresh(); + if (!resp.is_ok()) { + throw std::runtime_error("Failed to refresh lease: error code: " + std::to_string(resp.error_code()) + + ", message: " + resp.error_message()); + } + if (resp.value().ttl() == 0) { + throw std::out_of_range("Failed to refresh lease due to expiration: the new TTL is 0."); + } +} diff --git a/tst/KeepAliveTest.cpp b/tst/KeepAliveTest.cpp new file mode 100644 index 0000000..0330d61 --- /dev/null +++ b/tst/KeepAliveTest.cpp @@ -0,0 +1,35 @@ +#define CATCH_CONFIG_MAIN +#include + +#include +#include + +#include "etcd/Client.hpp" +#include "etcd/KeepAlive.hpp" +#include "etcd/Response.hpp" +#include "etcd/SyncClient.hpp" +#include "etcd/Value.hpp" + +static std::string etcd_uri( + "http://127.0.0.1:2379,http://127.0.0.1:2479,http://127.0.0.1:2579"); + +TEST_CASE("keepalive revoke and check if alive") { + etcd::Client etcd(etcd_uri); + + // create a lease with 30 seconds TTL + auto keepalive = etcd.leasekeepalive(30).get(); + auto lease_id = keepalive->Lease(); + + // revoke the lease before it reaches its TTL + etcd.leaserevoke(lease_id).wait(); + + // retrieves its TTL again, and it is now -1 + auto response = etcd.leasetimetolive(lease_id).get(); + REQUIRE(response.value().ttl() == -1); + + // shorter than the TLL, or no sleep + std::this_thread::sleep_for(std::chrono::seconds(1)); + + // expect keep_alive->Check() to throw exception + REQUIRE_THROWS(keepalive->Check()); +}