From 3cbfb51269f80f661dca4fd1a4db2583aa588dc4 Mon Sep 17 00:00:00 2001 From: Tao He Date: Sun, 22 May 2022 22:45:50 +0800 Subject: [PATCH] Implements the timeout feature to the etcd client. Signed-off-by: Tao He --- README.md | 21 ++++++++++++++++++++- etcd/Client.hpp | 16 ++++++++++++++++ etcd/Response.hpp | 5 +++++ etcd/SyncClient.hpp | 19 +++++++++++++++++++ etcd/v3/Action.hpp | 10 ++++++++++ src/KeepAlive.cpp | 1 + src/Response.cpp | 5 +++++ src/SyncClient.cpp | 28 ++++++++++++++++++++++++++++ src/Watcher.cpp | 1 + src/v3/Action.cpp | 30 ++++++++++++++++++++++++++++-- tst/LockTest.cpp | 29 +++++++++++++++++++++++++++++ 11 files changed, 162 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index f1fbb17..ab59383 100644 --- a/README.md +++ b/README.md @@ -300,7 +300,6 @@ And pass a `target_name_override` arguments to `WithSSL`, etcd::Client *etcd = etcd::Client::WithSSL( "https://127.0.0.1:2379,https://127.0.0.2:2479", "example.rootca.cert", "example.cert", "example.key", "etcd"); - ``` For more discussion about this feature, see also [#87](https://github.com/etcd-cpp-apiv3/etcd-cpp-apiv3/issues/87), @@ -336,6 +335,26 @@ which can be used for fine-grained control the gRPC settings, e.g., For more motivation and discussion about the above design, please refer to [issue-103](https://github.com/etcd-cpp-apiv3/etcd-cpp-apiv3/issues/103). +### gRPC timeout when waiting for responses + +gRPC Timeout is long-standing missing pieces in the etcd-cpp-apiv3 library. The timeout has been +supported via a `set_grpc_timeout` interfaces on the client, + +```cpp + template + void set_grpc_timeout(std::chrono::duration const &timeout) +``` + +Any `std::chrono::duration` value can be used to set the grpc timeout, e.g., + +```cpp + etcd.set_grpc_timeout(std::chrono::seconds(5)); +``` + +Note that the timeout value is the "timeout" when waiting for responses upon the gRPC channel, i.e., `CompletionQueue::AsyncNext`. +It doesn't means the timeout between issuing a `.set()` method getting the `etcd::Response`, as in the async mode the such a time +duration is unpredictable and the gRPC timeout should be enough to avoid deadly waiting (e.g., waiting for a `lock()`). + ### Reading a value You can read a value with the `get()` method of the client instance. The only parameter is the diff --git a/etcd/Client.hpp b/etcd/Client.hpp index e07b260..0edcc10 100644 --- a/etcd/Client.hpp +++ b/etcd/Client.hpp @@ -614,6 +614,22 @@ namespace etcd std::shared_ptr grpc_channel() const; #endif + /** + * Set a timeout value for grpc operations. + */ + template + void set_grpc_timeout(std::chrono::duration const &timeout) { + this->client->set_grpc_timeout(timeout); + } + + /** + * Get the current timeout value for grpc operations. + */ + template + std::chrono::duration get_grpc_timeout() const { + return this->client->get_grpc_timeout(); + } + /** * Obtain the underlying synchronous client. */ diff --git a/etcd/Response.hpp b/etcd/Response.hpp index 5e9dc26..56b9b26 100644 --- a/etcd/Response.hpp +++ b/etcd/Response.hpp @@ -107,6 +107,11 @@ namespace etcd */ int error_code() const; + /** + * Check whether the response contains a grpc TIMEOUT error. + */ + bool is_grpc_timeout() const; + /** * Returns the string representation of the error code */ diff --git a/etcd/SyncClient.hpp b/etcd/SyncClient.hpp index 94f7bab..ccd3fc3 100644 --- a/etcd/SyncClient.hpp +++ b/etcd/SyncClient.hpp @@ -1,9 +1,11 @@ #ifndef __ETCD_SYNC_CLIENT_HPP__ #define __ETCD_SYNC_CLIENT_HPP__ +#include #include #include #include +#include #include #include "etcd/Response.hpp" @@ -711,6 +713,22 @@ namespace etcd std::shared_ptr grpc_channel() const; #endif + /** + * Set a timeout value for grpc operations. + */ + template + void set_grpc_timeout(std::chrono::duration const &timeout) { + grpc_timeout = std::chrono::duration_cast(timeout); + } + + /** + * Get the current timeout value for grpc operations. + */ + template + std::chrono::duration get_grpc_timeout() const { + return std::chrono::duration_cast>(grpc_timeout); + } + private: #if defined(WITH_GRPC_CHANNEL_CLASS) std::shared_ptr channel; @@ -719,6 +737,7 @@ namespace etcd #endif mutable std::unique_ptr token_authenticator; + mutable std::chrono::microseconds grpc_timeout = std::chrono::microseconds::zero(); struct EtcdServerStubs; struct EtcdServerStubsDeleter { diff --git a/etcd/v3/Action.hpp b/etcd/v3/Action.hpp index 168b9a2..aa7df04 100644 --- a/etcd/v3/Action.hpp +++ b/etcd/v3/Action.hpp @@ -19,6 +19,10 @@ using etcdserverpb::Lease; using v3lockpb::Lock; using v3electionpb::Election; +namespace etcd { + class Response; +} + namespace etcdv3 { enum class AtomicityType @@ -42,12 +46,16 @@ namespace etcdv3 std::string value; std::string old_value; std::string auth_token; + std::chrono::microseconds grpc_timeout = std::chrono::microseconds::zero(); KV::Stub* kv_stub; Watch::Stub* watch_stub; Lease::Stub* lease_stub; Lock::Stub* lock_stub; Election::Stub* election_stub; + bool has_grpc_timeout() const; + std::chrono::system_clock::time_point grpc_deadline() const; + void dump(std::ostream &os) const; }; @@ -67,6 +75,8 @@ namespace etcdv3 private: // Init things like auth token, etc. void InitAction(); + + friend class etcd::Response; }; namespace detail { diff --git a/src/KeepAlive.cpp b/src/KeepAlive.cpp index 130fe42..45eddae 100644 --- a/src/KeepAlive.cpp +++ b/src/KeepAlive.cpp @@ -65,6 +65,7 @@ etcd::KeepAlive::KeepAlive(SyncClient const &client, etcdv3::ActionParameters params; params.auth_token.assign(client.current_auth_token()); + // n.b.: keepalive: no need for timeout params.lease_id = this->lease_id; params.lease_stub = stubs->leaseServiceStub.get(); diff --git a/src/Response.cpp b/src/Response.cpp index 66972f3..f485fbf 100644 --- a/src/Response.cpp +++ b/src/Response.cpp @@ -84,6 +84,11 @@ std::string const & etcd::Response::error_message() const return _error_message; } +bool etcd::Response::is_grpc_timeout() const +{ + return _error_code == grpc::StatusCode::DEADLINE_EXCEEDED; +} + int64_t etcd::Response::index() const { return _index; diff --git a/src/SyncClient.cpp b/src/SyncClient.cpp index ebda8a7..c842a51 100644 --- a/src/SyncClient.cpp +++ b/src/SyncClient.cpp @@ -492,6 +492,7 @@ std::shared_ptr etcd::SyncClient::head_internal() { etcdv3::ActionParameters params; params.auth_token.assign(this->token_authenticator->renew_if_expired()); + params.grpc_timeout = this->grpc_timeout; params.kv_stub = stubs->kvServiceStub.get(); return std::make_shared(std::move(params)); } @@ -506,6 +507,7 @@ std::shared_ptr etcd::SyncClient::get_internal(std::st params.key.assign(key); params.withPrefix = false; params.auth_token.assign(this->token_authenticator->renew_if_expired()); + params.grpc_timeout = this->grpc_timeout; params.kv_stub = stubs->kvServiceStub.get(); return std::make_shared(std::move(params)); } @@ -540,6 +542,7 @@ std::shared_ptr etcd::SyncClient::set_internal(std::stri params.value.assign(value); params.lease_id = leaseid; params.auth_token.assign(this->token_authenticator->renew_if_expired()); + params.grpc_timeout = this->grpc_timeout; params.kv_stub = stubs->kvServiceStub.get(); return std::make_shared(std::move(params), false); } @@ -574,6 +577,7 @@ std::shared_ptr etcd::SyncClient::add_internal(std::stri params.value.assign(value); params.lease_id = leaseid; params.auth_token.assign(this->token_authenticator->renew_if_expired()); + params.grpc_timeout = this->grpc_timeout; params.kv_stub = stubs->kvServiceStub.get(); return std::make_shared(std::move(params), true); } @@ -587,6 +591,7 @@ std::shared_ptr etcd::SyncClient::put_internal(std::stri params.key.assign(key); params.value.assign(value); params.auth_token.assign(this->token_authenticator->renew_if_expired()); + params.grpc_timeout = this->grpc_timeout; params.kv_stub = stubs->kvServiceStub.get(); return std::make_shared(std::move(params)); } @@ -621,6 +626,7 @@ std::shared_ptr etcd::SyncClient::modify_internal(std params.value.assign(value); params.lease_id = leaseid; params.auth_token.assign(this->token_authenticator->renew_if_expired()); + params.grpc_timeout = this->grpc_timeout; params.kv_stub = stubs->kvServiceStub.get(); return std::make_shared(std::move(params)); } @@ -681,6 +687,7 @@ std::shared_ptr etcd::SyncClient::modify_if_i params.old_revision = old_index; params.old_value = old_value; params.auth_token.assign(this->token_authenticator->renew_if_expired()); + params.grpc_timeout = this->grpc_timeout; params.kv_stub = stubs->kvServiceStub.get(); return std::make_shared(std::move(params), atomicity_type); } @@ -695,6 +702,7 @@ std::shared_ptr etcd::SyncClient::rm_internal(std::st params.key.assign(key); params.withPrefix = false; params.auth_token.assign(this->token_authenticator->renew_if_expired()); + params.grpc_timeout = this->grpc_timeout; params.kv_stub = stubs->kvServiceStub.get(); return std::make_shared(std::move(params)); } @@ -716,6 +724,7 @@ std::shared_ptr etcd::SyncClient::rm_if_int params.old_revision = old_index; params.old_value = old_value; params.auth_token.assign(this->token_authenticator->renew_if_expired()); + params.grpc_timeout = this->grpc_timeout; params.kv_stub = stubs->kvServiceStub.get(); return std::make_shared(std::move(params), atomicity_type); } @@ -730,6 +739,7 @@ std::shared_ptr etcd::SyncClient::rmdir_internal(std: params.key.assign(key); params.withPrefix = recursive; params.auth_token.assign(this->token_authenticator->renew_if_expired()); + params.grpc_timeout = this->grpc_timeout; params.kv_stub = stubs->kvServiceStub.get(); return std::make_shared(std::move(params)); } @@ -750,6 +760,7 @@ std::shared_ptr etcd::SyncClient::rmdir_internal(std: params.range_end.assign(range_end); params.withPrefix = false; params.auth_token.assign(this->token_authenticator->renew_if_expired()); + params.grpc_timeout = this->grpc_timeout; params.kv_stub = stubs->kvServiceStub.get(); return std::make_shared(std::move(params)); } @@ -770,6 +781,7 @@ std::shared_ptr etcd::SyncClient::ls_internal(std::str params.withPrefix = true; params.limit = limit; params.auth_token.assign(this->token_authenticator->renew_if_expired()); + params.grpc_timeout = this->grpc_timeout; params.kv_stub = stubs->kvServiceStub.get(); return std::make_shared(std::move(params)); } @@ -791,6 +803,7 @@ std::shared_ptr etcd::SyncClient::ls_internal(std::str params.withPrefix = false; params.limit = limit; params.auth_token.assign(this->token_authenticator->renew_if_expired()); + params.grpc_timeout = this->grpc_timeout; params.kv_stub = stubs->kvServiceStub.get(); return std::make_shared(std::move(params)); } @@ -811,6 +824,7 @@ std::shared_ptr etcd::SyncClient::watch_internal(std:: params.withPrefix = recursive; params.revision = fromIndex; params.auth_token.assign(this->token_authenticator->renew_if_expired()); + params.grpc_timeout = this->grpc_timeout; params.watch_stub = stubs->watchServiceStub.get(); return std::make_shared(std::move(params)); } @@ -837,6 +851,7 @@ std::shared_ptr etcd::SyncClient::watch_internal(std:: params.withPrefix = false; params.revision = fromIndex; params.auth_token.assign(this->token_authenticator->renew_if_expired()); + params.grpc_timeout = this->grpc_timeout; params.watch_stub = stubs->watchServiceStub.get(); return std::make_shared(std::move(params)); } @@ -850,6 +865,7 @@ etcd::Response etcd::SyncClient::leasegrant(int ttl) return Response::create([this, ttl]() { etcdv3::ActionParameters params; params.auth_token.assign(this->token_authenticator->renew_if_expired()); + params.grpc_timeout = this->grpc_timeout; params.lease_stub = stubs->leaseServiceStub.get(); params.ttl = ttl; return std::make_shared(std::move(params)); @@ -860,6 +876,7 @@ std::shared_ptr etcd::SyncClient::leasekeepalive(int ttl) { etcdv3::ActionParameters params; params.ttl = ttl; params.auth_token.assign(this->token_authenticator->renew_if_expired()); + params.grpc_timeout = this->grpc_timeout; params.lease_stub = stubs->leaseServiceStub.get(); // keep alive is synchronous in two folds: @@ -881,6 +898,7 @@ std::shared_ptr etcd::SyncClient::leaserevoke_in etcdv3::ActionParameters params; params.lease_id = lease_id; params.auth_token.assign(this->token_authenticator->renew_if_expired()); + params.grpc_timeout = this->grpc_timeout; params.lease_stub = stubs->leaseServiceStub.get(); return std::make_shared(std::move(params)); } @@ -894,6 +912,7 @@ std::shared_ptr etcd::SyncClient::leasetimet etcdv3::ActionParameters params; params.lease_id = lease_id; params.auth_token.assign(this->token_authenticator->renew_if_expired()); + params.grpc_timeout = this->grpc_timeout; params.lease_stub = stubs->leaseServiceStub.get(); return std::make_shared(std::move(params)); } @@ -916,6 +935,7 @@ etcd::Response etcd::SyncClient::lock_internal(std::string const &key, std::shar params.key = key; params.lease_id = keepalive->Lease(); params.auth_token.assign(this->token_authenticator->renew_if_expired()); + params.grpc_timeout = this->grpc_timeout; params.lock_stub = stubs->lockServiceStub.get(); { @@ -948,6 +968,7 @@ std::shared_ptr etcd::SyncClient::lock_with_lease_inter params.key = key; params.lease_id = lease_id; params.auth_token.assign(this->token_authenticator->renew_if_expired()); + params.grpc_timeout = this->grpc_timeout; params.lock_stub = stubs->lockServiceStub.get(); return std::make_shared(std::move(params)); } @@ -960,6 +981,7 @@ std::shared_ptr etcd::SyncClient::unlock_internal(std etcdv3::ActionParameters params; params.key = lock_key; params.auth_token.assign(this->token_authenticator->renew_if_expired()); + params.grpc_timeout = this->grpc_timeout; params.lock_stub = stubs->lockServiceStub.get(); // issue a "unlock" request @@ -1002,6 +1024,7 @@ etcd::Response etcd::SyncClient::txn(etcdv3::Transaction const &txn) { std::shared_ptr etcd::SyncClient::txn_internal(etcdv3::Transaction const &txn) { etcdv3::ActionParameters params; params.auth_token.assign(this->token_authenticator->renew_if_expired()); + params.grpc_timeout = this->grpc_timeout; params.kv_stub = stubs->kvServiceStub.get(); return std::make_shared(std::move(params), txn); } @@ -1018,6 +1041,7 @@ std::shared_ptr etcd::SyncClient::campaign_internal params.lease_id = lease_id; params.value = value; params.auth_token.assign(this->token_authenticator->renew_if_expired()); + params.grpc_timeout = this->grpc_timeout; params.election_stub = stubs->electionServiceStub.get(); return std::make_shared(std::move(params)); } @@ -1038,6 +1062,7 @@ std::shared_ptr etcd::SyncClient::proclaim_internal params.revision = revision; params.value = value; params.auth_token.assign(this->token_authenticator->renew_if_expired()); + params.grpc_timeout = this->grpc_timeout; params.election_stub = stubs->electionServiceStub.get(); return std::make_shared(std::move(params)); } @@ -1050,6 +1075,7 @@ std::shared_ptr etcd::SyncClient::leader_internal(std etcdv3::ActionParameters params; params.name = name; params.auth_token.assign(this->token_authenticator->renew_if_expired()); + params.grpc_timeout = this->grpc_timeout; params.election_stub = stubs->electionServiceStub.get(); return std::make_shared(std::move(params)); } @@ -1059,6 +1085,7 @@ std::unique_ptr etcd::SyncClient::observe( etcdv3::ActionParameters params; params.name.assign(name); params.auth_token.assign(this->token_authenticator->renew_if_expired()); + params.grpc_timeout = this->grpc_timeout; params.election_stub = stubs->electionServiceStub.get(); std::unique_ptr observer(new Observer()); observer->action = std::make_shared(std::move(params)); @@ -1078,6 +1105,7 @@ std::shared_ptr etcd::SyncClient::resign_internal(std params.key = key; params.revision = revision; params.auth_token.assign(this->token_authenticator->renew_if_expired()); + params.grpc_timeout = this->grpc_timeout; params.election_stub = stubs->electionServiceStub.get(); return std::make_shared(std::move(params)); } diff --git a/src/Watcher.cpp b/src/Watcher.cpp index eba8523..9d9f799 100644 --- a/src/Watcher.cpp +++ b/src/Watcher.cpp @@ -136,6 +136,7 @@ void etcd::Watcher::doWatch(std::string const & key, { etcdv3::ActionParameters params; params.auth_token.assign(auth_token); + // n.b.: watch: no need for timeout params.key.assign(key); params.range_end.assign(range_end); if (fromIndex >= 0) { diff --git a/src/v3/Action.cpp b/src/v3/Action.cpp index 1755a9f..dfe7e84 100644 --- a/src/v3/Action.cpp +++ b/src/v3/Action.cpp @@ -1,4 +1,5 @@ #include +#include #include "etcd/v3/action_constants.hpp" #include "etcd/v3/Action.hpp" @@ -36,6 +37,14 @@ etcdv3::ActionParameters::ActionParameters() lease_stub = NULL; } +bool etcdv3::ActionParameters::has_grpc_timeout() const { + return this->grpc_timeout != std::chrono::microseconds::zero(); +} + +std::chrono::system_clock::time_point etcdv3::ActionParameters::grpc_deadline() const { + return std::chrono::system_clock::now() + this->grpc_timeout; +} + void etcdv3::ActionParameters::dump(std::ostream &os) const { os << "ActionParameters:" << std::endl; os << " withPrefix: " << withPrefix << std::endl; @@ -50,6 +59,7 @@ void etcdv3::ActionParameters::dump(std::ostream &os) const { os << " value: " << value << std::endl; os << " old_value: " << old_value << std::endl; os << " auth_token: " << auth_token << std::endl; + os << " grpc_timeout: " << grpc_timeout.count() << "(ms)" << std::endl; } void etcdv3::Action::waitForResponse() @@ -57,8 +67,24 @@ void etcdv3::Action::waitForResponse() void* got_tag; bool ok = false; - cq_.Next(&got_tag, &ok); - GPR_ASSERT(got_tag == (void*)this); + if (parameters.has_grpc_timeout()) { + switch (cq_.AsyncNext(&got_tag, &ok, parameters.grpc_deadline())) { + case CompletionQueue::NextStatus::TIMEOUT: { + status = grpc::Status(grpc::StatusCode::DEADLINE_EXCEEDED, "gRPC timeout"); + break; + } + case CompletionQueue::NextStatus::SHUTDOWN: { + status = grpc::Status(grpc::StatusCode::UNAVAILABLE, "gRPC already shutdown"); + break; + } + case CompletionQueue::NextStatus::GOT_EVENT: { + break; + } + } + } else { + cq_.Next(&got_tag, &ok); + GPR_ASSERT(got_tag == (void*)this); + } } const std::chrono::high_resolution_clock::time_point etcdv3::Action::startTimepoint() { diff --git a/tst/LockTest.cpp b/tst/LockTest.cpp index f0a0fcf..2ed918a 100644 --- a/tst/LockTest.cpp +++ b/tst/LockTest.cpp @@ -83,6 +83,35 @@ TEST_CASE("double lock will fail") REQUIRE(0 == resp5.error_code()); } +TEST_CASE("lock could be timeout") +{ + etcd::Client etcd("http://127.0.0.1:2379"); + + // setup the timeout + etcd.set_grpc_timeout(std::chrono::seconds(5)); + + // lock + etcd::Response resp1 = etcd.lock("/test/abcd").get(); + CHECK("lock" == resp1.action()); + REQUIRE(resp1.is_ok()); + REQUIRE(0 == resp1.error_code()); + + auto lock_in_another_thread = std::thread([&](){ + // lock again + etcd::Response resp2 = etcd.lock("/test/abcd").get(); + CHECK("lock" == resp2.action()); + REQUIRE(resp2.is_grpc_timeout()); + }); + + lock_in_another_thread.join(); + + // cleanup: unlock the second lock + etcd::Response resp5 = etcd.unlock(resp1.lock_key()).get(); + CHECK("unlock" == resp5.action()); + REQUIRE(resp5.is_ok()); + REQUIRE(0 == resp5.error_code()); +} + TEST_CASE("lock using lease") { etcd::Client etcd("http://127.0.0.1:2379");