Implements the timeout feature to the etcd client.

Signed-off-by: Tao He <sighingnow@gmail.com>
This commit is contained in:
Tao He 2022-05-22 22:45:50 +08:00
parent 8da8946409
commit e5f1167c69
11 changed files with 162 additions and 3 deletions

View File

@ -300,7 +300,6 @@ And pass a `target_name_override` arguments to `WithSSL`,
etcd::Client *etcd = etcd::Client::WithSSL(
"https://127.0.0.1:2379,https://127.0.0.2:2479",
"example.rootca.cert", "example.cert", "example.key", "etcd");
```
For more discussion about this feature, see also [#87](https://github.com/etcd-cpp-apiv3/etcd-cpp-apiv3/issues/87),
@ -336,6 +335,26 @@ which can be used for fine-grained control the gRPC settings, e.g.,
For more motivation and discussion about the above design, please refer to [issue-103](https://github.com/etcd-cpp-apiv3/etcd-cpp-apiv3/issues/103).
### gRPC timeout when waiting for responses
gRPC Timeout is long-standing missing pieces in the etcd-cpp-apiv3 library. The timeout has been
supported via a `set_grpc_timeout` interfaces on the client,
```cpp
template <typename Rep = std::micro>
void set_grpc_timeout(std::chrono::duration<Rep> const &timeout)
```
Any `std::chrono::duration` value can be used to set the grpc timeout, e.g.,
```cpp
etcd.set_grpc_timeout(std::chrono::seconds(5));
```
Note that the timeout value is the "timeout" when waiting for responses upon the gRPC channel, i.e., `CompletionQueue::AsyncNext`.
It doesn't means the timeout between issuing a `.set()` method getting the `etcd::Response`, as in the async mode the such a time
duration is unpredictable and the gRPC timeout should be enough to avoid deadly waiting (e.g., waiting for a `lock()`).
### Reading a value
You can read a value with the `get()` method of the client instance. The only parameter is the

View File

@ -614,6 +614,22 @@ namespace etcd
std::shared_ptr<grpc_impl::Channel> grpc_channel() const;
#endif
/**
* Set a timeout value for grpc operations.
*/
template <typename Rep = std::micro>
void set_grpc_timeout(std::chrono::duration<Rep> const &timeout) {
this->client->set_grpc_timeout(timeout);
}
/**
* Get the current timeout value for grpc operations.
*/
template <typename Rep = std::micro>
std::chrono::duration<Rep> get_grpc_timeout() const {
return this->client->get_grpc_timeout();
}
/**
* Obtain the underlying synchronous client.
*/

View File

@ -107,6 +107,11 @@ namespace etcd
*/
int error_code() const;
/**
* Check whether the response contains a grpc TIMEOUT error.
*/
bool is_grpc_timeout() const;
/**
* Returns the string representation of the error code
*/

View File

@ -1,9 +1,11 @@
#ifndef __ETCD_SYNC_CLIENT_HPP__
#define __ETCD_SYNC_CLIENT_HPP__
#include <chrono>
#include <map>
#include <memory>
#include <mutex>
#include <ratio>
#include <string>
#include "etcd/Response.hpp"
@ -711,6 +713,22 @@ namespace etcd
std::shared_ptr<grpc_impl::Channel> grpc_channel() const;
#endif
/**
* Set a timeout value for grpc operations.
*/
template <typename Rep = std::micro>
void set_grpc_timeout(std::chrono::duration<Rep> const &timeout) {
grpc_timeout = std::chrono::duration_cast<std::chrono::milliseconds>(timeout);
}
/**
* Get the current timeout value for grpc operations.
*/
template <typename Rep = std::micro>
std::chrono::duration<Rep> get_grpc_timeout() const {
return std::chrono::duration_cast<std::chrono::duration<Rep>>(grpc_timeout);
}
private:
#if defined(WITH_GRPC_CHANNEL_CLASS)
std::shared_ptr<grpc::Channel> channel;
@ -719,6 +737,7 @@ namespace etcd
#endif
mutable std::unique_ptr<TokenAuthenticator, TokenAuthenticatorDeleter> token_authenticator;
mutable std::chrono::microseconds grpc_timeout = std::chrono::microseconds::zero();
struct EtcdServerStubs;
struct EtcdServerStubsDeleter {

View File

@ -19,6 +19,10 @@ using etcdserverpb::Lease;
using v3lockpb::Lock;
using v3electionpb::Election;
namespace etcd {
class Response;
}
namespace etcdv3
{
enum class AtomicityType
@ -42,12 +46,16 @@ namespace etcdv3
std::string value;
std::string old_value;
std::string auth_token;
std::chrono::microseconds grpc_timeout = std::chrono::microseconds::zero();
KV::Stub* kv_stub;
Watch::Stub* watch_stub;
Lease::Stub* lease_stub;
Lock::Stub* lock_stub;
Election::Stub* election_stub;
bool has_grpc_timeout() const;
std::chrono::system_clock::time_point grpc_deadline() const;
void dump(std::ostream &os) const;
};
@ -67,6 +75,8 @@ namespace etcdv3
private:
// Init things like auth token, etc.
void InitAction();
friend class etcd::Response;
};
namespace detail {

View File

@ -65,6 +65,7 @@ etcd::KeepAlive::KeepAlive(SyncClient const &client,
etcdv3::ActionParameters params;
params.auth_token.assign(client.current_auth_token());
// n.b.: keepalive: no need for timeout
params.lease_id = this->lease_id;
params.lease_stub = stubs->leaseServiceStub.get();

View File

@ -84,6 +84,11 @@ std::string const & etcd::Response::error_message() const
return _error_message;
}
bool etcd::Response::is_grpc_timeout() const
{
return _error_code == grpc::StatusCode::DEADLINE_EXCEEDED;
}
int64_t etcd::Response::index() const
{
return _index;

View File

@ -492,6 +492,7 @@ std::shared_ptr<etcdv3::AsyncHeadAction> etcd::SyncClient::head_internal()
{
etcdv3::ActionParameters params;
params.auth_token.assign(this->token_authenticator->renew_if_expired());
params.grpc_timeout = this->grpc_timeout;
params.kv_stub = stubs->kvServiceStub.get();
return std::make_shared<etcdv3::AsyncHeadAction>(std::move(params));
}
@ -506,6 +507,7 @@ std::shared_ptr<etcdv3::AsyncRangeAction> etcd::SyncClient::get_internal(std::st
params.key.assign(key);
params.withPrefix = false;
params.auth_token.assign(this->token_authenticator->renew_if_expired());
params.grpc_timeout = this->grpc_timeout;
params.kv_stub = stubs->kvServiceStub.get();
return std::make_shared<etcdv3::AsyncRangeAction>(std::move(params));
}
@ -540,6 +542,7 @@ std::shared_ptr<etcdv3::AsyncSetAction> etcd::SyncClient::set_internal(std::stri
params.value.assign(value);
params.lease_id = leaseid;
params.auth_token.assign(this->token_authenticator->renew_if_expired());
params.grpc_timeout = this->grpc_timeout;
params.kv_stub = stubs->kvServiceStub.get();
return std::make_shared<etcdv3::AsyncSetAction>(std::move(params), false);
}
@ -574,6 +577,7 @@ std::shared_ptr<etcdv3::AsyncSetAction> etcd::SyncClient::add_internal(std::stri
params.value.assign(value);
params.lease_id = leaseid;
params.auth_token.assign(this->token_authenticator->renew_if_expired());
params.grpc_timeout = this->grpc_timeout;
params.kv_stub = stubs->kvServiceStub.get();
return std::make_shared<etcdv3::AsyncSetAction>(std::move(params), true);
}
@ -587,6 +591,7 @@ std::shared_ptr<etcdv3::AsyncPutAction> etcd::SyncClient::put_internal(std::stri
params.key.assign(key);
params.value.assign(value);
params.auth_token.assign(this->token_authenticator->renew_if_expired());
params.grpc_timeout = this->grpc_timeout;
params.kv_stub = stubs->kvServiceStub.get();
return std::make_shared<etcdv3::AsyncPutAction>(std::move(params));
}
@ -621,6 +626,7 @@ std::shared_ptr<etcdv3::AsyncUpdateAction> etcd::SyncClient::modify_internal(std
params.value.assign(value);
params.lease_id = leaseid;
params.auth_token.assign(this->token_authenticator->renew_if_expired());
params.grpc_timeout = this->grpc_timeout;
params.kv_stub = stubs->kvServiceStub.get();
return std::make_shared<etcdv3::AsyncUpdateAction>(std::move(params));
}
@ -681,6 +687,7 @@ std::shared_ptr<etcdv3::AsyncCompareAndSwapAction> etcd::SyncClient::modify_if_i
params.old_revision = old_index;
params.old_value = old_value;
params.auth_token.assign(this->token_authenticator->renew_if_expired());
params.grpc_timeout = this->grpc_timeout;
params.kv_stub = stubs->kvServiceStub.get();
return std::make_shared<etcdv3::AsyncCompareAndSwapAction>(std::move(params), atomicity_type);
}
@ -695,6 +702,7 @@ std::shared_ptr<etcdv3::AsyncDeleteAction> etcd::SyncClient::rm_internal(std::st
params.key.assign(key);
params.withPrefix = false;
params.auth_token.assign(this->token_authenticator->renew_if_expired());
params.grpc_timeout = this->grpc_timeout;
params.kv_stub = stubs->kvServiceStub.get();
return std::make_shared<etcdv3::AsyncDeleteAction>(std::move(params));
}
@ -716,6 +724,7 @@ std::shared_ptr<etcdv3::AsyncCompareAndDeleteAction> etcd::SyncClient::rm_if_int
params.old_revision = old_index;
params.old_value = old_value;
params.auth_token.assign(this->token_authenticator->renew_if_expired());
params.grpc_timeout = this->grpc_timeout;
params.kv_stub = stubs->kvServiceStub.get();
return std::make_shared<etcdv3::AsyncCompareAndDeleteAction>(std::move(params), atomicity_type);
}
@ -730,6 +739,7 @@ std::shared_ptr<etcdv3::AsyncDeleteAction> etcd::SyncClient::rmdir_internal(std:
params.key.assign(key);
params.withPrefix = recursive;
params.auth_token.assign(this->token_authenticator->renew_if_expired());
params.grpc_timeout = this->grpc_timeout;
params.kv_stub = stubs->kvServiceStub.get();
return std::make_shared<etcdv3::AsyncDeleteAction>(std::move(params));
}
@ -750,6 +760,7 @@ std::shared_ptr<etcdv3::AsyncDeleteAction> etcd::SyncClient::rmdir_internal(std:
params.range_end.assign(range_end);
params.withPrefix = false;
params.auth_token.assign(this->token_authenticator->renew_if_expired());
params.grpc_timeout = this->grpc_timeout;
params.kv_stub = stubs->kvServiceStub.get();
return std::make_shared<etcdv3::AsyncDeleteAction>(std::move(params));
}
@ -770,6 +781,7 @@ std::shared_ptr<etcdv3::AsyncRangeAction> etcd::SyncClient::ls_internal(std::str
params.withPrefix = true;
params.limit = limit;
params.auth_token.assign(this->token_authenticator->renew_if_expired());
params.grpc_timeout = this->grpc_timeout;
params.kv_stub = stubs->kvServiceStub.get();
return std::make_shared<etcdv3::AsyncRangeAction>(std::move(params));
}
@ -791,6 +803,7 @@ std::shared_ptr<etcdv3::AsyncRangeAction> etcd::SyncClient::ls_internal(std::str
params.withPrefix = false;
params.limit = limit;
params.auth_token.assign(this->token_authenticator->renew_if_expired());
params.grpc_timeout = this->grpc_timeout;
params.kv_stub = stubs->kvServiceStub.get();
return std::make_shared<etcdv3::AsyncRangeAction>(std::move(params));
}
@ -811,6 +824,7 @@ std::shared_ptr<etcdv3::AsyncWatchAction> etcd::SyncClient::watch_internal(std::
params.withPrefix = recursive;
params.revision = fromIndex;
params.auth_token.assign(this->token_authenticator->renew_if_expired());
params.grpc_timeout = this->grpc_timeout;
params.watch_stub = stubs->watchServiceStub.get();
return std::make_shared<etcdv3::AsyncWatchAction>(std::move(params));
}
@ -837,6 +851,7 @@ std::shared_ptr<etcdv3::AsyncWatchAction> etcd::SyncClient::watch_internal(std::
params.withPrefix = false;
params.revision = fromIndex;
params.auth_token.assign(this->token_authenticator->renew_if_expired());
params.grpc_timeout = this->grpc_timeout;
params.watch_stub = stubs->watchServiceStub.get();
return std::make_shared<etcdv3::AsyncWatchAction>(std::move(params));
}
@ -850,6 +865,7 @@ etcd::Response etcd::SyncClient::leasegrant(int ttl)
return Response::create<etcdv3::AsyncLeaseGrantAction>([this, ttl]() {
etcdv3::ActionParameters params;
params.auth_token.assign(this->token_authenticator->renew_if_expired());
params.grpc_timeout = this->grpc_timeout;
params.lease_stub = stubs->leaseServiceStub.get();
params.ttl = ttl;
return std::make_shared<etcdv3::AsyncLeaseGrantAction>(std::move(params));
@ -860,6 +876,7 @@ std::shared_ptr<etcd::KeepAlive> etcd::SyncClient::leasekeepalive(int ttl) {
etcdv3::ActionParameters params;
params.ttl = ttl;
params.auth_token.assign(this->token_authenticator->renew_if_expired());
params.grpc_timeout = this->grpc_timeout;
params.lease_stub = stubs->leaseServiceStub.get();
// keep alive is synchronous in two folds:
@ -881,6 +898,7 @@ std::shared_ptr<etcdv3::AsyncLeaseRevokeAction> etcd::SyncClient::leaserevoke_in
etcdv3::ActionParameters params;
params.lease_id = lease_id;
params.auth_token.assign(this->token_authenticator->renew_if_expired());
params.grpc_timeout = this->grpc_timeout;
params.lease_stub = stubs->leaseServiceStub.get();
return std::make_shared<etcdv3::AsyncLeaseRevokeAction>(std::move(params));
}
@ -894,6 +912,7 @@ std::shared_ptr<etcdv3::AsyncLeaseTimeToLiveAction> etcd::SyncClient::leasetimet
etcdv3::ActionParameters params;
params.lease_id = lease_id;
params.auth_token.assign(this->token_authenticator->renew_if_expired());
params.grpc_timeout = this->grpc_timeout;
params.lease_stub = stubs->leaseServiceStub.get();
return std::make_shared<etcdv3::AsyncLeaseTimeToLiveAction>(std::move(params));
}
@ -916,6 +935,7 @@ etcd::Response etcd::SyncClient::lock_internal(std::string const &key, std::shar
params.key = key;
params.lease_id = keepalive->Lease();
params.auth_token.assign(this->token_authenticator->renew_if_expired());
params.grpc_timeout = this->grpc_timeout;
params.lock_stub = stubs->lockServiceStub.get();
{
@ -948,6 +968,7 @@ std::shared_ptr<etcdv3::AsyncLockAction> etcd::SyncClient::lock_with_lease_inter
params.key = key;
params.lease_id = lease_id;
params.auth_token.assign(this->token_authenticator->renew_if_expired());
params.grpc_timeout = this->grpc_timeout;
params.lock_stub = stubs->lockServiceStub.get();
return std::make_shared<etcdv3::AsyncLockAction>(std::move(params));
}
@ -960,6 +981,7 @@ std::shared_ptr<etcdv3::AsyncUnlockAction> etcd::SyncClient::unlock_internal(std
etcdv3::ActionParameters params;
params.key = lock_key;
params.auth_token.assign(this->token_authenticator->renew_if_expired());
params.grpc_timeout = this->grpc_timeout;
params.lock_stub = stubs->lockServiceStub.get();
// issue a "unlock" request
@ -1002,6 +1024,7 @@ etcd::Response etcd::SyncClient::txn(etcdv3::Transaction const &txn) {
std::shared_ptr<etcdv3::AsyncTxnAction> etcd::SyncClient::txn_internal(etcdv3::Transaction const &txn) {
etcdv3::ActionParameters params;
params.auth_token.assign(this->token_authenticator->renew_if_expired());
params.grpc_timeout = this->grpc_timeout;
params.kv_stub = stubs->kvServiceStub.get();
return std::make_shared<etcdv3::AsyncTxnAction>(std::move(params), txn);
}
@ -1018,6 +1041,7 @@ std::shared_ptr<etcdv3::AsyncCampaignAction> etcd::SyncClient::campaign_internal
params.lease_id = lease_id;
params.value = value;
params.auth_token.assign(this->token_authenticator->renew_if_expired());
params.grpc_timeout = this->grpc_timeout;
params.election_stub = stubs->electionServiceStub.get();
return std::make_shared<etcdv3::AsyncCampaignAction>(std::move(params));
}
@ -1038,6 +1062,7 @@ std::shared_ptr<etcdv3::AsyncProclaimAction> etcd::SyncClient::proclaim_internal
params.revision = revision;
params.value = value;
params.auth_token.assign(this->token_authenticator->renew_if_expired());
params.grpc_timeout = this->grpc_timeout;
params.election_stub = stubs->electionServiceStub.get();
return std::make_shared<etcdv3::AsyncProclaimAction>(std::move(params));
}
@ -1050,6 +1075,7 @@ std::shared_ptr<etcdv3::AsyncLeaderAction> etcd::SyncClient::leader_internal(std
etcdv3::ActionParameters params;
params.name = name;
params.auth_token.assign(this->token_authenticator->renew_if_expired());
params.grpc_timeout = this->grpc_timeout;
params.election_stub = stubs->electionServiceStub.get();
return std::make_shared<etcdv3::AsyncLeaderAction>(std::move(params));
}
@ -1059,6 +1085,7 @@ std::unique_ptr<etcd::SyncClient::Observer> etcd::SyncClient::observe(
etcdv3::ActionParameters params;
params.name.assign(name);
params.auth_token.assign(this->token_authenticator->renew_if_expired());
params.grpc_timeout = this->grpc_timeout;
params.election_stub = stubs->electionServiceStub.get();
std::unique_ptr<etcd::SyncClient::Observer> observer(new Observer());
observer->action = std::make_shared<etcdv3::AsyncObserveAction>(std::move(params));
@ -1078,6 +1105,7 @@ std::shared_ptr<etcdv3::AsyncResignAction> etcd::SyncClient::resign_internal(std
params.key = key;
params.revision = revision;
params.auth_token.assign(this->token_authenticator->renew_if_expired());
params.grpc_timeout = this->grpc_timeout;
params.election_stub = stubs->electionServiceStub.get();
return std::make_shared<etcdv3::AsyncResignAction>(std::move(params));
}

View File

@ -136,6 +136,7 @@ void etcd::Watcher::doWatch(std::string const & key,
{
etcdv3::ActionParameters params;
params.auth_token.assign(auth_token);
// n.b.: watch: no need for timeout
params.key.assign(key);
params.range_end.assign(range_end);
if (fromIndex >= 0) {

View File

@ -1,4 +1,5 @@
#include <grpc/support/log.h>
#include <grpcpp/support/status.h>
#include "etcd/v3/action_constants.hpp"
#include "etcd/v3/Action.hpp"
@ -36,6 +37,14 @@ etcdv3::ActionParameters::ActionParameters()
lease_stub = NULL;
}
bool etcdv3::ActionParameters::has_grpc_timeout() const {
return this->grpc_timeout != std::chrono::microseconds::zero();
}
std::chrono::system_clock::time_point etcdv3::ActionParameters::grpc_deadline() const {
return std::chrono::system_clock::now() + this->grpc_timeout;
}
void etcdv3::ActionParameters::dump(std::ostream &os) const {
os << "ActionParameters:" << std::endl;
os << " withPrefix: " << withPrefix << std::endl;
@ -50,6 +59,7 @@ void etcdv3::ActionParameters::dump(std::ostream &os) const {
os << " value: " << value << std::endl;
os << " old_value: " << old_value << std::endl;
os << " auth_token: " << auth_token << std::endl;
os << " grpc_timeout: " << grpc_timeout.count() << "(ms)" << std::endl;
}
void etcdv3::Action::waitForResponse()
@ -57,9 +67,25 @@ void etcdv3::Action::waitForResponse()
void* got_tag;
bool ok = false;
if (parameters.has_grpc_timeout()) {
switch (cq_.AsyncNext(&got_tag, &ok, parameters.grpc_deadline())) {
case CompletionQueue::NextStatus::TIMEOUT: {
status = grpc::Status(grpc::StatusCode::DEADLINE_EXCEEDED, "gRPC timeout");
break;
}
case CompletionQueue::NextStatus::SHUTDOWN: {
status = grpc::Status(grpc::StatusCode::UNAVAILABLE, "gRPC already shutdown");
break;
}
case CompletionQueue::NextStatus::GOT_EVENT: {
break;
}
}
} else {
cq_.Next(&got_tag, &ok);
GPR_ASSERT(got_tag == (void*)this);
}
}
const std::chrono::high_resolution_clock::time_point etcdv3::Action::startTimepoint() {
return this->start_timepoint;

View File

@ -83,6 +83,35 @@ TEST_CASE("double lock will fail")
REQUIRE(0 == resp5.error_code());
}
TEST_CASE("lock could be timeout")
{
etcd::Client etcd("http://127.0.0.1:2379");
// setup the timeout
etcd.set_grpc_timeout(std::chrono::seconds(5));
// lock
etcd::Response resp1 = etcd.lock("/test/abcd").get();
CHECK("lock" == resp1.action());
REQUIRE(resp1.is_ok());
REQUIRE(0 == resp1.error_code());
auto lock_in_another_thread = std::thread([&](){
// lock again
etcd::Response resp2 = etcd.lock("/test/abcd").get();
CHECK("lock" == resp2.action());
REQUIRE(resp2.is_grpc_timeout());
});
lock_in_another_thread.join();
// cleanup: unlock the second lock
etcd::Response resp5 = etcd.unlock(resp1.lock_key()).get();
CHECK("unlock" == resp5.action());
REQUIRE(resp5.is_ok());
REQUIRE(0 == resp5.error_code());
}
TEST_CASE("lock using lease")
{
etcd::Client etcd("http://127.0.0.1:2379");