From de5358708465fb2c9b706fcc92c392c33df142e8 Mon Sep 17 00:00:00 2001 From: "cheng.li" Date: Tue, 31 May 2022 10:38:22 +0800 Subject: [PATCH] add retries for keepalive add retries for keepalive add retries for keepalive --- etcd/KeepAlive.hpp | 20 +++++++---- etcd/v3/AsyncLeaseAction.hpp | 5 ++- src/KeepAlive.cpp | 65 ++++++++++++++++++++---------------- src/v3/AsyncLeaseAction.cpp | 59 ++++++++++++++++++++++++++------ 4 files changed, 103 insertions(+), 46 deletions(-) diff --git a/etcd/KeepAlive.hpp b/etcd/KeepAlive.hpp index 79fe637..849f433 100644 --- a/etcd/KeepAlive.hpp +++ b/etcd/KeepAlive.hpp @@ -18,6 +18,10 @@ #endif #include +namespace etcdv3 { + class ActionParameters; +} + namespace etcd { /** @@ -27,25 +31,25 @@ namespace etcd { public: KeepAlive(Client const &client, - int ttl, int64_t lease_id = 0); + int ttl, int64_t lease_id = 0, int _max_retry_attempts = 3); KeepAlive(std::string const & address, - int ttl, int64_t lease_id = 0); + int ttl, int64_t lease_id = 0, int _max_retry_attempts = 3); KeepAlive(std::string const & address, std::string const & username, std::string const & password, int ttl, int64_t lease_id = 0, - int const auth_token_ttl = 300); + int const auth_token_ttl = 300, int _max_retry_attempts = 3); KeepAlive(Client const &client, std::function const &handler, - int ttl, int64_t lease_id = 0); + int ttl, int64_t lease_id = 0, int _max_retry_attempts = 3); KeepAlive(std::string const & address, std::function const &handler, - int ttl, int64_t lease_id = 0); + int ttl, int64_t lease_id = 0, int _max_retry_attempts = 3); KeepAlive(std::string const & address, std::string const & username, std::string const & password, std::function const &handler, int ttl, int64_t lease_id = 0, - int const auth_token_ttl = 300); + int const auth_token_ttl = 300, int _max_retry_attempts = 3); KeepAlive(KeepAlive const &) = delete; KeepAlive(KeepAlive &&) = delete; @@ -87,6 +91,10 @@ namespace etcd int ttl; int64_t lease_id; std::atomic_bool continue_next; + int max_retry_attempts; + int retry_attempts; + int timer_interval; + std::unique_ptr params; #if BOOST_VERSION >= 106600 boost::asio::io_context context; #else diff --git a/etcd/v3/AsyncLeaseAction.hpp b/etcd/v3/AsyncLeaseAction.hpp index 2ae8466..4e2d489 100644 --- a/etcd/v3/AsyncLeaseAction.hpp +++ b/etcd/v3/AsyncLeaseAction.hpp @@ -43,7 +43,9 @@ namespace etcdv3 class AsyncLeaseKeepAliveAction: public etcdv3::Action { public: - AsyncLeaseKeepAliveAction(etcdv3::ActionParameters const ¶m); + AsyncLeaseKeepAliveAction(etcdv3::ActionParameters const ¶m, + std::chrono::milliseconds _retryConnWait = std::chrono::milliseconds(500)); + ~AsyncLeaseKeepAliveAction(); AsyncLeaseKeepAliveResponse ParseResponse(); etcd::Response Refresh(); @@ -57,6 +59,7 @@ namespace etcdv3 LeaseKeepAliveRequest req; bool isCancelled; std::mutex protect_is_cancelled; + std::chrono::milliseconds retryConnWait; }; class AsyncLeaseTimeToLiveAction: public etcdv3::Action { diff --git a/src/KeepAlive.cpp b/src/KeepAlive.cpp index c298913..583002e 100644 --- a/src/KeepAlive.cpp +++ b/src/KeepAlive.cpp @@ -21,19 +21,20 @@ void etcd::KeepAlive::EtcdServerStubsDeleter::operator()(etcd::KeepAlive::EtcdSe } } -etcd::KeepAlive::KeepAlive(Client const &client, int ttl, int64_t lease_id): - ttl(ttl), lease_id(lease_id), continue_next(true) { +etcd::KeepAlive::KeepAlive(Client const &client, int ttl, int64_t lease_id, int _max_retry_attempts): + ttl(ttl), lease_id(lease_id), continue_next(true), max_retry_attempts(_max_retry_attempts), retry_attempts(0) { + timer_interval = std::max(ttl/max_retry_attempts, 1); stubs.reset(new EtcdServerStubs{}); stubs->leaseServiceStub = Lease::NewStub(client.channel); - etcdv3::ActionParameters params; - params.auth_token.assign(client.current_auth_token()); - params.lease_id = this->lease_id; - params.lease_stub = stubs->leaseServiceStub.get(); + params.reset(new etcdv3::ActionParameters()); + params->auth_token.assign(client.current_auth_token()); + params->lease_id = this->lease_id; + params->lease_stub = stubs->leaseServiceStub.get(); continue_next.store(true); - stubs->call.reset(new etcdv3::AsyncLeaseKeepAliveAction(params)); + stubs->call.reset(new etcdv3::AsyncLeaseKeepAliveAction(*params)); task_ = std::thread([this]() { try { // start refresh @@ -46,29 +47,30 @@ etcd::KeepAlive::KeepAlive(Client const &client, int ttl, int64_t lease_id): }); } -etcd::KeepAlive::KeepAlive(std::string const & address, int ttl, int64_t lease_id): - KeepAlive(Client(address), ttl, lease_id) { +etcd::KeepAlive::KeepAlive(std::string const & address, int ttl, int64_t lease_id, int _max_retry_attempts): + KeepAlive(Client(address), ttl, lease_id, _max_retry_attempts) { } etcd::KeepAlive::KeepAlive(std::string const & address, std::string const & username, std::string const & password, - int ttl, int64_t lease_id, int const auth_token_ttl): - KeepAlive(Client(address, username, password, auth_token_ttl), ttl, lease_id) { + int ttl, int64_t lease_id, int const auth_token_ttl, int _max_retry_attempts): + KeepAlive(Client(address, username, password, auth_token_ttl), ttl, lease_id, _max_retry_attempts) { } etcd::KeepAlive::KeepAlive(Client const &client, std::function const &handler, - int ttl, int64_t lease_id): - handler_(handler), ttl(ttl), lease_id(lease_id), continue_next(true) { + int ttl, int64_t lease_id, int _max_retry_attempts): + handler_(handler), ttl(ttl), lease_id(lease_id), continue_next(true), max_retry_attempts(_max_retry_attempts), retry_attempts(0) { + timer_interval = std::max(ttl/max_retry_attempts, 1); stubs.reset(new EtcdServerStubs{}); stubs->leaseServiceStub = Lease::NewStub(client.channel); - etcdv3::ActionParameters params; - params.auth_token.assign(client.current_auth_token()); - params.lease_id = this->lease_id; - params.lease_stub = stubs->leaseServiceStub.get(); + params.reset(new etcdv3::ActionParameters()); + params->auth_token.assign(client.current_auth_token()); + params->lease_id = this->lease_id; + params->lease_stub = stubs->leaseServiceStub.get(); - stubs->call.reset(new etcdv3::AsyncLeaseKeepAliveAction(params)); + stubs->call.reset(new etcdv3::AsyncLeaseKeepAliveAction(*params)); task_ = std::thread([this]() { try { // start refresh @@ -87,15 +89,15 @@ etcd::KeepAlive::KeepAlive(Client const &client, etcd::KeepAlive::KeepAlive(std::string const & address, std::function const &handler, - int ttl, int64_t lease_id): - KeepAlive(Client(address), handler, ttl, lease_id) { + int ttl, int64_t lease_id, int _max_retry_attempts): + KeepAlive(Client(address), handler, ttl, lease_id, _max_retry_attempts) { } etcd::KeepAlive::KeepAlive(std::string const & address, std::string const & username, std::string const & password, std::function const &handler, - int ttl, int64_t lease_id, const int auth_token_ttl): - KeepAlive(Client(address, username, password, auth_token_ttl), handler, ttl, lease_id) { + int ttl, int64_t lease_id, const int auth_token_ttl, int _max_retry_attempts): + KeepAlive(Client(address, username, password, auth_token_ttl), handler, ttl, lease_id, _max_retry_attempts) { } etcd::KeepAlive::~KeepAlive() @@ -131,9 +133,8 @@ void etcd::KeepAlive::refresh() return; } // minimal resolution: 1 second - int keepalive_ttl = std::max(ttl - 1, 1); keepalive_timer_.reset(new boost::asio::steady_timer( - context, std::chrono::seconds(keepalive_ttl))); + context, std::chrono::seconds(timer_interval))); keepalive_timer_->async_wait([this](const boost::system::error_code& error) { if (error) { #ifndef NDEBUG @@ -143,11 +144,19 @@ void etcd::KeepAlive::refresh() if (this->continue_next.load()) { auto resp = this->stubs->call->Refresh(); if (!resp.is_ok()) { - throw std::runtime_error("Failed to refresh lease: error code: " + std::to_string(resp.error_code()) + - ", message: " + resp.error_message()); - } - if (resp.value().ttl() == 0) { + ++retry_attempts; + if (retry_attempts >= max_retry_attempts) { + throw std::runtime_error("Failed to refresh lease: error code: " + std::to_string(resp.error_code()) + + ", message: " + resp.error_message()); + } + // going to reset KeepAlive stream" + this->continue_next.store(true); + this->stubs->call.reset(new etcdv3::AsyncLeaseKeepAliveAction(*params)); + } else if (resp.value().ttl() == 0) { throw std::out_of_range("Failed to refresh lease due to expiration: the new TTL is 0."); + } else { + // going to reset retry_attempts + retry_attempts = 0; } // trigger the next round; this->refresh(); diff --git a/src/v3/AsyncLeaseAction.cpp b/src/v3/AsyncLeaseAction.cpp index 5c9925f..4778a81 100644 --- a/src/v3/AsyncLeaseAction.cpp +++ b/src/v3/AsyncLeaseAction.cpp @@ -62,8 +62,8 @@ etcdv3::AsyncLeaseRevokeResponse etcdv3::AsyncLeaseRevokeAction::ParseResponse() } etcdv3::AsyncLeaseKeepAliveAction::AsyncLeaseKeepAliveAction( - etcdv3::ActionParameters const ¶m) - : etcdv3::Action(param) + etcdv3::ActionParameters const ¶m, std::chrono::milliseconds _retryConnWait) + : etcdv3::Action(param), retryConnWait(_retryConnWait) { isCancelled = false; stream = parameters.lease_stub->AsyncLeaseKeepAlive(&context, &cq_, (void*)etcdv3::KEEPALIVE_CREATE); @@ -109,19 +109,52 @@ etcd::Response etcdv3::AsyncLeaseKeepAliveAction::Refresh() void *got_tag = nullptr; bool ok = false; + auto deadline = std::chrono::system_clock::now() + retryConnWait; + stream->Write(leasekeepalive_request, (void *)etcdv3::KEEPALIVE_WRITE); // wait write finish - if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)etcdv3::KEEPALIVE_WRITE) { - stream->Read(&reply, (void*)etcdv3::KEEPALIVE_READ); - // wait read finish - if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)etcdv3::KEEPALIVE_READ) { - auto resp = ParseResponse(); - auto duration = std::chrono::duration_cast( - std::chrono::high_resolution_clock::now() - start_timepoint); - return etcd::Response(resp, duration); + switch (cq_.AsyncNext(&got_tag, &ok, deadline)) { + case CompletionQueue::NextStatus::TIMEOUT: { + status = grpc::Status(grpc::StatusCode::DEADLINE_EXCEEDED, "gRPC timeout during keep alive write"); + break; + } + case CompletionQueue::NextStatus::SHUTDOWN: { + status = grpc::Status(grpc::StatusCode::UNAVAILABLE, "gRPC already shutdown during keep alive write"); + break; + } + case CompletionQueue::NextStatus::GOT_EVENT: { + if (!ok || got_tag != (void *)etcdv3::KEEPALIVE_WRITE) { + status = grpc::Status(grpc::StatusCode::ABORTED, "Failed to create a lease keep-alive connection: write not ok or invalid tag"); + } } } - return etcd::Response(grpc::StatusCode::ABORTED, "Failed to create a lease keep-alive connection"); + + if (!status.ok()) { + return etcd::Response(ParseResponse(), std::chrono::duration_cast( + std::chrono::high_resolution_clock::now() - start_timepoint)); + } + + stream->Read(&reply, (void*)etcdv3::KEEPALIVE_READ); + // wait read finish + switch (cq_.AsyncNext(&got_tag, &ok, deadline)) { + case CompletionQueue::NextStatus::TIMEOUT: { + status = grpc::Status(grpc::StatusCode::DEADLINE_EXCEEDED, "gRPC timeout during keep alive read"); + break; + } + case CompletionQueue::NextStatus::SHUTDOWN: { + status = grpc::Status(grpc::StatusCode::UNAVAILABLE, "gRPC already shutdown during keep alive read"); + break; + } + case CompletionQueue::NextStatus::GOT_EVENT: { + if (!ok || got_tag != (void *)etcdv3::KEEPALIVE_READ) { + status = grpc::Status(grpc::StatusCode::ABORTED, "Failed to create a lease keep-alive connection: read not ok or invalid tag"); + } + break; + } + } + + return etcd::Response(ParseResponse(), std::chrono::duration_cast( + std::chrono::high_resolution_clock::now() - start_timepoint)); } void etcdv3::AsyncLeaseKeepAliveAction::CancelKeepAlive() @@ -158,6 +191,10 @@ bool etcdv3::AsyncLeaseKeepAliveAction::Cancelled() const return isCancelled; } +etcdv3::AsyncLeaseKeepAliveAction::~AsyncLeaseKeepAliveAction() { + //CancelKeepAlive(); +} + etcdv3::AsyncLeaseTimeToLiveAction::AsyncLeaseTimeToLiveAction( etcdv3::ActionParameters const ¶m) : etcdv3::Action(param)