Enable grpc timeout support in keepalive's refresh.
Signed-off-by: Tao He <sighingnow@gmail.com>
This commit is contained in:
parent
2437a08e72
commit
7c9b9e5699
|
|
@ -1,6 +1,7 @@
|
||||||
#ifndef __ETCD_CLIENT_HPP__
|
#ifndef __ETCD_CLIENT_HPP__
|
||||||
#define __ETCD_CLIENT_HPP__
|
#define __ETCD_CLIENT_HPP__
|
||||||
|
|
||||||
|
#include <chrono>
|
||||||
#include <map>
|
#include <map>
|
||||||
#include <memory>
|
#include <memory>
|
||||||
#include <mutex>
|
#include <mutex>
|
||||||
|
|
@ -625,8 +626,7 @@ namespace etcd
|
||||||
/**
|
/**
|
||||||
* Get the current timeout value for grpc operations.
|
* Get the current timeout value for grpc operations.
|
||||||
*/
|
*/
|
||||||
template <typename Rep = std::micro>
|
std::chrono::microseconds get_grpc_timeout() const {
|
||||||
std::chrono::duration<Rep> get_grpc_timeout() const {
|
|
||||||
return this->client->get_grpc_timeout();
|
return this->client->get_grpc_timeout();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -2,6 +2,7 @@
|
||||||
#define __ETCD_KEEPALIVE_HPP__
|
#define __ETCD_KEEPALIVE_HPP__
|
||||||
|
|
||||||
#include <atomic>
|
#include <atomic>
|
||||||
|
#include <chrono>
|
||||||
#include <exception>
|
#include <exception>
|
||||||
#include <functional>
|
#include <functional>
|
||||||
#include <string>
|
#include <string>
|
||||||
|
|
@ -72,6 +73,21 @@ namespace etcd
|
||||||
*/
|
*/
|
||||||
void Check();
|
void Check();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set a timeout value for grpc operations.
|
||||||
|
*/
|
||||||
|
template <typename Rep = std::micro>
|
||||||
|
void set_grpc_timeout(std::chrono::duration<Rep> const &timeout) {
|
||||||
|
this->grpc_timeout = std::chrono::duration_cast<std::chrono::milliseconds>(timeout);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the current timeout value for grpc operations.
|
||||||
|
*/
|
||||||
|
std::chrono::microseconds get_grpc_timeout() const {
|
||||||
|
return this->grpc_timeout;
|
||||||
|
}
|
||||||
|
|
||||||
~KeepAlive();
|
~KeepAlive();
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
|
|
@ -95,6 +111,10 @@ namespace etcd
|
||||||
int ttl;
|
int ttl;
|
||||||
int64_t lease_id;
|
int64_t lease_id;
|
||||||
std::atomic_bool continue_next;
|
std::atomic_bool continue_next;
|
||||||
|
|
||||||
|
// grpc timeout in `refresh()`
|
||||||
|
mutable std::chrono::microseconds grpc_timeout = std::chrono::microseconds::zero();
|
||||||
|
|
||||||
#if BOOST_VERSION >= 106600
|
#if BOOST_VERSION >= 106600
|
||||||
boost::asio::io_context context;
|
boost::asio::io_context context;
|
||||||
#else
|
#else
|
||||||
|
|
|
||||||
|
|
@ -724,9 +724,8 @@ namespace etcd
|
||||||
/**
|
/**
|
||||||
* Get the current timeout value for grpc operations.
|
* Get the current timeout value for grpc operations.
|
||||||
*/
|
*/
|
||||||
template <typename Rep = std::micro>
|
std::chrono::microseconds get_grpc_timeout() const {
|
||||||
std::chrono::duration<Rep> get_grpc_timeout() const {
|
return this->grpc_timeout;
|
||||||
return std::chrono::duration_cast<std::chrono::duration<Rep>>(grpc_timeout);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
|
|
||||||
|
|
@ -21,6 +21,10 @@ using etcdserverpb::LeaseTimeToLiveResponse;
|
||||||
using etcdserverpb::LeaseStatus;
|
using etcdserverpb::LeaseStatus;
|
||||||
using etcdserverpb::LeaseLeasesResponse;
|
using etcdserverpb::LeaseLeasesResponse;
|
||||||
|
|
||||||
|
namespace etcd {
|
||||||
|
class KeepAlive;
|
||||||
|
}
|
||||||
|
|
||||||
namespace etcdv3
|
namespace etcdv3
|
||||||
{
|
{
|
||||||
class AsyncLeaseGrantAction : public etcdv3::Action {
|
class AsyncLeaseGrantAction : public etcdv3::Action {
|
||||||
|
|
@ -51,12 +55,16 @@ namespace etcdv3
|
||||||
bool Cancelled() const;
|
bool Cancelled() const;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
etcdv3::ActionParameters& mutable_parameters();
|
||||||
|
|
||||||
LeaseKeepAliveResponse reply;
|
LeaseKeepAliveResponse reply;
|
||||||
std::unique_ptr<ClientAsyncReaderWriter<LeaseKeepAliveRequest, LeaseKeepAliveResponse>> stream;
|
std::unique_ptr<ClientAsyncReaderWriter<LeaseKeepAliveRequest, LeaseKeepAliveResponse>> stream;
|
||||||
|
|
||||||
LeaseKeepAliveRequest req;
|
LeaseKeepAliveRequest req;
|
||||||
bool isCancelled;
|
bool isCancelled;
|
||||||
std::mutex protect_is_cancelled;
|
std::mutex protect_is_cancelled;
|
||||||
|
|
||||||
|
friend class etcd::KeepAlive;
|
||||||
};
|
};
|
||||||
|
|
||||||
class AsyncLeaseTimeToLiveAction: public etcdv3::Action {
|
class AsyncLeaseTimeToLiveAction: public etcdv3::Action {
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,5 @@
|
||||||
#include <chrono>
|
#include <chrono>
|
||||||
|
#include <ratio>
|
||||||
|
|
||||||
#include "etcd/KeepAlive.hpp"
|
#include "etcd/KeepAlive.hpp"
|
||||||
#include "etcd/v3/AsyncLeaseAction.hpp"
|
#include "etcd/v3/AsyncLeaseAction.hpp"
|
||||||
|
|
@ -22,12 +23,14 @@ void etcd::KeepAlive::EtcdServerStubsDeleter::operator()(etcd::KeepAlive::EtcdSe
|
||||||
}
|
}
|
||||||
|
|
||||||
etcd::KeepAlive::KeepAlive(SyncClient const &client, int ttl, int64_t lease_id):
|
etcd::KeepAlive::KeepAlive(SyncClient const &client, int ttl, int64_t lease_id):
|
||||||
ttl(ttl), lease_id(lease_id), continue_next(true) {
|
ttl(ttl), lease_id(lease_id), continue_next(true),
|
||||||
|
grpc_timeout(client.get_grpc_timeout()) {
|
||||||
stubs.reset(new EtcdServerStubs{});
|
stubs.reset(new EtcdServerStubs{});
|
||||||
stubs->leaseServiceStub = Lease::NewStub(client.grpc_channel());
|
stubs->leaseServiceStub = Lease::NewStub(client.grpc_channel());
|
||||||
|
|
||||||
etcdv3::ActionParameters params;
|
etcdv3::ActionParameters params;
|
||||||
params.auth_token.assign(client.current_auth_token());
|
params.auth_token.assign(client.current_auth_token());
|
||||||
|
params.grpc_timeout = grpc_timeout;
|
||||||
params.lease_id = this->lease_id;
|
params.lease_id = this->lease_id;
|
||||||
params.lease_stub = stubs->leaseServiceStub.get();
|
params.lease_stub = stubs->leaseServiceStub.get();
|
||||||
|
|
||||||
|
|
@ -59,7 +62,8 @@ etcd::KeepAlive::KeepAlive(std::string const & address,
|
||||||
etcd::KeepAlive::KeepAlive(SyncClient const &client,
|
etcd::KeepAlive::KeepAlive(SyncClient const &client,
|
||||||
std::function<void (std::exception_ptr)> const &handler,
|
std::function<void (std::exception_ptr)> const &handler,
|
||||||
int ttl, int64_t lease_id):
|
int ttl, int64_t lease_id):
|
||||||
handler_(handler), ttl(ttl), lease_id(lease_id), continue_next(true) {
|
handler_(handler), ttl(ttl), lease_id(lease_id), continue_next(true),
|
||||||
|
grpc_timeout(client.get_grpc_timeout()) {
|
||||||
stubs.reset(new EtcdServerStubs{});
|
stubs.reset(new EtcdServerStubs{});
|
||||||
stubs->leaseServiceStub = Lease::NewStub(client.grpc_channel());
|
stubs->leaseServiceStub = Lease::NewStub(client.grpc_channel());
|
||||||
|
|
||||||
|
|
@ -133,8 +137,7 @@ void etcd::KeepAlive::refresh()
|
||||||
}
|
}
|
||||||
// minimal resolution: 1 second
|
// minimal resolution: 1 second
|
||||||
int keepalive_ttl = std::max(ttl - 1, 1);
|
int keepalive_ttl = std::max(ttl - 1, 1);
|
||||||
keepalive_timer_.reset(new boost::asio::steady_timer(
|
keepalive_timer_.reset(new boost::asio::steady_timer(context, std::chrono::seconds(keepalive_ttl)));
|
||||||
context, std::chrono::seconds(keepalive_ttl)));
|
|
||||||
keepalive_timer_->async_wait([this](const boost::system::error_code& error) {
|
keepalive_timer_->async_wait([this](const boost::system::error_code& error) {
|
||||||
if (error) {
|
if (error) {
|
||||||
#ifndef NDEBUG
|
#ifndef NDEBUG
|
||||||
|
|
@ -142,6 +145,7 @@ void etcd::KeepAlive::refresh()
|
||||||
#endif
|
#endif
|
||||||
} else {
|
} else {
|
||||||
if (this->continue_next.load()) {
|
if (this->continue_next.load()) {
|
||||||
|
this->stubs->call->mutable_parameters().grpc_timeout = this->grpc_timeout;
|
||||||
auto resp = this->stubs->call->Refresh();
|
auto resp = this->stubs->call->Refresh();
|
||||||
if (!resp.is_ok()) {
|
if (!resp.is_ok()) {
|
||||||
throw std::runtime_error("Failed to refresh lease: error code: " + std::to_string(resp.error_code()) +
|
throw std::runtime_error("Failed to refresh lease: error code: " + std::to_string(resp.error_code()) +
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,10 @@
|
||||||
#include "etcd/v3/AsyncLeaseAction.hpp"
|
#include "etcd/v3/AsyncLeaseAction.hpp"
|
||||||
|
|
||||||
#include "etcd/v3/action_constants.hpp"
|
#include "etcd/v3/action_constants.hpp"
|
||||||
#include "etcd/v3/Transaction.hpp"
|
#include "etcd/v3/Transaction.hpp"
|
||||||
|
|
||||||
|
#include <grpcpp/support/status.h>
|
||||||
|
|
||||||
using etcdserverpb::LeaseGrantRequest;
|
using etcdserverpb::LeaseGrantRequest;
|
||||||
using etcdserverpb::LeaseRevokeRequest;
|
using etcdserverpb::LeaseRevokeRequest;
|
||||||
using etcdserverpb::LeaseCheckpointRequest;
|
using etcdserverpb::LeaseCheckpointRequest;
|
||||||
|
|
@ -97,10 +100,9 @@ etcd::Response etcdv3::AsyncLeaseKeepAliveAction::Refresh()
|
||||||
|
|
||||||
auto start_timepoint = std::chrono::high_resolution_clock::now();
|
auto start_timepoint = std::chrono::high_resolution_clock::now();
|
||||||
if (isCancelled) {
|
if (isCancelled) {
|
||||||
auto resp = ParseResponse();
|
status = grpc::Status::CANCELLED;
|
||||||
auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
|
return etcd::Response(ParseResponse(), std::chrono::duration_cast<std::chrono::microseconds>(
|
||||||
std::chrono::high_resolution_clock::now() - start_timepoint);
|
std::chrono::high_resolution_clock::now() - start_timepoint));
|
||||||
return etcd::Response(resp, duration);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
LeaseKeepAliveRequest leasekeepalive_request;
|
LeaseKeepAliveRequest leasekeepalive_request;
|
||||||
|
|
@ -109,19 +111,62 @@ etcd::Response etcdv3::AsyncLeaseKeepAliveAction::Refresh()
|
||||||
void *got_tag = nullptr;
|
void *got_tag = nullptr;
|
||||||
bool ok = false;
|
bool ok = false;
|
||||||
|
|
||||||
stream->Write(leasekeepalive_request, (void *)etcdv3::KEEPALIVE_WRITE);
|
if (parameters.has_grpc_timeout()) {
|
||||||
// wait write finish
|
stream->Write(leasekeepalive_request, (void *)etcdv3::KEEPALIVE_WRITE);
|
||||||
if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)etcdv3::KEEPALIVE_WRITE) {
|
// wait write finish
|
||||||
|
switch (cq_.AsyncNext(&got_tag, &ok, parameters.grpc_deadline())) {
|
||||||
|
case CompletionQueue::NextStatus::TIMEOUT: {
|
||||||
|
status = grpc::Status(grpc::StatusCode::DEADLINE_EXCEEDED, "gRPC timeout during keep alive write");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case CompletionQueue::NextStatus::SHUTDOWN: {
|
||||||
|
status = grpc::Status(grpc::StatusCode::UNAVAILABLE, "gRPC already shutdown during keep alive write");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case CompletionQueue::NextStatus::GOT_EVENT: {
|
||||||
|
if (!ok || got_tag != (void *)etcdv3::KEEPALIVE_WRITE) {
|
||||||
|
return etcd::Response(grpc::StatusCode::ABORTED, "Failed to create a lease keep-alive connection: write not ok or invalid tag");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!status.ok()) {
|
||||||
|
return etcd::Response(ParseResponse(), std::chrono::duration_cast<std::chrono::microseconds>(
|
||||||
|
std::chrono::high_resolution_clock::now() - start_timepoint));
|
||||||
|
}
|
||||||
|
|
||||||
stream->Read(&reply, (void*)etcdv3::KEEPALIVE_READ);
|
stream->Read(&reply, (void*)etcdv3::KEEPALIVE_READ);
|
||||||
// wait read finish
|
// wait read finish
|
||||||
if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)etcdv3::KEEPALIVE_READ) {
|
switch (cq_.AsyncNext(&got_tag, &ok, parameters.grpc_deadline())) {
|
||||||
auto resp = ParseResponse();
|
case CompletionQueue::NextStatus::TIMEOUT: {
|
||||||
auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
|
status = grpc::Status(grpc::StatusCode::DEADLINE_EXCEEDED, "gRPC timeout during keep alive read");
|
||||||
std::chrono::high_resolution_clock::now() - start_timepoint);
|
break;
|
||||||
return etcd::Response(resp, duration);
|
}
|
||||||
|
case CompletionQueue::NextStatus::SHUTDOWN: {
|
||||||
|
status = grpc::Status(grpc::StatusCode::UNAVAILABLE, "gRPC already shutdown during keep alive read");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case CompletionQueue::NextStatus::GOT_EVENT: {
|
||||||
|
if (!ok || got_tag != (void *)etcdv3::KEEPALIVE_READ) {
|
||||||
|
return etcd::Response(grpc::StatusCode::ABORTED, "Failed to create a lease keep-alive connection: read not ok or invalid tag");
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
return etcd::Response(ParseResponse(), std::chrono::duration_cast<std::chrono::microseconds>(
|
||||||
|
std::chrono::high_resolution_clock::now() - start_timepoint));
|
||||||
|
} else {
|
||||||
|
stream->Write(leasekeepalive_request, (void *)etcdv3::KEEPALIVE_WRITE);
|
||||||
|
// wait write finish
|
||||||
|
if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)etcdv3::KEEPALIVE_WRITE) {
|
||||||
|
stream->Read(&reply, (void*)etcdv3::KEEPALIVE_READ);
|
||||||
|
// wait read finish
|
||||||
|
if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)etcdv3::KEEPALIVE_READ) {
|
||||||
|
return etcd::Response(ParseResponse(), std::chrono::duration_cast<std::chrono::microseconds>(
|
||||||
|
std::chrono::high_resolution_clock::now() - start_timepoint));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return etcd::Response(grpc::StatusCode::ABORTED, "Failed to create a lease keep-alive connection");
|
||||||
}
|
}
|
||||||
return etcd::Response(grpc::StatusCode::ABORTED, "Failed to create a lease keep-alive connection");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void etcdv3::AsyncLeaseKeepAliveAction::CancelKeepAlive()
|
void etcdv3::AsyncLeaseKeepAliveAction::CancelKeepAlive()
|
||||||
|
|
@ -158,6 +203,10 @@ bool etcdv3::AsyncLeaseKeepAliveAction::Cancelled() const
|
||||||
return isCancelled;
|
return isCancelled;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
etcdv3::ActionParameters& etcdv3::AsyncLeaseKeepAliveAction::mutable_parameters() {
|
||||||
|
return this->parameters;
|
||||||
|
}
|
||||||
|
|
||||||
etcdv3::AsyncLeaseTimeToLiveAction::AsyncLeaseTimeToLiveAction(
|
etcdv3::AsyncLeaseTimeToLiveAction::AsyncLeaseTimeToLiveAction(
|
||||||
etcdv3::ActionParameters && params)
|
etcdv3::ActionParameters && params)
|
||||||
: etcdv3::Action(std::move(params))
|
: etcdv3::Action(std::move(params))
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue