#if defined(_WIN32) // see also: https://stackoverflow.com/questions/2561368/illegal-token-on-right-side-of #define NOMINMAX #endif #include #include #include #include #include "etcd/SyncClient.hpp" #include "etcd/Value.hpp" #if defined(_WIN32) #include #include #else #include #include #endif #include #include #include #include #include #include #include #include #include #include #include #include "proto/rpc.grpc.pb.h" #include "proto/v3lock.grpc.pb.h" #include "proto/v3election.grpc.pb.h" #include "etcd/Client.hpp" #include "etcd/KeepAlive.hpp" #include "etcd/Watcher.hpp" #include "etcd/v3/action_constants.hpp" #include "etcd/v3/Action.hpp" #include "etcd/v3/AsyncGRPC.hpp" #include "etcd/v3/Transaction.hpp" etcd::Client::Client(etcd::SyncClient *client): client(client) { this->own_client = false; } etcd::Client* WithClient(etcd::SyncClient *client) { return new etcd::Client(client); } etcd::Client::Client(std::string const & address, std::string const & load_balancer) { this->own_client = true; this->client = new SyncClient(address, load_balancer); } etcd::Client::Client(std::string const & address, grpc::ChannelArguments const & arguments) { this->own_client = true; this->client = new SyncClient(address, arguments); } etcd::Client *etcd::Client::WithUrl(std::string const & etcd_url, std::string const & load_balancer) { return new etcd::Client(etcd_url, load_balancer); } etcd::Client *etcd::Client::WithUrl(std::string const & etcd_url, grpc::ChannelArguments const & arguments) { return new etcd::Client(etcd_url, arguments); } etcd::Client::Client(std::string const & address, std::string const & username, std::string const & password, int const auth_token_ttl, std::string const & load_balancer) { this->own_client = true; this->client = new SyncClient(address, username, password, auth_token_ttl, load_balancer); } etcd::Client::Client(std::string const & address, std::string const & username, std::string const & password, int const auth_token_ttl, grpc::ChannelArguments const & arguments) { this->own_client = true; this->client = new SyncClient(address, username, password, auth_token_ttl, arguments); } etcd::Client *etcd::Client::WithUser(std::string const & etcd_url, std::string const & username, std::string const & password, int const auth_token_ttl, std::string const & load_balancer) { return new etcd::Client(etcd_url, username, password, auth_token_ttl, load_balancer); } etcd::Client *etcd::Client::WithUser(std::string const & etcd_url, std::string const & username, std::string const & password, int const auth_token_ttl, grpc::ChannelArguments const & arguments) { return new etcd::Client(etcd_url, username, password, auth_token_ttl, arguments); } etcd::Client::Client(std::string const & address, std::string const & ca, std::string const & cert, std::string const & privkey, std::string const & target_name_override, std::string const & load_balancer) { this->own_client = true; this->client = new SyncClient(address, ca, cert, privkey, target_name_override, load_balancer); } etcd::Client::Client(std::string const & address, std::string const & ca, std::string const & cert, std::string const & privkey, std::string const & target_name_override, grpc::ChannelArguments const & arguments) { this->own_client = true; this->client = new SyncClient(address, ca, cert, privkey, target_name_override, arguments); } etcd::Client *etcd::Client::WithSSL(std::string const & etcd_url, std::string const & ca, std::string const & cert, std::string const & privkey, std::string const & target_name_override, std::string const & load_balancer) { return new etcd::Client(etcd_url, ca, cert, privkey, target_name_override, load_balancer); } etcd::Client *etcd::Client::WithSSL(std::string const & etcd_url, grpc::ChannelArguments const & arguments, std::string const & ca, std::string const & cert, std::string const & privkey, std::string const & target_name_override) { return new etcd::Client(etcd_url, ca, cert, privkey, target_name_override, arguments); } etcd::Client::~Client() { if (this->own_client) { delete this->client; } } namespace etcd { namespace detail { // Pass rvalue reference to lambda, // // Inspired by: https://stackoverflow.com/a/20669290/5080177 template class capture_impl { mutable A a; mutable F f; public: capture_impl(A && a, F && f) : a{std::forward(a)}, f{std::forward(f)} {} template auto operator()( Ts&&...args ) -> decltype(std::forward(f)(std::forward(a), std::forward(args)... )) { return std::forward(f)(std::forward(a), std::forward(args)... ); } template auto operator()( Ts&&...args ) const -> decltype(std::forward(f)(std::forward(a), std::forward(args)... )) { return std::forward(f)(std::forward(a), std::forward(args)... ); } }; template static capture_impl capture(A && a, F && f) { return capture_impl(std::forward(a), std::forward(f)); } template static auto asyncify(F && fn, Params && params, Ts... args) -> pplx::task(params), std::bind(std::forward(fn), std::placeholders::_1, std::forward(args)...))())> { return pplx::task(params), std::bind(std::forward(fn), std::placeholders::_1, std::forward(args)...))())>(capture(std::forward(params), std::bind(std::forward(fn), std::placeholders::_1, std::forward(args)...))); } } // namespace detail template using responser_t = etcd::Response (*)(std::shared_ptr); } // namespace etcd pplx::task etcd::Client::head() { return etcd::detail::asyncify( static_cast>(Response::create), this->client->head_internal()); } pplx::task etcd::Client::get(std::string const & key) { return etcd::detail::asyncify( static_cast>(Response::create), this->client->get_internal(key)); } pplx::task etcd::Client::get(std::string const & key, int64_t revision) { return etcd::detail::asyncify( static_cast>(Response::create), this->client->get_internal(key, revision)); } pplx::task etcd::Client::set(std::string const & key, std::string const & value, int ttl) { if (ttl > 0) { return this->leasegrant(ttl).then([this, key, value](pplx::task const &task) { auto resp = task.get(); if (resp.error_code() == 0) { return etcd::detail::asyncify( static_cast>(Response::create), this->client->set_internal(key, value, resp.value().lease())); } else { return pplx::task_from_result(resp); } }); } else { return etcd::detail::asyncify( static_cast>(Response::create), this->client->set_internal(key, value, 0)); } } pplx::task etcd::Client::set(std::string const & key, std::string const & value, int64_t leaseid) { return etcd::detail::asyncify( static_cast>(Response::create), this->client->set_internal(key, value, leaseid)); } pplx::task etcd::Client::add(std::string const & key, std::string const & value, int ttl) { if (ttl > 0) { return this->leasegrant(ttl).then([this, key, value](pplx::task const &task) { auto resp = task.get(); if (resp.error_code() == 0) { return etcd::detail::asyncify( static_cast>(Response::create), this->client->add_internal(key, value, resp.value().lease())); } else { return pplx::task_from_result(resp); } }); } else { return etcd::detail::asyncify( static_cast>(Response::create), this->client->add_internal(key, value, 0)); } } pplx::task etcd::Client::add(std::string const & key, std::string const & value, int64_t leaseid) { return etcd::detail::asyncify( static_cast>(Response::create), this->client->add_internal(key, value, leaseid)); } pplx::task etcd::Client::put(std::string const & key, std::string const & value) { return etcd::detail::asyncify( static_cast>(Response::create), this->client->put_internal(key, value)); } pplx::task etcd::Client::modify(std::string const & key, std::string const & value, int ttl) { if (ttl > 0) { return this->leasegrant(ttl).then([this, key, value](pplx::task const &task) { auto resp = task.get(); if (resp.error_code() == 0) { return etcd::detail::asyncify( static_cast>(Response::create), this->client->modify_internal(key, value, resp.value().lease())); } else { return pplx::task_from_result(resp); } }); } else { return etcd::detail::asyncify( static_cast>(Response::create), this->client->modify_internal(key, value, 0)); } } pplx::task etcd::Client::modify(std::string const & key, std::string const & value, int64_t leaseid) { return etcd::detail::asyncify( static_cast>(Response::create), this->client->modify_internal(key, value, leaseid)); } pplx::task etcd::Client::modify_if(std::string const & key, std::string const & value, std::string const & old_value, int ttl) { if (ttl > 0) { return this->leasegrant(ttl).then([this, key, value, old_value](pplx::task const &task) { auto resp = task.get(); if (resp.error_code() == 0) { return etcd::detail::asyncify( static_cast>(Response::create), this->client->modify_if_internal(key, value, 0, old_value, resp.value().lease(), etcdv3::AtomicityType::PREV_VALUE)); } else { return pplx::task_from_result(resp); } }); } else { return etcd::detail::asyncify( static_cast>(Response::create), this->client->modify_if_internal(key, value, 0, old_value, 0, etcdv3::AtomicityType::PREV_VALUE)); } } pplx::task etcd::Client::modify_if(std::string const & key, std::string const & value, std::string const & old_value, int64_t leaseid) { return etcd::detail::asyncify( static_cast>(Response::create), this->client->modify_if_internal(key, value, 0, old_value, leaseid, etcdv3::AtomicityType::PREV_VALUE)); } pplx::task etcd::Client::modify_if(std::string const & key, std::string const & value, int64_t old_index, int ttl) { if (ttl > 0) { return this->leasegrant(ttl).then([this, key, value, old_index](pplx::task const &task) { auto resp = task.get(); if (resp.error_code() == 0) { return etcd::detail::asyncify( static_cast>(Response::create), this->client->modify_if_internal(key, value, old_index, "", resp.value().lease(), etcdv3::AtomicityType::PREV_INDEX)); } else { return pplx::task_from_result(resp); } }); } else { return etcd::detail::asyncify( static_cast>(Response::create), this->client->modify_if_internal(key, value, old_index, "", 0, etcdv3::AtomicityType::PREV_INDEX)); } } pplx::task etcd::Client::modify_if(std::string const & key, std::string const & value, int64_t old_index, int64_t leaseid) { return etcd::detail::asyncify( static_cast>(Response::create), this->client->modify_if_internal(key, value, old_index, "", leaseid, etcdv3::AtomicityType::PREV_INDEX)); } pplx::task etcd::Client::rm(std::string const & key) { return etcd::detail::asyncify( static_cast>(Response::create), this->client->rm_internal(key)); } pplx::task etcd::Client::rm_if(std::string const & key, std::string const & old_value) { return etcd::detail::asyncify( static_cast>(Response::create), this->client->rm_if_internal(key, 0, old_value, etcdv3::AtomicityType::PREV_VALUE)); } pplx::task etcd::Client::rm_if(std::string const & key, int64_t old_index) { return etcd::detail::asyncify( static_cast>(Response::create), this->client->rm_if_internal(key, old_index, "", etcdv3::AtomicityType::PREV_INDEX)); } pplx::task etcd::Client::rmdir(std::string const & key, bool recursive) { return etcd::detail::asyncify( static_cast>(Response::create), this->client->rmdir_internal(key, recursive)); } pplx::task etcd::Client::rmdir(std::string const & key, const char *range_end) { return this->rmdir(key, std::string(range_end)); } pplx::task etcd::Client::rmdir(std::string const & key, std::string const &range_end) { return etcd::detail::asyncify( static_cast>(Response::create), this->client->rmdir_internal(key, range_end)); } pplx::task etcd::Client::ls(std::string const & key) { return etcd::detail::asyncify( static_cast>(Response::create), this->client->ls_internal(key, 0)); } pplx::task etcd::Client::ls(std::string const & key, size_t const limit) { return etcd::detail::asyncify( static_cast>(Response::create), this->client->ls_internal(key, limit)); } pplx::task etcd::Client::ls(std::string const & key, size_t const limit, int64_t revision) { return etcd::detail::asyncify( static_cast>(Response::create), this->client->ls_internal(key, limit, false, revision)); } pplx::task etcd::Client::ls(std::string const & key, std::string const &range_end) { return etcd::detail::asyncify( static_cast>(Response::create), this->client->ls_internal(key, range_end, 0)); } pplx::task etcd::Client::ls(std::string const & key, std::string const &range_end, size_t const limit) { return etcd::detail::asyncify( static_cast>(Response::create), this->client->ls_internal(key, range_end, limit)); } pplx::task etcd::Client::ls(std::string const & key, std::string const &range_end, size_t const limit, int64_t revision) { return etcd::detail::asyncify( static_cast>(Response::create), this->client->ls_internal(key, range_end, limit, false, revision)); } pplx::task etcd::Client::keys(std::string const & key) { return etcd::detail::asyncify( static_cast>(Response::create), this->client->ls_internal(key, 0, true)); } pplx::task etcd::Client::keys(std::string const & key, size_t const limit) { return etcd::detail::asyncify( static_cast>(Response::create), this->client->ls_internal(key, limit, true)); } pplx::task etcd::Client::keys(std::string const & key, size_t const limit, int64_t revision) { return etcd::detail::asyncify( static_cast>(Response::create), this->client->ls_internal(key, limit, true, revision)); } pplx::task etcd::Client::keys(std::string const & key, std::string const &range_end) { return etcd::detail::asyncify( static_cast>(Response::create), this->client->ls_internal(key, range_end, 0, true)); } pplx::task etcd::Client::keys(std::string const & key, std::string const &range_end, size_t const limit) { return etcd::detail::asyncify( static_cast>(Response::create), this->client->ls_internal(key, range_end, limit, true)); } pplx::task etcd::Client::keys(std::string const & key, std::string const &range_end, size_t const limit, int64_t revision) { return etcd::detail::asyncify( static_cast>(Response::create), this->client->ls_internal(key, range_end, limit, true, revision)); } pplx::task etcd::Client::watch(std::string const & key, bool recursive) { return etcd::detail::asyncify( static_cast>(Response::create), this->client->watch_internal(key, 0, recursive)); } pplx::task etcd::Client::watch(std::string const & key, int64_t fromIndex, bool recursive) { return etcd::detail::asyncify( static_cast>(Response::create), this->client->watch_internal(key, fromIndex, recursive)); } pplx::task etcd::Client::watch(std::string const & key, const char *range_end) { return this->watch(key, std::string(range_end)); } pplx::task etcd::Client::watch(std::string const & key, std::string const & range_end) { return etcd::detail::asyncify( static_cast>(Response::create), this->client->watch_internal(key, range_end, 0)); } pplx::task etcd::Client::watch(std::string const & key, std::string const & range_end, int64_t fromIndex) { return etcd::detail::asyncify( static_cast>(Response::create), this->client->watch_internal(key, range_end, fromIndex)); } pplx::task etcd::Client::leasegrant(int ttl) { // See Note [lease with TTL and issue the actual request] return pplx::task([this, ttl]() { return this->client->leasegrant(ttl); }); } pplx::task> etcd::Client::leasekeepalive(int ttl) { // See Note [lease with TTL and issue the actual request] return pplx::task>([this, ttl]() { return this->client->leasekeepalive(ttl); }); } pplx::task etcd::Client::leaserevoke(int64_t lease_id) { return etcd::detail::asyncify( static_cast>(Response::create), this->client->leaserevoke_internal(lease_id)); } pplx::task etcd::Client::leasetimetolive(int64_t lease_id) { return etcd::detail::asyncify( static_cast>(Response::create), this->client->leasetimetolive_internal(lease_id)); } pplx::task etcd::Client::leases() { return etcd::detail::asyncify( static_cast>(Response::create), this->client->leases_internal()); } pplx::task etcd::Client::lock(std::string const &key) { static const int DEFAULT_LEASE_TTL_FOR_LOCK = 10; // see also etcd::SyncClient::lock return this->lock(key, DEFAULT_LEASE_TTL_FOR_LOCK); } pplx::task etcd::Client::lock(std::string const &key, const bool sync) { static const int DEFAULT_LEASE_TTL_FOR_LOCK = 10; // see also etcd::SyncClient::lock return this->lock(key, DEFAULT_LEASE_TTL_FOR_LOCK, sync); } pplx::task etcd::Client::lock(std::string const &key, int lease_ttl) { // See Note [lease with TTL and issue the actual request] // See also SyncClient::lock // // We don't separate the keepalive step and lock step to avoid leaving many // busy-waiting keepalive tasks when waiting for the locks due to the contention // of async threadpool. return pplx::task([this, key, lease_ttl]() { return this->client->lock(key, lease_ttl); }); } pplx::task etcd::Client::lock(std::string const &key, int lease_ttl, const bool sync) { if (sync) { pplx::task_completion_event event; event.set(this->client->lock(key, lease_ttl)); return pplx::task(event); } else { return this->lock(key, lease_ttl); } } pplx::task etcd::Client::lock_with_lease(std::string const &key, int64_t lease_id) { return etcd::detail::asyncify( static_cast>(Response::create), this->client->lock_with_lease_internal(key, lease_id)); } pplx::task etcd::Client::lock_with_lease(std::string const &key, int64_t lease_id, const bool sync) { if (sync) { pplx::task_completion_event event; event.set(this->client->lock_with_lease(key, lease_id)); return pplx::task(event); } else { return this->lock_with_lease(key, lease_id); } } pplx::task etcd::Client::unlock(std::string const &lock_key) { return etcd::detail::asyncify( static_cast>(Response::create), this->client->unlock_internal(lock_key)); } pplx::task etcd::Client::txn(etcdv3::Transaction const &txn) { return etcd::detail::asyncify( static_cast>(Response::create), this->client->txn_internal(txn)); } pplx::task etcd::Client::campaign( std::string const &name, int64_t lease_id, std::string const &value) { return etcd::detail::asyncify( static_cast>(Response::create), this->client->campaign_internal(name, lease_id, value)); } pplx::task etcd::Client::proclaim( std::string const &name, int64_t lease_id, std::string const &key, int64_t revision, std::string const &value) { return etcd::detail::asyncify( static_cast>(Response::create), this->client->proclaim_internal(name, lease_id, key, revision, value)); } pplx::task etcd::Client::leader(std::string const &name) { return etcd::detail::asyncify( static_cast>(Response::create), this->client->leader_internal(name)); } std::unique_ptr etcd::Client::observe(std::string const &name) { return this->client->observe(name); } pplx::task etcd::Client::resign( std::string const &name, int64_t lease_id, std::string const &key, int64_t revision) { return etcd::detail::asyncify( static_cast>(Response::create), this->client->resign_internal(name, lease_id, key, revision)); } const std::string &etcd::Client::current_auth_token() const { return this->client->current_auth_token(); } std::shared_ptr etcd::Client::grpc_channel() const { return this->client->grpc_channel(); } etcd::SyncClient* etcd::Client::sync_client() const { return this->client; } // watchers from the async client etcd::Watcher::Watcher(Client const &client, std::string const & key, std::function callback, std::function wait_callback, bool recursive): Watcher(*client.sync_client(), key, callback, wait_callback, recursive) { } etcd::Watcher::Watcher(Client const &client, std::string const & key, std::string const &range_end, std::function callback, std::function wait_callback): Watcher(*client.sync_client(), key, range_end, callback, wait_callback) { } etcd::Watcher::Watcher(Client const &client, std::string const & key, int64_t fromIndex, std::function callback, std::function wait_callback, bool recursive): Watcher(*client.sync_client(), key, fromIndex, callback, wait_callback, recursive) { } etcd::Watcher::Watcher(Client const &client, std::string const & key, std::string const &range_end, int64_t fromIndex, std::function callback, std::function wait_callback): Watcher(*client.sync_client(), key, range_end, fromIndex, callback, wait_callback) { } etcd::Watcher::Watcher(Client const &client, std::string const & key, std::function callback, bool recursive): Watcher(*client.sync_client(), key, callback, recursive) { } etcd::Watcher::Watcher(Client const &client, std::string const & key, std::string const &range_end, std::function callback): Watcher(*client.sync_client(), key, range_end, callback) { } etcd::Watcher::Watcher(Client const &client, std::string const & key, int64_t fromIndex, std::function callback, bool recursive): Watcher(*client.sync_client(), key, fromIndex, callback, recursive) { } etcd::Watcher::Watcher(Client const &client, std::string const & key, std::string const &range_end, int64_t fromIndex, std::function callback): Watcher(*client.sync_client(), key, range_end, fromIndex, callback) { } // keepalive from then async client etcd::KeepAlive::KeepAlive(Client const &client, int ttl, int64_t lease_id): KeepAlive(*client.sync_client(), ttl, lease_id) { } etcd::KeepAlive::KeepAlive(Client const &client, std::function const &handler, int ttl, int64_t lease_id): KeepAlive(*client.sync_client(), handler, ttl, lease_id) { }