Timeout parameter is addded for each operation.

Timeout is added.
This commit is contained in:
samur 2021-01-29 07:54:59 +03:00
parent 4ecd0e095b
commit 735fd43b4a
6 changed files with 669 additions and 289 deletions

View File

@ -1,278 +1,616 @@
#ifndef __ETCD_CLIENT_HPP__ #include <stdio.h>
#define __ETCD_CLIENT_HPP__ #include <stdlib.h>
#include <sys/types.h>
#include "etcd/Response.hpp" #if defined(_WIN32)
#include <winsock2.h>
#include <ws2tcpip.h>
#else
#include <netdb.h>
#include <sys/socket.h>
#endif
#include <map> #include <limits>
#include <mutex> #include <memory>
#include <string> #include "etcd/Client.hpp"
#include "etcd/KeepAlive.hpp"
#include "etcd/v3/action_constants.hpp"
#include "etcd/v3/Action.hpp"
#include "etcd/v3/AsyncTxnResponse.hpp"
#include "etcd/v3/AsyncRangeResponse.hpp"
#include "etcd/v3/AsyncWatchResponse.hpp"
#include "etcd/v3/AsyncDeleteRangeResponse.hpp"
#include "etcd/v3/AsyncLockResponse.hpp"
#include "etcd/v3/Transaction.hpp"
#include <iostream>
#include <grpc++/grpc++.h> #include "etcd/v3/AsyncSetAction.hpp"
#include "proto/rpc.grpc.pb.h" #include "etcd/v3/AsyncCompareAndSwapAction.hpp"
#include "proto/v3lock.grpc.pb.h" #include "etcd/v3/AsyncCompareAndDeleteAction.hpp"
#include "etcd/v3/AsyncUpdateAction.hpp"
#include "etcd/v3/AsyncGetAction.hpp"
#include "etcd/v3/AsyncDeleteAction.hpp"
#include "etcd/v3/AsyncWatchAction.hpp"
#include "etcd/v3/AsyncLeaseAction.hpp"
#include "etcd/v3/AsyncLockAction.hpp"
#include "etcd/v3/AsyncTxnAction.hpp"
using etcdserverpb::Auth; #include <boost/algorithm/string.hpp>
using etcdserverpb::KV;
using etcdserverpb::Watch;
using etcdserverpb::Lease;
using v3lockpb::Lock;
namespace etcdv3 { using grpc::Channel;
class Transaction;
namespace etcd {
namespace detail {
static bool dns_resolve(std::string const &target, std::vector<std::string> &endpoints) {
struct addrinfo hints = {}, *addrs;
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = IPPROTO_TCP;
std::vector<std::string> target_parts;
boost::split(target_parts, target, boost::is_any_of(":"));
if (target_parts.size() != 2) {
std::cerr << "warn: invalid URL: " << target << std::endl;
return false;
} }
namespace etcd #if defined(_WIN32)
{ {
class KeepAlive; // Use the MAKEWORD(lowbyte, highbyte) macro declared in Windef.h.
class Watcher; WORD wVersionRequested = MAKEWORD(2, 2);
WSADATA wsaData;
/** int err = WSAStartup(wVersionRequested, &wsaData);
* Client is responsible for maintaining a connection towards an etcd server. if (err != 0) {
* Etcd operations can be reached via the methods of the client. // Tell the user that we could not find a usable Winsock DLL. */
*/ std::cerr << "WSAStartup failed with error: %d" << err << std::endl;
class Client return false;
}
}
#endif
int r = getaddrinfo(target_parts[0].c_str(), target_parts[1].c_str(), &hints, &addrs);
if (r != 0) {
std::cerr << "warn: getaddrinfo() failed for endpoint " << target
<< " with error: " << r << std::endl;
return false;
}
char host[16] = {'\0'};
for (struct addrinfo* addr = addrs; addr != nullptr; addr = addr->ai_next) {
memset(host, '\0', sizeof(host));
getnameinfo(addr->ai_addr, addr->ai_addrlen, host, sizeof(host), NULL, 0, NI_NUMERICHOST);
endpoints.emplace_back(std::string(host) + ":" + target_parts[1]);
}
freeaddrinfo(addrs);
return true;
}
const std::string strip_and_resolve_addresses(std::string const &address) {
std::vector<std::string> addresses;
boost::algorithm::split(addresses, address, boost::algorithm::is_any_of(",;"));
std::string stripped_address;
{ {
public: std::vector<std::string> stripped_addresses;
/** std::string substr("://");
* Constructs an etcd client object. for (auto const &addr: addresses) {
* std::string::size_type idx = addr.find(substr);
* @param etcd_url is the url of the etcd server to connect to, like "http://127.0.0.1:2379", std::string target = idx == std::string::npos ? addr : addr.substr(idx + substr.length());
* or multiple url, seperated by ',' or ';'. etcd::detail::dns_resolve(target, stripped_addresses);
* @param load_balancer is the load balance strategy, can be one of round_robin/pick_first/grpclb/xds. }
*/ stripped_address = boost::algorithm::join(stripped_addresses, ",");
Client(std::string const & etcd_url, }
std::string const & load_balancer = "round_robin"); return "ipv4:///" + stripped_address;
}
/** const bool authenticate(std::shared_ptr<grpc::Channel> const &channel,
* Constructs an etcd client object.
*
* @param etcd_url is the url of the etcd server to connect to, like "http://127.0.0.1:2379",
* or multiple url, seperated by ',' or ';'.
* @param username username of etcd auth
* @param password password of etcd auth
* @param load_balancer is the load balance strategy, can be one of round_robin/pick_first/grpclb/xds.
*/
Client(std::string const & etcd_url,
std::string const &username, std::string const &username,
std::string const &password, std::string const &password,
std::string const & load_balancer = "round_robin"); std::string &token_or_message) {
// run a round of auth
auto auth_stub = Auth::NewStub(channel);
ClientContext context;
etcdserverpb::AuthenticateRequest auth_request;
etcdserverpb::AuthenticateResponse auth_response;
auth_request.set_name(username);
auth_request.set_password(password);
auto status = auth_stub->Authenticate(&context, auth_request, &auth_response);
if (status.ok()) {
token_or_message = auth_response.token();
return true;
} else {
token_or_message = status.error_message();
return false;
}
}
/** }
* Sends a get request to the etcd server }
* @param key is the key to be read
*/
pplx::task<Response> get(std::string const & key);
/** etcd::Client::Client(std::string const & address,
* Sets the value of a key. The key will be modified if already exists or created const unsigned int& timeout,
* if it does not exists. std::string const & load_balancer)
* @param key is the key to be created or modified {
* @param value is the new value to be set // create channels
*/ std::string const addresses = etcd::detail::strip_and_resolve_addresses(address);
pplx::task<Response> set(std::string const & key, std::string const & value, int ttl = 0); grpc::ChannelArguments grpc_args;
grpc_args.SetMaxSendMessageSize(std::numeric_limits<int>::max());
grpc_args.SetMaxReceiveMessageSize(std::numeric_limits<int>::max());
std::shared_ptr<grpc::ChannelCredentials> creds = grpc::InsecureChannelCredentials();
grpc_args.SetLoadBalancingPolicyName(load_balancer);
this->channel = grpc::CreateCustomChannel(addresses, creds, grpc_args);
/** // create stubs
* Sets the value of a key. The key will be modified if already exists or created kvServiceStub = KV::NewStub(this->channel);
* if it does not exists. watchServiceStub= Watch::NewStub(this->channel);
* @param key is the key to be created or modified leaseServiceStub= Lease::NewStub(this->channel);
* @param value is the new value to be set lockServiceStub = Lock::NewStub(this->channel);
* @param leaseId is the lease attached to the key this->timeout = timeout;
*/ }
pplx::task<Response> set(std::string const & key, std::string const & value, int64_t leaseId);
etcd::Client::Client(std::string const & address,
std::string const & username,
std::string const & password,
const unsigned int& timeout,
std::string const & load_balancer)
{
// create channels
std::string const addresses = etcd::detail::strip_and_resolve_addresses(address);
grpc::ChannelArguments grpc_args;
grpc_args.SetMaxSendMessageSize(std::numeric_limits<int>::max());
grpc_args.SetMaxReceiveMessageSize(std::numeric_limits<int>::max());
std::shared_ptr<grpc::ChannelCredentials> creds = grpc::InsecureChannelCredentials();
grpc_args.SetLoadBalancingPolicyName(load_balancer);
this->channel = grpc::CreateCustomChannel(addresses, creds, grpc_args);
// auth
std::string token_or_message;
if (!etcd::detail::authenticate(this->channel, username, password, token_or_message)) {
throw std::invalid_argument("Etcd authentication failed: " + token_or_message);
}
this->auth_token = token_or_message;
// setup stubs
kvServiceStub = KV::NewStub(this->channel);
watchServiceStub= Watch::NewStub(this->channel);
leaseServiceStub= Lease::NewStub(this->channel);
lockServiceStub = Lock::NewStub(this->channel);
this->timeout = timeout;
}
pplx::task<etcd::Response> etcd::Client::get(std::string const & key)
{
etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.key.assign(key);
params.withPrefix = false;
params.kv_stub = kvServiceStub .get();
params.timeout = this->timeout;
std::shared_ptr<etcdv3::AsyncGetAction> call(new etcdv3::AsyncGetAction(params));
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::set(std::string const & key, std::string const & value, int ttl)
{
etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.key.assign(key);
params.value.assign(value);
params.kv_stub = kvServiceStub .get();
params.timeout = this->timeout;
if(ttl > 0)
{
auto res = leasegrant(ttl).get();
if(!res.is_ok())
{
return pplx::task<etcd::Response>([res]()
{
return etcd::Response(res.error_code(), res.error_message().c_str());
});
}
else
{
params.lease_id = res.value().lease();
}
}
std::shared_ptr<etcdv3::AsyncSetAction> call(new etcdv3::AsyncSetAction(params));
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::set(std::string const & key, std::string const & value, int64_t leaseid)
{
etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.key.assign(key);
params.value.assign(value);
params.lease_id = leaseid;
params.kv_stub = kvServiceStub .get();
params.timeout = this->timeout;
std::shared_ptr<etcdv3::AsyncSetAction> call(new etcdv3::AsyncSetAction(params));
return Response::create(call);
}
/** pplx::task<etcd::Response> etcd::Client::add(std::string const & key, std::string const & value, int ttl)
* Creates a new key and sets it's value. Fails if the key already exists. {
* @param key is the key to be created etcdv3::ActionParameters params;
* @param value is the value to be set params.auth_token.assign(this->auth_token);
*/ params.key.assign(key);
pplx::task<Response> add(std::string const & key, std::string const & value, int ttl = 0); params.value.assign(value);
params.kv_stub = kvServiceStub .get();
params.timeout = this->timeout;
if(ttl > 0)
{
auto res = leasegrant(ttl).get();
if(!res.is_ok())
{
return pplx::task<etcd::Response>([res]()
{
return etcd::Response(res.error_code(), res.error_message().c_str());
});
}
else
{
params.lease_id = res.value().lease();
}
}
std::shared_ptr<etcdv3::AsyncSetAction> call(new etcdv3::AsyncSetAction(params,true));
return Response::create(call);
}
/** pplx::task<etcd::Response> etcd::Client::add(std::string const & key, std::string const & value, int64_t leaseid)
* Creates a new key and sets it's value. Fails if the key already exists. {
* @param key is the key to be created etcdv3::ActionParameters params;
* @param value is the value to be set params.auth_token.assign(this->auth_token);
* @param leaseId is the lease attached to the key params.key.assign(key);
*/ params.value.assign(value);
pplx::task<Response> add(std::string const & key, std::string const & value, int64_t leaseId); params.lease_id = leaseid;
params.kv_stub = kvServiceStub .get();
/** params.timeout = this->timeout;
* Modifies an existing key. Fails if the key does not exists. std::shared_ptr<etcdv3::AsyncSetAction> call(new etcdv3::AsyncSetAction(params,true));
* @param key is the key to be modified return Response::create(call);
* @param value is the new value to be set }
*/
pplx::task<Response> modify(std::string const & key, std::string const & value, int ttl = 0);
/**
* Modifies an existing key. Fails if the key does not exists.
* @param key is the key to be modified
* @param value is the new value to be set
* @param leaseId is the lease attached to the key
*/
pplx::task<Response> modify(std::string const & key, std::string const & value, int64_t leaseId);
/**
* Modifies an existing key only if it has a specific value. Fails if the key does not exists
* or the original value differs from the expected one.
* @param key is the key to be modified
* @param value is the new value to be set
* @param old_value is the value to be replaced
*/
pplx::task<Response> modify_if(std::string const & key, std::string const & value, std::string const & old_value, int ttl = 0);
/**
* Modifies an existing key only if it has a specific value. Fails if the key does not exists
* or the original value differs from the expected one.
* @param key is the key to be modified
* @param value is the new value to be set
* @param old_value is the value to be replaced
* @param leaseId is the lease attached to the key
*/
pplx::task<Response> modify_if(std::string const & key, std::string const & value, std::string const & old_value, int64_t leaseId);
/**
* Modifies an existing key only if it has a specific modification index value. Fails if the key
* does not exists or the modification index of the previous value differs from the expected one.
* @param key is the key to be modified
* @param value is the new value to be set
* @param old_index is the expected index of the original value
*/
pplx::task<Response> modify_if(std::string const & key, std::string const & value, int old_index, int ttl = 0);
/**
* Modifies an existing key only if it has a specific modification index value. Fails if the key
* does not exists or the modification index of the previous value differs from the expected one.
* @param key is the key to be modified
* @param value is the new value to be set
* @param old_index is the expected index of the original value
* @param leaseId is the lease attached to the key
*/
pplx::task<Response> modify_if(std::string const & key, std::string const & value, int old_index, int64_t leaseId);
/**
* Removes a single key. The key has to point to a plain, non directory entry.
* @param key is the key to be deleted
*/
pplx::task<Response> rm(std::string const & key);
/**
* Removes a single key but only if it has a specific value. Fails if the key does not exists
* or the its value differs from the expected one.
* @param key is the key to be deleted
*/
pplx::task<Response> rm_if(std::string const & key, std::string const & old_value);
/**
* Removes an existing key only if it has a specific modification index value. Fails if the key
* does not exists or the modification index of it differs from the expected one.
* @param key is the key to be deleted
* @param old_index is the expected index of the existing value
*/
pplx::task<Response> rm_if(std::string const & key, int old_index);
/**
* Gets a directory listing of the directory identified by the key.
* @param key is the key to be listed
*/
pplx::task<Response> ls(std::string const & key);
/** pplx::task<etcd::Response> etcd::Client::modify(std::string const & key, std::string const & value, int ttl)
* Gets a directory listing of the directory identified by the key. {
* @param key is the key to be listed etcdv3::ActionParameters params;
* @param limit is the size limit of results to be listed, we don't use default parameters params.auth_token.assign(this->auth_token);
* to ensure backwards binary compatibility. params.key.assign(key);
*/ params.value.assign(value);
pplx::task<Response> ls(std::string const & key, size_t const limit); params.kv_stub = kvServiceStub .get();
params.timeout = this->timeout;
if(ttl > 0)
{
auto res = leasegrant(ttl).get();
if(!res.is_ok())
{
return pplx::task<etcd::Response>([res]()
{
return etcd::Response(res.error_code(), res.error_message().c_str());
});
}
else
{
params.lease_id = res.value().lease();
}
}
std::shared_ptr<etcdv3::AsyncUpdateAction> call(new etcdv3::AsyncUpdateAction(params));
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::modify(std::string const & key, std::string const & value, int64_t leaseid)
{
etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.key.assign(key);
params.value.assign(value);
params.lease_id = leaseid;
params.kv_stub = kvServiceStub .get();
params.timeout = this->timeout;
std::shared_ptr<etcdv3::AsyncUpdateAction> call(new etcdv3::AsyncUpdateAction(params));
return Response::create(call);
}
/** pplx::task<etcd::Response> etcd::Client::modify_if(std::string const & key, std::string const & value, std::string const & old_value, int ttl)
* Removes a directory node. Fails if the parent directory dos not exists or not a directory. {
* @param key is the directory to be created to be listed etcdv3::ActionParameters params;
* @param recursive if true then delete a whole subtree, otherwise deletes only an empty directory. params.auth_token.assign(this->auth_token);
*/ params.key.assign(key);
pplx::task<Response> rmdir(std::string const & key, bool recursive = false); params.value.assign(value);
params.old_value.assign(old_value);
params.kv_stub = kvServiceStub .get();
params.timeout = this->timeout;
if(ttl > 0)
{
auto res = leasegrant(ttl).get();
if(!res.is_ok())
{
return pplx::task<etcd::Response>([res]()
{
return etcd::Response(res.error_code(), res.error_message().c_str());
});
}
else
{
params.lease_id = res.value().lease();
}
}
std::shared_ptr<etcdv3::AsyncCompareAndSwapAction> call(new etcdv3::AsyncCompareAndSwapAction(params,etcdv3::Atomicity_Type::PREV_VALUE));
return Response::create(call);
}
/** pplx::task<etcd::Response> etcd::Client::modify_if(std::string const & key, std::string const & value, std::string const & old_value, int64_t leaseid)
* Watches for changes of a key or a subtree. Please note that if you watch e.g. "/testdir" and {
* a new key is created, like "/testdir/newkey" then no change happened in the value of etcdv3::ActionParameters params;
* "/testdir" so your watch will not detect this. If you want to detect addition and deletion of params.auth_token.assign(this->auth_token);
* directory entries then you have to do a recursive watch. params.key.assign(key);
* @param key is the value or directory to be watched params.value.assign(value);
* @param recursive if true watch a whole subtree params.old_value.assign(old_value);
*/ params.lease_id = leaseid;
pplx::task<Response> watch(std::string const & key, bool recursive = false); params.kv_stub = kvServiceStub .get();
params.timeout = this->timeout;
std::shared_ptr<etcdv3::AsyncCompareAndSwapAction> call(new etcdv3::AsyncCompareAndSwapAction(params,etcdv3::Atomicity_Type::PREV_VALUE));
return Response::create(call);
}
/** pplx::task<etcd::Response> etcd::Client::modify_if(std::string const & key, std::string const & value, int old_index, int ttl)
* Watches for changes of a key or a subtree from a specific index. The index value can be in the "past". {
* @param key is the value or directory to be watched etcdv3::ActionParameters params;
* @param fromIndex the first index we are interested in params.auth_token.assign(this->auth_token);
* @param recursive if true watch a whole subtree params.key.assign(key);
*/ params.value.assign(value);
pplx::task<Response> watch(std::string const & key, int fromIndex, bool recursive = false); params.old_revision = old_index;
params.kv_stub = kvServiceStub.get();
params.timeout = this->timeout;
if(ttl > 0)
{
auto res = leasegrant(ttl).get();
if(!res.is_ok())
{
return pplx::task<etcd::Response>([res]()
{
return etcd::Response(res.error_code(), res.error_message().c_str());
});
}
else
{
params.lease_id = res.value().lease();
}
}
std::shared_ptr<etcdv3::AsyncCompareAndSwapAction> call(new etcdv3::AsyncCompareAndSwapAction(params,etcdv3::Atomicity_Type::PREV_INDEX));
return Response::create(call);
}
/** pplx::task<etcd::Response> etcd::Client::modify_if(std::string const & key, std::string const & value, int old_index, int64_t leaseid)
* Grants a lease. {
* @param ttl is the time to live of the lease etcdv3::ActionParameters params;
*/ params.auth_token.assign(this->auth_token);
pplx::task<Response> leasegrant(int ttl); params.key.assign(key);
params.value.assign(value);
/** params.lease_id = leaseid;
* Revoke a lease. params.old_revision = old_index;
* @param lease_id is the id the lease params.kv_stub = kvServiceStub .get();
*/ params.timeout = this->timeout;
pplx::task<Response> leaserevoke(int64_t lease_id); std::shared_ptr<etcdv3::AsyncCompareAndSwapAction> call(new etcdv3::AsyncCompareAndSwapAction(params,etcdv3::Atomicity_Type::PREV_INDEX));
return Response::create(call);
/** }
* Get time-to-live of a lease.
* @param lease_id is the id the lease
*/
pplx::task<Response> leasetimetolive(int64_t lease_id);
/**
* Gains a lock at a key, using a default created lease, using the default lease (60 seconds), with
* keeping alive has already been taken care of by the library.
* @param key is the key to be used to request the lock.
*/
pplx::task<Response> lock(std::string const &key);
/**
* Gains a lock at a key, using a user-provided lease, the lifetime of the lease won't be taken care
* of by the library.
* @param key is the key to be used to request the lock.
*/
pplx::task<Response> lock(std::string const &key, int64_t lease_id);
/**
* Releases a lock at a key.
* @param key is the lock key to release.
*/
pplx::task<Response> unlock(std::string const &lock_key);
/**
* Execute a etcd transaction.
* @param txn is the transaction object to be executed.
*/
pplx::task<Response> txn(etcdv3::Transaction const &txn);
private:
std::shared_ptr<grpc::Channel> channel;
std::string auth_token;
std::unique_ptr<KV::Stub> kvServiceStub;
std::unique_ptr<Watch::Stub> watchServiceStub;
std::unique_ptr<Lease::Stub> leaseServiceStub;
std::unique_ptr<Lock::Stub> lockServiceStub;
std::mutex mutex_for_keepalives;
std::map<std::string, int64_t> leases_for_locks;
std::map<int64_t, std::shared_ptr<KeepAlive>> keep_alive_for_locks;
friend class KeepAlive;
friend class Watcher;
};
pplx::task<etcd::Response> etcd::Client::rm(std::string const & key)
{
etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.key.assign(key);
params.withPrefix = false;
params.kv_stub = kvServiceStub.get();
params.timeout = this->timeout;
std::shared_ptr<etcdv3::AsyncDeleteAction> call(new etcdv3::AsyncDeleteAction(params));
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::rm_if(std::string const & key, std::string const & old_value)
{
etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.key.assign(key);
params.old_value.assign(old_value);
params.kv_stub = kvServiceStub.get();
params.timeout = this->timeout;
std::shared_ptr<etcdv3::AsyncCompareAndDeleteAction> call(new etcdv3::AsyncCompareAndDeleteAction(params,etcdv3::Atomicity_Type::PREV_VALUE));
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::rm_if(std::string const & key, int old_index)
{
etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.key.assign(key);
params.old_revision = old_index;
params.kv_stub = kvServiceStub.get();
params.timeout = this->timeout;
std::shared_ptr<etcdv3::AsyncCompareAndDeleteAction> call(new etcdv3::AsyncCompareAndDeleteAction(params, etcdv3::Atomicity_Type::PREV_INDEX));;
return Response::create(call);
} }
#endif pplx::task<etcd::Response> etcd::Client::rmdir(std::string const & key, bool recursive)
{
etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.key.assign(key);
params.withPrefix = recursive;
params.kv_stub = kvServiceStub .get();
params.timeout = this->timeout;
std::shared_ptr<etcdv3::AsyncDeleteAction> call(new etcdv3::AsyncDeleteAction(params));
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::ls(std::string const & key)
{
etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.key.assign(key);
params.withPrefix = true;
params.limit = 0; // default no limit.
params.kv_stub = kvServiceStub .get();
params.timeout = this->timeout;
std::shared_ptr<etcdv3::AsyncGetAction> call(new etcdv3::AsyncGetAction(params));
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::ls(std::string const & key, size_t const limit)
{
etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.key.assign(key);
params.withPrefix = true;
params.limit = limit;
params.kv_stub = kvServiceStub .get();
params.timeout = this->timeout;
std::shared_ptr<etcdv3::AsyncGetAction> call(new etcdv3::AsyncGetAction(params));
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::watch(std::string const & key, bool recursive)
{
etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.key.assign(key);
params.withPrefix = recursive;
params.watch_stub = watchServiceStub.get();
params.timeout = this->timeout;
std::shared_ptr<etcdv3::AsyncWatchAction> call(new etcdv3::AsyncWatchAction(params));
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::watch(std::string const & key, int fromIndex, bool recursive)
{
etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.key.assign(key);
params.withPrefix = recursive;
params.revision = fromIndex;
params.watch_stub = watchServiceStub.get();
params.timeout = this->timeout;
std::shared_ptr<etcdv3::AsyncWatchAction> call(new etcdv3::AsyncWatchAction(params));
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::leasegrant(int ttl)
{
etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.ttl = ttl;
params.lease_stub = leaseServiceStub.get();
params.timeout = this->timeout;
std::shared_ptr<etcdv3::AsyncLeaseGrantAction> call(new etcdv3::AsyncLeaseGrantAction(params));
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::leaserevoke(int64_t lease_id)
{
etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.lease_id = lease_id;
params.lease_stub = leaseServiceStub.get();
params.timeout = this->timeout;
std::shared_ptr<etcdv3::AsyncLeaseRevokeAction> call(new etcdv3::AsyncLeaseRevokeAction(params));
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::leasetimetolive(int64_t lease_id)
{
etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.lease_id = lease_id;
params.lease_stub = leaseServiceStub.get();
params.timeout = this->timeout;
std::shared_ptr<etcdv3::AsyncLeaseTimeToLiveAction> call(new etcdv3::AsyncLeaseTimeToLiveAction(params));
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::lock(std::string const &key) {
etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.timeout = this->timeout;
static const int DEFAULT_LEASE_TTL_FOR_LOCK = 10;
// routines in lock usually will be fast, less than 10 seconds.
//
// (base on our experiences in vineyard and GraphScope).
auto resp = this->leasegrant(DEFAULT_LEASE_TTL_FOR_LOCK).get();
int64_t lease_id = resp.value().lease();
{
std::lock_guard<std::mutex> lexical_scope_lock(mutex_for_keepalives);
this->keep_alive_for_locks[lease_id].reset(
new KeepAlive(*this, DEFAULT_LEASE_TTL_FOR_LOCK, lease_id));
}
params.key = key;
params.lease_id = lease_id;
params.lock_stub = lockServiceStub.get();
std::shared_ptr<etcdv3::AsyncLockAction> call(new etcdv3::AsyncLockAction(params));
return Response::create(call).then(
[this, lease_id](pplx::task<etcd::Response> const &resp_task) -> etcd::Response {
auto const& resp = resp_task.get();
{
std::lock_guard<std::mutex> lexical_scope_lock(mutex_for_keepalives);
if (resp.is_ok()) {
this->leases_for_locks[resp.lock_key()] = lease_id;
} else {
this->keep_alive_for_locks.erase(lease_id);
}
}
return resp;
}
);
}
pplx::task<etcd::Response> etcd::Client::lock(std::string const &key,
int64_t lease_id) {
etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.key = key;
params.lease_id = lease_id;
params.lock_stub = lockServiceStub.get();
params.timeout = this->timeout;
std::shared_ptr<etcdv3::AsyncLockAction> call(new etcdv3::AsyncLockAction(params));
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::unlock(std::string const &lock_key) {
// cancel the KeepAlive first, it exists
{
std::lock_guard<std::mutex> lexical_scope_lock(mutex_for_keepalives);
auto p_leases = this->leases_for_locks.find(lock_key);
if (p_leases != this->leases_for_locks.end()) {
auto p_keeps_alive = this->keep_alive_for_locks.find(p_leases->second);
if (p_keeps_alive != this->keep_alive_for_locks.end()) {
this->keep_alive_for_locks.erase(p_keeps_alive);
}
this->leases_for_locks.erase(p_leases);
}
}
// issue a "unlock" request
etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.key = lock_key;
params.lock_stub = lockServiceStub.get();
params.timeout = this->timeout;
std::shared_ptr<etcdv3::AsyncUnlockAction> call(new etcdv3::AsyncUnlockAction(params));
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::txn(etcdv3::Transaction const &txn) {
etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.kv_stub = kvServiceStub .get();
params.timeout = this->timeout;
std::shared_ptr<etcdv3::AsyncTxnAction> call(new etcdv3::AsyncTxnAction(params, txn));
return Response::create(call);
}

View File

@ -21,9 +21,11 @@ namespace etcd
* *
* @param etcd_url is the url of the etcd server to connect to, like "http://127.0.0.1:2379", * @param etcd_url is the url of the etcd server to connect to, like "http://127.0.0.1:2379",
* or multiple url, seperated by ',' or ';'. * or multiple url, seperated by ',' or ';'.
* @param timeout timeout duration for each operation in terms of milliseconds
* @param load_balancer is the load balance strategy, can be one of round_robin/pick_first/grpclb/xds. * @param load_balancer is the load balance strategy, can be one of round_robin/pick_first/grpclb/xds.
*/ */
SyncClient(std::string const & etcd_url, SyncClient(std::string const & etcd_url,
const unsigned int& timeout = std::numeric_limits<unsigned int>::max(),
std::string const & load_balancer = "round_robin"); std::string const & load_balancer = "round_robin");
/** /**
@ -33,11 +35,13 @@ namespace etcd
* or multiple url, seperated by ',' or ';'. * or multiple url, seperated by ',' or ';'.
* @param username username of etcd auth * @param username username of etcd auth
* @param password password of etcd auth * @param password password of etcd auth
* @param timeout timeout duration for each operation in terms of milliseconds
* @param load_balancer is the load balance strategy, can be one of round_robin/pick_first/grpclb/xds. * @param load_balancer is the load balance strategy, can be one of round_robin/pick_first/grpclb/xds.
*/ */
SyncClient(std::string const & etcd_url, SyncClient(std::string const & etcd_url,
std::string const & username, std::string const & username,
std::string const & password, std::string const & password,
const unsigned int& timeout = std::numeric_limits<unsigned int>::max(),
std::string const & load_balancer = "round_robin"); std::string const & load_balancer = "round_robin");
Response get(std::string const & key); Response get(std::string const & key);

View File

@ -2,6 +2,7 @@
#define __V3_ACTION_HPP__ #define __V3_ACTION_HPP__
#include <chrono> #include <chrono>
#include <limits>
#include <grpc++/grpc++.h> #include <grpc++/grpc++.h>
#include "proto/rpc.grpc.pb.h" #include "proto/rpc.grpc.pb.h"
@ -41,6 +42,7 @@ namespace etcdv3
Watch::Stub* watch_stub; Watch::Stub* watch_stub;
Lease::Stub* lease_stub; Lease::Stub* lease_stub;
Lock::Stub* lock_stub; Lock::Stub* lock_stub;
unsigned int timeout;
}; };
class Action class Action

View File

@ -129,6 +129,7 @@ const bool authenticate(std::shared_ptr<grpc::Channel> const &channel,
} }
etcd::Client::Client(std::string const & address, etcd::Client::Client(std::string const & address,
const unsigned int& timeout,
std::string const & load_balancer) std::string const & load_balancer)
{ {
// create channels // create channels
@ -145,11 +146,13 @@ etcd::Client::Client(std::string const & address,
watchServiceStub= Watch::NewStub(this->channel); watchServiceStub= Watch::NewStub(this->channel);
leaseServiceStub= Lease::NewStub(this->channel); leaseServiceStub= Lease::NewStub(this->channel);
lockServiceStub = Lock::NewStub(this->channel); lockServiceStub = Lock::NewStub(this->channel);
this->timeout = timeout;
} }
etcd::Client::Client(std::string const & address, etcd::Client::Client(std::string const & address,
std::string const & username, std::string const & username,
std::string const & password, std::string const & password,
const unsigned int& timeout,
std::string const & load_balancer) std::string const & load_balancer)
{ {
// create channels // create channels
@ -173,6 +176,7 @@ etcd::Client::Client(std::string const & address,
watchServiceStub= Watch::NewStub(this->channel); watchServiceStub= Watch::NewStub(this->channel);
leaseServiceStub= Lease::NewStub(this->channel); leaseServiceStub= Lease::NewStub(this->channel);
lockServiceStub = Lock::NewStub(this->channel); lockServiceStub = Lock::NewStub(this->channel);
this->timeout = timeout;
} }
pplx::task<etcd::Response> etcd::Client::get(std::string const & key) pplx::task<etcd::Response> etcd::Client::get(std::string const & key)
@ -182,6 +186,7 @@ pplx::task<etcd::Response> etcd::Client::get(std::string const & key)
params.key.assign(key); params.key.assign(key);
params.withPrefix = false; params.withPrefix = false;
params.kv_stub = kvServiceStub .get(); params.kv_stub = kvServiceStub .get();
params.timeout = this->timeout;
std::shared_ptr<etcdv3::AsyncGetAction> call(new etcdv3::AsyncGetAction(params)); std::shared_ptr<etcdv3::AsyncGetAction> call(new etcdv3::AsyncGetAction(params));
return Response::create(call); return Response::create(call);
} }
@ -193,7 +198,7 @@ pplx::task<etcd::Response> etcd::Client::set(std::string const & key, std::strin
params.key.assign(key); params.key.assign(key);
params.value.assign(value); params.value.assign(value);
params.kv_stub = kvServiceStub .get(); params.kv_stub = kvServiceStub .get();
params.timeout = this->timeout;
if(ttl > 0) if(ttl > 0)
{ {
auto res = leasegrant(ttl).get(); auto res = leasegrant(ttl).get();
@ -222,6 +227,7 @@ pplx::task<etcd::Response> etcd::Client::set(std::string const & key, std::strin
params.value.assign(value); params.value.assign(value);
params.lease_id = leaseid; params.lease_id = leaseid;
params.kv_stub = kvServiceStub .get(); params.kv_stub = kvServiceStub .get();
params.timeout = this->timeout;
std::shared_ptr<etcdv3::AsyncSetAction> call(new etcdv3::AsyncSetAction(params)); std::shared_ptr<etcdv3::AsyncSetAction> call(new etcdv3::AsyncSetAction(params));
return Response::create(call); return Response::create(call);
} }
@ -234,7 +240,7 @@ pplx::task<etcd::Response> etcd::Client::add(std::string const & key, std::strin
params.key.assign(key); params.key.assign(key);
params.value.assign(value); params.value.assign(value);
params.kv_stub = kvServiceStub .get(); params.kv_stub = kvServiceStub .get();
params.timeout = this->timeout;
if(ttl > 0) if(ttl > 0)
{ {
auto res = leasegrant(ttl).get(); auto res = leasegrant(ttl).get();
@ -262,6 +268,7 @@ pplx::task<etcd::Response> etcd::Client::add(std::string const & key, std::strin
params.value.assign(value); params.value.assign(value);
params.lease_id = leaseid; params.lease_id = leaseid;
params.kv_stub = kvServiceStub .get(); params.kv_stub = kvServiceStub .get();
params.timeout = this->timeout;
std::shared_ptr<etcdv3::AsyncSetAction> call(new etcdv3::AsyncSetAction(params,true)); std::shared_ptr<etcdv3::AsyncSetAction> call(new etcdv3::AsyncSetAction(params,true));
return Response::create(call); return Response::create(call);
} }
@ -274,7 +281,7 @@ pplx::task<etcd::Response> etcd::Client::modify(std::string const & key, std::st
params.key.assign(key); params.key.assign(key);
params.value.assign(value); params.value.assign(value);
params.kv_stub = kvServiceStub .get(); params.kv_stub = kvServiceStub .get();
params.timeout = this->timeout;
if(ttl > 0) if(ttl > 0)
{ {
auto res = leasegrant(ttl).get(); auto res = leasegrant(ttl).get();
@ -302,6 +309,7 @@ pplx::task<etcd::Response> etcd::Client::modify(std::string const & key, std::st
params.value.assign(value); params.value.assign(value);
params.lease_id = leaseid; params.lease_id = leaseid;
params.kv_stub = kvServiceStub .get(); params.kv_stub = kvServiceStub .get();
params.timeout = this->timeout;
std::shared_ptr<etcdv3::AsyncUpdateAction> call(new etcdv3::AsyncUpdateAction(params)); std::shared_ptr<etcdv3::AsyncUpdateAction> call(new etcdv3::AsyncUpdateAction(params));
return Response::create(call); return Response::create(call);
} }
@ -315,7 +323,7 @@ pplx::task<etcd::Response> etcd::Client::modify_if(std::string const & key, std:
params.value.assign(value); params.value.assign(value);
params.old_value.assign(old_value); params.old_value.assign(old_value);
params.kv_stub = kvServiceStub .get(); params.kv_stub = kvServiceStub .get();
params.timeout = this->timeout;
if(ttl > 0) if(ttl > 0)
{ {
auto res = leasegrant(ttl).get(); auto res = leasegrant(ttl).get();
@ -344,6 +352,7 @@ pplx::task<etcd::Response> etcd::Client::modify_if(std::string const & key, std:
params.old_value.assign(old_value); params.old_value.assign(old_value);
params.lease_id = leaseid; params.lease_id = leaseid;
params.kv_stub = kvServiceStub .get(); params.kv_stub = kvServiceStub .get();
params.timeout = this->timeout;
std::shared_ptr<etcdv3::AsyncCompareAndSwapAction> call(new etcdv3::AsyncCompareAndSwapAction(params,etcdv3::Atomicity_Type::PREV_VALUE)); std::shared_ptr<etcdv3::AsyncCompareAndSwapAction> call(new etcdv3::AsyncCompareAndSwapAction(params,etcdv3::Atomicity_Type::PREV_VALUE));
return Response::create(call); return Response::create(call);
} }
@ -356,6 +365,7 @@ pplx::task<etcd::Response> etcd::Client::modify_if(std::string const & key, std:
params.value.assign(value); params.value.assign(value);
params.old_revision = old_index; params.old_revision = old_index;
params.kv_stub = kvServiceStub.get(); params.kv_stub = kvServiceStub.get();
params.timeout = this->timeout;
if(ttl > 0) if(ttl > 0)
{ {
auto res = leasegrant(ttl).get(); auto res = leasegrant(ttl).get();
@ -384,6 +394,7 @@ pplx::task<etcd::Response> etcd::Client::modify_if(std::string const & key, std:
params.lease_id = leaseid; params.lease_id = leaseid;
params.old_revision = old_index; params.old_revision = old_index;
params.kv_stub = kvServiceStub .get(); params.kv_stub = kvServiceStub .get();
params.timeout = this->timeout;
std::shared_ptr<etcdv3::AsyncCompareAndSwapAction> call(new etcdv3::AsyncCompareAndSwapAction(params,etcdv3::Atomicity_Type::PREV_INDEX)); std::shared_ptr<etcdv3::AsyncCompareAndSwapAction> call(new etcdv3::AsyncCompareAndSwapAction(params,etcdv3::Atomicity_Type::PREV_INDEX));
return Response::create(call); return Response::create(call);
} }
@ -396,6 +407,7 @@ pplx::task<etcd::Response> etcd::Client::rm(std::string const & key)
params.key.assign(key); params.key.assign(key);
params.withPrefix = false; params.withPrefix = false;
params.kv_stub = kvServiceStub.get(); params.kv_stub = kvServiceStub.get();
params.timeout = this->timeout;
std::shared_ptr<etcdv3::AsyncDeleteAction> call(new etcdv3::AsyncDeleteAction(params)); std::shared_ptr<etcdv3::AsyncDeleteAction> call(new etcdv3::AsyncDeleteAction(params));
return Response::create(call); return Response::create(call);
} }
@ -408,6 +420,7 @@ pplx::task<etcd::Response> etcd::Client::rm_if(std::string const & key, std::str
params.key.assign(key); params.key.assign(key);
params.old_value.assign(old_value); params.old_value.assign(old_value);
params.kv_stub = kvServiceStub.get(); params.kv_stub = kvServiceStub.get();
params.timeout = this->timeout;
std::shared_ptr<etcdv3::AsyncCompareAndDeleteAction> call(new etcdv3::AsyncCompareAndDeleteAction(params,etcdv3::Atomicity_Type::PREV_VALUE)); std::shared_ptr<etcdv3::AsyncCompareAndDeleteAction> call(new etcdv3::AsyncCompareAndDeleteAction(params,etcdv3::Atomicity_Type::PREV_VALUE));
return Response::create(call); return Response::create(call);
} }
@ -419,6 +432,7 @@ pplx::task<etcd::Response> etcd::Client::rm_if(std::string const & key, int old_
params.key.assign(key); params.key.assign(key);
params.old_revision = old_index; params.old_revision = old_index;
params.kv_stub = kvServiceStub.get(); params.kv_stub = kvServiceStub.get();
params.timeout = this->timeout;
std::shared_ptr<etcdv3::AsyncCompareAndDeleteAction> call(new etcdv3::AsyncCompareAndDeleteAction(params, etcdv3::Atomicity_Type::PREV_INDEX));; std::shared_ptr<etcdv3::AsyncCompareAndDeleteAction> call(new etcdv3::AsyncCompareAndDeleteAction(params, etcdv3::Atomicity_Type::PREV_INDEX));;
return Response::create(call); return Response::create(call);
@ -431,6 +445,7 @@ pplx::task<etcd::Response> etcd::Client::rmdir(std::string const & key, bool rec
params.key.assign(key); params.key.assign(key);
params.withPrefix = recursive; params.withPrefix = recursive;
params.kv_stub = kvServiceStub .get(); params.kv_stub = kvServiceStub .get();
params.timeout = this->timeout;
std::shared_ptr<etcdv3::AsyncDeleteAction> call(new etcdv3::AsyncDeleteAction(params)); std::shared_ptr<etcdv3::AsyncDeleteAction> call(new etcdv3::AsyncDeleteAction(params));
return Response::create(call); return Response::create(call);
} }
@ -443,6 +458,7 @@ pplx::task<etcd::Response> etcd::Client::ls(std::string const & key)
params.withPrefix = true; params.withPrefix = true;
params.limit = 0; // default no limit. params.limit = 0; // default no limit.
params.kv_stub = kvServiceStub .get(); params.kv_stub = kvServiceStub .get();
params.timeout = this->timeout;
std::shared_ptr<etcdv3::AsyncGetAction> call(new etcdv3::AsyncGetAction(params)); std::shared_ptr<etcdv3::AsyncGetAction> call(new etcdv3::AsyncGetAction(params));
return Response::create(call); return Response::create(call);
} }
@ -455,6 +471,7 @@ pplx::task<etcd::Response> etcd::Client::ls(std::string const & key, size_t cons
params.withPrefix = true; params.withPrefix = true;
params.limit = limit; params.limit = limit;
params.kv_stub = kvServiceStub .get(); params.kv_stub = kvServiceStub .get();
params.timeout = this->timeout;
std::shared_ptr<etcdv3::AsyncGetAction> call(new etcdv3::AsyncGetAction(params)); std::shared_ptr<etcdv3::AsyncGetAction> call(new etcdv3::AsyncGetAction(params));
return Response::create(call); return Response::create(call);
} }
@ -466,6 +483,7 @@ pplx::task<etcd::Response> etcd::Client::watch(std::string const & key, bool rec
params.key.assign(key); params.key.assign(key);
params.withPrefix = recursive; params.withPrefix = recursive;
params.watch_stub = watchServiceStub.get(); params.watch_stub = watchServiceStub.get();
params.timeout = this->timeout;
std::shared_ptr<etcdv3::AsyncWatchAction> call(new etcdv3::AsyncWatchAction(params)); std::shared_ptr<etcdv3::AsyncWatchAction> call(new etcdv3::AsyncWatchAction(params));
return Response::create(call); return Response::create(call);
} }
@ -478,6 +496,7 @@ pplx::task<etcd::Response> etcd::Client::watch(std::string const & key, int from
params.withPrefix = recursive; params.withPrefix = recursive;
params.revision = fromIndex; params.revision = fromIndex;
params.watch_stub = watchServiceStub.get(); params.watch_stub = watchServiceStub.get();
params.timeout = this->timeout;
std::shared_ptr<etcdv3::AsyncWatchAction> call(new etcdv3::AsyncWatchAction(params)); std::shared_ptr<etcdv3::AsyncWatchAction> call(new etcdv3::AsyncWatchAction(params));
return Response::create(call); return Response::create(call);
} }
@ -488,6 +507,7 @@ pplx::task<etcd::Response> etcd::Client::leasegrant(int ttl)
params.auth_token.assign(this->auth_token); params.auth_token.assign(this->auth_token);
params.ttl = ttl; params.ttl = ttl;
params.lease_stub = leaseServiceStub.get(); params.lease_stub = leaseServiceStub.get();
params.timeout = this->timeout;
std::shared_ptr<etcdv3::AsyncLeaseGrantAction> call(new etcdv3::AsyncLeaseGrantAction(params)); std::shared_ptr<etcdv3::AsyncLeaseGrantAction> call(new etcdv3::AsyncLeaseGrantAction(params));
return Response::create(call); return Response::create(call);
} }
@ -498,6 +518,7 @@ pplx::task<etcd::Response> etcd::Client::leaserevoke(int64_t lease_id)
params.auth_token.assign(this->auth_token); params.auth_token.assign(this->auth_token);
params.lease_id = lease_id; params.lease_id = lease_id;
params.lease_stub = leaseServiceStub.get(); params.lease_stub = leaseServiceStub.get();
params.timeout = this->timeout;
std::shared_ptr<etcdv3::AsyncLeaseRevokeAction> call(new etcdv3::AsyncLeaseRevokeAction(params)); std::shared_ptr<etcdv3::AsyncLeaseRevokeAction> call(new etcdv3::AsyncLeaseRevokeAction(params));
return Response::create(call); return Response::create(call);
} }
@ -508,6 +529,7 @@ pplx::task<etcd::Response> etcd::Client::leasetimetolive(int64_t lease_id)
params.auth_token.assign(this->auth_token); params.auth_token.assign(this->auth_token);
params.lease_id = lease_id; params.lease_id = lease_id;
params.lease_stub = leaseServiceStub.get(); params.lease_stub = leaseServiceStub.get();
params.timeout = this->timeout;
std::shared_ptr<etcdv3::AsyncLeaseTimeToLiveAction> call(new etcdv3::AsyncLeaseTimeToLiveAction(params)); std::shared_ptr<etcdv3::AsyncLeaseTimeToLiveAction> call(new etcdv3::AsyncLeaseTimeToLiveAction(params));
return Response::create(call); return Response::create(call);
} }
@ -515,7 +537,7 @@ pplx::task<etcd::Response> etcd::Client::leasetimetolive(int64_t lease_id)
pplx::task<etcd::Response> etcd::Client::lock(std::string const &key) { pplx::task<etcd::Response> etcd::Client::lock(std::string const &key) {
etcdv3::ActionParameters params; etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token); params.auth_token.assign(this->auth_token);
params.timeout = this->timeout;
static const int DEFAULT_LEASE_TTL_FOR_LOCK = 10; static const int DEFAULT_LEASE_TTL_FOR_LOCK = 10;
// routines in lock usually will be fast, less than 10 seconds. // routines in lock usually will be fast, less than 10 seconds.
@ -555,6 +577,7 @@ pplx::task<etcd::Response> etcd::Client::lock(std::string const &key,
params.key = key; params.key = key;
params.lease_id = lease_id; params.lease_id = lease_id;
params.lock_stub = lockServiceStub.get(); params.lock_stub = lockServiceStub.get();
params.timeout = this->timeout;
std::shared_ptr<etcdv3::AsyncLockAction> call(new etcdv3::AsyncLockAction(params)); std::shared_ptr<etcdv3::AsyncLockAction> call(new etcdv3::AsyncLockAction(params));
return Response::create(call); return Response::create(call);
} }
@ -578,6 +601,7 @@ pplx::task<etcd::Response> etcd::Client::unlock(std::string const &lock_key) {
params.auth_token.assign(this->auth_token); params.auth_token.assign(this->auth_token);
params.key = lock_key; params.key = lock_key;
params.lock_stub = lockServiceStub.get(); params.lock_stub = lockServiceStub.get();
params.timeout = this->timeout;
std::shared_ptr<etcdv3::AsyncUnlockAction> call(new etcdv3::AsyncUnlockAction(params)); std::shared_ptr<etcdv3::AsyncUnlockAction> call(new etcdv3::AsyncUnlockAction(params));
return Response::create(call); return Response::create(call);
} }
@ -586,6 +610,7 @@ pplx::task<etcd::Response> etcd::Client::txn(etcdv3::Transaction const &txn) {
etcdv3::ActionParameters params; etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token); params.auth_token.assign(this->auth_token);
params.kv_stub = kvServiceStub .get(); params.kv_stub = kvServiceStub .get();
params.timeout = this->timeout;
std::shared_ptr<etcdv3::AsyncTxnAction> call(new etcdv3::AsyncTxnAction(params, txn)); std::shared_ptr<etcdv3::AsyncTxnAction> call(new etcdv3::AsyncTxnAction(params, txn));
return Response::create(call); return Response::create(call);
} }

View File

@ -10,16 +10,17 @@
return etcd::Response(500, ex.what()); \ return etcd::Response(500, ex.what()); \
} }
etcd::SyncClient::SyncClient(std::string const & address, std::string const & load_balancer) etcd::SyncClient::SyncClient(std::string const & address, const unsigned int& timeout, std::string const & load_balancer)
: client(address, load_balancer) : client(address, timeout, load_balancer)
{ {
} }
etcd::SyncClient::SyncClient(std::string const & address, etcd::SyncClient::SyncClient(std::string const & address,
std::string const & username, std::string const & username,
std::string const & password, std::string const & password,
const unsigned int& timeout,
std::string const & load_balancer) std::string const & load_balancer)
: client(address, username, password, load_balancer) : client(address, username, password, timeout, load_balancer)
{ {
} }

View File

@ -23,6 +23,7 @@ etcdv3::ActionParameters::ActionParameters()
kv_stub = NULL; kv_stub = NULL;
watch_stub = NULL; watch_stub = NULL;
lease_stub = NULL; lease_stub = NULL;
timeout = std::numeric_limits<unsigned int>::max();
} }
void etcdv3::Action::waitForResponse() void etcdv3::Action::waitForResponse()
@ -30,8 +31,17 @@ void etcdv3::Action::waitForResponse()
void* got_tag; void* got_tag;
bool ok = false; bool ok = false;
cq_.Next(&got_tag, &ok); auto status = cq_.AsyncNext(&got_tag, &ok, std::chrono::system_clock::now() + std::chrono::milliseconds(parameters.timeout));
GPR_ASSERT(got_tag == (void*)this); if( status == grpc::CompletionQueue::NextStatus::TIMEOUT )
{
ok = false;
this->status = grpc::Status(grpc::StatusCode::DEADLINE_EXCEEDED, "Timeout");
}
else
{
GRP_ASSERT(got_tag == (void*)this);
}
} }
const std::chrono::high_resolution_clock::time_point etcdv3::Action::startTimepoint() { const std::chrono::high_resolution_clock::time_point etcdv3::Action::startTimepoint() {