Handling (or checking) possible failure in lease's KeepAlive. (#53)

* Handling (or checking) possible failure in lease's KeepAlive.
* Add documentation.
* Enhance documentations.

Signed-off-by: Tao He <sighingnow@gmail.com>
This commit is contained in:
Tao He 2021-04-02 17:15:27 +08:00 committed by GitHub
parent 27e6e2ac11
commit 62a846d798
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 196 additions and 7 deletions

View File

@ -527,6 +527,29 @@ Keep alive for leases is implemented using a seperate class `KeepAlive`, which c
It will perform a periodly keep-alive action before it is cancelled explicitly, or destructed implicitly.
`KeepAlive` may fails (e.g., when the etcd server stopped unexpectedly), the constructor of `KeepAlive`
could accept a handler of type `std::function<std::exception_ptr>` and the handler will be invoked
when exception occurs during keeping it alive.
Note that the handler will invoked in a separated thread, not the thread where the `KeepAlive` object
is constructed.
```c++
std::function<void (std::exception_ptr)> handler = [](std::exception_ptr eptr) {
try {
if (eptr) {
std::rethrow_exception(eptr);
}
} catch(const std::exception& e) {
std::cerr << "Caught exception \"" << e.what() << "\"\n";
}
};
etcd::KeepAlive keepalive(etcd, handler, ttl, lease_id);
```
Without handler, the internal state can be checked via `KeepAlive::Check()` and it will rethrow
the async exception when there are errors during keeping the lease alive.
### TODO
1. Cancellation of asynchronous calls(except for watch)

View File

