diff --git a/etcd/Client.hpp b/etcd/Client.hpp index 0edcc10..dc8a841 100644 --- a/etcd/Client.hpp +++ b/etcd/Client.hpp @@ -1,6 +1,7 @@ #ifndef __ETCD_CLIENT_HPP__ #define __ETCD_CLIENT_HPP__ +#include #include #include #include @@ -625,8 +626,7 @@ namespace etcd /** * Get the current timeout value for grpc operations. */ - template - std::chrono::duration get_grpc_timeout() const { + std::chrono::microseconds get_grpc_timeout() const { return this->client->get_grpc_timeout(); } diff --git a/etcd/KeepAlive.hpp b/etcd/KeepAlive.hpp index 758c277..a65ecdb 100644 --- a/etcd/KeepAlive.hpp +++ b/etcd/KeepAlive.hpp @@ -2,6 +2,7 @@ #define __ETCD_KEEPALIVE_HPP__ #include +#include #include #include #include @@ -72,6 +73,21 @@ namespace etcd */ void Check(); + /** + * Set a timeout value for grpc operations. + */ + template + void set_grpc_timeout(std::chrono::duration const &timeout) { + this->grpc_timeout = std::chrono::duration_cast(timeout); + } + + /** + * Get the current timeout value for grpc operations. + */ + std::chrono::microseconds get_grpc_timeout() const { + return this->grpc_timeout; + } + ~KeepAlive(); protected: @@ -95,6 +111,10 @@ namespace etcd int ttl; int64_t lease_id; std::atomic_bool continue_next; + + // grpc timeout in `refresh()` + mutable std::chrono::microseconds grpc_timeout = std::chrono::microseconds::zero(); + #if BOOST_VERSION >= 106600 boost::asio::io_context context; #else diff --git a/etcd/SyncClient.hpp b/etcd/SyncClient.hpp index ccd3fc3..96770e7 100644 --- a/etcd/SyncClient.hpp +++ b/etcd/SyncClient.hpp @@ -724,9 +724,8 @@ namespace etcd /** * Get the current timeout value for grpc operations. */ - template - std::chrono::duration get_grpc_timeout() const { - return std::chrono::duration_cast>(grpc_timeout); + std::chrono::microseconds get_grpc_timeout() const { + return this->grpc_timeout; } private: diff --git a/etcd/v3/AsyncLeaseAction.hpp b/etcd/v3/AsyncLeaseAction.hpp index 070fa03..b50b98a 100644 --- a/etcd/v3/AsyncLeaseAction.hpp +++ b/etcd/v3/AsyncLeaseAction.hpp @@ -21,6 +21,10 @@ using etcdserverpb::LeaseTimeToLiveResponse; using etcdserverpb::LeaseStatus; using etcdserverpb::LeaseLeasesResponse; +namespace etcd { + class KeepAlive; +} + namespace etcdv3 { class AsyncLeaseGrantAction : public etcdv3::Action { @@ -51,12 +55,16 @@ namespace etcdv3 bool Cancelled() const; private: + etcdv3::ActionParameters& mutable_parameters(); + LeaseKeepAliveResponse reply; std::unique_ptr> stream; LeaseKeepAliveRequest req; bool isCancelled; std::mutex protect_is_cancelled; + + friend class etcd::KeepAlive; }; class AsyncLeaseTimeToLiveAction: public etcdv3::Action { diff --git a/src/KeepAlive.cpp b/src/KeepAlive.cpp index 45eddae..6de6679 100644 --- a/src/KeepAlive.cpp +++ b/src/KeepAlive.cpp @@ -1,4 +1,5 @@ #include +#include #include "etcd/KeepAlive.hpp" #include "etcd/v3/AsyncLeaseAction.hpp" @@ -22,12 +23,14 @@ void etcd::KeepAlive::EtcdServerStubsDeleter::operator()(etcd::KeepAlive::EtcdSe } etcd::KeepAlive::KeepAlive(SyncClient const &client, int ttl, int64_t lease_id): - ttl(ttl), lease_id(lease_id), continue_next(true) { + ttl(ttl), lease_id(lease_id), continue_next(true), + grpc_timeout(client.get_grpc_timeout()) { stubs.reset(new EtcdServerStubs{}); stubs->leaseServiceStub = Lease::NewStub(client.grpc_channel()); etcdv3::ActionParameters params; params.auth_token.assign(client.current_auth_token()); + params.grpc_timeout = grpc_timeout; params.lease_id = this->lease_id; params.lease_stub = stubs->leaseServiceStub.get(); @@ -59,7 +62,8 @@ etcd::KeepAlive::KeepAlive(std::string const & address, etcd::KeepAlive::KeepAlive(SyncClient const &client, std::function const &handler, int ttl, int64_t lease_id): - handler_(handler), ttl(ttl), lease_id(lease_id), continue_next(true) { + handler_(handler), ttl(ttl), lease_id(lease_id), continue_next(true), + grpc_timeout(client.get_grpc_timeout()) { stubs.reset(new EtcdServerStubs{}); stubs->leaseServiceStub = Lease::NewStub(client.grpc_channel()); @@ -133,8 +137,7 @@ void etcd::KeepAlive::refresh() } // minimal resolution: 1 second int keepalive_ttl = std::max(ttl - 1, 1); - keepalive_timer_.reset(new boost::asio::steady_timer( - context, std::chrono::seconds(keepalive_ttl))); + keepalive_timer_.reset(new boost::asio::steady_timer(context, std::chrono::seconds(keepalive_ttl))); keepalive_timer_->async_wait([this](const boost::system::error_code& error) { if (error) { #ifndef NDEBUG @@ -142,6 +145,7 @@ void etcd::KeepAlive::refresh() #endif } else { if (this->continue_next.load()) { + this->stubs->call->mutable_parameters().grpc_timeout = this->grpc_timeout; auto resp = this->stubs->call->Refresh(); if (!resp.is_ok()) { throw std::runtime_error("Failed to refresh lease: error code: " + std::to_string(resp.error_code()) + diff --git a/src/v3/AsyncLeaseAction.cpp b/src/v3/AsyncLeaseAction.cpp index 4b2f2d0..2e97d9b 100644 --- a/src/v3/AsyncLeaseAction.cpp +++ b/src/v3/AsyncLeaseAction.cpp @@ -1,7 +1,10 @@ #include "etcd/v3/AsyncLeaseAction.hpp" + #include "etcd/v3/action_constants.hpp" #include "etcd/v3/Transaction.hpp" +#include + using etcdserverpb::LeaseGrantRequest; using etcdserverpb::LeaseRevokeRequest; using etcdserverpb::LeaseCheckpointRequest; @@ -97,10 +100,9 @@ etcd::Response etcdv3::AsyncLeaseKeepAliveAction::Refresh() auto start_timepoint = std::chrono::high_resolution_clock::now(); if (isCancelled) { - auto resp = ParseResponse(); - auto duration = std::chrono::duration_cast( - std::chrono::high_resolution_clock::now() - start_timepoint); - return etcd::Response(resp, duration); + status = grpc::Status::CANCELLED; + return etcd::Response(ParseResponse(), std::chrono::duration_cast( + std::chrono::high_resolution_clock::now() - start_timepoint)); } LeaseKeepAliveRequest leasekeepalive_request; @@ -109,19 +111,62 @@ etcd::Response etcdv3::AsyncLeaseKeepAliveAction::Refresh() void *got_tag = nullptr; bool ok = false; - stream->Write(leasekeepalive_request, (void *)etcdv3::KEEPALIVE_WRITE); - // wait write finish - if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)etcdv3::KEEPALIVE_WRITE) { + if (parameters.has_grpc_timeout()) { + stream->Write(leasekeepalive_request, (void *)etcdv3::KEEPALIVE_WRITE); + // wait write finish + switch (cq_.AsyncNext(&got_tag, &ok, parameters.grpc_deadline())) { + case CompletionQueue::NextStatus::TIMEOUT: { + status = grpc::Status(grpc::StatusCode::DEADLINE_EXCEEDED, "gRPC timeout during keep alive write"); + break; + } + case CompletionQueue::NextStatus::SHUTDOWN: { + status = grpc::Status(grpc::StatusCode::UNAVAILABLE, "gRPC already shutdown during keep alive write"); + break; + } + case CompletionQueue::NextStatus::GOT_EVENT: { + if (!ok || got_tag != (void *)etcdv3::KEEPALIVE_WRITE) { + return etcd::Response(grpc::StatusCode::ABORTED, "Failed to create a lease keep-alive connection: write not ok or invalid tag"); + } + } + } + if (!status.ok()) { + return etcd::Response(ParseResponse(), std::chrono::duration_cast( + std::chrono::high_resolution_clock::now() - start_timepoint)); + } + stream->Read(&reply, (void*)etcdv3::KEEPALIVE_READ); // wait read finish - if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)etcdv3::KEEPALIVE_READ) { - auto resp = ParseResponse(); - auto duration = std::chrono::duration_cast( - std::chrono::high_resolution_clock::now() - start_timepoint); - return etcd::Response(resp, duration); + switch (cq_.AsyncNext(&got_tag, &ok, parameters.grpc_deadline())) { + case CompletionQueue::NextStatus::TIMEOUT: { + status = grpc::Status(grpc::StatusCode::DEADLINE_EXCEEDED, "gRPC timeout during keep alive read"); + break; + } + case CompletionQueue::NextStatus::SHUTDOWN: { + status = grpc::Status(grpc::StatusCode::UNAVAILABLE, "gRPC already shutdown during keep alive read"); + break; + } + case CompletionQueue::NextStatus::GOT_EVENT: { + if (!ok || got_tag != (void *)etcdv3::KEEPALIVE_READ) { + return etcd::Response(grpc::StatusCode::ABORTED, "Failed to create a lease keep-alive connection: read not ok or invalid tag"); + } + break; + } } + return etcd::Response(ParseResponse(), std::chrono::duration_cast( + std::chrono::high_resolution_clock::now() - start_timepoint)); + } else { + stream->Write(leasekeepalive_request, (void *)etcdv3::KEEPALIVE_WRITE); + // wait write finish + if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)etcdv3::KEEPALIVE_WRITE) { + stream->Read(&reply, (void*)etcdv3::KEEPALIVE_READ); + // wait read finish + if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)etcdv3::KEEPALIVE_READ) { + return etcd::Response(ParseResponse(), std::chrono::duration_cast( + std::chrono::high_resolution_clock::now() - start_timepoint)); + } + } + return etcd::Response(grpc::StatusCode::ABORTED, "Failed to create a lease keep-alive connection"); } - return etcd::Response(grpc::StatusCode::ABORTED, "Failed to create a lease keep-alive connection"); } void etcdv3::AsyncLeaseKeepAliveAction::CancelKeepAlive() @@ -158,6 +203,10 @@ bool etcdv3::AsyncLeaseKeepAliveAction::Cancelled() const return isCancelled; } +etcdv3::ActionParameters& etcdv3::AsyncLeaseKeepAliveAction::mutable_parameters() { + return this->parameters; +} + etcdv3::AsyncLeaseTimeToLiveAction::AsyncLeaseTimeToLiveAction( etcdv3::ActionParameters && params) : etcdv3::Action(std::move(params))