Fixes the Check() error when if next refresh is not triggered yet
Signed-off-by: Tao He <sighingnow@gmail.com>
This commit is contained in:
parent
ceb1af1110
commit
f942a2bb70
|
|
@ -217,6 +217,9 @@ jobs:
|
|||
echo "Run the etcd watcher test ........................."
|
||||
./build/bin/WatcherTest
|
||||
|
||||
echo "Run the etcd keepalive test ........................."
|
||||
./build/bin/KeepAliveTest
|
||||
|
||||
echo "Run the etcd transaction test ........................."
|
||||
./build/bin/TransactionTest
|
||||
|
||||
|
|
|
|||
|
|
@ -830,6 +830,8 @@ is constructed.
|
|||
Without handler, the internal state can be checked via `KeepAlive::Check()` and it will rethrow
|
||||
the async exception when there are errors during keeping the lease alive.
|
||||
|
||||
Note that even with `handler`, the `KeepAlive::Check()` still rethrow if there's an async exception.
|
||||
|
||||
### Etcd transactions
|
||||
|
||||
Etcd v3's [Transaction APIs](https://etcd.io/docs/v3.4/learning/api/#transaction) is supported via the
|
||||
|
|
|
|||
|
|
@ -99,7 +99,10 @@ namespace etcd
|
|||
~KeepAlive();
|
||||
|
||||
protected:
|
||||
// automatically refresh loop
|
||||
void refresh();
|
||||
// refresh once immediately
|
||||
void refresh_once();
|
||||
|
||||
struct EtcdServerStubs;
|
||||
struct EtcdServerStubsDeleter {
|
||||
|
|
@ -120,7 +123,7 @@ namespace etcd
|
|||
int64_t lease_id;
|
||||
|
||||
// protect the initializing status of `timer`.
|
||||
std::mutex mutex_for_refresh_;
|
||||
std::recursive_mutex mutex_for_refresh_;
|
||||
std::atomic_bool continue_next;
|
||||
|
||||
// grpc timeout in `refresh()`
|
||||
|
|
|
|||
|
|
@ -90,12 +90,13 @@ etcd::KeepAlive::KeepAlive(SyncClient const &client,
|
|||
this->refresh();
|
||||
context.run();
|
||||
} catch (...) {
|
||||
if (handler_) {
|
||||
handler_(std::current_exception());
|
||||
} else {
|
||||
eptr_ = std::current_exception();
|
||||
}
|
||||
// run canceller first
|
||||
this->Cancel();
|
||||
// propogate the exception
|
||||
eptr_ = std::current_exception();
|
||||
if (handler_) {
|
||||
handler_(eptr_);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
@ -124,7 +125,7 @@ etcd::KeepAlive::~KeepAlive()
|
|||
|
||||
void etcd::KeepAlive::Cancel()
|
||||
{
|
||||
std::lock_guard<std::mutex> scope_lock(mutex_for_refresh_);
|
||||
std::lock_guard<std::recursive_mutex> scope_lock(mutex_for_refresh_);
|
||||
if (!continue_next.exchange(false)) {
|
||||
return;
|
||||
}
|
||||
|
|
@ -139,11 +140,27 @@ void etcd::KeepAlive::Check() {
|
|||
if (eptr_) {
|
||||
std::rethrow_exception(eptr_);
|
||||
}
|
||||
// issue an refresh to make sure it still alive
|
||||
try {
|
||||
this->refresh_once();
|
||||
} catch (...) {
|
||||
// run canceller first
|
||||
this->Cancel();
|
||||
|
||||
// propogate the exception, as we throw in `Check()`, the `handler` won't be touched
|
||||
eptr_ = std::current_exception();
|
||||
if (handler_) {
|
||||
handler_(eptr_);
|
||||
}
|
||||
|
||||
// rethrow in `Check()` to keep the consistent semantics
|
||||
std::rethrow_exception(eptr_);
|
||||
}
|
||||
}
|
||||
|
||||
void etcd::KeepAlive::refresh()
|
||||
{
|
||||
std::lock_guard<std::mutex> scope_lock(mutex_for_refresh_);
|
||||
std::lock_guard<std::recursive_mutex> scope_lock(mutex_for_refresh_);
|
||||
if (!continue_next.load()) {
|
||||
return;
|
||||
}
|
||||
|
|
@ -157,18 +174,28 @@ void etcd::KeepAlive::refresh()
|
|||
#endif
|
||||
} else {
|
||||
if (this->continue_next.load()) {
|
||||
this->stubs->call->mutable_parameters().grpc_timeout = this->grpc_timeout;
|
||||
auto resp = this->stubs->call->Refresh();
|
||||
if (!resp.is_ok()) {
|
||||
throw std::runtime_error("Failed to refresh lease: error code: " + std::to_string(resp.error_code()) +
|
||||
", message: " + resp.error_message());
|
||||
}
|
||||
if (resp.value().ttl() == 0) {
|
||||
throw std::out_of_range("Failed to refresh lease due to expiration: the new TTL is 0.");
|
||||
}
|
||||
// execute refresh
|
||||
this->refresh_once();
|
||||
// trigger the next round;
|
||||
this->refresh();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void etcd::KeepAlive::refresh_once()
|
||||
{
|
||||
std::lock_guard<std::recursive_mutex> scope_lock(mutex_for_refresh_);
|
||||
if (!continue_next.load()) {
|
||||
return;
|
||||
}
|
||||
this->stubs->call->mutable_parameters().grpc_timeout = this->grpc_timeout;
|
||||
auto resp = this->stubs->call->Refresh();
|
||||
if (!resp.is_ok()) {
|
||||
throw std::runtime_error("Failed to refresh lease: error code: " + std::to_string(resp.error_code()) +
|
||||
", message: " + resp.error_message());
|
||||
}
|
||||
if (resp.value().ttl() == 0) {
|
||||
throw std::out_of_range("Failed to refresh lease due to expiration: the new TTL is 0.");
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,35 @@
|
|||
#define CATCH_CONFIG_MAIN
|
||||
#include <catch.hpp>
|
||||
|
||||
#include <chrono>
|
||||
#include <thread>
|
||||
|
||||
#include "etcd/Client.hpp"
|
||||
#include "etcd/KeepAlive.hpp"
|
||||
#include "etcd/Response.hpp"
|
||||
#include "etcd/SyncClient.hpp"
|
||||
#include "etcd/Value.hpp"
|
||||
|
||||
static std::string etcd_uri(
|
||||
"http://127.0.0.1:2379,http://127.0.0.1:2479,http://127.0.0.1:2579");
|
||||
|
||||
TEST_CASE("keepalive revoke and check if alive") {
|
||||
etcd::Client etcd(etcd_uri);
|
||||
|
||||
// create a lease with 30 seconds TTL
|
||||
auto keepalive = etcd.leasekeepalive(30).get();
|
||||
auto lease_id = keepalive->Lease();
|
||||
|
||||
// revoke the lease before it reaches its TTL
|
||||
etcd.leaserevoke(lease_id).wait();
|
||||
|
||||
// retrieves its TTL again, and it is now -1
|
||||
auto response = etcd.leasetimetolive(lease_id).get();
|
||||
REQUIRE(response.value().ttl() == -1);
|
||||
|
||||
// shorter than the TLL, or no sleep
|
||||
std::this_thread::sleep_for(std::chrono::seconds(1));
|
||||
|
||||
// expect keep_alive->Check() to throw exception
|
||||
REQUIRE_THROWS(keepalive->Check());
|
||||
}
|
||||
Loading…
Reference in New Issue