From ed199295f32125f793aa9ac7c493bbaf90c3e9c6 Mon Sep 17 00:00:00 2001 From: samur Date: Fri, 29 Jan 2021 08:58:02 +0300 Subject: [PATCH] Update Client.hpp --- etcd/Client.hpp | 877 +++++++++++++++--------------------------------- 1 file changed, 272 insertions(+), 605 deletions(-) diff --git a/etcd/Client.hpp b/etcd/Client.hpp index b9e847c..2cb4a7f 100644 --- a/etcd/Client.hpp +++ b/etcd/Client.hpp @@ -1,616 +1,283 @@ -#include -#include -#include +#ifndef __ETCD_CLIENT_HPP__ +#define __ETCD_CLIENT_HPP__ -#if defined(_WIN32) -#include -#include -#else -#include -#include -#endif +#include "etcd/Response.hpp" +#include +#include +#include #include -#include -#include "etcd/Client.hpp" -#include "etcd/KeepAlive.hpp" -#include "etcd/v3/action_constants.hpp" -#include "etcd/v3/Action.hpp" -#include "etcd/v3/AsyncTxnResponse.hpp" -#include "etcd/v3/AsyncRangeResponse.hpp" -#include "etcd/v3/AsyncWatchResponse.hpp" -#include "etcd/v3/AsyncDeleteRangeResponse.hpp" -#include "etcd/v3/AsyncLockResponse.hpp" -#include "etcd/v3/Transaction.hpp" -#include -#include "etcd/v3/AsyncSetAction.hpp" -#include "etcd/v3/AsyncCompareAndSwapAction.hpp" -#include "etcd/v3/AsyncCompareAndDeleteAction.hpp" -#include "etcd/v3/AsyncUpdateAction.hpp" -#include "etcd/v3/AsyncGetAction.hpp" -#include "etcd/v3/AsyncDeleteAction.hpp" -#include "etcd/v3/AsyncWatchAction.hpp" -#include "etcd/v3/AsyncLeaseAction.hpp" -#include "etcd/v3/AsyncLockAction.hpp" -#include "etcd/v3/AsyncTxnAction.hpp" +#include +#include "proto/rpc.grpc.pb.h" +#include "proto/v3lock.grpc.pb.h" -#include +using etcdserverpb::Auth; +using etcdserverpb::KV; +using etcdserverpb::Watch; +using etcdserverpb::Lease; +using v3lockpb::Lock; -using grpc::Channel; +namespace etcdv3 { + class Transaction; +} -namespace etcd { -namespace detail { +namespace etcd +{ + class KeepAlive; + class Watcher; -static bool dns_resolve(std::string const &target, std::vector &endpoints) { - struct addrinfo hints = {}, *addrs; - hints.ai_family = AF_INET; - hints.ai_socktype = SOCK_STREAM; - hints.ai_protocol = IPPROTO_TCP; - - std::vector target_parts; - boost::split(target_parts, target, boost::is_any_of(":")); - if (target_parts.size() != 2) { - std::cerr << "warn: invalid URL: " << target << std::endl; - return false; - } - -#if defined(_WIN32) + /** + * Client is responsible for maintaining a connection towards an etcd server. + * Etcd operations can be reached via the methods of the client. + */ + class Client { - // Use the MAKEWORD(lowbyte, highbyte) macro declared in Windef.h. - WORD wVersionRequested = MAKEWORD(2, 2); - WSADATA wsaData; + public: + /** + * Constructs an etcd client object. + * + * @param etcd_url is the url of the etcd server to connect to, like "http://127.0.0.1:2379", + * or multiple url, seperated by ',' or ';'. + * @param timeout timeout duration for each operation in terms of milliseconds + * @param load_balancer is the load balance strategy, can be one of round_robin/pick_first/grpclb/xds. + */ + Client(std::string const & etcd_url, + const unsigned int& timeout = std::numeric_limits::max(), + std::string const & load_balancer = "round_robin"); + + /** + * Constructs an etcd client object. + * + * @param etcd_url is the url of the etcd server to connect to, like "http://127.0.0.1:2379", + * or multiple url, seperated by ',' or ';'. + * @param username username of etcd auth + * @param password password of etcd auth + * @param timeout timeout duration for each operation in terms of milliseconds + * @param load_balancer is the load balance strategy, can be one of round_robin/pick_first/grpclb/xds. + */ + Client(std::string const & etcd_url, + std::string const & username, + std::string const & password, + const unsigned int& timeout = std::numeric_limits::max(), + std::string const & load_balancer = "round_robin"); + + /** + * Sends a get request to the etcd server + * @param key is the key to be read + */ + pplx::task get(std::string const & key); + + /** + * Sets the value of a key. The key will be modified if already exists or created + * if it does not exists. + * @param key is the key to be created or modified + * @param value is the new value to be set + */ + pplx::task set(std::string const & key, std::string const & value, int ttl = 0); + + /** + * Sets the value of a key. The key will be modified if already exists or created + * if it does not exists. + * @param key is the key to be created or modified + * @param value is the new value to be set + * @param leaseId is the lease attached to the key + */ + pplx::task set(std::string const & key, std::string const & value, int64_t leaseId); + + + /** + * Creates a new key and sets it's value. Fails if the key already exists. + * @param key is the key to be created + * @param value is the value to be set + */ + pplx::task add(std::string const & key, std::string const & value, int ttl = 0); + + /** + * Creates a new key and sets it's value. Fails if the key already exists. + * @param key is the key to be created + * @param value is the value to be set + * @param leaseId is the lease attached to the key + */ + pplx::task add(std::string const & key, std::string const & value, int64_t leaseId); + + /** + * Modifies an existing key. Fails if the key does not exists. + * @param key is the key to be modified + * @param value is the new value to be set + */ + pplx::task modify(std::string const & key, std::string const & value, int ttl = 0); + + /** + * Modifies an existing key. Fails if the key does not exists. + * @param key is the key to be modified + * @param value is the new value to be set + * @param leaseId is the lease attached to the key + */ + pplx::task modify(std::string const & key, std::string const & value, int64_t leaseId); + + /** + * Modifies an existing key only if it has a specific value. Fails if the key does not exists + * or the original value differs from the expected one. + * @param key is the key to be modified + * @param value is the new value to be set + * @param old_value is the value to be replaced + */ + pplx::task modify_if(std::string const & key, std::string const & value, std::string const & old_value, int ttl = 0); + + /** + * Modifies an existing key only if it has a specific value. Fails if the key does not exists + * or the original value differs from the expected one. + * @param key is the key to be modified + * @param value is the new value to be set + * @param old_value is the value to be replaced + * @param leaseId is the lease attached to the key + */ + pplx::task modify_if(std::string const & key, std::string const & value, std::string const & old_value, int64_t leaseId); + + /** + * Modifies an existing key only if it has a specific modification index value. Fails if the key + * does not exists or the modification index of the previous value differs from the expected one. + * @param key is the key to be modified + * @param value is the new value to be set + * @param old_index is the expected index of the original value + */ + pplx::task modify_if(std::string const & key, std::string const & value, int old_index, int ttl = 0); + + /** + * Modifies an existing key only if it has a specific modification index value. Fails if the key + * does not exists or the modification index of the previous value differs from the expected one. + * @param key is the key to be modified + * @param value is the new value to be set + * @param old_index is the expected index of the original value + * @param leaseId is the lease attached to the key + */ + pplx::task modify_if(std::string const & key, std::string const & value, int old_index, int64_t leaseId); + + /** + * Removes a single key. The key has to point to a plain, non directory entry. + * @param key is the key to be deleted + */ + pplx::task rm(std::string const & key); + + /** + * Removes a single key but only if it has a specific value. Fails if the key does not exists + * or the its value differs from the expected one. + * @param key is the key to be deleted + */ + pplx::task rm_if(std::string const & key, std::string const & old_value); + + /** + * Removes an existing key only if it has a specific modification index value. Fails if the key + * does not exists or the modification index of it differs from the expected one. + * @param key is the key to be deleted + * @param old_index is the expected index of the existing value + */ + pplx::task rm_if(std::string const & key, int old_index); + + /** + * Gets a directory listing of the directory identified by the key. + * @param key is the key to be listed + */ + pplx::task ls(std::string const & key); + + + /** + * Gets a directory listing of the directory identified by the key. + * @param key is the key to be listed + * @param limit is the size limit of results to be listed, we don't use default parameters + * to ensure backwards binary compatibility. + */ + pplx::task ls(std::string const & key, size_t const limit); + + + /** + * Removes a directory node. Fails if the parent directory dos not exists or not a directory. + * @param key is the directory to be created to be listed + * @param recursive if true then delete a whole subtree, otherwise deletes only an empty directory. + */ + pplx::task rmdir(std::string const & key, bool recursive = false); + + /** + * Watches for changes of a key or a subtree. Please note that if you watch e.g. "/testdir" and + * a new key is created, like "/testdir/newkey" then no change happened in the value of + * "/testdir" so your watch will not detect this. If you want to detect addition and deletion of + * directory entries then you have to do a recursive watch. + * @param key is the value or directory to be watched + * @param recursive if true watch a whole subtree + */ + pplx::task watch(std::string const & key, bool recursive = false); + + /** + * Watches for changes of a key or a subtree from a specific index. The index value can be in the "past". + * @param key is the value or directory to be watched + * @param fromIndex the first index we are interested in + * @param recursive if true watch a whole subtree + */ + pplx::task watch(std::string const & key, int fromIndex, bool recursive = false); + + /** + * Grants a lease. + * @param ttl is the time to live of the lease + */ + pplx::task leasegrant(int ttl); + + /** + * Revoke a lease. + * @param lease_id is the id the lease + */ + pplx::task leaserevoke(int64_t lease_id); + + /** + * Get time-to-live of a lease. + * @param lease_id is the id the lease + */ + pplx::task leasetimetolive(int64_t lease_id); + + /** + * Gains a lock at a key, using a default created lease, using the default lease (60 seconds), with + * keeping alive has already been taken care of by the library. + * @param key is the key to be used to request the lock. + */ + pplx::task lock(std::string const &key); + + /** + * Gains a lock at a key, using a user-provided lease, the lifetime of the lease won't be taken care + * of by the library. + * @param key is the key to be used to request the lock. + */ + pplx::task lock(std::string const &key, int64_t lease_id); + + /** + * Releases a lock at a key. + * @param key is the lock key to release. + */ + pplx::task unlock(std::string const &lock_key); + + /** + * Execute a etcd transaction. + * @param txn is the transaction object to be executed. + */ + pplx::task txn(etcdv3::Transaction const &txn); + + private: + std::shared_ptr channel; + std::string auth_token; + std::unique_ptr kvServiceStub; + std::unique_ptr watchServiceStub; + std::unique_ptr leaseServiceStub; + std::unique_ptr lockServiceStub; + + std::mutex mutex_for_keepalives; + std::map leases_for_locks; + std::map> keep_alive_for_locks; + + friend class KeepAlive; + friend class Watcher; +}; + + + +} - int err = WSAStartup(wVersionRequested, &wsaData); - if (err != 0) { - // Tell the user that we could not find a usable Winsock DLL. */ - std::cerr << "WSAStartup failed with error: %d" << err << std::endl; - return false; - } - } #endif - - int r = getaddrinfo(target_parts[0].c_str(), target_parts[1].c_str(), &hints, &addrs); - if (r != 0) { - std::cerr << "warn: getaddrinfo() failed for endpoint " << target - << " with error: " << r << std::endl; - return false; - } - - char host[16] = {'\0'}; - for (struct addrinfo* addr = addrs; addr != nullptr; addr = addr->ai_next) { - memset(host, '\0', sizeof(host)); - getnameinfo(addr->ai_addr, addr->ai_addrlen, host, sizeof(host), NULL, 0, NI_NUMERICHOST); - endpoints.emplace_back(std::string(host) + ":" + target_parts[1]); - } - freeaddrinfo(addrs); - return true; -} - -const std::string strip_and_resolve_addresses(std::string const &address) { - std::vector addresses; - boost::algorithm::split(addresses, address, boost::algorithm::is_any_of(",;")); - std::string stripped_address; - { - std::vector stripped_addresses; - std::string substr("://"); - for (auto const &addr: addresses) { - std::string::size_type idx = addr.find(substr); - std::string target = idx == std::string::npos ? addr : addr.substr(idx + substr.length()); - etcd::detail::dns_resolve(target, stripped_addresses); - } - stripped_address = boost::algorithm::join(stripped_addresses, ","); - } - return "ipv4:///" + stripped_address; -} - -const bool authenticate(std::shared_ptr const &channel, - std::string const &username, - std::string const &password, - std::string &token_or_message) { - // run a round of auth - auto auth_stub = Auth::NewStub(channel); - ClientContext context; - etcdserverpb::AuthenticateRequest auth_request; - etcdserverpb::AuthenticateResponse auth_response; - auth_request.set_name(username); - auth_request.set_password(password); - auto status = auth_stub->Authenticate(&context, auth_request, &auth_response); - if (status.ok()) { - token_or_message = auth_response.token(); - return true; - } else { - token_or_message = status.error_message(); - return false; - } -} - -} -} - -etcd::Client::Client(std::string const & address, - const long& timeout, - std::string const & load_balancer) -{ - // create channels - std::string const addresses = etcd::detail::strip_and_resolve_addresses(address); - grpc::ChannelArguments grpc_args; - grpc_args.SetMaxSendMessageSize(std::numeric_limits::max()); - grpc_args.SetMaxReceiveMessageSize(std::numeric_limits::max()); - std::shared_ptr creds = grpc::InsecureChannelCredentials(); - grpc_args.SetLoadBalancingPolicyName(load_balancer); - this->channel = grpc::CreateCustomChannel(addresses, creds, grpc_args); - - // create stubs - kvServiceStub = KV::NewStub(this->channel); - watchServiceStub= Watch::NewStub(this->channel); - leaseServiceStub= Lease::NewStub(this->channel); - lockServiceStub = Lock::NewStub(this->channel); - this->timeout = timeout; -} - -etcd::Client::Client(std::string const & address, - std::string const & username, - std::string const & password, - const long& timeout, - std::string const & load_balancer) -{ - // create channels - std::string const addresses = etcd::detail::strip_and_resolve_addresses(address); - grpc::ChannelArguments grpc_args; - grpc_args.SetMaxSendMessageSize(std::numeric_limits::max()); - grpc_args.SetMaxReceiveMessageSize(std::numeric_limits::max()); - std::shared_ptr creds = grpc::InsecureChannelCredentials(); - grpc_args.SetLoadBalancingPolicyName(load_balancer); - this->channel = grpc::CreateCustomChannel(addresses, creds, grpc_args); - - // auth - std::string token_or_message; - if (!etcd::detail::authenticate(this->channel, username, password, token_or_message)) { - throw std::invalid_argument("Etcd authentication failed: " + token_or_message); - } - this->auth_token = token_or_message; - - // setup stubs - kvServiceStub = KV::NewStub(this->channel); - watchServiceStub= Watch::NewStub(this->channel); - leaseServiceStub= Lease::NewStub(this->channel); - lockServiceStub = Lock::NewStub(this->channel); - this->timeout = timeout; -} - -pplx::task etcd::Client::get(std::string const & key) -{ - etcdv3::ActionParameters params; - params.auth_token.assign(this->auth_token); - params.key.assign(key); - params.withPrefix = false; - params.kv_stub = kvServiceStub .get(); - params.timeout = this->timeout; - std::shared_ptr call(new etcdv3::AsyncGetAction(params)); - return Response::create(call); -} - -pplx::task etcd::Client::set(std::string const & key, std::string const & value, int ttl) -{ - etcdv3::ActionParameters params; - params.auth_token.assign(this->auth_token); - params.key.assign(key); - params.value.assign(value); - params.kv_stub = kvServiceStub .get(); - params.timeout = this->timeout; - if(ttl > 0) - { - auto res = leasegrant(ttl).get(); - if(!res.is_ok()) - { - return pplx::task([res]() - { - return etcd::Response(res.error_code(), res.error_message().c_str()); - }); - } - else - { - params.lease_id = res.value().lease(); - } - } - - std::shared_ptr call(new etcdv3::AsyncSetAction(params)); - return Response::create(call); -} - -pplx::task etcd::Client::set(std::string const & key, std::string const & value, int64_t leaseid) -{ - etcdv3::ActionParameters params; - params.auth_token.assign(this->auth_token); - params.key.assign(key); - params.value.assign(value); - params.lease_id = leaseid; - params.kv_stub = kvServiceStub .get(); - params.timeout = this->timeout; - std::shared_ptr call(new etcdv3::AsyncSetAction(params)); - return Response::create(call); -} - - -pplx::task etcd::Client::add(std::string const & key, std::string const & value, int ttl) -{ - etcdv3::ActionParameters params; - params.auth_token.assign(this->auth_token); - params.key.assign(key); - params.value.assign(value); - params.kv_stub = kvServiceStub .get(); - params.timeout = this->timeout; - if(ttl > 0) - { - auto res = leasegrant(ttl).get(); - if(!res.is_ok()) - { - return pplx::task([res]() - { - return etcd::Response(res.error_code(), res.error_message().c_str()); - }); - } - else - { - params.lease_id = res.value().lease(); - } - } - std::shared_ptr call(new etcdv3::AsyncSetAction(params,true)); - return Response::create(call); -} - -pplx::task etcd::Client::add(std::string const & key, std::string const & value, int64_t leaseid) -{ - etcdv3::ActionParameters params; - params.auth_token.assign(this->auth_token); - params.key.assign(key); - params.value.assign(value); - params.lease_id = leaseid; - params.kv_stub = kvServiceStub .get(); - params.timeout = this->timeout; - std::shared_ptr call(new etcdv3::AsyncSetAction(params,true)); - return Response::create(call); -} - - -pplx::task etcd::Client::modify(std::string const & key, std::string const & value, int ttl) -{ - etcdv3::ActionParameters params; - params.auth_token.assign(this->auth_token); - params.key.assign(key); - params.value.assign(value); - params.kv_stub = kvServiceStub .get(); - params.timeout = this->timeout; - if(ttl > 0) - { - auto res = leasegrant(ttl).get(); - if(!res.is_ok()) - { - return pplx::task([res]() - { - return etcd::Response(res.error_code(), res.error_message().c_str()); - }); - } - else - { - params.lease_id = res.value().lease(); - } - } - std::shared_ptr call(new etcdv3::AsyncUpdateAction(params)); - return Response::create(call); -} - -pplx::task etcd::Client::modify(std::string const & key, std::string const & value, int64_t leaseid) -{ - etcdv3::ActionParameters params; - params.auth_token.assign(this->auth_token); - params.key.assign(key); - params.value.assign(value); - params.lease_id = leaseid; - params.kv_stub = kvServiceStub .get(); - params.timeout = this->timeout; - std::shared_ptr call(new etcdv3::AsyncUpdateAction(params)); - return Response::create(call); -} - - -pplx::task etcd::Client::modify_if(std::string const & key, std::string const & value, std::string const & old_value, int ttl) -{ - etcdv3::ActionParameters params; - params.auth_token.assign(this->auth_token); - params.key.assign(key); - params.value.assign(value); - params.old_value.assign(old_value); - params.kv_stub = kvServiceStub .get(); - params.timeout = this->timeout; - if(ttl > 0) - { - auto res = leasegrant(ttl).get(); - if(!res.is_ok()) - { - return pplx::task([res]() - { - return etcd::Response(res.error_code(), res.error_message().c_str()); - }); - } - else - { - params.lease_id = res.value().lease(); - } - } - std::shared_ptr call(new etcdv3::AsyncCompareAndSwapAction(params,etcdv3::Atomicity_Type::PREV_VALUE)); - return Response::create(call); -} - -pplx::task etcd::Client::modify_if(std::string const & key, std::string const & value, std::string const & old_value, int64_t leaseid) -{ - etcdv3::ActionParameters params; - params.auth_token.assign(this->auth_token); - params.key.assign(key); - params.value.assign(value); - params.old_value.assign(old_value); - params.lease_id = leaseid; - params.kv_stub = kvServiceStub .get(); - params.timeout = this->timeout; - std::shared_ptr call(new etcdv3::AsyncCompareAndSwapAction(params,etcdv3::Atomicity_Type::PREV_VALUE)); - return Response::create(call); -} - -pplx::task etcd::Client::modify_if(std::string const & key, std::string const & value, int old_index, int ttl) -{ - etcdv3::ActionParameters params; - params.auth_token.assign(this->auth_token); - params.key.assign(key); - params.value.assign(value); - params.old_revision = old_index; - params.kv_stub = kvServiceStub.get(); - params.timeout = this->timeout; - if(ttl > 0) - { - auto res = leasegrant(ttl).get(); - if(!res.is_ok()) - { - return pplx::task([res]() - { - return etcd::Response(res.error_code(), res.error_message().c_str()); - }); - } - else - { - params.lease_id = res.value().lease(); - } - } - std::shared_ptr call(new etcdv3::AsyncCompareAndSwapAction(params,etcdv3::Atomicity_Type::PREV_INDEX)); - return Response::create(call); -} - -pplx::task etcd::Client::modify_if(std::string const & key, std::string const & value, int old_index, int64_t leaseid) -{ - etcdv3::ActionParameters params; - params.auth_token.assign(this->auth_token); - params.key.assign(key); - params.value.assign(value); - params.lease_id = leaseid; - params.old_revision = old_index; - params.kv_stub = kvServiceStub .get(); - params.timeout = this->timeout; - std::shared_ptr call(new etcdv3::AsyncCompareAndSwapAction(params,etcdv3::Atomicity_Type::PREV_INDEX)); - return Response::create(call); -} - - -pplx::task etcd::Client::rm(std::string const & key) -{ - etcdv3::ActionParameters params; - params.auth_token.assign(this->auth_token); - params.key.assign(key); - params.withPrefix = false; - params.kv_stub = kvServiceStub.get(); - params.timeout = this->timeout; - std::shared_ptr call(new etcdv3::AsyncDeleteAction(params)); - return Response::create(call); -} - - -pplx::task etcd::Client::rm_if(std::string const & key, std::string const & old_value) -{ - etcdv3::ActionParameters params; - params.auth_token.assign(this->auth_token); - params.key.assign(key); - params.old_value.assign(old_value); - params.kv_stub = kvServiceStub.get(); - params.timeout = this->timeout; - std::shared_ptr call(new etcdv3::AsyncCompareAndDeleteAction(params,etcdv3::Atomicity_Type::PREV_VALUE)); - return Response::create(call); -} - -pplx::task etcd::Client::rm_if(std::string const & key, int old_index) -{ - etcdv3::ActionParameters params; - params.auth_token.assign(this->auth_token); - params.key.assign(key); - params.old_revision = old_index; - params.kv_stub = kvServiceStub.get(); - params.timeout = this->timeout; - std::shared_ptr call(new etcdv3::AsyncCompareAndDeleteAction(params, etcdv3::Atomicity_Type::PREV_INDEX));; - return Response::create(call); - -} - -pplx::task etcd::Client::rmdir(std::string const & key, bool recursive) -{ - etcdv3::ActionParameters params; - params.auth_token.assign(this->auth_token); - params.key.assign(key); - params.withPrefix = recursive; - params.kv_stub = kvServiceStub .get(); - params.timeout = this->timeout; - std::shared_ptr call(new etcdv3::AsyncDeleteAction(params)); - return Response::create(call); -} - -pplx::task etcd::Client::ls(std::string const & key) -{ - etcdv3::ActionParameters params; - params.auth_token.assign(this->auth_token); - params.key.assign(key); - params.withPrefix = true; - params.limit = 0; // default no limit. - params.kv_stub = kvServiceStub .get(); - params.timeout = this->timeout; - std::shared_ptr call(new etcdv3::AsyncGetAction(params)); - return Response::create(call); -} - -pplx::task etcd::Client::ls(std::string const & key, size_t const limit) -{ - etcdv3::ActionParameters params; - params.auth_token.assign(this->auth_token); - params.key.assign(key); - params.withPrefix = true; - params.limit = limit; - params.kv_stub = kvServiceStub .get(); - params.timeout = this->timeout; - std::shared_ptr call(new etcdv3::AsyncGetAction(params)); - return Response::create(call); -} - -pplx::task etcd::Client::watch(std::string const & key, bool recursive) -{ - etcdv3::ActionParameters params; - params.auth_token.assign(this->auth_token); - params.key.assign(key); - params.withPrefix = recursive; - params.watch_stub = watchServiceStub.get(); - params.timeout = this->timeout; - std::shared_ptr call(new etcdv3::AsyncWatchAction(params)); - return Response::create(call); -} - -pplx::task etcd::Client::watch(std::string const & key, int fromIndex, bool recursive) -{ - etcdv3::ActionParameters params; - params.auth_token.assign(this->auth_token); - params.key.assign(key); - params.withPrefix = recursive; - params.revision = fromIndex; - params.watch_stub = watchServiceStub.get(); - params.timeout = this->timeout; - std::shared_ptr call(new etcdv3::AsyncWatchAction(params)); - return Response::create(call); -} - -pplx::task etcd::Client::leasegrant(int ttl) -{ - etcdv3::ActionParameters params; - params.auth_token.assign(this->auth_token); - params.ttl = ttl; - params.lease_stub = leaseServiceStub.get(); - params.timeout = this->timeout; - std::shared_ptr call(new etcdv3::AsyncLeaseGrantAction(params)); - return Response::create(call); -} - -pplx::task etcd::Client::leaserevoke(int64_t lease_id) -{ - etcdv3::ActionParameters params; - params.auth_token.assign(this->auth_token); - params.lease_id = lease_id; - params.lease_stub = leaseServiceStub.get(); - params.timeout = this->timeout; - std::shared_ptr call(new etcdv3::AsyncLeaseRevokeAction(params)); - return Response::create(call); -} - -pplx::task etcd::Client::leasetimetolive(int64_t lease_id) -{ - etcdv3::ActionParameters params; - params.auth_token.assign(this->auth_token); - params.lease_id = lease_id; - params.lease_stub = leaseServiceStub.get(); - params.timeout = this->timeout; - std::shared_ptr call(new etcdv3::AsyncLeaseTimeToLiveAction(params)); - return Response::create(call); -} - -pplx::task etcd::Client::lock(std::string const &key) { - etcdv3::ActionParameters params; - params.auth_token.assign(this->auth_token); - params.timeout = this->timeout; - static const int DEFAULT_LEASE_TTL_FOR_LOCK = 10; - - // routines in lock usually will be fast, less than 10 seconds. - // - // (base on our experiences in vineyard and GraphScope). - auto resp = this->leasegrant(DEFAULT_LEASE_TTL_FOR_LOCK).get(); - int64_t lease_id = resp.value().lease(); - { - std::lock_guard lexical_scope_lock(mutex_for_keepalives); - this->keep_alive_for_locks[lease_id].reset( - new KeepAlive(*this, DEFAULT_LEASE_TTL_FOR_LOCK, lease_id)); - } - params.key = key; - params.lease_id = lease_id; - params.lock_stub = lockServiceStub.get(); - std::shared_ptr call(new etcdv3::AsyncLockAction(params)); - return Response::create(call).then( - [this, lease_id](pplx::task const &resp_task) -> etcd::Response { - auto const& resp = resp_task.get(); - { - std::lock_guard lexical_scope_lock(mutex_for_keepalives); - if (resp.is_ok()) { - this->leases_for_locks[resp.lock_key()] = lease_id; - } else { - this->keep_alive_for_locks.erase(lease_id); - } - } - return resp; - } - ); -} - -pplx::task etcd::Client::lock(std::string const &key, - int64_t lease_id) { - etcdv3::ActionParameters params; - params.auth_token.assign(this->auth_token); - params.key = key; - params.lease_id = lease_id; - params.lock_stub = lockServiceStub.get(); - params.timeout = this->timeout; - std::shared_ptr call(new etcdv3::AsyncLockAction(params)); - return Response::create(call); -} - -pplx::task etcd::Client::unlock(std::string const &lock_key) { - // cancel the KeepAlive first, it exists - { - std::lock_guard lexical_scope_lock(mutex_for_keepalives); - auto p_leases = this->leases_for_locks.find(lock_key); - if (p_leases != this->leases_for_locks.end()) { - auto p_keeps_alive = this->keep_alive_for_locks.find(p_leases->second); - if (p_keeps_alive != this->keep_alive_for_locks.end()) { - this->keep_alive_for_locks.erase(p_keeps_alive); - } - this->leases_for_locks.erase(p_leases); - } - } - - // issue a "unlock" request - etcdv3::ActionParameters params; - params.auth_token.assign(this->auth_token); - params.key = lock_key; - params.lock_stub = lockServiceStub.get(); - params.timeout = this->timeout; - std::shared_ptr call(new etcdv3::AsyncUnlockAction(params)); - return Response::create(call); -} - -pplx::task etcd::Client::txn(etcdv3::Transaction const &txn) { - etcdv3::ActionParameters params; - params.auth_token.assign(this->auth_token); - params.kv_stub = kvServiceStub .get(); - params.timeout = this->timeout; - std::shared_ptr call(new etcdv3::AsyncTxnAction(params, txn)); - return Response::create(call); -}