add retries for keepalive

add retries for keepalive

add retries for keepalive
This commit is contained in:
cheng.li 2022-05-31 10:38:22 +08:00
parent f21c45b362
commit de53587084
4 changed files with 103 additions and 46 deletions

View File

@ -18,6 +18,10 @@
#endif #endif
#include <boost/asio/steady_timer.hpp> #include <boost/asio/steady_timer.hpp>
namespace etcdv3 {
class ActionParameters;
}
namespace etcd namespace etcd
{ {
/** /**
@ -27,25 +31,25 @@ namespace etcd
{ {
public: public:
KeepAlive(Client const &client, KeepAlive(Client const &client,
int ttl, int64_t lease_id = 0); int ttl, int64_t lease_id = 0, int _max_retry_attempts = 3);
KeepAlive(std::string const & address, KeepAlive(std::string const & address,
int ttl, int64_t lease_id = 0); int ttl, int64_t lease_id = 0, int _max_retry_attempts = 3);
KeepAlive(std::string const & address, KeepAlive(std::string const & address,
std::string const & username, std::string const & password, std::string const & username, std::string const & password,
int ttl, int64_t lease_id = 0, int ttl, int64_t lease_id = 0,
int const auth_token_ttl = 300); int const auth_token_ttl = 300, int _max_retry_attempts = 3);
KeepAlive(Client const &client, KeepAlive(Client const &client,
std::function<void (std::exception_ptr)> const &handler, std::function<void (std::exception_ptr)> const &handler,
int ttl, int64_t lease_id = 0); int ttl, int64_t lease_id = 0, int _max_retry_attempts = 3);
KeepAlive(std::string const & address, KeepAlive(std::string const & address,
std::function<void (std::exception_ptr)> const &handler, std::function<void (std::exception_ptr)> const &handler,
int ttl, int64_t lease_id = 0); int ttl, int64_t lease_id = 0, int _max_retry_attempts = 3);
KeepAlive(std::string const & address, KeepAlive(std::string const & address,
std::string const & username, std::string const & password, std::string const & username, std::string const & password,
std::function<void (std::exception_ptr)> const &handler, std::function<void (std::exception_ptr)> const &handler,
int ttl, int64_t lease_id = 0, int ttl, int64_t lease_id = 0,
int const auth_token_ttl = 300); int const auth_token_ttl = 300, int _max_retry_attempts = 3);
KeepAlive(KeepAlive const &) = delete; KeepAlive(KeepAlive const &) = delete;
KeepAlive(KeepAlive &&) = delete; KeepAlive(KeepAlive &&) = delete;
@ -87,6 +91,10 @@ namespace etcd
int ttl; int ttl;
int64_t lease_id; int64_t lease_id;
std::atomic_bool continue_next; std::atomic_bool continue_next;
int max_retry_attempts;
int retry_attempts;
int timer_interval;
std::unique_ptr<etcdv3::ActionParameters> params;
#if BOOST_VERSION >= 106600 #if BOOST_VERSION >= 106600
boost::asio::io_context context; boost::asio::io_context context;
#else #else

View File

@ -43,7 +43,9 @@ namespace etcdv3
class AsyncLeaseKeepAliveAction: public etcdv3::Action { class AsyncLeaseKeepAliveAction: public etcdv3::Action {
public: public:
AsyncLeaseKeepAliveAction(etcdv3::ActionParameters const &param); AsyncLeaseKeepAliveAction(etcdv3::ActionParameters const &param,
std::chrono::milliseconds _retryConnWait = std::chrono::milliseconds(500));
~AsyncLeaseKeepAliveAction();
AsyncLeaseKeepAliveResponse ParseResponse(); AsyncLeaseKeepAliveResponse ParseResponse();
etcd::Response Refresh(); etcd::Response Refresh();
@ -57,6 +59,7 @@ namespace etcdv3
LeaseKeepAliveRequest req; LeaseKeepAliveRequest req;
bool isCancelled; bool isCancelled;
std::mutex protect_is_cancelled; std::mutex protect_is_cancelled;
std::chrono::milliseconds retryConnWait;
}; };
class AsyncLeaseTimeToLiveAction: public etcdv3::Action { class AsyncLeaseTimeToLiveAction: public etcdv3::Action {

View File

@ -21,19 +21,20 @@ void etcd::KeepAlive::EtcdServerStubsDeleter::operator()(etcd::KeepAlive::EtcdSe
} }
} }
etcd::KeepAlive::KeepAlive(Client const &client, int ttl, int64_t lease_id): etcd::KeepAlive::KeepAlive(Client const &client, int ttl, int64_t lease_id, int _max_retry_attempts):
ttl(ttl), lease_id(lease_id), continue_next(true) { ttl(ttl), lease_id(lease_id), continue_next(true), max_retry_attempts(_max_retry_attempts), retry_attempts(0) {
timer_interval = std::max(ttl/max_retry_attempts, 1);
stubs.reset(new EtcdServerStubs{}); stubs.reset(new EtcdServerStubs{});
stubs->leaseServiceStub = Lease::NewStub(client.channel); stubs->leaseServiceStub = Lease::NewStub(client.channel);
etcdv3::ActionParameters params; params.reset(new etcdv3::ActionParameters());
params.auth_token.assign(client.current_auth_token()); params->auth_token.assign(client.current_auth_token());
params.lease_id = this->lease_id; params->lease_id = this->lease_id;
params.lease_stub = stubs->leaseServiceStub.get(); params->lease_stub = stubs->leaseServiceStub.get();
continue_next.store(true); continue_next.store(true);
stubs->call.reset(new etcdv3::AsyncLeaseKeepAliveAction(params)); stubs->call.reset(new etcdv3::AsyncLeaseKeepAliveAction(*params));
task_ = std::thread([this]() { task_ = std::thread([this]() {
try { try {
// start refresh // start refresh
@ -46,29 +47,30 @@ etcd::KeepAlive::KeepAlive(Client const &client, int ttl, int64_t lease_id):
}); });
} }
etcd::KeepAlive::KeepAlive(std::string const & address, int ttl, int64_t lease_id): etcd::KeepAlive::KeepAlive(std::string const & address, int ttl, int64_t lease_id, int _max_retry_attempts):
KeepAlive(Client(address), ttl, lease_id) { KeepAlive(Client(address), ttl, lease_id, _max_retry_attempts) {
} }
etcd::KeepAlive::KeepAlive(std::string const & address, etcd::KeepAlive::KeepAlive(std::string const & address,
std::string const & username, std::string const & password, std::string const & username, std::string const & password,
int ttl, int64_t lease_id, int const auth_token_ttl): int ttl, int64_t lease_id, int const auth_token_ttl, int _max_retry_attempts):
KeepAlive(Client(address, username, password, auth_token_ttl), ttl, lease_id) { KeepAlive(Client(address, username, password, auth_token_ttl), ttl, lease_id, _max_retry_attempts) {
} }
etcd::KeepAlive::KeepAlive(Client const &client, etcd::KeepAlive::KeepAlive(Client const &client,
std::function<void (std::exception_ptr)> const &handler, std::function<void (std::exception_ptr)> const &handler,
int ttl, int64_t lease_id): int ttl, int64_t lease_id, int _max_retry_attempts):
handler_(handler), ttl(ttl), lease_id(lease_id), continue_next(true) { handler_(handler), ttl(ttl), lease_id(lease_id), continue_next(true), max_retry_attempts(_max_retry_attempts), retry_attempts(0) {
timer_interval = std::max(ttl/max_retry_attempts, 1);
stubs.reset(new EtcdServerStubs{}); stubs.reset(new EtcdServerStubs{});
stubs->leaseServiceStub = Lease::NewStub(client.channel); stubs->leaseServiceStub = Lease::NewStub(client.channel);
etcdv3::ActionParameters params; params.reset(new etcdv3::ActionParameters());
params.auth_token.assign(client.current_auth_token()); params->auth_token.assign(client.current_auth_token());
params.lease_id = this->lease_id; params->lease_id = this->lease_id;
params.lease_stub = stubs->leaseServiceStub.get(); params->lease_stub = stubs->leaseServiceStub.get();
stubs->call.reset(new etcdv3::AsyncLeaseKeepAliveAction(params)); stubs->call.reset(new etcdv3::AsyncLeaseKeepAliveAction(*params));
task_ = std::thread([this]() { task_ = std::thread([this]() {
try { try {
// start refresh // start refresh
@ -87,15 +89,15 @@ etcd::KeepAlive::KeepAlive(Client const &client,
etcd::KeepAlive::KeepAlive(std::string const & address, etcd::KeepAlive::KeepAlive(std::string const & address,
std::function<void (std::exception_ptr)> const &handler, std::function<void (std::exception_ptr)> const &handler,
int ttl, int64_t lease_id): int ttl, int64_t lease_id, int _max_retry_attempts):
KeepAlive(Client(address), handler, ttl, lease_id) { KeepAlive(Client(address), handler, ttl, lease_id, _max_retry_attempts) {
} }
etcd::KeepAlive::KeepAlive(std::string const & address, etcd::KeepAlive::KeepAlive(std::string const & address,
std::string const & username, std::string const & password, std::string const & username, std::string const & password,
std::function<void (std::exception_ptr)> const &handler, std::function<void (std::exception_ptr)> const &handler,
int ttl, int64_t lease_id, const int auth_token_ttl): int ttl, int64_t lease_id, const int auth_token_ttl, int _max_retry_attempts):
KeepAlive(Client(address, username, password, auth_token_ttl), handler, ttl, lease_id) { KeepAlive(Client(address, username, password, auth_token_ttl), handler, ttl, lease_id, _max_retry_attempts) {
} }
etcd::KeepAlive::~KeepAlive() etcd::KeepAlive::~KeepAlive()
@ -131,9 +133,8 @@ void etcd::KeepAlive::refresh()
return; return;
} }
// minimal resolution: 1 second // minimal resolution: 1 second
int keepalive_ttl = std::max(ttl - 1, 1);
keepalive_timer_.reset(new boost::asio::steady_timer( keepalive_timer_.reset(new boost::asio::steady_timer(
context, std::chrono::seconds(keepalive_ttl))); context, std::chrono::seconds(timer_interval)));
keepalive_timer_->async_wait([this](const boost::system::error_code& error) { keepalive_timer_->async_wait([this](const boost::system::error_code& error) {
if (error) { if (error) {
#ifndef NDEBUG #ifndef NDEBUG
@ -143,11 +144,19 @@ void etcd::KeepAlive::refresh()
if (this->continue_next.load()) { if (this->continue_next.load()) {
auto resp = this->stubs->call->Refresh(); auto resp = this->stubs->call->Refresh();
if (!resp.is_ok()) { if (!resp.is_ok()) {
throw std::runtime_error("Failed to refresh lease: error code: " + std::to_string(resp.error_code()) + ++retry_attempts;
", message: " + resp.error_message()); if (retry_attempts >= max_retry_attempts) {
} throw std::runtime_error("Failed to refresh lease: error code: " + std::to_string(resp.error_code()) +
if (resp.value().ttl() == 0) { ", message: " + resp.error_message());
}
// going to reset KeepAlive stream"
this->continue_next.store(true);
this->stubs->call.reset(new etcdv3::AsyncLeaseKeepAliveAction(*params));
} else if (resp.value().ttl() == 0) {
throw std::out_of_range("Failed to refresh lease due to expiration: the new TTL is 0."); throw std::out_of_range("Failed to refresh lease due to expiration: the new TTL is 0.");
} else {
// going to reset retry_attempts
retry_attempts = 0;
} }
// trigger the next round; // trigger the next round;
this->refresh(); this->refresh();

View File

@ -62,8 +62,8 @@ etcdv3::AsyncLeaseRevokeResponse etcdv3::AsyncLeaseRevokeAction::ParseResponse()
} }
etcdv3::AsyncLeaseKeepAliveAction::AsyncLeaseKeepAliveAction( etcdv3::AsyncLeaseKeepAliveAction::AsyncLeaseKeepAliveAction(
etcdv3::ActionParameters const &param) etcdv3::ActionParameters const &param, std::chrono::milliseconds _retryConnWait)
: etcdv3::Action(param) : etcdv3::Action(param), retryConnWait(_retryConnWait)
{ {
isCancelled = false; isCancelled = false;
stream = parameters.lease_stub->AsyncLeaseKeepAlive(&context, &cq_, (void*)etcdv3::KEEPALIVE_CREATE); stream = parameters.lease_stub->AsyncLeaseKeepAlive(&context, &cq_, (void*)etcdv3::KEEPALIVE_CREATE);
@ -109,19 +109,52 @@ etcd::Response etcdv3::AsyncLeaseKeepAliveAction::Refresh()
void *got_tag = nullptr; void *got_tag = nullptr;
bool ok = false; bool ok = false;
auto deadline = std::chrono::system_clock::now() + retryConnWait;
stream->Write(leasekeepalive_request, (void *)etcdv3::KEEPALIVE_WRITE); stream->Write(leasekeepalive_request, (void *)etcdv3::KEEPALIVE_WRITE);
// wait write finish // wait write finish
if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)etcdv3::KEEPALIVE_WRITE) { switch (cq_.AsyncNext(&got_tag, &ok, deadline)) {
stream->Read(&reply, (void*)etcdv3::KEEPALIVE_READ); case CompletionQueue::NextStatus::TIMEOUT: {
// wait read finish status = grpc::Status(grpc::StatusCode::DEADLINE_EXCEEDED, "gRPC timeout during keep alive write");
if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)etcdv3::KEEPALIVE_READ) { break;
auto resp = ParseResponse(); }
auto duration = std::chrono::duration_cast<std::chrono::microseconds>( case CompletionQueue::NextStatus::SHUTDOWN: {
std::chrono::high_resolution_clock::now() - start_timepoint); status = grpc::Status(grpc::StatusCode::UNAVAILABLE, "gRPC already shutdown during keep alive write");
return etcd::Response(resp, duration); break;
}
case CompletionQueue::NextStatus::GOT_EVENT: {
if (!ok || got_tag != (void *)etcdv3::KEEPALIVE_WRITE) {
status = grpc::Status(grpc::StatusCode::ABORTED, "Failed to create a lease keep-alive connection: write not ok or invalid tag");
}
} }
} }
return etcd::Response(grpc::StatusCode::ABORTED, "Failed to create a lease keep-alive connection");
if (!status.ok()) {
return etcd::Response(ParseResponse(), std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::high_resolution_clock::now() - start_timepoint));
}
stream->Read(&reply, (void*)etcdv3::KEEPALIVE_READ);
// wait read finish
switch (cq_.AsyncNext(&got_tag, &ok, deadline)) {
case CompletionQueue::NextStatus::TIMEOUT: {
status = grpc::Status(grpc::StatusCode::DEADLINE_EXCEEDED, "gRPC timeout during keep alive read");
break;
}
case CompletionQueue::NextStatus::SHUTDOWN: {
status = grpc::Status(grpc::StatusCode::UNAVAILABLE, "gRPC already shutdown during keep alive read");
break;
}
case CompletionQueue::NextStatus::GOT_EVENT: {
if (!ok || got_tag != (void *)etcdv3::KEEPALIVE_READ) {
status = grpc::Status(grpc::StatusCode::ABORTED, "Failed to create a lease keep-alive connection: read not ok or invalid tag");
}
break;
}
}
return etcd::Response(ParseResponse(), std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::high_resolution_clock::now() - start_timepoint));
} }
void etcdv3::AsyncLeaseKeepAliveAction::CancelKeepAlive() void etcdv3::AsyncLeaseKeepAliveAction::CancelKeepAlive()
@ -158,6 +191,10 @@ bool etcdv3::AsyncLeaseKeepAliveAction::Cancelled() const
return isCancelled; return isCancelled;
} }
etcdv3::AsyncLeaseKeepAliveAction::~AsyncLeaseKeepAliveAction() {
//CancelKeepAlive();
}
etcdv3::AsyncLeaseTimeToLiveAction::AsyncLeaseTimeToLiveAction( etcdv3::AsyncLeaseTimeToLiveAction::AsyncLeaseTimeToLiveAction(
etcdv3::ActionParameters const &param) etcdv3::ActionParameters const &param)
: etcdv3::Action(param) : etcdv3::Action(param)