diff --git a/etcd/Client.hpp b/etcd/Client.hpp index 11e7ceb..c1dddca 100644 --- a/etcd/Client.hpp +++ b/etcd/Client.hpp @@ -1,278 +1,616 @@ -#ifndef __ETCD_CLIENT_HPP__ -#define __ETCD_CLIENT_HPP__ - -#include "etcd/Response.hpp" - -#include -#include -#include - -#include -#include "proto/rpc.grpc.pb.h" -#include "proto/v3lock.grpc.pb.h" - -using etcdserverpb::Auth; -using etcdserverpb::KV; -using etcdserverpb::Watch; -using etcdserverpb::Lease; -using v3lockpb::Lock; - -namespace etcdv3 { - class Transaction; -} - -namespace etcd -{ - class KeepAlive; - class Watcher; - - /** - * Client is responsible for maintaining a connection towards an etcd server. - * Etcd operations can be reached via the methods of the client. - */ - class Client - { - public: - /** - * Constructs an etcd client object. - * - * @param etcd_url is the url of the etcd server to connect to, like "http://127.0.0.1:2379", - * or multiple url, seperated by ',' or ';'. - * @param load_balancer is the load balance strategy, can be one of round_robin/pick_first/grpclb/xds. - */ - Client(std::string const & etcd_url, - std::string const & load_balancer = "round_robin"); - - /** - * Constructs an etcd client object. - * - * @param etcd_url is the url of the etcd server to connect to, like "http://127.0.0.1:2379", - * or multiple url, seperated by ',' or ';'. - * @param username username of etcd auth - * @param password password of etcd auth - * @param load_balancer is the load balance strategy, can be one of round_robin/pick_first/grpclb/xds. - */ - Client(std::string const & etcd_url, - std::string const & username, - std::string const & password, - std::string const & load_balancer = "round_robin"); - - /** - * Sends a get request to the etcd server - * @param key is the key to be read - */ - pplx::task get(std::string const & key); - - /** - * Sets the value of a key. The key will be modified if already exists or created - * if it does not exists. - * @param key is the key to be created or modified - * @param value is the new value to be set - */ - pplx::task set(std::string const & key, std::string const & value, int ttl = 0); - - /** - * Sets the value of a key. The key will be modified if already exists or created - * if it does not exists. - * @param key is the key to be created or modified - * @param value is the new value to be set - * @param leaseId is the lease attached to the key - */ - pplx::task set(std::string const & key, std::string const & value, int64_t leaseId); - - - /** - * Creates a new key and sets it's value. Fails if the key already exists. - * @param key is the key to be created - * @param value is the value to be set - */ - pplx::task add(std::string const & key, std::string const & value, int ttl = 0); - - /** - * Creates a new key and sets it's value. Fails if the key already exists. - * @param key is the key to be created - * @param value is the value to be set - * @param leaseId is the lease attached to the key - */ - pplx::task add(std::string const & key, std::string const & value, int64_t leaseId); - - /** - * Modifies an existing key. Fails if the key does not exists. - * @param key is the key to be modified - * @param value is the new value to be set - */ - pplx::task modify(std::string const & key, std::string const & value, int ttl = 0); - - /** - * Modifies an existing key. Fails if the key does not exists. - * @param key is the key to be modified - * @param value is the new value to be set - * @param leaseId is the lease attached to the key - */ - pplx::task modify(std::string const & key, std::string const & value, int64_t leaseId); - - /** - * Modifies an existing key only if it has a specific value. Fails if the key does not exists - * or the original value differs from the expected one. - * @param key is the key to be modified - * @param value is the new value to be set - * @param old_value is the value to be replaced - */ - pplx::task modify_if(std::string const & key, std::string const & value, std::string const & old_value, int ttl = 0); - - /** - * Modifies an existing key only if it has a specific value. Fails if the key does not exists - * or the original value differs from the expected one. - * @param key is the key to be modified - * @param value is the new value to be set - * @param old_value is the value to be replaced - * @param leaseId is the lease attached to the key - */ - pplx::task modify_if(std::string const & key, std::string const & value, std::string const & old_value, int64_t leaseId); - - /** - * Modifies an existing key only if it has a specific modification index value. Fails if the key - * does not exists or the modification index of the previous value differs from the expected one. - * @param key is the key to be modified - * @param value is the new value to be set - * @param old_index is the expected index of the original value - */ - pplx::task modify_if(std::string const & key, std::string const & value, int old_index, int ttl = 0); - - /** - * Modifies an existing key only if it has a specific modification index value. Fails if the key - * does not exists or the modification index of the previous value differs from the expected one. - * @param key is the key to be modified - * @param value is the new value to be set - * @param old_index is the expected index of the original value - * @param leaseId is the lease attached to the key - */ - pplx::task modify_if(std::string const & key, std::string const & value, int old_index, int64_t leaseId); - - /** - * Removes a single key. The key has to point to a plain, non directory entry. - * @param key is the key to be deleted - */ - pplx::task rm(std::string const & key); - - /** - * Removes a single key but only if it has a specific value. Fails if the key does not exists - * or the its value differs from the expected one. - * @param key is the key to be deleted - */ - pplx::task rm_if(std::string const & key, std::string const & old_value); - - /** - * Removes an existing key only if it has a specific modification index value. Fails if the key - * does not exists or the modification index of it differs from the expected one. - * @param key is the key to be deleted - * @param old_index is the expected index of the existing value - */ - pplx::task rm_if(std::string const & key, int old_index); - - /** - * Gets a directory listing of the directory identified by the key. - * @param key is the key to be listed - */ - pplx::task ls(std::string const & key); - - - /** - * Gets a directory listing of the directory identified by the key. - * @param key is the key to be listed - * @param limit is the size limit of results to be listed, we don't use default parameters - * to ensure backwards binary compatibility. - */ - pplx::task ls(std::string const & key, size_t const limit); - - - /** - * Removes a directory node. Fails if the parent directory dos not exists or not a directory. - * @param key is the directory to be created to be listed - * @param recursive if true then delete a whole subtree, otherwise deletes only an empty directory. - */ - pplx::task rmdir(std::string const & key, bool recursive = false); - - /** - * Watches for changes of a key or a subtree. Please note that if you watch e.g. "/testdir" and - * a new key is created, like "/testdir/newkey" then no change happened in the value of - * "/testdir" so your watch will not detect this. If you want to detect addition and deletion of - * directory entries then you have to do a recursive watch. - * @param key is the value or directory to be watched - * @param recursive if true watch a whole subtree - */ - pplx::task watch(std::string const & key, bool recursive = false); - - /** - * Watches for changes of a key or a subtree from a specific index. The index value can be in the "past". - * @param key is the value or directory to be watched - * @param fromIndex the first index we are interested in - * @param recursive if true watch a whole subtree - */ - pplx::task watch(std::string const & key, int fromIndex, bool recursive = false); - - /** - * Grants a lease. - * @param ttl is the time to live of the lease - */ - pplx::task leasegrant(int ttl); - - /** - * Revoke a lease. - * @param lease_id is the id the lease - */ - pplx::task leaserevoke(int64_t lease_id); - - /** - * Get time-to-live of a lease. - * @param lease_id is the id the lease - */ - pplx::task leasetimetolive(int64_t lease_id); - - /** - * Gains a lock at a key, using a default created lease, using the default lease (60 seconds), with - * keeping alive has already been taken care of by the library. - * @param key is the key to be used to request the lock. - */ - pplx::task lock(std::string const &key); - - /** - * Gains a lock at a key, using a user-provided lease, the lifetime of the lease won't be taken care - * of by the library. - * @param key is the key to be used to request the lock. - */ - pplx::task lock(std::string const &key, int64_t lease_id); - - /** - * Releases a lock at a key. - * @param key is the lock key to release. - */ - pplx::task unlock(std::string const &lock_key); - - /** - * Execute a etcd transaction. - * @param txn is the transaction object to be executed. - */ - pplx::task txn(etcdv3::Transaction const &txn); - - private: - std::shared_ptr channel; - std::string auth_token; - std::unique_ptr kvServiceStub; - std::unique_ptr watchServiceStub; - std::unique_ptr leaseServiceStub; - std::unique_ptr lockServiceStub; - - std::mutex mutex_for_keepalives; - std::map leases_for_locks; - std::map> keep_alive_for_locks; - - friend class KeepAlive; - friend class Watcher; -}; - - - -} +#include +#include +#include +#if defined(_WIN32) +#include +#include +#else +#include +#include #endif + +#include +#include +#include "etcd/Client.hpp" +#include "etcd/KeepAlive.hpp" +#include "etcd/v3/action_constants.hpp" +#include "etcd/v3/Action.hpp" +#include "etcd/v3/AsyncTxnResponse.hpp" +#include "etcd/v3/AsyncRangeResponse.hpp" +#include "etcd/v3/AsyncWatchResponse.hpp" +#include "etcd/v3/AsyncDeleteRangeResponse.hpp" +#include "etcd/v3/AsyncLockResponse.hpp" +#include "etcd/v3/Transaction.hpp" +#include + +#include "etcd/v3/AsyncSetAction.hpp" +#include "etcd/v3/AsyncCompareAndSwapAction.hpp" +#include "etcd/v3/AsyncCompareAndDeleteAction.hpp" +#include "etcd/v3/AsyncUpdateAction.hpp" +#include "etcd/v3/AsyncGetAction.hpp" +#include "etcd/v3/AsyncDeleteAction.hpp" +#include "etcd/v3/AsyncWatchAction.hpp" +#include "etcd/v3/AsyncLeaseAction.hpp" +#include "etcd/v3/AsyncLockAction.hpp" +#include "etcd/v3/AsyncTxnAction.hpp" + +#include + +using grpc::Channel; + +namespace etcd { +namespace detail { + +static bool dns_resolve(std::string const &target, std::vector &endpoints) { + struct addrinfo hints = {}, *addrs; + hints.ai_family = AF_INET; + hints.ai_socktype = SOCK_STREAM; + hints.ai_protocol = IPPROTO_TCP; + + std::vector target_parts; + boost::split(target_parts, target, boost::is_any_of(":")); + if (target_parts.size() != 2) { + std::cerr << "warn: invalid URL: " << target << std::endl; + return false; + } + +#if defined(_WIN32) + { + // Use the MAKEWORD(lowbyte, highbyte) macro declared in Windef.h. + WORD wVersionRequested = MAKEWORD(2, 2); + WSADATA wsaData; + + int err = WSAStartup(wVersionRequested, &wsaData); + if (err != 0) { + // Tell the user that we could not find a usable Winsock DLL. */ + std::cerr << "WSAStartup failed with error: %d" << err << std::endl; + return false; + } + } +#endif + + int r = getaddrinfo(target_parts[0].c_str(), target_parts[1].c_str(), &hints, &addrs); + if (r != 0) { + std::cerr << "warn: getaddrinfo() failed for endpoint " << target + << " with error: " << r << std::endl; + return false; + } + + char host[16] = {'\0'}; + for (struct addrinfo* addr = addrs; addr != nullptr; addr = addr->ai_next) { + memset(host, '\0', sizeof(host)); + getnameinfo(addr->ai_addr, addr->ai_addrlen, host, sizeof(host), NULL, 0, NI_NUMERICHOST); + endpoints.emplace_back(std::string(host) + ":" + target_parts[1]); + } + freeaddrinfo(addrs); + return true; +} + +const std::string strip_and_resolve_addresses(std::string const &address) { + std::vector addresses; + boost::algorithm::split(addresses, address, boost::algorithm::is_any_of(",;")); + std::string stripped_address; + { + std::vector stripped_addresses; + std::string substr("://"); + for (auto const &addr: addresses) { + std::string::size_type idx = addr.find(substr); + std::string target = idx == std::string::npos ? addr : addr.substr(idx + substr.length()); + etcd::detail::dns_resolve(target, stripped_addresses); + } + stripped_address = boost::algorithm::join(stripped_addresses, ","); + } + return "ipv4:///" + stripped_address; +} + +const bool authenticate(std::shared_ptr const &channel, + std::string const &username, + std::string const &password, + std::string &token_or_message) { + // run a round of auth + auto auth_stub = Auth::NewStub(channel); + ClientContext context; + etcdserverpb::AuthenticateRequest auth_request; + etcdserverpb::AuthenticateResponse auth_response; + auth_request.set_name(username); + auth_request.set_password(password); + auto status = auth_stub->Authenticate(&context, auth_request, &auth_response); + if (status.ok()) { + token_or_message = auth_response.token(); + return true; + } else { + token_or_message = status.error_message(); + return false; + } +} + +} +} + +etcd::Client::Client(std::string const & address, + const unsigned int& timeout, + std::string const & load_balancer) +{ + // create channels + std::string const addresses = etcd::detail::strip_and_resolve_addresses(address); + grpc::ChannelArguments grpc_args; + grpc_args.SetMaxSendMessageSize(std::numeric_limits::max()); + grpc_args.SetMaxReceiveMessageSize(std::numeric_limits::max()); + std::shared_ptr creds = grpc::InsecureChannelCredentials(); + grpc_args.SetLoadBalancingPolicyName(load_balancer); + this->channel = grpc::CreateCustomChannel(addresses, creds, grpc_args); + + // create stubs + kvServiceStub = KV::NewStub(this->channel); + watchServiceStub= Watch::NewStub(this->channel); + leaseServiceStub= Lease::NewStub(this->channel); + lockServiceStub = Lock::NewStub(this->channel); + this->timeout = timeout; +} + +etcd::Client::Client(std::string const & address, + std::string const & username, + std::string const & password, + const unsigned int& timeout, + std::string const & load_balancer) +{ + // create channels + std::string const addresses = etcd::detail::strip_and_resolve_addresses(address); + grpc::ChannelArguments grpc_args; + grpc_args.SetMaxSendMessageSize(std::numeric_limits::max()); + grpc_args.SetMaxReceiveMessageSize(std::numeric_limits::max()); + std::shared_ptr creds = grpc::InsecureChannelCredentials(); + grpc_args.SetLoadBalancingPolicyName(load_balancer); + this->channel = grpc::CreateCustomChannel(addresses, creds, grpc_args); + + // auth + std::string token_or_message; + if (!etcd::detail::authenticate(this->channel, username, password, token_or_message)) { + throw std::invalid_argument("Etcd authentication failed: " + token_or_message); + } + this->auth_token = token_or_message; + + // setup stubs + kvServiceStub = KV::NewStub(this->channel); + watchServiceStub= Watch::NewStub(this->channel); + leaseServiceStub= Lease::NewStub(this->channel); + lockServiceStub = Lock::NewStub(this->channel); + this->timeout = timeout; +} + +pplx::task etcd::Client::get(std::string const & key) +{ + etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); + params.key.assign(key); + params.withPrefix = false; + params.kv_stub = kvServiceStub .get(); + params.timeout = this->timeout; + std::shared_ptr call(new etcdv3::AsyncGetAction(params)); + return Response::create(call); +} + +pplx::task etcd::Client::set(std::string const & key, std::string const & value, int ttl) +{ + etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); + params.key.assign(key); + params.value.assign(value); + params.kv_stub = kvServiceStub .get(); + params.timeout = this->timeout; + if(ttl > 0) + { + auto res = leasegrant(ttl).get(); + if(!res.is_ok()) + { + return pplx::task([res]() + { + return etcd::Response(res.error_code(), res.error_message().c_str()); + }); + } + else + { + params.lease_id = res.value().lease(); + } + } + + std::shared_ptr call(new etcdv3::AsyncSetAction(params)); + return Response::create(call); +} + +pplx::task etcd::Client::set(std::string const & key, std::string const & value, int64_t leaseid) +{ + etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); + params.key.assign(key); + params.value.assign(value); + params.lease_id = leaseid; + params.kv_stub = kvServiceStub .get(); + params.timeout = this->timeout; + std::shared_ptr call(new etcdv3::AsyncSetAction(params)); + return Response::create(call); +} + + +pplx::task etcd::Client::add(std::string const & key, std::string const & value, int ttl) +{ + etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); + params.key.assign(key); + params.value.assign(value); + params.kv_stub = kvServiceStub .get(); + params.timeout = this->timeout; + if(ttl > 0) + { + auto res = leasegrant(ttl).get(); + if(!res.is_ok()) + { + return pplx::task([res]() + { + return etcd::Response(res.error_code(), res.error_message().c_str()); + }); + } + else + { + params.lease_id = res.value().lease(); + } + } + std::shared_ptr call(new etcdv3::AsyncSetAction(params,true)); + return Response::create(call); +} + +pplx::task etcd::Client::add(std::string const & key, std::string const & value, int64_t leaseid) +{ + etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); + params.key.assign(key); + params.value.assign(value); + params.lease_id = leaseid; + params.kv_stub = kvServiceStub .get(); + params.timeout = this->timeout; + std::shared_ptr call(new etcdv3::AsyncSetAction(params,true)); + return Response::create(call); +} + + +pplx::task etcd::Client::modify(std::string const & key, std::string const & value, int ttl) +{ + etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); + params.key.assign(key); + params.value.assign(value); + params.kv_stub = kvServiceStub .get(); + params.timeout = this->timeout; + if(ttl > 0) + { + auto res = leasegrant(ttl).get(); + if(!res.is_ok()) + { + return pplx::task([res]() + { + return etcd::Response(res.error_code(), res.error_message().c_str()); + }); + } + else + { + params.lease_id = res.value().lease(); + } + } + std::shared_ptr call(new etcdv3::AsyncUpdateAction(params)); + return Response::create(call); +} + +pplx::task etcd::Client::modify(std::string const & key, std::string const & value, int64_t leaseid) +{ + etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); + params.key.assign(key); + params.value.assign(value); + params.lease_id = leaseid; + params.kv_stub = kvServiceStub .get(); + params.timeout = this->timeout; + std::shared_ptr call(new etcdv3::AsyncUpdateAction(params)); + return Response::create(call); +} + + +pplx::task etcd::Client::modify_if(std::string const & key, std::string const & value, std::string const & old_value, int ttl) +{ + etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); + params.key.assign(key); + params.value.assign(value); + params.old_value.assign(old_value); + params.kv_stub = kvServiceStub .get(); + params.timeout = this->timeout; + if(ttl > 0) + { + auto res = leasegrant(ttl).get(); + if(!res.is_ok()) + { + return pplx::task([res]() + { + return etcd::Response(res.error_code(), res.error_message().c_str()); + }); + } + else + { + params.lease_id = res.value().lease(); + } + } + std::shared_ptr call(new etcdv3::AsyncCompareAndSwapAction(params,etcdv3::Atomicity_Type::PREV_VALUE)); + return Response::create(call); +} + +pplx::task etcd::Client::modify_if(std::string const & key, std::string const & value, std::string const & old_value, int64_t leaseid) +{ + etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); + params.key.assign(key); + params.value.assign(value); + params.old_value.assign(old_value); + params.lease_id = leaseid; + params.kv_stub = kvServiceStub .get(); + params.timeout = this->timeout; + std::shared_ptr call(new etcdv3::AsyncCompareAndSwapAction(params,etcdv3::Atomicity_Type::PREV_VALUE)); + return Response::create(call); +} + +pplx::task etcd::Client::modify_if(std::string const & key, std::string const & value, int old_index, int ttl) +{ + etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); + params.key.assign(key); + params.value.assign(value); + params.old_revision = old_index; + params.kv_stub = kvServiceStub.get(); + params.timeout = this->timeout; + if(ttl > 0) + { + auto res = leasegrant(ttl).get(); + if(!res.is_ok()) + { + return pplx::task([res]() + { + return etcd::Response(res.error_code(), res.error_message().c_str()); + }); + } + else + { + params.lease_id = res.value().lease(); + } + } + std::shared_ptr call(new etcdv3::AsyncCompareAndSwapAction(params,etcdv3::Atomicity_Type::PREV_INDEX)); + return Response::create(call); +} + +pplx::task etcd::Client::modify_if(std::string const & key, std::string const & value, int old_index, int64_t leaseid) +{ + etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); + params.key.assign(key); + params.value.assign(value); + params.lease_id = leaseid; + params.old_revision = old_index; + params.kv_stub = kvServiceStub .get(); + params.timeout = this->timeout; + std::shared_ptr call(new etcdv3::AsyncCompareAndSwapAction(params,etcdv3::Atomicity_Type::PREV_INDEX)); + return Response::create(call); +} + + +pplx::task etcd::Client::rm(std::string const & key) +{ + etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); + params.key.assign(key); + params.withPrefix = false; + params.kv_stub = kvServiceStub.get(); + params.timeout = this->timeout; + std::shared_ptr call(new etcdv3::AsyncDeleteAction(params)); + return Response::create(call); +} + + +pplx::task etcd::Client::rm_if(std::string const & key, std::string const & old_value) +{ + etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); + params.key.assign(key); + params.old_value.assign(old_value); + params.kv_stub = kvServiceStub.get(); + params.timeout = this->timeout; + std::shared_ptr call(new etcdv3::AsyncCompareAndDeleteAction(params,etcdv3::Atomicity_Type::PREV_VALUE)); + return Response::create(call); +} + +pplx::task etcd::Client::rm_if(std::string const & key, int old_index) +{ + etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); + params.key.assign(key); + params.old_revision = old_index; + params.kv_stub = kvServiceStub.get(); + params.timeout = this->timeout; + std::shared_ptr call(new etcdv3::AsyncCompareAndDeleteAction(params, etcdv3::Atomicity_Type::PREV_INDEX));; + return Response::create(call); + +} + +pplx::task etcd::Client::rmdir(std::string const & key, bool recursive) +{ + etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); + params.key.assign(key); + params.withPrefix = recursive; + params.kv_stub = kvServiceStub .get(); + params.timeout = this->timeout; + std::shared_ptr call(new etcdv3::AsyncDeleteAction(params)); + return Response::create(call); +} + +pplx::task etcd::Client::ls(std::string const & key) +{ + etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); + params.key.assign(key); + params.withPrefix = true; + params.limit = 0; // default no limit. + params.kv_stub = kvServiceStub .get(); + params.timeout = this->timeout; + std::shared_ptr call(new etcdv3::AsyncGetAction(params)); + return Response::create(call); +} + +pplx::task etcd::Client::ls(std::string const & key, size_t const limit) +{ + etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); + params.key.assign(key); + params.withPrefix = true; + params.limit = limit; + params.kv_stub = kvServiceStub .get(); + params.timeout = this->timeout; + std::shared_ptr call(new etcdv3::AsyncGetAction(params)); + return Response::create(call); +} + +pplx::task etcd::Client::watch(std::string const & key, bool recursive) +{ + etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); + params.key.assign(key); + params.withPrefix = recursive; + params.watch_stub = watchServiceStub.get(); + params.timeout = this->timeout; + std::shared_ptr call(new etcdv3::AsyncWatchAction(params)); + return Response::create(call); +} + +pplx::task etcd::Client::watch(std::string const & key, int fromIndex, bool recursive) +{ + etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); + params.key.assign(key); + params.withPrefix = recursive; + params.revision = fromIndex; + params.watch_stub = watchServiceStub.get(); + params.timeout = this->timeout; + std::shared_ptr call(new etcdv3::AsyncWatchAction(params)); + return Response::create(call); +} + +pplx::task etcd::Client::leasegrant(int ttl) +{ + etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); + params.ttl = ttl; + params.lease_stub = leaseServiceStub.get(); + params.timeout = this->timeout; + std::shared_ptr call(new etcdv3::AsyncLeaseGrantAction(params)); + return Response::create(call); +} + +pplx::task etcd::Client::leaserevoke(int64_t lease_id) +{ + etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); + params.lease_id = lease_id; + params.lease_stub = leaseServiceStub.get(); + params.timeout = this->timeout; + std::shared_ptr call(new etcdv3::AsyncLeaseRevokeAction(params)); + return Response::create(call); +} + +pplx::task etcd::Client::leasetimetolive(int64_t lease_id) +{ + etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); + params.lease_id = lease_id; + params.lease_stub = leaseServiceStub.get(); + params.timeout = this->timeout; + std::shared_ptr call(new etcdv3::AsyncLeaseTimeToLiveAction(params)); + return Response::create(call); +} + +pplx::task etcd::Client::lock(std::string const &key) { + etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); + params.timeout = this->timeout; + static const int DEFAULT_LEASE_TTL_FOR_LOCK = 10; + + // routines in lock usually will be fast, less than 10 seconds. + // + // (base on our experiences in vineyard and GraphScope). + auto resp = this->leasegrant(DEFAULT_LEASE_TTL_FOR_LOCK).get(); + int64_t lease_id = resp.value().lease(); + { + std::lock_guard lexical_scope_lock(mutex_for_keepalives); + this->keep_alive_for_locks[lease_id].reset( + new KeepAlive(*this, DEFAULT_LEASE_TTL_FOR_LOCK, lease_id)); + } + params.key = key; + params.lease_id = lease_id; + params.lock_stub = lockServiceStub.get(); + std::shared_ptr call(new etcdv3::AsyncLockAction(params)); + return Response::create(call).then( + [this, lease_id](pplx::task const &resp_task) -> etcd::Response { + auto const& resp = resp_task.get(); + { + std::lock_guard lexical_scope_lock(mutex_for_keepalives); + if (resp.is_ok()) { + this->leases_for_locks[resp.lock_key()] = lease_id; + } else { + this->keep_alive_for_locks.erase(lease_id); + } + } + return resp; + } + ); +} + +pplx::task etcd::Client::lock(std::string const &key, + int64_t lease_id) { + etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); + params.key = key; + params.lease_id = lease_id; + params.lock_stub = lockServiceStub.get(); + params.timeout = this->timeout; + std::shared_ptr call(new etcdv3::AsyncLockAction(params)); + return Response::create(call); +} + +pplx::task etcd::Client::unlock(std::string const &lock_key) { + // cancel the KeepAlive first, it exists + { + std::lock_guard lexical_scope_lock(mutex_for_keepalives); + auto p_leases = this->leases_for_locks.find(lock_key); + if (p_leases != this->leases_for_locks.end()) { + auto p_keeps_alive = this->keep_alive_for_locks.find(p_leases->second); + if (p_keeps_alive != this->keep_alive_for_locks.end()) { + this->keep_alive_for_locks.erase(p_keeps_alive); + } + this->leases_for_locks.erase(p_leases); + } + } + + // issue a "unlock" request + etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); + params.key = lock_key; + params.lock_stub = lockServiceStub.get(); + params.timeout = this->timeout; + std::shared_ptr call(new etcdv3::AsyncUnlockAction(params)); + return Response::create(call); +} + +pplx::task etcd::Client::txn(etcdv3::Transaction const &txn) { + etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); + params.kv_stub = kvServiceStub .get(); + params.timeout = this->timeout; + std::shared_ptr call(new etcdv3::AsyncTxnAction(params, txn)); + return Response::create(call); +} diff --git a/etcd/SyncClient.hpp b/etcd/SyncClient.hpp index a90dc2e..b50bf39 100644 --- a/etcd/SyncClient.hpp +++ b/etcd/SyncClient.hpp @@ -21,9 +21,11 @@ namespace etcd * * @param etcd_url is the url of the etcd server to connect to, like "http://127.0.0.1:2379", * or multiple url, seperated by ',' or ';'. + * @param timeout timeout duration for each operation in terms of milliseconds * @param load_balancer is the load balance strategy, can be one of round_robin/pick_first/grpclb/xds. */ SyncClient(std::string const & etcd_url, + const unsigned int& timeout = std::numeric_limits::max(), std::string const & load_balancer = "round_robin"); /** @@ -33,11 +35,13 @@ namespace etcd * or multiple url, seperated by ',' or ';'. * @param username username of etcd auth * @param password password of etcd auth + * @param timeout timeout duration for each operation in terms of milliseconds * @param load_balancer is the load balance strategy, can be one of round_robin/pick_first/grpclb/xds. */ SyncClient(std::string const & etcd_url, std::string const & username, std::string const & password, + const unsigned int& timeout = std::numeric_limits::max(), std::string const & load_balancer = "round_robin"); Response get(std::string const & key); diff --git a/etcd/v3/Action.hpp b/etcd/v3/Action.hpp index 3c8f8f2..961dee8 100644 --- a/etcd/v3/Action.hpp +++ b/etcd/v3/Action.hpp @@ -2,6 +2,7 @@ #define __V3_ACTION_HPP__ #include +#include #include #include "proto/rpc.grpc.pb.h" @@ -41,6 +42,7 @@ namespace etcdv3 Watch::Stub* watch_stub; Lease::Stub* lease_stub; Lock::Stub* lock_stub; + unsigned int timeout; }; class Action diff --git a/src/Client.cpp b/src/Client.cpp index 70fcd52..c1dddca 100644 --- a/src/Client.cpp +++ b/src/Client.cpp @@ -129,6 +129,7 @@ const bool authenticate(std::shared_ptr const &channel, } etcd::Client::Client(std::string const & address, + const unsigned int& timeout, std::string const & load_balancer) { // create channels @@ -145,11 +146,13 @@ etcd::Client::Client(std::string const & address, watchServiceStub= Watch::NewStub(this->channel); leaseServiceStub= Lease::NewStub(this->channel); lockServiceStub = Lock::NewStub(this->channel); + this->timeout = timeout; } etcd::Client::Client(std::string const & address, std::string const & username, std::string const & password, + const unsigned int& timeout, std::string const & load_balancer) { // create channels @@ -173,6 +176,7 @@ etcd::Client::Client(std::string const & address, watchServiceStub= Watch::NewStub(this->channel); leaseServiceStub= Lease::NewStub(this->channel); lockServiceStub = Lock::NewStub(this->channel); + this->timeout = timeout; } pplx::task etcd::Client::get(std::string const & key) @@ -182,6 +186,7 @@ pplx::task etcd::Client::get(std::string const & key) params.key.assign(key); params.withPrefix = false; params.kv_stub = kvServiceStub .get(); + params.timeout = this->timeout; std::shared_ptr call(new etcdv3::AsyncGetAction(params)); return Response::create(call); } @@ -193,7 +198,7 @@ pplx::task etcd::Client::set(std::string const & key, std::strin params.key.assign(key); params.value.assign(value); params.kv_stub = kvServiceStub .get(); - + params.timeout = this->timeout; if(ttl > 0) { auto res = leasegrant(ttl).get(); @@ -222,6 +227,7 @@ pplx::task etcd::Client::set(std::string const & key, std::strin params.value.assign(value); params.lease_id = leaseid; params.kv_stub = kvServiceStub .get(); + params.timeout = this->timeout; std::shared_ptr call(new etcdv3::AsyncSetAction(params)); return Response::create(call); } @@ -234,7 +240,7 @@ pplx::task etcd::Client::add(std::string const & key, std::strin params.key.assign(key); params.value.assign(value); params.kv_stub = kvServiceStub .get(); - + params.timeout = this->timeout; if(ttl > 0) { auto res = leasegrant(ttl).get(); @@ -262,6 +268,7 @@ pplx::task etcd::Client::add(std::string const & key, std::strin params.value.assign(value); params.lease_id = leaseid; params.kv_stub = kvServiceStub .get(); + params.timeout = this->timeout; std::shared_ptr call(new etcdv3::AsyncSetAction(params,true)); return Response::create(call); } @@ -274,7 +281,7 @@ pplx::task etcd::Client::modify(std::string const & key, std::st params.key.assign(key); params.value.assign(value); params.kv_stub = kvServiceStub .get(); - + params.timeout = this->timeout; if(ttl > 0) { auto res = leasegrant(ttl).get(); @@ -302,6 +309,7 @@ pplx::task etcd::Client::modify(std::string const & key, std::st params.value.assign(value); params.lease_id = leaseid; params.kv_stub = kvServiceStub .get(); + params.timeout = this->timeout; std::shared_ptr call(new etcdv3::AsyncUpdateAction(params)); return Response::create(call); } @@ -315,7 +323,7 @@ pplx::task etcd::Client::modify_if(std::string const & key, std: params.value.assign(value); params.old_value.assign(old_value); params.kv_stub = kvServiceStub .get(); - + params.timeout = this->timeout; if(ttl > 0) { auto res = leasegrant(ttl).get(); @@ -344,6 +352,7 @@ pplx::task etcd::Client::modify_if(std::string const & key, std: params.old_value.assign(old_value); params.lease_id = leaseid; params.kv_stub = kvServiceStub .get(); + params.timeout = this->timeout; std::shared_ptr call(new etcdv3::AsyncCompareAndSwapAction(params,etcdv3::Atomicity_Type::PREV_VALUE)); return Response::create(call); } @@ -356,6 +365,7 @@ pplx::task etcd::Client::modify_if(std::string const & key, std: params.value.assign(value); params.old_revision = old_index; params.kv_stub = kvServiceStub.get(); + params.timeout = this->timeout; if(ttl > 0) { auto res = leasegrant(ttl).get(); @@ -384,6 +394,7 @@ pplx::task etcd::Client::modify_if(std::string const & key, std: params.lease_id = leaseid; params.old_revision = old_index; params.kv_stub = kvServiceStub .get(); + params.timeout = this->timeout; std::shared_ptr call(new etcdv3::AsyncCompareAndSwapAction(params,etcdv3::Atomicity_Type::PREV_INDEX)); return Response::create(call); } @@ -395,7 +406,8 @@ pplx::task etcd::Client::rm(std::string const & key) params.auth_token.assign(this->auth_token); params.key.assign(key); params.withPrefix = false; - params.kv_stub = kvServiceStub .get(); + params.kv_stub = kvServiceStub.get(); + params.timeout = this->timeout; std::shared_ptr call(new etcdv3::AsyncDeleteAction(params)); return Response::create(call); } @@ -407,7 +419,8 @@ pplx::task etcd::Client::rm_if(std::string const & key, std::str params.auth_token.assign(this->auth_token); params.key.assign(key); params.old_value.assign(old_value); - params.kv_stub = kvServiceStub .get(); + params.kv_stub = kvServiceStub.get(); + params.timeout = this->timeout; std::shared_ptr call(new etcdv3::AsyncCompareAndDeleteAction(params,etcdv3::Atomicity_Type::PREV_VALUE)); return Response::create(call); } @@ -418,7 +431,8 @@ pplx::task etcd::Client::rm_if(std::string const & key, int old_ params.auth_token.assign(this->auth_token); params.key.assign(key); params.old_revision = old_index; - params.kv_stub = kvServiceStub .get(); + params.kv_stub = kvServiceStub.get(); + params.timeout = this->timeout; std::shared_ptr call(new etcdv3::AsyncCompareAndDeleteAction(params, etcdv3::Atomicity_Type::PREV_INDEX));; return Response::create(call); @@ -431,6 +445,7 @@ pplx::task etcd::Client::rmdir(std::string const & key, bool rec params.key.assign(key); params.withPrefix = recursive; params.kv_stub = kvServiceStub .get(); + params.timeout = this->timeout; std::shared_ptr call(new etcdv3::AsyncDeleteAction(params)); return Response::create(call); } @@ -443,6 +458,7 @@ pplx::task etcd::Client::ls(std::string const & key) params.withPrefix = true; params.limit = 0; // default no limit. params.kv_stub = kvServiceStub .get(); + params.timeout = this->timeout; std::shared_ptr call(new etcdv3::AsyncGetAction(params)); return Response::create(call); } @@ -455,6 +471,7 @@ pplx::task etcd::Client::ls(std::string const & key, size_t cons params.withPrefix = true; params.limit = limit; params.kv_stub = kvServiceStub .get(); + params.timeout = this->timeout; std::shared_ptr call(new etcdv3::AsyncGetAction(params)); return Response::create(call); } @@ -466,6 +483,7 @@ pplx::task etcd::Client::watch(std::string const & key, bool rec params.key.assign(key); params.withPrefix = recursive; params.watch_stub = watchServiceStub.get(); + params.timeout = this->timeout; std::shared_ptr call(new etcdv3::AsyncWatchAction(params)); return Response::create(call); } @@ -478,6 +496,7 @@ pplx::task etcd::Client::watch(std::string const & key, int from params.withPrefix = recursive; params.revision = fromIndex; params.watch_stub = watchServiceStub.get(); + params.timeout = this->timeout; std::shared_ptr call(new etcdv3::AsyncWatchAction(params)); return Response::create(call); } @@ -488,6 +507,7 @@ pplx::task etcd::Client::leasegrant(int ttl) params.auth_token.assign(this->auth_token); params.ttl = ttl; params.lease_stub = leaseServiceStub.get(); + params.timeout = this->timeout; std::shared_ptr call(new etcdv3::AsyncLeaseGrantAction(params)); return Response::create(call); } @@ -498,6 +518,7 @@ pplx::task etcd::Client::leaserevoke(int64_t lease_id) params.auth_token.assign(this->auth_token); params.lease_id = lease_id; params.lease_stub = leaseServiceStub.get(); + params.timeout = this->timeout; std::shared_ptr call(new etcdv3::AsyncLeaseRevokeAction(params)); return Response::create(call); } @@ -508,6 +529,7 @@ pplx::task etcd::Client::leasetimetolive(int64_t lease_id) params.auth_token.assign(this->auth_token); params.lease_id = lease_id; params.lease_stub = leaseServiceStub.get(); + params.timeout = this->timeout; std::shared_ptr call(new etcdv3::AsyncLeaseTimeToLiveAction(params)); return Response::create(call); } @@ -515,7 +537,7 @@ pplx::task etcd::Client::leasetimetolive(int64_t lease_id) pplx::task etcd::Client::lock(std::string const &key) { etcdv3::ActionParameters params; params.auth_token.assign(this->auth_token); - + params.timeout = this->timeout; static const int DEFAULT_LEASE_TTL_FOR_LOCK = 10; // routines in lock usually will be fast, less than 10 seconds. @@ -555,6 +577,7 @@ pplx::task etcd::Client::lock(std::string const &key, params.key = key; params.lease_id = lease_id; params.lock_stub = lockServiceStub.get(); + params.timeout = this->timeout; std::shared_ptr call(new etcdv3::AsyncLockAction(params)); return Response::create(call); } @@ -578,6 +601,7 @@ pplx::task etcd::Client::unlock(std::string const &lock_key) { params.auth_token.assign(this->auth_token); params.key = lock_key; params.lock_stub = lockServiceStub.get(); + params.timeout = this->timeout; std::shared_ptr call(new etcdv3::AsyncUnlockAction(params)); return Response::create(call); } @@ -586,6 +610,7 @@ pplx::task etcd::Client::txn(etcdv3::Transaction const &txn) { etcdv3::ActionParameters params; params.auth_token.assign(this->auth_token); params.kv_stub = kvServiceStub .get(); + params.timeout = this->timeout; std::shared_ptr call(new etcdv3::AsyncTxnAction(params, txn)); return Response::create(call); } diff --git a/src/SyncClient.cpp b/src/SyncClient.cpp index 4dc267f..7f9329b 100644 --- a/src/SyncClient.cpp +++ b/src/SyncClient.cpp @@ -10,16 +10,17 @@ return etcd::Response(500, ex.what()); \ } -etcd::SyncClient::SyncClient(std::string const & address, std::string const & load_balancer) - : client(address, load_balancer) +etcd::SyncClient::SyncClient(std::string const & address, const unsigned int& timeout, std::string const & load_balancer) + : client(address, timeout, load_balancer) { } etcd::SyncClient::SyncClient(std::string const & address, std::string const & username, std::string const & password, + const unsigned int& timeout, std::string const & load_balancer) - : client(address, username, password, load_balancer) + : client(address, username, password, timeout, load_balancer) { } diff --git a/src/v3/Action.cpp b/src/v3/Action.cpp index 0f4c81b..e9c4ce1 100644 --- a/src/v3/Action.cpp +++ b/src/v3/Action.cpp @@ -23,6 +23,7 @@ etcdv3::ActionParameters::ActionParameters() kv_stub = NULL; watch_stub = NULL; lease_stub = NULL; + timeout = std::numeric_limits::max(); } void etcdv3::Action::waitForResponse() @@ -30,8 +31,17 @@ void etcdv3::Action::waitForResponse() void* got_tag; bool ok = false; - cq_.Next(&got_tag, &ok); - GPR_ASSERT(got_tag == (void*)this); + auto status = cq_.AsyncNext(&got_tag, &ok, std::chrono::system_clock::now() + std::chrono::milliseconds(parameters.timeout)); + if( status == grpc::CompletionQueue::NextStatus::TIMEOUT ) + { + ok = false; + this->status = grpc::Status(grpc::StatusCode::DEADLINE_EXCEEDED, "Timeout"); + } + else + { + GRP_ASSERT(got_tag == (void*)this); + } + } const std::chrono::high_resolution_clock::time_point etcdv3::Action::startTimepoint() {