From 62a846d79865f22a61e520023de9bbd1f1b48b9a Mon Sep 17 00:00:00 2001 From: Tao He Date: Fri, 2 Apr 2021 17:15:27 +0800 Subject: [PATCH] Handling (or checking) possible failure in lease's KeepAlive. (#53) * Handling (or checking) possible failure in lease's KeepAlive. * Add documentation. * Enhance documentations. Signed-off-by: Tao He --- README.md | 23 +++++++++++++ etcd/KeepAlive.hpp | 31 +++++++++++++++-- src/KeepAlive.cpp | 64 +++++++++++++++++++++++++++++++--- tst/LockTest.cpp | 85 ++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 196 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index e2e61dc..3ea730e 100644 --- a/README.md +++ b/README.md @@ -527,6 +527,29 @@ Keep alive for leases is implemented using a seperate class `KeepAlive`, which c It will perform a periodly keep-alive action before it is cancelled explicitly, or destructed implicitly. +`KeepAlive` may fails (e.g., when the etcd server stopped unexpectedly), the constructor of `KeepAlive` +could accept a handler of type `std::function` and the handler will be invoked +when exception occurs during keeping it alive. + +Note that the handler will invoked in a separated thread, not the thread where the `KeepAlive` object +is constructed. + +```c++ + std::function handler = [](std::exception_ptr eptr) { + try { + if (eptr) { + std::rethrow_exception(eptr); + } + } catch(const std::exception& e) { + std::cerr << "Caught exception \"" << e.what() << "\"\n"; + } + }; + etcd::KeepAlive keepalive(etcd, handler, ttl, lease_id); +``` + +Without handler, the internal state can be checked via `KeepAlive::Check()` and it will rethrow +the async exception when there are errors during keeping the lease alive. + ### TODO 1. Cancellation of asynchronous calls(except for watch) diff --git a/etcd/KeepAlive.hpp b/etcd/KeepAlive.hpp index 2165d7f..9e1ca11 100644 --- a/etcd/KeepAlive.hpp +++ b/etcd/KeepAlive.hpp @@ -1,6 +1,8 @@ #ifndef __ETCD_KEEPALIVE_HPP__ #define __ETCD_KEEPALIVE_HPP__ +#include +#include #include #include "etcd/Client.hpp" @@ -22,12 +24,26 @@ namespace etcd class KeepAlive { public: - KeepAlive(Client const &client, int ttl, int64_t lease_id=0); - KeepAlive(std::string const & address, int ttl, int64_t lease_id=0); + KeepAlive(Client const &client, + int ttl, int64_t lease_id=0); + KeepAlive(std::string const & address, + int ttl, int64_t lease_id=0); KeepAlive(std::string const & address, std::string const & username, std::string const & password, int ttl, int64_t lease_id=0); + KeepAlive(Client const &client, + std::function const &handler, + int ttl, int64_t lease_id=0); + KeepAlive(std::string const & address, + std::function const &handler, + int ttl, int64_t lease_id=0); + KeepAlive(std::string const & address, + std::string const & username, std::string const & password, + std::function const &handler, + int ttl, int64_t lease_id=0); + + KeepAlive(KeepAlive const &) = delete; KeepAlive(KeepAlive &&) = delete; @@ -36,6 +52,13 @@ namespace etcd */ void Cancel(); + /** + * Check if the keep alive is still valid (invalid when there's an async exception). + * + * Nothing will happen if valid and an exception will be rethrowed if invalid. + */ + void Check(); + ~KeepAlive(); protected: @@ -49,6 +72,10 @@ namespace etcd std::unique_ptr stubs; private: + // error handling + std::exception_ptr eptr_; + std::function handler_; + int ttl; int64_t lease_id; bool continue_next; diff --git a/src/KeepAlive.cpp b/src/KeepAlive.cpp index c0d9c70..14be257 100644 --- a/src/KeepAlive.cpp +++ b/src/KeepAlive.cpp @@ -33,9 +33,13 @@ etcd::KeepAlive::KeepAlive(Client const &client, int ttl, int64_t lease_id): stubs->call.reset(new etcdv3::AsyncLeaseKeepAliveAction(params)); currentTask = pplx::task([this]() { - // start refresh - this->refresh(); - context.run(); + try { + // start refresh + this->refresh(); + context.run(); + } catch (...) { + eptr_ = std::current_exception(); + } context.stop(); // clean up }); } @@ -50,6 +54,48 @@ etcd::KeepAlive::KeepAlive(std::string const & address, KeepAlive(Client(address, username, password), ttl, lease_id) { } +etcd::KeepAlive::KeepAlive(Client const &client, + std::function const &handler, + int ttl, int64_t lease_id): + handler_(handler), ttl(ttl), lease_id(lease_id), continue_next(true) { + stubs.reset(new EtcdServerStubs{}); + stubs->leaseServiceStub = Lease::NewStub(client.channel); + + etcdv3::ActionParameters params; + params.auth_token.assign(client.auth_token); + params.lease_id = this->lease_id; + params.lease_stub = stubs->leaseServiceStub.get(); + + stubs->call.reset(new etcdv3::AsyncLeaseKeepAliveAction(params)); + currentTask = pplx::task([this]() { + try { + // start refresh + this->refresh(); + context.run(); + } catch (...) { + if (handler_) { + handler_(std::current_exception()); + } else { + eptr_ = std::current_exception(); + } + } + context.stop(); // clean up + }); +} + +etcd::KeepAlive::KeepAlive(std::string const & address, + std::function const &handler, + int ttl, int64_t lease_id): + KeepAlive(Client(address), handler, ttl, lease_id) { +} + +etcd::KeepAlive::KeepAlive(std::string const & address, + std::string const & username, std::string const & password, + std::function const &handler, + int ttl, int64_t lease_id): + KeepAlive(Client(address, username, password), handler, ttl, lease_id) { +} + etcd::KeepAlive::~KeepAlive() { this->Cancel(); @@ -64,7 +110,8 @@ void etcd::KeepAlive::Cancel() #ifndef NDEBUG { std::ios::fmtflags os_flags (std::cout.flags()); - std::cout << "Cancel keepalive for " << std::hex << lease_id << std::endl; + std::cout << "Cancel keepalive for " << lease_id + << "(" << std::hex << lease_id << ")" << std::endl; std::cout.flags(os_flags); } #endif @@ -75,6 +122,12 @@ void etcd::KeepAlive::Cancel() currentTask.wait(); } +void etcd::KeepAlive::Check() { + if (eptr_) { + std::rethrow_exception(eptr_); + } +} + void etcd::KeepAlive::refresh() { if (!continue_next) { @@ -86,7 +139,8 @@ void etcd::KeepAlive::refresh() { std::ios::fmtflags os_flags (std::cout.flags()); std::cout << "Trigger the next keepalive round with ttl " << keepalive_ttl - << " for " << std::hex << lease_id << std::endl; + << " for " << lease_id + << "(" << std::hex << lease_id << ")" << std::endl; std::cout.flags(os_flags); } #endif diff --git a/tst/LockTest.cpp b/tst/LockTest.cpp index 6060517..78549ac 100644 --- a/tst/LockTest.cpp +++ b/tst/LockTest.cpp @@ -7,6 +7,7 @@ #include #include "etcd/Client.hpp" +#include "etcd/KeepAlive.hpp" TEST_CASE("lock and unlock") @@ -79,3 +80,87 @@ TEST_CASE("double lock will fail") REQUIRE(resp5.is_ok()); REQUIRE(0 == resp5.error_code()); } + +TEST_CASE("lock using lease") +{ + etcd::Client etcd("http://127.0.0.1:2379"); + + bool failed = false; + + std::function handler = [&failed](std::exception_ptr eptr) { + try { + if (eptr) { + std::rethrow_exception(eptr); + } + } catch(const std::exception& e) { + std::cerr << "Caught exception \"" << e.what() << "\"\n"; + failed = true; + } + }; + + // with handler + { + // grant lease and keep it alive + int64_t lease_id = etcd.leasegrant(5).get().value().lease(); + etcd::KeepAlive keepalive(etcd, handler, 3, lease_id); + + REQUIRE(!failed); + keepalive.Check(); // shouldn't throw + + // lock + etcd::Response resp1 = etcd.lock("/test/abcd", lease_id).get(); + CHECK("lock" == resp1.action()); + REQUIRE(resp1.is_ok()); + REQUIRE(0 == resp1.error_code()); + + REQUIRE(!failed); + keepalive.Check(); // shouldn't throw + + std::this_thread::sleep_for(std::chrono::seconds(20)); + + REQUIRE(!failed); + keepalive.Check(); // shouldn't throw + + // unlock + etcd::Response resp2 = etcd.unlock(resp1.lock_key()).get(); + CHECK("unlock" == resp2.action()); + REQUIRE(resp2.is_ok()); + REQUIRE(0 == resp2.error_code()); + + REQUIRE(!failed); + keepalive.Check(); // shouldn't throw + } + + // without handler + { + // grant lease and keep it alive + int64_t lease_id = etcd.leasegrant(5).get().value().lease(); + etcd::KeepAlive keepalive(etcd, 3, lease_id); + + REQUIRE(!failed); + keepalive.Check(); // shouldn't throw + + // lock + etcd::Response resp1 = etcd.lock("/test/abcd", lease_id).get(); + CHECK("lock" == resp1.action()); + REQUIRE(resp1.is_ok()); + REQUIRE(0 == resp1.error_code()); + + REQUIRE(!failed); + keepalive.Check(); // shouldn't throw + + std::this_thread::sleep_for(std::chrono::seconds(20)); + + REQUIRE(!failed); + keepalive.Check(); // shouldn't throw + + // unlock + etcd::Response resp2 = etcd.unlock(resp1.lock_key()).get(); + CHECK("unlock" == resp2.action()); + REQUIRE(resp2.is_ok()); + REQUIRE(0 == resp2.error_code()); + + REQUIRE(!failed); + keepalive.Check(); // shouldn't throw + } +}