@ -1,6 +1,8 @@
#ifndef __ETCD_KEEPALIVE_HPP__
#define __ETCD_KEEPALIVE_HPP__
#include <exception>
#include <functional>
#include <string>
#include "etcd/Client.hpp"
@ -22,12 +24,26 @@ namespace etcd
class KeepAlive
{
public:
KeepAlive(Client const &client, int ttl, int64_t lease_id=0);
KeepAlive(std::string const & address, int ttl, int64_t lease_id=0);
KeepAlive(Client const &client,
int ttl, int64_t lease_id=0);
KeepAlive(std::string const & address,
int ttl, int64_t lease_id=0);
KeepAlive(std::string const & address,
std::string const & username, std::string const & password,
int ttl, int64_t lease_id=0);
KeepAlive(Client const &client,
std::function<void (std::exception_ptr)> const &handler,
int ttl, int64_t lease_id=0);
KeepAlive(std::string const & address,
std::function<void (std::exception_ptr)> const &handler,
int ttl, int64_t lease_id=0);
KeepAlive(std::string const & address,
std::string const & username, std::string const & password,
std::function<void (std::exception_ptr)> const &handler,
int ttl, int64_t lease_id=0);
KeepAlive(KeepAlive const &) = delete;
KeepAlive(KeepAlive &&) = delete;
@ -36,6 +52,13 @@ namespace etcd
*/
void Cancel();
/**
* Check if the keep alive is still valid (invalid when there's an async exception).
*
* Nothing will happen if valid and an exception will be rethrowed if invalid.
*/
void Check();
~KeepAlive();
protected:
@ -49,6 +72,10 @@ namespace etcd
std::unique_ptr<EtcdServerStubs, EtcdServerStubsDeleter> stubs;
private:
// error handling
std::exception_ptr eptr_;
std::function<void (std::exception_ptr)> handler_;
int ttl;
int64_t lease_id;
bool continue_next;

View File

@ -33,9 +33,13 @@ etcd::KeepAlive::KeepAlive(Client const &client, int ttl, int64_t lease_id):
stubs->call.reset(new etcdv3::AsyncLeaseKeepAliveAction(params));
currentTask = pplx::task<void>([this]() {
// start refresh
this->refresh();
context.run();
try {
// start refresh
this->refresh();
context.run();
} catch (...) {
eptr_ = std::current_exception();
}
context.stop(); // clean up
});
}
@ -50,6 +54,48 @@ etcd::KeepAlive::KeepAlive(std::string const & address,
KeepAlive(Client(address, username, password), ttl, lease_id) {
}
etcd::KeepAlive::KeepAlive(Client const &client,
std::function<void (std::exception_ptr)> const &handler,
int ttl, int64_t lease_id):
handler_(handler), ttl(ttl), lease_id(lease_id), continue_next(true) {
stubs.reset(new EtcdServerStubs{});
stubs->leaseServiceStub = Lease::NewStub(client.channel);
etcdv3::ActionParameters params;
params.auth_token.assign(client.auth_token);
params.lease_id = this->lease_id;
params.lease_stub = stubs->leaseServiceStub.get();
stubs->call.reset(new etcdv3::AsyncLeaseKeepAliveAction(params));
currentTask = pplx::task<void>([this]() {
try {
// start refresh
this->refresh();
context.run();
} catch (...) {
if (handler_) {
handler_(std::current_exception());
} else {
eptr_ = std::current_exception();
}
}
context.stop(); // clean up
});
}
etcd::KeepAlive::KeepAlive(std::string const & address,
std::function<void (std::exception_ptr)> const &handler,
int ttl, int64_t lease_id):
KeepAlive(Client(address), handler, ttl, lease_id) {
}
etcd::KeepAlive::KeepAlive(std::string const & address,
std::string const & username, std::string const & password,
std::function<void (std::exception_ptr)> const &handler,
int ttl, int64_t lease_id):
KeepAlive(Client(address, username, password), handler, ttl, lease_id) {
}
etcd::KeepAlive::~KeepAlive()
{
this->Cancel();
@ -64,7 +110,8 @@ void etcd::KeepAlive::Cancel()
#ifndef NDEBUG
{
std::ios::fmtflags os_flags (std::cout.flags());
std::cout << "Cancel keepalive for " << std::hex << lease_id << std::endl;
std::cout << "Cancel keepalive for " << lease_id
<< "(" << std::hex << lease_id << ")" << std::endl;
std::cout.flags(os_flags);
}
#endif
@ -75,6 +122,12 @@ void etcd::KeepAlive::Cancel()
currentTask.wait();
}
void etcd::KeepAlive::Check() {
if (eptr_) {
std::rethrow_exception(eptr_);
}
}
void etcd::KeepAlive::refresh()
{
if (!continue_next) {
@ -86,7 +139,8 @@ void etcd::KeepAlive::refresh()
{
std::ios::fmtflags os_flags (std::cout.flags());
std::cout << "Trigger the next keepalive round with ttl " << keepalive_ttl
<< " for " << std::hex << lease_id << std::endl;
<< " for " << lease_id
<< "(" << std::hex << lease_id << ")" << std::endl;
std::cout.flags(os_flags);
}
#endif

View File

@ -7,6 +7,7 @@
#include <thread>
#include "etcd/Client.hpp"
#include "etcd/KeepAlive.hpp"
TEST_CASE("lock and unlock")
@ -79,3 +80,87 @@ TEST_CASE("double lock will fail")
REQUIRE(resp5.is_ok());
REQUIRE(0 == resp5.error_code());
}
TEST_CASE("lock using lease")
{
etcd::Client etcd("http://127.0.0.1:2379");
bool failed = false;
std::function<void (std::exception_ptr)> handler = [&failed](std::exception_ptr eptr) {
try {
if (eptr) {
std::rethrow_exception(eptr);
}
} catch(const std::exception& e) {
std::cerr << "Caught exception \"" << e.what() << "\"\n";
failed = true;
}
};
// with handler
{
// grant lease and keep it alive
int64_t lease_id = etcd.leasegrant(5).get().value().lease();
etcd::KeepAlive keepalive(etcd, handler, 3, lease_id);
REQUIRE(!failed);
keepalive.Check(); // shouldn't throw
// lock
etcd::Response resp1 = etcd.lock("/test/abcd", lease_id).get();
CHECK("lock" == resp1.action());
REQUIRE(resp1.is_ok());
REQUIRE(0 == resp1.error_code());
REQUIRE(!failed);
keepalive.Check(); // shouldn't throw
std::this_thread::sleep_for(std::chrono::seconds(20));
REQUIRE(!failed);
keepalive.Check(); // shouldn't throw
// unlock
etcd::Response resp2 = etcd.unlock(resp1.lock_key()).get();
CHECK("unlock" == resp2.action());
REQUIRE(resp2.is_ok());
REQUIRE(0 == resp2.error_code());
REQUIRE(!failed);
keepalive.Check(); // shouldn't throw
}
// without handler
{
// grant lease and keep it alive
int64_t lease_id = etcd.leasegrant(5).get().value().lease();
etcd::KeepAlive keepalive(etcd, 3, lease_id);
REQUIRE(!failed);
keepalive.Check(); // shouldn't throw
// lock
etcd::Response resp1 = etcd.lock("/test/abcd", lease_id).get();
CHECK("lock" == resp1.action());
REQUIRE(resp1.is_ok());
REQUIRE(0 == resp1.error_code());
REQUIRE(!failed);
keepalive.Check(); // shouldn't throw
std::this_thread::sleep_for(std::chrono::seconds(20));
REQUIRE(!failed);
keepalive.Check(); // shouldn't throw
// unlock
etcd::Response resp2 = etcd.unlock(resp1.lock_key()).get();
CHECK("unlock" == resp2.action());
REQUIRE(resp2.is_ok());
REQUIRE(0 == resp2.error_code());
REQUIRE(!failed);
keepalive.Check(); // shouldn't throw
}
}