Implements the timeout feature to the etcd client.
Signed-off-by: Tao He <sighingnow@gmail.com>
This commit is contained in:
parent
8da8946409
commit
3cbfb51269
21
README.md
21
README.md
|
|
@ -300,7 +300,6 @@ And pass a `target_name_override` arguments to `WithSSL`,
|
||||||
etcd::Client *etcd = etcd::Client::WithSSL(
|
etcd::Client *etcd = etcd::Client::WithSSL(
|
||||||
"https://127.0.0.1:2379,https://127.0.0.2:2479",
|
"https://127.0.0.1:2379,https://127.0.0.2:2479",
|
||||||
"example.rootca.cert", "example.cert", "example.key", "etcd");
|
"example.rootca.cert", "example.cert", "example.key", "etcd");
|
||||||
|
|
||||||
```
|
```
|
||||||
|
|
||||||
For more discussion about this feature, see also [#87](https://github.com/etcd-cpp-apiv3/etcd-cpp-apiv3/issues/87),
|
For more discussion about this feature, see also [#87](https://github.com/etcd-cpp-apiv3/etcd-cpp-apiv3/issues/87),
|
||||||
|
|
@ -336,6 +335,26 @@ which can be used for fine-grained control the gRPC settings, e.g.,
|
||||||
|
|
||||||
For more motivation and discussion about the above design, please refer to [issue-103](https://github.com/etcd-cpp-apiv3/etcd-cpp-apiv3/issues/103).
|
For more motivation and discussion about the above design, please refer to [issue-103](https://github.com/etcd-cpp-apiv3/etcd-cpp-apiv3/issues/103).
|
||||||
|
|
||||||
|
### gRPC timeout when waiting for responses
|
||||||
|
|
||||||
|
gRPC Timeout is long-standing missing pieces in the etcd-cpp-apiv3 library. The timeout has been
|
||||||
|
supported via a `set_grpc_timeout` interfaces on the client,
|
||||||
|
|
||||||
|
```cpp
|
||||||
|
template <typename Rep = std::micro>
|
||||||
|
void set_grpc_timeout(std::chrono::duration<Rep> const &timeout)
|
||||||
|
```
|
||||||
|
|
||||||
|
Any `std::chrono::duration` value can be used to set the grpc timeout, e.g.,
|
||||||
|
|
||||||
|
```cpp
|
||||||
|
etcd.set_grpc_timeout(std::chrono::seconds(5));
|
||||||
|
```
|
||||||
|
|
||||||
|
Note that the timeout value is the "timeout" when waiting for responses upon the gRPC channel, i.e., `CompletionQueue::AsyncNext`.
|
||||||
|
It doesn't means the timeout between issuing a `.set()` method getting the `etcd::Response`, as in the async mode the such a time
|
||||||
|
duration is unpredictable and the gRPC timeout should be enough to avoid deadly waiting (e.g., waiting for a `lock()`).
|
||||||
|
|
||||||
### Reading a value
|
### Reading a value
|
||||||
|
|
||||||
You can read a value with the `get()` method of the client instance. The only parameter is the
|
You can read a value with the `get()` method of the client instance. The only parameter is the
|
||||||
|
|
|
||||||
|
|
@ -614,6 +614,22 @@ namespace etcd
|
||||||
std::shared_ptr<grpc_impl::Channel> grpc_channel() const;
|
std::shared_ptr<grpc_impl::Channel> grpc_channel() const;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set a timeout value for grpc operations.
|
||||||
|
*/
|
||||||
|
template <typename Rep = std::micro>
|
||||||
|
void set_grpc_timeout(std::chrono::duration<Rep> const &timeout) {
|
||||||
|
this->client->set_grpc_timeout(timeout);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the current timeout value for grpc operations.
|
||||||
|
*/
|
||||||
|
template <typename Rep = std::micro>
|
||||||
|
std::chrono::duration<Rep> get_grpc_timeout() const {
|
||||||
|
return this->client->get_grpc_timeout();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Obtain the underlying synchronous client.
|
* Obtain the underlying synchronous client.
|
||||||
*/
|
*/
|
||||||
|
|
|
||||||
|
|
@ -107,6 +107,11 @@ namespace etcd
|
||||||
*/
|
*/
|
||||||
int error_code() const;
|
int error_code() const;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check whether the response contains a grpc TIMEOUT error.
|
||||||
|
*/
|
||||||
|
bool is_grpc_timeout() const;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the string representation of the error code
|
* Returns the string representation of the error code
|
||||||
*/
|
*/
|
||||||
|
|
|
||||||
|
|
@ -1,9 +1,11 @@
|
||||||
#ifndef __ETCD_SYNC_CLIENT_HPP__
|
#ifndef __ETCD_SYNC_CLIENT_HPP__
|
||||||
#define __ETCD_SYNC_CLIENT_HPP__
|
#define __ETCD_SYNC_CLIENT_HPP__
|
||||||
|
|
||||||
|
#include <chrono>
|
||||||
#include <map>
|
#include <map>
|
||||||
#include <memory>
|
#include <memory>
|
||||||
#include <mutex>
|
#include <mutex>
|
||||||
|
#include <ratio>
|
||||||
#include <string>
|
#include <string>
|
||||||
|
|
||||||
#include "etcd/Response.hpp"
|
#include "etcd/Response.hpp"
|
||||||
|
|
@ -711,6 +713,22 @@ namespace etcd
|
||||||
std::shared_ptr<grpc_impl::Channel> grpc_channel() const;
|
std::shared_ptr<grpc_impl::Channel> grpc_channel() const;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set a timeout value for grpc operations.
|
||||||
|
*/
|
||||||
|
template <typename Rep = std::micro>
|
||||||
|
void set_grpc_timeout(std::chrono::duration<Rep> const &timeout) {
|
||||||
|
grpc_timeout = std::chrono::duration_cast<std::chrono::milliseconds>(timeout);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the current timeout value for grpc operations.
|
||||||
|
*/
|
||||||
|
template <typename Rep = std::micro>
|
||||||
|
std::chrono::duration<Rep> get_grpc_timeout() const {
|
||||||
|
return std::chrono::duration_cast<std::chrono::duration<Rep>>(grpc_timeout);
|
||||||
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
#if defined(WITH_GRPC_CHANNEL_CLASS)
|
#if defined(WITH_GRPC_CHANNEL_CLASS)
|
||||||
std::shared_ptr<grpc::Channel> channel;
|
std::shared_ptr<grpc::Channel> channel;
|
||||||
|
|
@ -719,6 +737,7 @@ namespace etcd
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
mutable std::unique_ptr<TokenAuthenticator, TokenAuthenticatorDeleter> token_authenticator;
|
mutable std::unique_ptr<TokenAuthenticator, TokenAuthenticatorDeleter> token_authenticator;
|
||||||
|
mutable std::chrono::microseconds grpc_timeout = std::chrono::microseconds::zero();
|
||||||
|
|
||||||
struct EtcdServerStubs;
|
struct EtcdServerStubs;
|
||||||
struct EtcdServerStubsDeleter {
|
struct EtcdServerStubsDeleter {
|
||||||
|
|
|
||||||
|
|
@ -19,6 +19,10 @@ using etcdserverpb::Lease;
|
||||||
using v3lockpb::Lock;
|
using v3lockpb::Lock;
|
||||||
using v3electionpb::Election;
|
using v3electionpb::Election;
|
||||||
|
|
||||||
|
namespace etcd {
|
||||||
|
class Response;
|
||||||
|
}
|
||||||
|
|
||||||
namespace etcdv3
|
namespace etcdv3
|
||||||
{
|
{
|
||||||
enum class AtomicityType
|
enum class AtomicityType
|
||||||
|
|
@ -42,12 +46,16 @@ namespace etcdv3
|
||||||
std::string value;
|
std::string value;
|
||||||
std::string old_value;
|
std::string old_value;
|
||||||
std::string auth_token;
|
std::string auth_token;
|
||||||
|
std::chrono::microseconds grpc_timeout = std::chrono::microseconds::zero();
|
||||||
KV::Stub* kv_stub;
|
KV::Stub* kv_stub;
|
||||||
Watch::Stub* watch_stub;
|
Watch::Stub* watch_stub;
|
||||||
Lease::Stub* lease_stub;
|
Lease::Stub* lease_stub;
|
||||||
Lock::Stub* lock_stub;
|
Lock::Stub* lock_stub;
|
||||||
Election::Stub* election_stub;
|
Election::Stub* election_stub;
|
||||||
|
|
||||||
|
bool has_grpc_timeout() const;
|
||||||
|
std::chrono::system_clock::time_point grpc_deadline() const;
|
||||||
|
|
||||||
void dump(std::ostream &os) const;
|
void dump(std::ostream &os) const;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
@ -67,6 +75,8 @@ namespace etcdv3
|
||||||
private:
|
private:
|
||||||
// Init things like auth token, etc.
|
// Init things like auth token, etc.
|
||||||
void InitAction();
|
void InitAction();
|
||||||
|
|
||||||
|
friend class etcd::Response;
|
||||||
};
|
};
|
||||||
|
|
||||||
namespace detail {
|
namespace detail {
|
||||||
|
|
|
||||||
|
|
@ -65,6 +65,7 @@ etcd::KeepAlive::KeepAlive(SyncClient const &client,
|
||||||
|
|
||||||
etcdv3::ActionParameters params;
|
etcdv3::ActionParameters params;
|
||||||
params.auth_token.assign(client.current_auth_token());
|
params.auth_token.assign(client.current_auth_token());
|
||||||
|
// n.b.: keepalive: no need for timeout
|
||||||
params.lease_id = this->lease_id;
|
params.lease_id = this->lease_id;
|
||||||
params.lease_stub = stubs->leaseServiceStub.get();
|
params.lease_stub = stubs->leaseServiceStub.get();
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -84,6 +84,11 @@ std::string const & etcd::Response::error_message() const
|
||||||
return _error_message;
|
return _error_message;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool etcd::Response::is_grpc_timeout() const
|
||||||
|
{
|
||||||
|
return _error_code == grpc::StatusCode::DEADLINE_EXCEEDED;
|
||||||
|
}
|
||||||
|
|
||||||
int64_t etcd::Response::index() const
|
int64_t etcd::Response::index() const
|
||||||
{
|
{
|
||||||
return _index;
|
return _index;
|
||||||
|
|
|
||||||
|
|
@ -492,6 +492,7 @@ std::shared_ptr<etcdv3::AsyncHeadAction> etcd::SyncClient::head_internal()
|
||||||
{
|
{
|
||||||
etcdv3::ActionParameters params;
|
etcdv3::ActionParameters params;
|
||||||
params.auth_token.assign(this->token_authenticator->renew_if_expired());
|
params.auth_token.assign(this->token_authenticator->renew_if_expired());
|
||||||
|
params.grpc_timeout = this->grpc_timeout;
|
||||||
params.kv_stub = stubs->kvServiceStub.get();
|
params.kv_stub = stubs->kvServiceStub.get();
|
||||||
return std::make_shared<etcdv3::AsyncHeadAction>(std::move(params));
|
return std::make_shared<etcdv3::AsyncHeadAction>(std::move(params));
|
||||||
}
|
}
|
||||||
|
|
@ -506,6 +507,7 @@ std::shared_ptr<etcdv3::AsyncRangeAction> etcd::SyncClient::get_internal(std::st
|
||||||
params.key.assign(key);
|
params.key.assign(key);
|
||||||
params.withPrefix = false;
|
params.withPrefix = false;
|
||||||
params.auth_token.assign(this->token_authenticator->renew_if_expired());
|
params.auth_token.assign(this->token_authenticator->renew_if_expired());
|
||||||
|
params.grpc_timeout = this->grpc_timeout;
|
||||||
params.kv_stub = stubs->kvServiceStub.get();
|
params.kv_stub = stubs->kvServiceStub.get();
|
||||||
return std::make_shared<etcdv3::AsyncRangeAction>(std::move(params));
|
return std::make_shared<etcdv3::AsyncRangeAction>(std::move(params));
|
||||||
}
|
}
|
||||||
|
|
@ -540,6 +542,7 @@ std::shared_ptr<etcdv3::AsyncSetAction> etcd::SyncClient::set_internal(std::stri
|
||||||
params.value.assign(value);
|
params.value.assign(value);
|
||||||
params.lease_id = leaseid;
|
params.lease_id = leaseid;
|
||||||
params.auth_token.assign(this->token_authenticator->renew_if_expired());
|
params.auth_token.assign(this->token_authenticator->renew_if_expired());
|
||||||
|
params.grpc_timeout = this->grpc_timeout;
|
||||||
params.kv_stub = stubs->kvServiceStub.get();
|
params.kv_stub = stubs->kvServiceStub.get();
|
||||||
return std::make_shared<etcdv3::AsyncSetAction>(std::move(params), false);
|
return std::make_shared<etcdv3::AsyncSetAction>(std::move(params), false);
|
||||||
}
|
}
|
||||||
|
|
@ -574,6 +577,7 @@ std::shared_ptr<etcdv3::AsyncSetAction> etcd::SyncClient::add_internal(std::stri
|
||||||
params.value.assign(value);
|
params.value.assign(value);
|
||||||
params.lease_id = leaseid;
|
params.lease_id = leaseid;
|
||||||
params.auth_token.assign(this->token_authenticator->renew_if_expired());
|
params.auth_token.assign(this->token_authenticator->renew_if_expired());
|
||||||
|
params.grpc_timeout = this->grpc_timeout;
|
||||||
params.kv_stub = stubs->kvServiceStub.get();
|
params.kv_stub = stubs->kvServiceStub.get();
|
||||||
return std::make_shared<etcdv3::AsyncSetAction>(std::move(params), true);
|
return std::make_shared<etcdv3::AsyncSetAction>(std::move(params), true);
|
||||||
}
|
}
|
||||||
|
|
@ -587,6 +591,7 @@ std::shared_ptr<etcdv3::AsyncPutAction> etcd::SyncClient::put_internal(std::stri
|
||||||
params.key.assign(key);
|
params.key.assign(key);
|
||||||
params.value.assign(value);
|
params.value.assign(value);
|
||||||
params.auth_token.assign(this->token_authenticator->renew_if_expired());
|
params.auth_token.assign(this->token_authenticator->renew_if_expired());
|
||||||
|
params.grpc_timeout = this->grpc_timeout;
|
||||||
params.kv_stub = stubs->kvServiceStub.get();
|
params.kv_stub = stubs->kvServiceStub.get();
|
||||||
return std::make_shared<etcdv3::AsyncPutAction>(std::move(params));
|
return std::make_shared<etcdv3::AsyncPutAction>(std::move(params));
|
||||||
}
|
}
|
||||||
|
|
@ -621,6 +626,7 @@ std::shared_ptr<etcdv3::AsyncUpdateAction> etcd::SyncClient::modify_internal(std
|
||||||
params.value.assign(value);
|
params.value.assign(value);
|
||||||
params.lease_id = leaseid;
|
params.lease_id = leaseid;
|
||||||
params.auth_token.assign(this->token_authenticator->renew_if_expired());
|
params.auth_token.assign(this->token_authenticator->renew_if_expired());
|
||||||
|
params.grpc_timeout = this->grpc_timeout;
|
||||||
params.kv_stub = stubs->kvServiceStub.get();
|
params.kv_stub = stubs->kvServiceStub.get();
|
||||||
return std::make_shared<etcdv3::AsyncUpdateAction>(std::move(params));
|
return std::make_shared<etcdv3::AsyncUpdateAction>(std::move(params));
|
||||||
}
|
}
|
||||||
|
|
@ -681,6 +687,7 @@ std::shared_ptr<etcdv3::AsyncCompareAndSwapAction> etcd::SyncClient::modify_if_i
|
||||||
params.old_revision = old_index;
|
params.old_revision = old_index;
|
||||||
params.old_value = old_value;
|
params.old_value = old_value;
|
||||||
params.auth_token.assign(this->token_authenticator->renew_if_expired());
|
params.auth_token.assign(this->token_authenticator->renew_if_expired());
|
||||||
|
params.grpc_timeout = this->grpc_timeout;
|
||||||
params.kv_stub = stubs->kvServiceStub.get();
|
params.kv_stub = stubs->kvServiceStub.get();
|
||||||
return std::make_shared<etcdv3::AsyncCompareAndSwapAction>(std::move(params), atomicity_type);
|
return std::make_shared<etcdv3::AsyncCompareAndSwapAction>(std::move(params), atomicity_type);
|
||||||
}
|
}
|
||||||
|
|
@ -695,6 +702,7 @@ std::shared_ptr<etcdv3::AsyncDeleteAction> etcd::SyncClient::rm_internal(std::st
|
||||||
params.key.assign(key);
|
params.key.assign(key);
|
||||||
params.withPrefix = false;
|
params.withPrefix = false;
|
||||||
params.auth_token.assign(this->token_authenticator->renew_if_expired());
|
params.auth_token.assign(this->token_authenticator->renew_if_expired());
|
||||||
|
params.grpc_timeout = this->grpc_timeout;
|
||||||
params.kv_stub = stubs->kvServiceStub.get();
|
params.kv_stub = stubs->kvServiceStub.get();
|
||||||
return std::make_shared<etcdv3::AsyncDeleteAction>(std::move(params));
|
return std::make_shared<etcdv3::AsyncDeleteAction>(std::move(params));
|
||||||
}
|
}
|
||||||
|
|
@ -716,6 +724,7 @@ std::shared_ptr<etcdv3::AsyncCompareAndDeleteAction> etcd::SyncClient::rm_if_int
|
||||||
params.old_revision = old_index;
|
params.old_revision = old_index;
|
||||||
params.old_value = old_value;
|
params.old_value = old_value;
|
||||||
params.auth_token.assign(this->token_authenticator->renew_if_expired());
|
params.auth_token.assign(this->token_authenticator->renew_if_expired());
|
||||||
|
params.grpc_timeout = this->grpc_timeout;
|
||||||
params.kv_stub = stubs->kvServiceStub.get();
|
params.kv_stub = stubs->kvServiceStub.get();
|
||||||
return std::make_shared<etcdv3::AsyncCompareAndDeleteAction>(std::move(params), atomicity_type);
|
return std::make_shared<etcdv3::AsyncCompareAndDeleteAction>(std::move(params), atomicity_type);
|
||||||
}
|
}
|
||||||
|
|
@ -730,6 +739,7 @@ std::shared_ptr<etcdv3::AsyncDeleteAction> etcd::SyncClient::rmdir_internal(std:
|
||||||
params.key.assign(key);
|
params.key.assign(key);
|
||||||
params.withPrefix = recursive;
|
params.withPrefix = recursive;
|
||||||
params.auth_token.assign(this->token_authenticator->renew_if_expired());
|
params.auth_token.assign(this->token_authenticator->renew_if_expired());
|
||||||
|
params.grpc_timeout = this->grpc_timeout;
|
||||||
params.kv_stub = stubs->kvServiceStub.get();
|
params.kv_stub = stubs->kvServiceStub.get();
|
||||||
return std::make_shared<etcdv3::AsyncDeleteAction>(std::move(params));
|
return std::make_shared<etcdv3::AsyncDeleteAction>(std::move(params));
|
||||||
}
|
}
|
||||||
|
|
@ -750,6 +760,7 @@ std::shared_ptr<etcdv3::AsyncDeleteAction> etcd::SyncClient::rmdir_internal(std:
|
||||||
params.range_end.assign(range_end);
|
params.range_end.assign(range_end);
|
||||||
params.withPrefix = false;
|
params.withPrefix = false;
|
||||||
params.auth_token.assign(this->token_authenticator->renew_if_expired());
|
params.auth_token.assign(this->token_authenticator->renew_if_expired());
|
||||||
|
params.grpc_timeout = this->grpc_timeout;
|
||||||
params.kv_stub = stubs->kvServiceStub.get();
|
params.kv_stub = stubs->kvServiceStub.get();
|
||||||
return std::make_shared<etcdv3::AsyncDeleteAction>(std::move(params));
|
return std::make_shared<etcdv3::AsyncDeleteAction>(std::move(params));
|
||||||
}
|
}
|
||||||
|
|
@ -770,6 +781,7 @@ std::shared_ptr<etcdv3::AsyncRangeAction> etcd::SyncClient::ls_internal(std::str
|
||||||
params.withPrefix = true;
|
params.withPrefix = true;
|
||||||
params.limit = limit;
|
params.limit = limit;
|
||||||
params.auth_token.assign(this->token_authenticator->renew_if_expired());
|
params.auth_token.assign(this->token_authenticator->renew_if_expired());
|
||||||
|
params.grpc_timeout = this->grpc_timeout;
|
||||||
params.kv_stub = stubs->kvServiceStub.get();
|
params.kv_stub = stubs->kvServiceStub.get();
|
||||||
return std::make_shared<etcdv3::AsyncRangeAction>(std::move(params));
|
return std::make_shared<etcdv3::AsyncRangeAction>(std::move(params));
|
||||||
}
|
}
|
||||||
|
|
@ -791,6 +803,7 @@ std::shared_ptr<etcdv3::AsyncRangeAction> etcd::SyncClient::ls_internal(std::str
|
||||||
params.withPrefix = false;
|
params.withPrefix = false;
|
||||||
params.limit = limit;
|
params.limit = limit;
|
||||||
params.auth_token.assign(this->token_authenticator->renew_if_expired());
|
params.auth_token.assign(this->token_authenticator->renew_if_expired());
|
||||||
|
params.grpc_timeout = this->grpc_timeout;
|
||||||
params.kv_stub = stubs->kvServiceStub.get();
|
params.kv_stub = stubs->kvServiceStub.get();
|
||||||
return std::make_shared<etcdv3::AsyncRangeAction>(std::move(params));
|
return std::make_shared<etcdv3::AsyncRangeAction>(std::move(params));
|
||||||
}
|
}
|
||||||
|
|
@ -811,6 +824,7 @@ std::shared_ptr<etcdv3::AsyncWatchAction> etcd::SyncClient::watch_internal(std::
|
||||||
params.withPrefix = recursive;
|
params.withPrefix = recursive;
|
||||||
params.revision = fromIndex;
|
params.revision = fromIndex;
|
||||||
params.auth_token.assign(this->token_authenticator->renew_if_expired());
|
params.auth_token.assign(this->token_authenticator->renew_if_expired());
|
||||||
|
params.grpc_timeout = this->grpc_timeout;
|
||||||
params.watch_stub = stubs->watchServiceStub.get();
|
params.watch_stub = stubs->watchServiceStub.get();
|
||||||
return std::make_shared<etcdv3::AsyncWatchAction>(std::move(params));
|
return std::make_shared<etcdv3::AsyncWatchAction>(std::move(params));
|
||||||
}
|
}
|
||||||
|
|
@ -837,6 +851,7 @@ std::shared_ptr<etcdv3::AsyncWatchAction> etcd::SyncClient::watch_internal(std::
|
||||||
params.withPrefix = false;
|
params.withPrefix = false;
|
||||||
params.revision = fromIndex;
|
params.revision = fromIndex;
|
||||||
params.auth_token.assign(this->token_authenticator->renew_if_expired());
|
params.auth_token.assign(this->token_authenticator->renew_if_expired());
|
||||||
|
params.grpc_timeout = this->grpc_timeout;
|
||||||
params.watch_stub = stubs->watchServiceStub.get();
|
params.watch_stub = stubs->watchServiceStub.get();
|
||||||
return std::make_shared<etcdv3::AsyncWatchAction>(std::move(params));
|
return std::make_shared<etcdv3::AsyncWatchAction>(std::move(params));
|
||||||
}
|
}
|
||||||
|
|
@ -850,6 +865,7 @@ etcd::Response etcd::SyncClient::leasegrant(int ttl)
|
||||||
return Response::create<etcdv3::AsyncLeaseGrantAction>([this, ttl]() {
|
return Response::create<etcdv3::AsyncLeaseGrantAction>([this, ttl]() {
|
||||||
etcdv3::ActionParameters params;
|
etcdv3::ActionParameters params;
|
||||||
params.auth_token.assign(this->token_authenticator->renew_if_expired());
|
params.auth_token.assign(this->token_authenticator->renew_if_expired());
|
||||||
|
params.grpc_timeout = this->grpc_timeout;
|
||||||
params.lease_stub = stubs->leaseServiceStub.get();
|
params.lease_stub = stubs->leaseServiceStub.get();
|
||||||
params.ttl = ttl;
|
params.ttl = ttl;
|
||||||
return std::make_shared<etcdv3::AsyncLeaseGrantAction>(std::move(params));
|
return std::make_shared<etcdv3::AsyncLeaseGrantAction>(std::move(params));
|
||||||
|
|
@ -860,6 +876,7 @@ std::shared_ptr<etcd::KeepAlive> etcd::SyncClient::leasekeepalive(int ttl) {
|
||||||
etcdv3::ActionParameters params;
|
etcdv3::ActionParameters params;
|
||||||
params.ttl = ttl;
|
params.ttl = ttl;
|
||||||
params.auth_token.assign(this->token_authenticator->renew_if_expired());
|
params.auth_token.assign(this->token_authenticator->renew_if_expired());
|
||||||
|
params.grpc_timeout = this->grpc_timeout;
|
||||||
params.lease_stub = stubs->leaseServiceStub.get();
|
params.lease_stub = stubs->leaseServiceStub.get();
|
||||||
|
|
||||||
// keep alive is synchronous in two folds:
|
// keep alive is synchronous in two folds:
|
||||||
|
|
@ -881,6 +898,7 @@ std::shared_ptr<etcdv3::AsyncLeaseRevokeAction> etcd::SyncClient::leaserevoke_in
|
||||||
etcdv3::ActionParameters params;
|
etcdv3::ActionParameters params;
|
||||||
params.lease_id = lease_id;
|
params.lease_id = lease_id;
|
||||||
params.auth_token.assign(this->token_authenticator->renew_if_expired());
|
params.auth_token.assign(this->token_authenticator->renew_if_expired());
|
||||||
|
params.grpc_timeout = this->grpc_timeout;
|
||||||
params.lease_stub = stubs->leaseServiceStub.get();
|
params.lease_stub = stubs->leaseServiceStub.get();
|
||||||
return std::make_shared<etcdv3::AsyncLeaseRevokeAction>(std::move(params));
|
return std::make_shared<etcdv3::AsyncLeaseRevokeAction>(std::move(params));
|
||||||
}
|
}
|
||||||
|
|
@ -894,6 +912,7 @@ std::shared_ptr<etcdv3::AsyncLeaseTimeToLiveAction> etcd::SyncClient::leasetimet
|
||||||
etcdv3::ActionParameters params;
|
etcdv3::ActionParameters params;
|
||||||
params.lease_id = lease_id;
|
params.lease_id = lease_id;
|
||||||
params.auth_token.assign(this->token_authenticator->renew_if_expired());
|
params.auth_token.assign(this->token_authenticator->renew_if_expired());
|
||||||
|
params.grpc_timeout = this->grpc_timeout;
|
||||||
params.lease_stub = stubs->leaseServiceStub.get();
|
params.lease_stub = stubs->leaseServiceStub.get();
|
||||||
return std::make_shared<etcdv3::AsyncLeaseTimeToLiveAction>(std::move(params));
|
return std::make_shared<etcdv3::AsyncLeaseTimeToLiveAction>(std::move(params));
|
||||||
}
|
}
|
||||||
|
|
@ -916,6 +935,7 @@ etcd::Response etcd::SyncClient::lock_internal(std::string const &key, std::shar
|
||||||
params.key = key;
|
params.key = key;
|
||||||
params.lease_id = keepalive->Lease();
|
params.lease_id = keepalive->Lease();
|
||||||
params.auth_token.assign(this->token_authenticator->renew_if_expired());
|
params.auth_token.assign(this->token_authenticator->renew_if_expired());
|
||||||
|
params.grpc_timeout = this->grpc_timeout;
|
||||||
params.lock_stub = stubs->lockServiceStub.get();
|
params.lock_stub = stubs->lockServiceStub.get();
|
||||||
|
|
||||||
{
|
{
|
||||||
|
|
@ -948,6 +968,7 @@ std::shared_ptr<etcdv3::AsyncLockAction> etcd::SyncClient::lock_with_lease_inter
|
||||||
params.key = key;
|
params.key = key;
|
||||||
params.lease_id = lease_id;
|
params.lease_id = lease_id;
|
||||||
params.auth_token.assign(this->token_authenticator->renew_if_expired());
|
params.auth_token.assign(this->token_authenticator->renew_if_expired());
|
||||||
|
params.grpc_timeout = this->grpc_timeout;
|
||||||
params.lock_stub = stubs->lockServiceStub.get();
|
params.lock_stub = stubs->lockServiceStub.get();
|
||||||
return std::make_shared<etcdv3::AsyncLockAction>(std::move(params));
|
return std::make_shared<etcdv3::AsyncLockAction>(std::move(params));
|
||||||
}
|
}
|
||||||
|
|
@ -960,6 +981,7 @@ std::shared_ptr<etcdv3::AsyncUnlockAction> etcd::SyncClient::unlock_internal(std
|
||||||
etcdv3::ActionParameters params;
|
etcdv3::ActionParameters params;
|
||||||
params.key = lock_key;
|
params.key = lock_key;
|
||||||
params.auth_token.assign(this->token_authenticator->renew_if_expired());
|
params.auth_token.assign(this->token_authenticator->renew_if_expired());
|
||||||
|
params.grpc_timeout = this->grpc_timeout;
|
||||||
params.lock_stub = stubs->lockServiceStub.get();
|
params.lock_stub = stubs->lockServiceStub.get();
|
||||||
|
|
||||||
// issue a "unlock" request
|
// issue a "unlock" request
|
||||||
|
|
@ -1002,6 +1024,7 @@ etcd::Response etcd::SyncClient::txn(etcdv3::Transaction const &txn) {
|
||||||
std::shared_ptr<etcdv3::AsyncTxnAction> etcd::SyncClient::txn_internal(etcdv3::Transaction const &txn) {
|
std::shared_ptr<etcdv3::AsyncTxnAction> etcd::SyncClient::txn_internal(etcdv3::Transaction const &txn) {
|
||||||
etcdv3::ActionParameters params;
|
etcdv3::ActionParameters params;
|
||||||
params.auth_token.assign(this->token_authenticator->renew_if_expired());
|
params.auth_token.assign(this->token_authenticator->renew_if_expired());
|
||||||
|
params.grpc_timeout = this->grpc_timeout;
|
||||||
params.kv_stub = stubs->kvServiceStub.get();
|
params.kv_stub = stubs->kvServiceStub.get();
|
||||||
return std::make_shared<etcdv3::AsyncTxnAction>(std::move(params), txn);
|
return std::make_shared<etcdv3::AsyncTxnAction>(std::move(params), txn);
|
||||||
}
|
}
|
||||||
|
|
@ -1018,6 +1041,7 @@ std::shared_ptr<etcdv3::AsyncCampaignAction> etcd::SyncClient::campaign_internal
|
||||||
params.lease_id = lease_id;
|
params.lease_id = lease_id;
|
||||||
params.value = value;
|
params.value = value;
|
||||||
params.auth_token.assign(this->token_authenticator->renew_if_expired());
|
params.auth_token.assign(this->token_authenticator->renew_if_expired());
|
||||||
|
params.grpc_timeout = this->grpc_timeout;
|
||||||
params.election_stub = stubs->electionServiceStub.get();
|
params.election_stub = stubs->electionServiceStub.get();
|
||||||
return std::make_shared<etcdv3::AsyncCampaignAction>(std::move(params));
|
return std::make_shared<etcdv3::AsyncCampaignAction>(std::move(params));
|
||||||
}
|
}
|
||||||
|
|
@ -1038,6 +1062,7 @@ std::shared_ptr<etcdv3::AsyncProclaimAction> etcd::SyncClient::proclaim_internal
|
||||||
params.revision = revision;
|
params.revision = revision;
|
||||||
params.value = value;
|
params.value = value;
|
||||||
params.auth_token.assign(this->token_authenticator->renew_if_expired());
|
params.auth_token.assign(this->token_authenticator->renew_if_expired());
|
||||||
|
params.grpc_timeout = this->grpc_timeout;
|
||||||
params.election_stub = stubs->electionServiceStub.get();
|
params.election_stub = stubs->electionServiceStub.get();
|
||||||
return std::make_shared<etcdv3::AsyncProclaimAction>(std::move(params));
|
return std::make_shared<etcdv3::AsyncProclaimAction>(std::move(params));
|
||||||
}
|
}
|
||||||
|
|
@ -1050,6 +1075,7 @@ std::shared_ptr<etcdv3::AsyncLeaderAction> etcd::SyncClient::leader_internal(std
|
||||||
etcdv3::ActionParameters params;
|
etcdv3::ActionParameters params;
|
||||||
params.name = name;
|
params.name = name;
|
||||||
params.auth_token.assign(this->token_authenticator->renew_if_expired());
|
params.auth_token.assign(this->token_authenticator->renew_if_expired());
|
||||||
|
params.grpc_timeout = this->grpc_timeout;
|
||||||
params.election_stub = stubs->electionServiceStub.get();
|
params.election_stub = stubs->electionServiceStub.get();
|
||||||
return std::make_shared<etcdv3::AsyncLeaderAction>(std::move(params));
|
return std::make_shared<etcdv3::AsyncLeaderAction>(std::move(params));
|
||||||
}
|
}
|
||||||
|
|
@ -1059,6 +1085,7 @@ std::unique_ptr<etcd::SyncClient::Observer> etcd::SyncClient::observe(
|
||||||
etcdv3::ActionParameters params;
|
etcdv3::ActionParameters params;
|
||||||
params.name.assign(name);
|
params.name.assign(name);
|
||||||
params.auth_token.assign(this->token_authenticator->renew_if_expired());
|
params.auth_token.assign(this->token_authenticator->renew_if_expired());
|
||||||
|
params.grpc_timeout = this->grpc_timeout;
|
||||||
params.election_stub = stubs->electionServiceStub.get();
|
params.election_stub = stubs->electionServiceStub.get();
|
||||||
std::unique_ptr<etcd::SyncClient::Observer> observer(new Observer());
|
std::unique_ptr<etcd::SyncClient::Observer> observer(new Observer());
|
||||||
observer->action = std::make_shared<etcdv3::AsyncObserveAction>(std::move(params));
|
observer->action = std::make_shared<etcdv3::AsyncObserveAction>(std::move(params));
|
||||||
|
|
@ -1078,6 +1105,7 @@ std::shared_ptr<etcdv3::AsyncResignAction> etcd::SyncClient::resign_internal(std
|
||||||
params.key = key;
|
params.key = key;
|
||||||
params.revision = revision;
|
params.revision = revision;
|
||||||
params.auth_token.assign(this->token_authenticator->renew_if_expired());
|
params.auth_token.assign(this->token_authenticator->renew_if_expired());
|
||||||
|
params.grpc_timeout = this->grpc_timeout;
|
||||||
params.election_stub = stubs->electionServiceStub.get();
|
params.election_stub = stubs->electionServiceStub.get();
|
||||||
return std::make_shared<etcdv3::AsyncResignAction>(std::move(params));
|
return std::make_shared<etcdv3::AsyncResignAction>(std::move(params));
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -136,6 +136,7 @@ void etcd::Watcher::doWatch(std::string const & key,
|
||||||
{
|
{
|
||||||
etcdv3::ActionParameters params;
|
etcdv3::ActionParameters params;
|
||||||
params.auth_token.assign(auth_token);
|
params.auth_token.assign(auth_token);
|
||||||
|
// n.b.: watch: no need for timeout
|
||||||
params.key.assign(key);
|
params.key.assign(key);
|
||||||
params.range_end.assign(range_end);
|
params.range_end.assign(range_end);
|
||||||
if (fromIndex >= 0) {
|
if (fromIndex >= 0) {
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,5 @@
|
||||||
#include <grpc/support/log.h>
|
#include <grpc/support/log.h>
|
||||||
|
#include <grpcpp/support/status.h>
|
||||||
#include "etcd/v3/action_constants.hpp"
|
#include "etcd/v3/action_constants.hpp"
|
||||||
#include "etcd/v3/Action.hpp"
|
#include "etcd/v3/Action.hpp"
|
||||||
|
|
||||||
|
|
@ -36,6 +37,14 @@ etcdv3::ActionParameters::ActionParameters()
|
||||||
lease_stub = NULL;
|
lease_stub = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool etcdv3::ActionParameters::has_grpc_timeout() const {
|
||||||
|
return this->grpc_timeout != std::chrono::microseconds::zero();
|
||||||
|
}
|
||||||
|
|
||||||
|
std::chrono::system_clock::time_point etcdv3::ActionParameters::grpc_deadline() const {
|
||||||
|
return std::chrono::system_clock::now() + this->grpc_timeout;
|
||||||
|
}
|
||||||
|
|
||||||
void etcdv3::ActionParameters::dump(std::ostream &os) const {
|
void etcdv3::ActionParameters::dump(std::ostream &os) const {
|
||||||
os << "ActionParameters:" << std::endl;
|
os << "ActionParameters:" << std::endl;
|
||||||
os << " withPrefix: " << withPrefix << std::endl;
|
os << " withPrefix: " << withPrefix << std::endl;
|
||||||
|
|
@ -50,6 +59,7 @@ void etcdv3::ActionParameters::dump(std::ostream &os) const {
|
||||||
os << " value: " << value << std::endl;
|
os << " value: " << value << std::endl;
|
||||||
os << " old_value: " << old_value << std::endl;
|
os << " old_value: " << old_value << std::endl;
|
||||||
os << " auth_token: " << auth_token << std::endl;
|
os << " auth_token: " << auth_token << std::endl;
|
||||||
|
os << " grpc_timeout: " << grpc_timeout.count() << "(ms)" << std::endl;
|
||||||
}
|
}
|
||||||
|
|
||||||
void etcdv3::Action::waitForResponse()
|
void etcdv3::Action::waitForResponse()
|
||||||
|
|
@ -57,8 +67,24 @@ void etcdv3::Action::waitForResponse()
|
||||||
void* got_tag;
|
void* got_tag;
|
||||||
bool ok = false;
|
bool ok = false;
|
||||||
|
|
||||||
|
if (parameters.has_grpc_timeout()) {
|
||||||
|
switch (cq_.AsyncNext(&got_tag, &ok, parameters.grpc_deadline())) {
|
||||||
|
case CompletionQueue::NextStatus::TIMEOUT: {
|
||||||
|
status = grpc::Status(grpc::StatusCode::DEADLINE_EXCEEDED, "gRPC timeout");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case CompletionQueue::NextStatus::SHUTDOWN: {
|
||||||
|
status = grpc::Status(grpc::StatusCode::UNAVAILABLE, "gRPC already shutdown");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case CompletionQueue::NextStatus::GOT_EVENT: {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
cq_.Next(&got_tag, &ok);
|
cq_.Next(&got_tag, &ok);
|
||||||
GPR_ASSERT(got_tag == (void*)this);
|
GPR_ASSERT(got_tag == (void*)this);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const std::chrono::high_resolution_clock::time_point etcdv3::Action::startTimepoint() {
|
const std::chrono::high_resolution_clock::time_point etcdv3::Action::startTimepoint() {
|
||||||
|
|
|
||||||
|
|
@ -83,6 +83,35 @@ TEST_CASE("double lock will fail")
|
||||||
REQUIRE(0 == resp5.error_code());
|
REQUIRE(0 == resp5.error_code());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_CASE("lock could be timeout")
|
||||||
|
{
|
||||||
|
etcd::Client etcd("http://127.0.0.1:2379");
|
||||||
|
|
||||||
|
// setup the timeout
|
||||||
|
etcd.set_grpc_timeout(std::chrono::seconds(5));
|
||||||
|
|
||||||
|
// lock
|
||||||
|
etcd::Response resp1 = etcd.lock("/test/abcd").get();
|
||||||
|
CHECK("lock" == resp1.action());
|
||||||
|
REQUIRE(resp1.is_ok());
|
||||||
|
REQUIRE(0 == resp1.error_code());
|
||||||
|
|
||||||
|
auto lock_in_another_thread = std::thread([&](){
|
||||||
|
// lock again
|
||||||
|
etcd::Response resp2 = etcd.lock("/test/abcd").get();
|
||||||
|
CHECK("lock" == resp2.action());
|
||||||
|
REQUIRE(resp2.is_grpc_timeout());
|
||||||
|
});
|
||||||
|
|
||||||
|
lock_in_another_thread.join();
|
||||||
|
|
||||||
|
// cleanup: unlock the second lock
|
||||||
|
etcd::Response resp5 = etcd.unlock(resp1.lock_key()).get();
|
||||||
|
CHECK("unlock" == resp5.action());
|
||||||
|
REQUIRE(resp5.is_ok());
|
||||||
|
REQUIRE(0 == resp5.error_code());
|
||||||
|
}
|
||||||
|
|
||||||
TEST_CASE("lock using lease")
|
TEST_CASE("lock using lease")
|
||||||
{
|
{
|
||||||
etcd::Client etcd("http://127.0.0.1:2379");
|
etcd::Client etcd("http://127.0.0.1:2379");
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue