Refactor the implementation of sync-client and async-client.

Signed-off-by: Tao He <sighingnow@gmail.com>
This commit is contained in:
Tao He 2022-05-22 02:02:58 +08:00
parent f21c45b362
commit 67775e82fe
50 changed files with 2402 additions and 1236 deletions

View File

@ -159,11 +159,23 @@ jobs:
sleep 5 sleep 5
# tests without auth # tests without auth
echo "Run the etcd sync test ........................."
./build/bin/EtcdSyncTest ./build/bin/EtcdSyncTest
echo "Run the etcd test ........................."
./build/bin/EtcdTest ./build/bin/EtcdTest
echo "Run the etcd lock test ........................."
./build/bin/LockTest ./build/bin/LockTest
echo "Run the etcd memory leak test ........................."
./build/bin/MemLeakTest ./build/bin/MemLeakTest
echo "Run the etcd watcher test ........................."
./build/bin/WatcherTest ./build/bin/WatcherTest
echo "Run the etcd election test ........................."
./build/bin/ElectionTest ./build/bin/ElectionTest
killall -TERM etcd killall -TERM etcd

3
.gitignore vendored
View File

@ -3,3 +3,6 @@ compile_commands.json
proto/**/*.pb.cc proto/**/*.pb.cc
proto/**/*.pb.h proto/**/*.pb.h
default.etcd/ default.etcd/
# vscode-clangd
.cache/

View File

@ -26,8 +26,10 @@ if(NOT CMAKE_BUILD_TYPE AND NOT CMAKE_CONFIGURATION_TYPES)
) )
endif() endif()
option(BUILD_SHARED_LIBS "Build shared libraries" ON) option(BUILD_SHARED_LIBS "Build etcd-cpp-apiv3 shared libraries" ON)
option(BUILD_ETCD_TESTS "Build test cases" OFF) option(BUILD_ETCD_TESTS "Build etcd-cpp-apiv3 test cases" OFF)
option(CMAKE_POSITION_INDEPENDENT_CODE "Build etcd-cpp-apiv3 with -fPIC" ON)
option(ETCD_W_STRICT "Build etcd-cpp-apiv3 with -Werror" ON)
# reference: https://gitlab.kitware.com/cmake/community/-/wikis/doc/cmake/RPATH-handling#always-full-rpath # reference: https://gitlab.kitware.com/cmake/community/-/wikis/doc/cmake/RPATH-handling#always-full-rpath
set(CMAKE_BUILD_WITH_INSTALL_RPATH FALSE) set(CMAKE_BUILD_WITH_INSTALL_RPATH FALSE)
@ -98,7 +100,10 @@ include_directories(SYSTEM ${Boost_INCLUDE_DIR}
${OPENSSL_INCLUDE_DIR}) ${OPENSSL_INCLUDE_DIR})
include_directories(${CMAKE_CURRENT_SOURCE_DIR}) include_directories(${CMAKE_CURRENT_SOURCE_DIR})
if(NOT CMAKE_CXX_COMPILER_ID MATCHES "MSVC") if(NOT CMAKE_CXX_COMPILER_ID MATCHES "MSVC")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wpedantic -Werror -Wno-string-compare") if(ETCD_W_STRICT)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror")
endif()
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wpedantic -Wno-string-compare")
endif() endif()
check_cxx_compiler_flag(-Wno-c++17-extensions W_NO_CPP17_EXTENSIONS) check_cxx_compiler_flag(-Wno-c++17-extensions W_NO_CPP17_EXTENSIONS)

View File

@ -79,6 +79,33 @@ dependencies have been successfully installed:
The _etcd-cpp-apiv3_ should work well with etcd > 3.0. Feel free to issue an issue to us on The _etcd-cpp-apiv3_ should work well with etcd > 3.0. Feel free to issue an issue to us on
Github when you encounter problems when working with etcd 3.x releases. Github when you encounter problems when working with etcd 3.x releases.
## Sync vs. Async runtime
There are various discussion about whether to support a user-transparent multi-thread executor in
the background, or, leaving the burden of thread management to the user (e.g., see
[issue#100](https://github.com/etcd-cpp-apiv3/etcd-cpp-apiv3/issues/100) for more discussion about
the implementation of underlying thread model).
The _etcd-cpp-apiv3_ library supports both synchronous and asynchronous runtime, in two separate
library as follows:
- etcd-cpp-api-core: the synchronous runtime, provides a blocking-style API and the users are responsible
for handling dispatch the request into separate thread to avoid been blocked by waiting for response.
- target found by cmake: `ETCD_CPP_CORE_LIBRARIES`
- etcd-cpp-api: the asynchronous runtime backed by the `pplx` library from
[cpprestsdk](https://github.com/microsoft/cpprestsdk), where a `boost::asio::io_context` and a pool
of threads is used to run the asynchronous operations in the background. By default the number of
threads in the thread pool equals to `std::thread::hardware_concurrency()`.
- target found by cmake: `ETCD_CPP_LIBRARIES`
We encourage the users to use the asynchronous runtime by default, as it provides more flexibility
and convenient APIs and less possibilities for errors that block the main thread. However, please note
that the asynchronous runtime will setup a thread pool in the background.
Note that `etcd-cpp-api-core` and `etcd-cpp-api` are two separate target and don't depends on each other.
You should depends on either of them in your program.
## Usage ## Usage
```c++ ```c++

View File

@ -14,6 +14,7 @@ if(NOT gRPC_FOUND)
list(APPEND CMAKE_MODULE_PATH ${CMAKE_CURRENT_LIST_DIR}) list(APPEND CMAKE_MODULE_PATH ${CMAKE_CURRENT_LIST_DIR})
find_dependency(GRPC) find_dependency(GRPC)
endif() endif()
find_dependency(cpprestsdk) find_dependency(cpprestsdk)
if(cpprestsdk_FOUND) if(cpprestsdk_FOUND)
set(CPPREST_LIB cpprestsdk::cpprest) set(CPPREST_LIB cpprestsdk::cpprest)
@ -24,11 +25,12 @@ include("${CMAKE_CURRENT_LIST_DIR}/etcd-targets.cmake")
set(etcd-cpp-api_FOUND TRUE) set(etcd-cpp-api_FOUND TRUE)
set(ETCD_CPP_LIBRARIES etcd-cpp-api) set(ETCD_CPP_LIBRARIES etcd-cpp-api)
set(ETCD_CPP_CORE_LIBRARIES etcd-cpp-api-core)
set(ETCD_CPP_INCLUDE_DIR "${ETCD_CPP_HOME}/include") set(ETCD_CPP_INCLUDE_DIR "${ETCD_CPP_HOME}/include")
set(ETCD_CPP_INCLUDE_DIRS "${ETCD_CPP_INCLUDE_DIR}") set(ETCD_CPP_INCLUDE_DIRS "${ETCD_CPP_INCLUDE_DIR}")
include(FindPackageMessage) include(FindPackageMessage)
find_package_message(etcd find_package_message(etcd
"Found etcd: ${CMAKE_CURRENT_LIST_FILE} (found version \"@etcd-cpp-api_VERSION@\")" "Found etcd: ${CMAKE_CURRENT_LIST_FILE} (found version \"@etcd-cpp-api_VERSION@\")"
"etcd-cpp-apiv3 version: @etcd-cpp-api_VERSION@\netcd-cpp-apiv3 libraries: ${ETCD_CPP_LIBRARIES}, include directories: ${ETCD_CPP_INCLUDE_DIRS}" "etcd-cpp-apiv3 version: @etcd-cpp-api_VERSION@\netcd-cpp-apiv3 libraries: ${ETCD_CPP_LIBRARIES}, \netcd-cpp-apiv3 core libraries: ${ETCD_CPP_CORE_LIBRARIES}\ninclude directories: ${ETCD_CPP_INCLUDE_DIRS}"
) )

View File

@ -5,56 +5,38 @@
#include <memory> #include <memory>
#include <mutex> #include <mutex>
#include <string> #include <string>
#include <type_traits>
#include "pplx/pplxtasks.h" #include "pplx/pplxtasks.h"
#include "etcd/Response.hpp" #include "etcd/Response.hpp"
#include "etcd/SyncClient.hpp"
#include "etcd/v3/action_constants.hpp" #include "etcd/v3/action_constants.hpp"
namespace etcdv3 {
class Transaction;
class AsyncObserveAction;
namespace detail {
std::string string_plus_one(std::string const &value);
}
}
#if defined(WITH_GRPC_CHANNEL_CLASS)
namespace grpc {
class Channel;
class ChannelArguments;
}
#else
namespace grpc_impl {
class Channel;
class ChannelArguments;
}
#endif
namespace etcd namespace etcd
{ {
using etcdv3::ERROR_KEY_NOT_FOUND; // FIXME
using etcdv3::ERROR_COMPARE_FAILED;
using etcdv3::ERROR_KEY_ALREADY_EXISTS;
class KeepAlive;
class Watcher;
/** /**
* Client is responsible for maintaining a connection towards an etcd server. * Client is responsible for maintaining a connection towards an etcd server.
* Etcd operations can be reached via the methods of the client. * Etcd operations can be reached via the methods of the client.
*/ */
class Client class Client
{ {
private:
class TokenAuthenticator;
class TokenAuthenticatorDeleter {
public: public:
void operator()(TokenAuthenticator *authenticator); /**
}; * Constructs an async etcd client object from an established synchronous client.
*
* @param sync_client The synchronous client to use for the async client.
*/
Client(SyncClient *client);
/**
* Constructs an async etcd client object from an established synchronous client.
*
* @param sync_client The synchronous client to use for the async client.
*/
static Client* WithClient(SyncClient *client);
public:
/** /**
* Constructs an etcd client object. * Constructs an etcd client object.
* *
@ -87,7 +69,7 @@ namespace etcd
* or multiple url, seperated by ',' or ';'. * or multiple url, seperated by ',' or ';'.
* @param load_balancer is the load balance strategy, can be one of round_robin/pick_first/grpclb/xds. * @param load_balancer is the load balance strategy, can be one of round_robin/pick_first/grpclb/xds.
*/ */
static etcd::Client *WithUrl(std::string const & etcd_url, static Client *WithUrl(std::string const & etcd_url,
std::string const & load_balancer = "round_robin"); std::string const & load_balancer = "round_robin");
/** /**
@ -97,7 +79,7 @@ namespace etcd
* or multiple url, seperated by ',' or ';'. * or multiple url, seperated by ',' or ';'.
* @param arguments user provided grpc channel arguments. * @param arguments user provided grpc channel arguments.
*/ */
static etcd::Client *WithUrl(std::string const & etcd_url, static Client *WithUrl(std::string const & etcd_url,
#if defined(WITH_GRPC_CHANNEL_CLASS) #if defined(WITH_GRPC_CHANNEL_CLASS)
grpc::ChannelArguments const & arguments grpc::ChannelArguments const & arguments
#else #else
@ -143,7 +125,6 @@ namespace etcd
#endif #endif
); );
/** /**
* Constructs an etcd client object. * Constructs an etcd client object.
* *
@ -154,7 +135,7 @@ namespace etcd
* @param load_balancer is the load balance strategy, can be one of round_robin/pick_first/grpclb/xds. * @param load_balancer is the load balance strategy, can be one of round_robin/pick_first/grpclb/xds.
* @param auth_token_ttl TTL seconds for auth token, see also `--auth-token-ttl` flags of etcd. * @param auth_token_ttl TTL seconds for auth token, see also `--auth-token-ttl` flags of etcd.
*/ */
static etcd::Client *WithUser(std::string const & etcd_url, static Client *WithUser(std::string const & etcd_url,
std::string const & username, std::string const & username,
std::string const & password, std::string const & password,
int const auth_token_ttl = 300, int const auth_token_ttl = 300,
@ -171,7 +152,7 @@ namespace etcd
* @param auth_token_ttl TTL seconds for auth token, see also `--auth-token-ttl` flags of etcd. * @param auth_token_ttl TTL seconds for auth token, see also `--auth-token-ttl` flags of etcd.
* Default value should be 300. * Default value should be 300.
*/ */
static etcd::Client *WithUser(std::string const & etcd_url, static Client *WithUser(std::string const & etcd_url,
std::string const & username, std::string const & username,
std::string const & password, std::string const & password,
int const auth_token_ttl, int const auth_token_ttl,
@ -182,7 +163,6 @@ namespace etcd
#endif #endif
); );
/** /**
* Constructs an etcd client object. * Constructs an etcd client object.
* *
@ -222,7 +202,6 @@ namespace etcd
#endif #endif
); );
/** /**
* Constructs an etcd client object. * Constructs an etcd client object.
* *
@ -236,7 +215,7 @@ namespace etcd
* SANS of your SSL certificate. * SANS of your SSL certificate.
* @param load_balancer is the load balance strategy, can be one of round_robin/pick_first/grpclb/xds. * @param load_balancer is the load balance strategy, can be one of round_robin/pick_first/grpclb/xds.
*/ */
static etcd::Client *WithSSL(std::string const & etcd_url, static Client *WithSSL(std::string const & etcd_url,
std::string const & ca, std::string const & ca,
std::string const & cert = "", std::string const & cert = "",
std::string const & key = "", std::string const & key = "",
@ -256,7 +235,7 @@ namespace etcd
* SANS of your SSL certificate. * SANS of your SSL certificate.
* @param arguments user provided grpc channel arguments. * @param arguments user provided grpc channel arguments.
*/ */
static etcd::Client *WithSSL(std::string const & etcd_url, static Client *WithSSL(std::string const & etcd_url,
#if defined(WITH_GRPC_CHANNEL_CLASS) #if defined(WITH_GRPC_CHANNEL_CLASS)
grpc::ChannelArguments const & arguments, grpc::ChannelArguments const & arguments,
#else #else
@ -267,6 +246,8 @@ namespace etcd
std::string const & key = "", std::string const & key = "",
std::string const & target_name_override = ""); std::string const & target_name_override = "");
~Client();
/** /**
* Get the HEAD revision of the connected etcd server. * Get the HEAD revision of the connected etcd server.
*/ */
@ -398,6 +379,30 @@ namespace etcd
*/ */
pplx::task<Response> ls(std::string const & key); pplx::task<Response> ls(std::string const & key);
/**
* Removes a directory node. Fails if the parent directory dos not exists or not a directory.
* @param key is the directory to be created to be listed
* @param recursive if true then delete a whole subtree, otherwise deletes only an empty directory.
*/
pplx::task<Response> rmdir(std::string const & key, bool recursive = false);
/**
* Removes multiple keys between [key, range_end).
*
* This overload for `const char *` is to avoid const char * to bool implicit casting.
*
* @param key is the directory to be created to be listed
* @param range_end is the end of key range to be removed.
*/
pplx::task<Response> rmdir(std::string const & key, const char *range_end);
/**
* Removes multiple keys between [key, range_end).
*
* @param key is the directory to be created to be listed
* @param range_end is the end of key range to be removed.
*/
pplx::task<Response> rmdir(std::string const & key, std::string const &range_end);
/** /**
* Gets a directory listing of the directory identified by the key. * Gets a directory listing of the directory identified by the key.
@ -428,31 +433,6 @@ namespace etcd
*/ */
pplx::task<Response> ls(std::string const & key, std::string const &range_end, size_t const limit); pplx::task<Response> ls(std::string const & key, std::string const &range_end, size_t const limit);
/**
* Removes a directory node. Fails if the parent directory dos not exists or not a directory.
* @param key is the directory to be created to be listed
* @param recursive if true then delete a whole subtree, otherwise deletes only an empty directory.
*/
pplx::task<Response> rmdir(std::string const & key, bool recursive = false);
/**
* Removes multiple keys between [key, range_end).
*
* This overload for `const char *` is to avoid const char * to bool implicit casting.
*
* @param key is the directory to be created to be listed
* @param range_end is the end of key range to be removed.
*/
pplx::task<Response> rmdir(std::string const & key, const char *range_end);
/**
* Removes multiple keys between [key, range_end).
*
* @param key is the directory to be created to be listed
* @param range_end is the end of key range to be removed.
*/
pplx::task<Response> rmdir(std::string const & key, std::string const &range_end);
/** /**
* Watches for changes of a key or a subtree. Please note that if you watch e.g. "/testdir" and * Watches for changes of a key or a subtree. Please note that if you watch e.g. "/testdir" and
* a new key is created, like "/testdir/newkey" then no change happened in the value of * a new key is created, like "/testdir/newkey" then no change happened in the value of
@ -597,19 +577,7 @@ namespace etcd
*/ */
pplx::task<Response> leader(std::string const &name); pplx::task<Response> leader(std::string const &name);
/** using Observer = SyncClient::Observer;
* An observer that will cancel the associated election::observe request
* when being destruct.
*/
class Observer {
public:
~Observer();
private:
std::shared_ptr<etcdv3::AsyncObserveAction> action = nullptr;
pplx::task<etcd::Response> resp;
friend class Client;
};
/** /**
* Observe the leader change. * Observe the leader change.
@ -618,19 +586,7 @@ namespace etcd
* *
* @returns an observer that holds that action and will cancel the request when being destructed. * @returns an observer that holds that action and will cancel the request when being destructed.
*/ */
std::unique_ptr<Observer> observe(std::string const &name, std::unique_ptr<Observer> observe(std::string const &name);
const bool once = false);
/**
* Observe the leader change.
*
* @param name is the names of election to watch.
*
* @returns an observer that holds that action and will cancel the request when being destructed.
*/
std::unique_ptr<Observer> observe(std::string const &name,
std::function<void(Response)> callback,
const bool once = false);
/** /**
* Updates the value of election with a new value, with leader key returns by * Updates the value of election with a new value, with leader key returns by
@ -649,27 +605,23 @@ namespace etcd
*/ */
const std::string &current_auth_token() const; const std::string &current_auth_token() const;
private: /**
* Obtain the underlying gRPC channel.
*/
#if defined(WITH_GRPC_CHANNEL_CLASS) #if defined(WITH_GRPC_CHANNEL_CLASS)
std::shared_ptr<grpc::Channel> channel; std::shared_ptr<grpc::Channel> grpc_channel() const;
#else #else
std::shared_ptr<grpc_impl::Channel> channel; std::shared_ptr<grpc_impl::Channel> grpc_channel() const;
#endif #endif
mutable std::unique_ptr<TokenAuthenticator, TokenAuthenticatorDeleter> token_authenticator; /**
* Obtain the underlying synchronous client.
*/
SyncClient* sync_client() const;
struct EtcdServerStubs; private:
struct EtcdServerStubsDeleter { bool own_client = true;
void operator()(EtcdServerStubs *stubs); SyncClient *client = nullptr;
};
std::unique_ptr<EtcdServerStubs, EtcdServerStubsDeleter> stubs;
std::mutex mutex_for_keepalives;
std::map<std::string, int64_t> leases_for_locks;
std::map<int64_t, std::shared_ptr<KeepAlive>> keep_alive_for_locks;
friend class KeepAlive;
friend class Watcher;
}; };
} }

View File

@ -7,7 +7,7 @@
#include <string> #include <string>
#include <thread> #include <thread>
#include "etcd/Client.hpp" #include "etcd/SyncClient.hpp"
#include "etcd/Response.hpp" #include "etcd/Response.hpp"
#include <boost/config.hpp> #include <boost/config.hpp>
@ -20,6 +20,9 @@
namespace etcd namespace etcd
{ {
// forward declaration to avoid header/library dependency
class Client;
/** /**
* If ID is set to 0, the library will choose an ID, and can be accessed from ".Lease()". * If ID is set to 0, the library will choose an ID, and can be accessed from ".Lease()".
*/ */
@ -28,6 +31,8 @@ namespace etcd
public: public:
KeepAlive(Client const &client, KeepAlive(Client const &client,
int ttl, int64_t lease_id = 0); int ttl, int64_t lease_id = 0);
KeepAlive(SyncClient const &client,
int ttl, int64_t lease_id = 0);
KeepAlive(std::string const & address, KeepAlive(std::string const & address,
int ttl, int64_t lease_id = 0); int ttl, int64_t lease_id = 0);
KeepAlive(std::string const & address, KeepAlive(std::string const & address,
@ -38,6 +43,9 @@ namespace etcd
KeepAlive(Client const &client, KeepAlive(Client const &client,
std::function<void (std::exception_ptr)> const &handler, std::function<void (std::exception_ptr)> const &handler,
int ttl, int64_t lease_id = 0); int ttl, int64_t lease_id = 0);
KeepAlive(SyncClient const &client,
std::function<void (std::exception_ptr)> const &handler,
int ttl, int64_t lease_id = 0);
KeepAlive(std::string const & address, KeepAlive(std::string const & address,
std::function<void (std::exception_ptr)> const &handler, std::function<void (std::exception_ptr)> const &handler,
int ttl, int64_t lease_id = 0); int ttl, int64_t lease_id = 0);

View File

@ -1,12 +1,13 @@
#ifndef __ETCD_RESPONSE_HPP__ #ifndef __ETCD_RESPONSE_HPP__
#define __ETCD_RESPONSE_HPP__ #define __ETCD_RESPONSE_HPP__
#include <chrono>
#include <iostream> #include <iostream>
#include <functional>
#include <memory>
#include <string> #include <string>
#include <vector> #include <vector>
#include "pplx/pplxtasks.h"
#include "etcd/Value.hpp" #include "etcd/Value.hpp"
namespace etcdv3 { namespace etcdv3 {
@ -28,9 +29,7 @@ namespace etcd
public: public:
template <typename T> template <typename T>
static pplx::task<etcd::Response> create(std::shared_ptr<T> call) static etcd::Response create(std::unique_ptr<T> call)
{
return pplx::task<etcd::Response>([call]()
{ {
call->waitForResponse(); call->waitForResponse();
auto v3resp = call->ParseResponse(); auto v3resp = call->ParseResponse();
@ -38,14 +37,22 @@ namespace etcd
auto duration = std::chrono::duration_cast<std::chrono::microseconds>( auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::high_resolution_clock::now() - call->startTimepoint()); std::chrono::high_resolution_clock::now() - call->startTimepoint());
return etcd::Response(v3resp, duration); return etcd::Response(v3resp, duration);
});
} }
template <typename T> template <typename T>
static pplx::task<etcd::Response> create(std::shared_ptr<T> call, static etcd::Response create(std::shared_ptr<T> call)
std::function<void(Response)> callback)
{ {
return pplx::task<etcd::Response>([call, callback]() call->waitForResponse();
auto v3resp = call->ParseResponse();
auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::high_resolution_clock::now() - call->startTimepoint());
return etcd::Response(v3resp, duration);
}
template <typename T>
static etcd::Response create(std::unique_ptr<T> call,
std::function<void(Response)> callback)
{ {
call->waitForResponse(callback); call->waitForResponse(callback);
auto v3resp = call->ParseResponse(); auto v3resp = call->ParseResponse();
@ -53,13 +60,10 @@ namespace etcd
auto duration = std::chrono::duration_cast<std::chrono::microseconds>( auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::high_resolution_clock::now() - call->startTimepoint()); std::chrono::high_resolution_clock::now() - call->startTimepoint());
return etcd::Response(v3resp, duration); return etcd::Response(v3resp, duration);
});
} }
template <typename T> template <typename T>
static pplx::task<etcd::Response> create(std::function<std::shared_ptr<T>()> callfn) static etcd::Response create(std::function<std::unique_ptr<T>()> callfn)
{
return pplx::task<etcd::Response>([callfn]()
{ {
auto call = callfn(); auto call = callfn();
@ -69,12 +73,13 @@ namespace etcd
auto duration = std::chrono::duration_cast<std::chrono::microseconds>( auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::high_resolution_clock::now() - call->startTimepoint()); std::chrono::high_resolution_clock::now() - call->startTimepoint());
return etcd::Response(v3resp, duration); return etcd::Response(v3resp, duration);
});
} }
template <typename T> template <typename T>
static etcd::Response create_sync(std::shared_ptr<T> call) static etcd::Response create(std::function<std::shared_ptr<T>()> callfn)
{ {
auto call = callfn();
call->waitForResponse(); call->waitForResponse();
auto v3resp = call->ParseResponse(); auto v3resp = call->ParseResponse();

View File

@ -1,10 +1,76 @@
#ifndef __ETCD_SYNC_CLIENT_HPP__ #ifndef __ETCD_SYNC_CLIENT_HPP__
#define __ETCD_SYNC_CLIENT_HPP__ #define __ETCD_SYNC_CLIENT_HPP__
#include "etcd/Client.hpp" #include <map>
#include <memory>
#include <mutex>
#include <string>
#include "etcd/Response.hpp"
#include "etcd/v3/action_constants.hpp"
namespace etcdv3 {
struct ActionParameters;
class AsyncCompareAndDeleteAction;
class AsyncCompareAndSwapAction;
class AsyncDeleteAction;
class AsyncCampaignAction;
class AsyncProclaimAction;
class AsyncLeaderAction;
class AsyncObserveAction;
class AsyncResignAction;
class AsyncHeadAction;
class AsyncLeaseGrantAction;
class AsyncLeaseRevokeAction;
class AsyncLeaseKeepAliveAction;
class AsyncLeaseTimeToLiveAction;
class AsyncLeaseLeasesAction;
class AsyncLockAction;
class AsyncUnlockAction;
class AsyncPutAction;
class AsyncRangeAction;
class AsyncSetAction;
class AsyncTxnAction;
class AsyncUpdateAction;
class AsyncWatchAction;
enum class AtomicityType;
class Transaction;
namespace detail {
std::string string_plus_one(std::string const &value);
}
}
#if defined(WITH_GRPC_CHANNEL_CLASS)
namespace grpc {
class Channel;
class ChannelArguments;
}
#else
namespace grpc_impl {
class Channel;
class ChannelArguments;
}
#endif
namespace etcd namespace etcd
{ {
using etcdv3::ERROR_KEY_NOT_FOUND;
using etcdv3::ERROR_COMPARE_FAILED;
using etcdv3::ERROR_KEY_ALREADY_EXISTS;
using etcdv3::ERROR_ACTION_CANCELLED;
class KeepAlive;
class Watcher;
class Client;
// FIXME
/**
* Client is responsible for maintaining a connection towards an etcd server.
* Etcd operations can be reached via the methods of the client.
*/
/** /**
* SyncClient is a wrapper around etcd::Client and provides a simplified sync interface with blocking operations. * SyncClient is a wrapper around etcd::Client and provides a simplified sync interface with blocking operations.
* *
@ -15,9 +81,16 @@ namespace etcd
*/ */
class SyncClient class SyncClient
{ {
private:
class TokenAuthenticator;
class TokenAuthenticatorDeleter {
public:
void operator()(TokenAuthenticator *authenticator);
};
public: public:
/** /**
* Constructs an etcd sync client object. * Constructs an etcd client object.
* *
* @param etcd_url is the url of the etcd server to connect to, like "http://127.0.0.1:2379", * @param etcd_url is the url of the etcd server to connect to, like "http://127.0.0.1:2379",
* or multiple url, seperated by ',' or ';'. * or multiple url, seperated by ',' or ';'.
@ -27,7 +100,47 @@ namespace etcd
std::string const & load_balancer = "round_robin"); std::string const & load_balancer = "round_robin");
/** /**
* Constructs an etcd sync client object. * Constructs an etcd client object.
*
* @param etcd_url is the url of the etcd server to connect to, like "http://127.0.0.1:2379",
* or multiple url, seperated by ',' or ';'.
* @param arguments user provided grpc channel arguments.
*/
SyncClient(std::string const & etcd_url,
#if defined(WITH_GRPC_CHANNEL_CLASS)
grpc::ChannelArguments const & arguments
#else
grpc_impl::ChannelArguments const & arguments
#endif
);
/**
* Constructs an etcd client object.
*
* @param etcd_url is the url of the etcd server to connect to, like "http://127.0.0.1:2379",
* or multiple url, seperated by ',' or ';'.
* @param load_balancer is the load balance strategy, can be one of round_robin/pick_first/grpclb/xds.
*/
static SyncClient *WithUrl(std::string const & etcd_url,
std::string const & load_balancer = "round_robin");
/**
* Constructs an etcd client object.
*
* @param etcd_url is the url of the etcd server to connect to, like "http://127.0.0.1:2379",
* or multiple url, seperated by ',' or ';'.
* @param arguments user provided grpc channel arguments.
*/
static SyncClient *WithUrl(std::string const & etcd_url,
#if defined(WITH_GRPC_CHANNEL_CLASS)
grpc::ChannelArguments const & arguments
#else
grpc_impl::ChannelArguments const & arguments
#endif
);
/**
* Constructs an etcd client object.
* *
* @param etcd_url is the url of the etcd server to connect to, like "http://127.0.0.1:2379", * @param etcd_url is the url of the etcd server to connect to, like "http://127.0.0.1:2379",
* or multiple url, seperated by ',' or ';'. * or multiple url, seperated by ',' or ';'.
@ -42,46 +155,336 @@ namespace etcd
int const auth_token_ttl = 300, int const auth_token_ttl = 300,
std::string const & load_balancer = "round_robin"); std::string const & load_balancer = "round_robin");
Response head(); /**
Response get(std::string const & key); * Constructs an etcd client object.
Response set(std::string const & key, std::string const & value, int ttl = 0); *
Response set(std::string const & key, std::string const & value, int64_t leaseId); * @param etcd_url is the url of the etcd server to connect to, like "http://127.0.0.1:2379",
Response add(std::string const & key, std::string const & value, int ttl = 0); * or multiple url, seperated by ',' or ';'.
Response add(std::string const & key, std::string const & value, int64_t leaseId); * @param username username of etcd auth
Response put(std::string const & key, std::string const & value); * @param password password of etcd auth
Response modify(std::string const & key, std::string const & value, int ttl = 0); * @param arguments user provided grpc channel arguments.
Response modify(std::string const & key, std::string const & value, int64_t leaseId); * @param auth_token_ttl TTL seconds for auth token, see also `--auth-token-ttl` flags of etcd.
Response modify_if(std::string const & key, std::string const & value, std::string const & old_value, int ttl = 0); * Default value should be 300.
Response modify_if(std::string const & key, std::string const & value, std::string const & old_value, int64_t leaseId); */
Response modify_if(std::string const & key, std::string const & value, int64_t old_index, int ttl = 0); SyncClient(std::string const & etcd_url,
Response modify_if(std::string const & key, std::string const & value, int64_t old_index, int64_t leaseId); std::string const & username,
Response rm(std::string const & key); std::string const & password,
Response rm_if(std::string const & key, std::string const & old_value); int const auth_token_ttl,
Response rm_if(std::string const & key, int64_t old_index); #if defined(WITH_GRPC_CHANNEL_CLASS)
Response ls(std::string const & key); grpc::ChannelArguments const & arguments
Response ls(std::string const & key, size_t const limit); #else
Response ls(std::string const & key, std::string const &range_end); grpc_impl::ChannelArguments const & arguments
Response ls(std::string const & key, std::string const &range_end, size_t const limit); #endif
Response mkdir(std::string const & key, int ttl = 0); );
Response rmdir(std::string const & key, bool recursive = false);
Response rmdir(std::string const & key, const char *range_end);
Response rmdir(std::string const & key, std::string const &range_end);
Response leasegrant(int ttl);
Response leaserevoke(int64_t lease_id);
Response leasetimetolive(int64_t lease_id);
Response campaign(std::string const &name, int64_t lease_id,
std::string const &value); /**
Response proclaim(std::string const &name, int64_t lease_id, * Constructs an etcd client object.
std::string const &key, int64_t revision, std::string const &value); *
Response leader(std::string const &name); * @param etcd_url is the url of the etcd server to connect to, like "http://127.0.0.1:2379",
std::unique_ptr<Client::Observer> observe(std::string const &name, * or multiple url, seperated by ',' or ';'.
const bool once = false); * @param username username of etcd auth
std::unique_ptr<Client::Observer> observe(std::string const &name, * @param password password of etcd auth
std::function<void(Response)> callback, * @param load_balancer is the load balance strategy, can be one of round_robin/pick_first/grpclb/xds.
const bool once = false); * @param auth_token_ttl TTL seconds for auth token, see also `--auth-token-ttl` flags of etcd.
Response resign(std::string const &name, int64_t lease_id, */
std::string const &key, int64_t revision); static SyncClient *WithUser(std::string const & etcd_url,
std::string const & username,
std::string const & password,
int const auth_token_ttl = 300,
std::string const & load_balancer = "round_robin");
/**
* Constructs an etcd client object.
*
* @param etcd_url is the url of the etcd server to connect to, like "http://127.0.0.1:2379",
* or multiple url, seperated by ',' or ';'.
* @param username username of etcd auth
* @param password password of etcd auth
* @param arguments user provided grpc channel arguments.
* @param auth_token_ttl TTL seconds for auth token, see also `--auth-token-ttl` flags of etcd.
* Default value should be 300.
*/
static SyncClient *WithUser(std::string const & etcd_url,
std::string const & username,
std::string const & password,
int const auth_token_ttl,
#if defined(WITH_GRPC_CHANNEL_CLASS)
grpc::ChannelArguments const & arguments
#else
grpc_impl::ChannelArguments const & arguments
#endif
);
/**
* Constructs an etcd client object.
*
* @param etcd_url is the url of the etcd server to connect to, like "http://127.0.0.1:2379",
* or multiple url, seperated by ',' or ';'.
* @param ca root CA file for SSL/TLS connection.
* @param cert cert chain file for SSL/TLS authentication, could be empty string.
* @param key private key file for SSL/TLS authentication, could be empty string.
* @param load_balancer is the load balance strategy, can be one of round_robin/pick_first/grpclb/xds.
*/
SyncClient(std::string const & etcd_url,
std::string const & ca,
std::string const & cert,
std::string const & key,
std::string const & target_name_override,
std::string const & load_balancer);
/**
* Constructs an etcd client object.
*
* @param etcd_url is the url of the etcd server to connect to, like "http://127.0.0.1:2379",
* or multiple url, seperated by ',' or ';'.
* @param ca root CA file for SSL/TLS connection.
* @param cert cert chain file for SSL/TLS authentication, could be empty string.
* @param key private key file for SSL/TLS authentication, could be empty string.
* @param arguments user provided grpc channel arguments.
*/
SyncClient(std::string const & etcd_url,
std::string const & ca,
std::string const & cert,
std::string const & key,
std::string const & target_name_override,
#if defined(WITH_GRPC_CHANNEL_CLASS)
grpc::ChannelArguments const & arguments
#else
grpc_impl::ChannelArguments const & arguments
#endif
);
/**
* Constructs an etcd client object.
*
* @param etcd_url is the url of the etcd server to connect to, like "http://127.0.0.1:2379",
* or multiple url, seperated by ',' or ';'.
* @param ca root CA file for SSL/TLS connection.
* @param cert cert chain file for SSL/TLS authentication, could be empty string.
* @param key private key file for SSL/TLS authentication, could be empty string.
* @param target_name_override Override the target host name if you want to pass multiple address
* for load balancing with SSL, and there's no DNS. The @target_name_override@ must exist in the
* SANS of your SSL certificate.
* @param load_balancer is the load balance strategy, can be one of round_robin/pick_first/grpclb/xds.
*/
static SyncClient *WithSSL(std::string const & etcd_url,
std::string const & ca,
std::string const & cert = "",
std::string const & key = "",
std::string const & target_name_override = "",
std::string const & load_balancer = "round_robin");
/**
* Constructs an etcd client object.
*
* @param etcd_url is the url of the etcd server to connect to, like "http://127.0.0.1:2379",
* or multiple url, seperated by ',' or ';'.
* @param ca root CA file for SSL/TLS connection.
* @param cert cert chain file for SSL/TLS authentication, could be empty string.
* @param key private key file for SSL/TLS authentication, could be empty string.
* @param target_name_override Override the target host name if you want to pass multiple address
* for load balancing with SSL, and there's no DNS. The @target_name_override@ must exist in the
* SANS of your SSL certificate.
* @param arguments user provided grpc channel arguments.
*/
static SyncClient *WithSSL(std::string const & etcd_url,
#if defined(WITH_GRPC_CHANNEL_CLASS)
grpc::ChannelArguments const & arguments,
#else
grpc_impl::ChannelArguments const & arguments,
#endif
std::string const & ca,
std::string const & cert = "",
std::string const & key = "",
std::string const & target_name_override = "");
~SyncClient();
/**
* Get the HEAD revision of the connected etcd server.
*/
Response head();
/**
* Get the value of specified key from the etcd server
* @param key is the key to be read
*/
Response get(std::string const & key);
/**
* Sets the value of a key. The key will be modified if already exists or created
* if it does not exists.
* @param key is the key to be created or modified
* @param value is the new value to be set
*/
Response set(std::string const & key, std::string const & value, int ttl = 0);
/**
* Sets the value of a key. The key will be modified if already exists or created
* if it does not exists.
* @param key is the key to be created or modified
* @param value is the new value to be set
* @param leaseId is the lease attached to the key
*/
Response set(std::string const & key, std::string const & value, int64_t leaseId);
/**
* Creates a new key and sets it's value. Fails if the key already exists.
* @param key is the key to be created
* @param value is the value to be set
*/
Response add(std::string const & key, std::string const & value, int ttl = 0);
/**
* Creates a new key and sets it's value. Fails if the key already exists.
* @param key is the key to be created
* @param value is the value to be set
* @param leaseId is the lease attached to the key
*/
Response add(std::string const & key, std::string const & value, int64_t leaseId);
/**
* Put a new key-value pair.
* @param key is the key to be put
* @param value is the value to be put
*/
Response put(std::string const & key, std::string const & value);
/**
* Modifies an existing key. Fails if the key does not exists.
* @param key is the key to be modified
* @param value is the new value to be set
*/
Response modify(std::string const & key, std::string const & value, int ttl = 0);
/**
* Modifies an existing key. Fails if the key does not exists.
* @param key is the key to be modified
* @param value is the new value to be set
* @param leaseId is the lease attached to the key
*/
Response modify(std::string const & key, std::string const & value, int64_t leaseId);
/**
* Modifies an existing key only if it has a specific value. Fails if the key does not exists
* or the original value differs from the expected one.
* @param key is the key to be modified
* @param value is the new value to be set
* @param old_value is the value to be replaced
*/
Response modify_if(std::string const & key, std::string const & value, std::string const & old_value, int ttl = 0);
/**
* Modifies an existing key only if it has a specific value. Fails if the key does not exists
* or the original value differs from the expected one.
* @param key is the key to be modified
* @param value is the new value to be set
* @param old_value is the value to be replaced
* @param leaseId is the lease attached to the key
*/
Response modify_if(std::string const & key, std::string const & value, std::string const & old_value, int64_t leaseId);
/**
* Modifies an existing key only if it has a specific modification index value. Fails if the key
* does not exists or the modification index of the previous value differs from the expected one.
* @param key is the key to be modified
* @param value is the new value to be set
* @param old_index is the expected index of the original value
*/
Response modify_if(std::string const & key, std::string const & value, int64_t old_index, int ttl = 0);
/**
* Modifies an existing key only if it has a specific modification index value. Fails if the key
* does not exists or the modification index of the previous value differs from the expected one.
* @param key is the key to be modified
* @param value is the new value to be set
* @param old_index is the expected index of the original value
* @param leaseId is the lease attached to the key
*/
Response modify_if(std::string const & key, std::string const & value, int64_t old_index, int64_t leaseId);
/**
* Removes a single key. The key has to point to a plain, non directory entry.
* @param key is the key to be deleted
*/
Response rm(std::string const & key);
/**
* Removes a single key but only if it has a specific value. Fails if the key does not exists
* or the its value differs from the expected one.
* @param key is the key to be deleted
*/
Response rm_if(std::string const & key, std::string const & old_value);
/**
* Removes an existing key only if it has a specific modification index value. Fails if the key
* does not exists or the modification index of it differs from the expected one.
* @param key is the key to be deleted
* @param old_index is the expected index of the existing value
*/
Response rm_if(std::string const & key, int64_t old_index);
/**
* Removes a directory node. Fails if the parent directory dos not exists or not a directory.
* @param key is the directory to be created to be listed
* @param recursive if true then delete a whole subtree, otherwise deletes only an empty directory.
*/
Response rmdir(std::string const & key, bool recursive = false);
/**
* Removes multiple keys between [key, range_end).
*
* This overload for `const char *` is to avoid const char * to bool implicit casting.
*
* @param key is the directory to be created to be listed
* @param range_end is the end of key range to be removed.
*/
Response rmdir(std::string const & key, const char *range_end);
/**
* Removes multiple keys between [key, range_end).
*
* @param key is the directory to be created to be listed
* @param range_end is the end of key range to be removed.
*/
Response rmdir(std::string const & key, std::string const &range_end);
/**
* Gets a directory listing of the directory identified by the key.
* @param key is the key to be listed
*/
Response ls(std::string const & key);
/**
* Gets a directory listing of the directory identified by the key.
* @param key is the key to be listed
* @param limit is the size limit of results to be listed, we don't use default parameters
* to ensure backwards binary compatibility.
*/
Response ls(std::string const & key, size_t const limit);
/**
* Gets a directory listing of the directory identified by the key and range_end, i.e., get
* all keys in the range [key, range_end).
*
* @param key is the key to be listed
* @param range_end is the end of key range to be listed
*/
Response ls(std::string const & key, std::string const &range_end);
/**
* Gets a directory listing of the directory identified by the key and range_end, i.e., get
* all keys in the range [key, range_end).
*
* @param key is the key to be listed
* @param range_end is the end of key range to be listed
* @param limit is the size limit of results to be listed, we don't use default parameters
* to ensure backwards binary compatibility.
*/
Response ls(std::string const & key, std::string const &range_end, size_t const limit);
/** /**
* Watches for changes of a key or a subtree. Please note that if you watch e.g. "/testdir" and * Watches for changes of a key or a subtree. Please note that if you watch e.g. "/testdir" and
@ -92,8 +495,6 @@ namespace etcd
* @param recursive if true watch a whole subtree * @param recursive if true watch a whole subtree
*/ */
Response watch(std::string const & key, bool recursive = false); Response watch(std::string const & key, bool recursive = false);
Response watch(std::string const & key, const char *range_end);
Response watch(std::string const & key, std::string const &range_end);
/** /**
* Watches for changes of a key or a subtree from a specific index. The index value can be in the "past". * Watches for changes of a key or a subtree from a specific index. The index value can be in the "past".
@ -102,11 +503,238 @@ namespace etcd
* @param recursive if true watch a whole subtree * @param recursive if true watch a whole subtree
*/ */
Response watch(std::string const & key, int64_t fromIndex, bool recursive = false); Response watch(std::string const & key, int64_t fromIndex, bool recursive = false);
/**
* Watches for changes of a range of keys inside [key, range_end).
*
* This overload for `const char *` is to avoid const char * to bool implicit casting.
*
* @param key is the value or directory to be watched
* @param range_end is the end of key range to be removed.
*/
Response watch(std::string const & key, const char *range_end);
/**
* Watches for changes of a range of keys inside [key, range_end).
*
* @param key is the value or directory to be watched
* @param range_end is the end of key range to be removed.
*/
Response watch(std::string const & key, std::string const &range_end);
/**
* Watches for changes of a range of keys inside [key, range_end) from a specific index. The index value
* can be in the "past".
*
* Watches for changes of a key or a subtree from a specific index. The index value can be in the "past".
* @param key is the value or directory to be watched
* @param range_end is the end of key range to be removed.
* @param fromIndex the first index we are interested in
*/
Response watch(std::string const & key, std::string const &range_end, int64_t fromIndex); Response watch(std::string const & key, std::string const &range_end, int64_t fromIndex);
protected: /**
Client client; * Grants a lease.
* @param ttl is the time to live of the lease
*/
Response leasegrant(int ttl);
/**
* Grants a lease.
* @param ttl is the time to live of the lease
*/
std::shared_ptr<KeepAlive> leasekeepalive(int ttl);
/**
* Revoke a lease.
* @param lease_id is the id the lease
*/
Response leaserevoke(int64_t lease_id);
/**
* Get time-to-live of a lease.
* @param lease_id is the id the lease
*/
Response leasetimetolive(int64_t lease_id);
/**
* Gains a lock at a key, using a default created lease, using the default lease (10 seconds), with
* keeping alive has already been taken care of by the library.
* @param key is the key to be used to request the lock.
*/
Response lock(std::string const &key);
/**
* Gains a lock at a key, using a default created lease, using the specified lease TTL (in seconds), with
* keeping alive has already been taken care of by the library.
* @param key is the key to be used to request the lock.
* @param lease_ttl is the TTL used to create a lease for the key.
*/
Response lock(std::string const &key, int lease_ttl);
/**
* Gains a lock at a key, using a user-provided lease, the lifetime of the lease won't be taken care
* of by the library.
* @param key is the key to be used to request the lock.
*/
Response lock_with_lease(std::string const &key, int64_t lease_id);
/**
* Releases a lock at a key.
* @param key is the lock key to release.
*/
Response unlock(std::string const &lock_key);
/**
* Execute a etcd transaction.
* @param txn is the transaction object to be executed.
*/
Response txn(etcdv3::Transaction const &txn);
/**
* Campaign for the election @name@.
*
* @param name is the name of election that will campaign for.
* @param lease_id is a user-managed (usually with a `KeepAlive`) lease id.
* @param value is the value for campaign.
*
* @returns a leader key if succeed, consist of
*
* - name: the name of the election
* - key: a generated election key
* - created rev: the revision of the generated key
* - lease: the lease id of the election leader
*/
Response campaign(std::string const &name, int64_t lease_id, std::string const &value);
/**
* Updates the value of election with a new value, with leader key returns by
* @campaign@.
*
* @param name is the name of election
* @param lease_id is the user-provided lease id for the proclamation
* @param key is the generated associated key returned by @campaign@
* @param revision is the created revision of key-value returned by @campaign@
* @param value is the new value to set.
*/
Response proclaim(std::string const &name, int64_t lease_id,
std::string const &key, int64_t revision, std::string const &value);
/**
* Get the current leader proclamation.
*
* @param name is the names of election.
*
* @returns current election key and value.
*/
Response leader(std::string const &name);
/**
* An observer that will cancel the associated election::observe request
* when being destruct.
*/
class Observer {
public:
~Observer();
// wait a at least one response from the observer.
Response WaitOnce();
private:
std::shared_ptr<etcdv3::AsyncObserveAction> action = nullptr;
friend class SyncClient;
}; };
/**
* Observe the leader change.
*
* @param name is the names of election to watch.
*
* @returns an observer that holds that action and will cancel the request when being destructed.
*/
std::unique_ptr<Observer> observe(std::string const &name);
/**
* Updates the value of election with a new value, with leader key returns by
* @campaign@.
*
* @param name is the name of election
* @param lease_id is the user-provided lease id for the proclamation
* @param key is the generated associated key returned by @campaign@
* @param revision is the created revision of key-value returned by @campaign@
*/
Response resign(std::string const &name, int64_t lease_id,
std::string const &key, int64_t revision);
private:
// TODO: use std::unique_ptr<>
std::shared_ptr<etcdv3::AsyncHeadAction> head_internal();
std::shared_ptr<etcdv3::AsyncRangeAction> get_internal(std::string const & key);
std::shared_ptr<etcdv3::AsyncSetAction> set_internal(std::string const & key, std::string const & value, int64_t leaseId);
std::shared_ptr<etcdv3::AsyncSetAction> add_internal(std::string const & key, std::string const & value, int64_t leaseId);
std::shared_ptr<etcdv3::AsyncPutAction> put_internal(std::string const & key, std::string const & value);
std::shared_ptr<etcdv3::AsyncUpdateAction> modify_internal(std::string const & key, std::string const & value, int64_t leaseId);
std::shared_ptr<etcdv3::AsyncCompareAndSwapAction> modify_if_internal(std::string const & key, std::string const & value, int64_t old_index, std::string const & old_value, int64_t leaseId, etcdv3::AtomicityType const & atomicity_type);
std::shared_ptr<etcdv3::AsyncDeleteAction> rm_internal(std::string const & key);
std::shared_ptr<etcdv3::AsyncCompareAndDeleteAction> rm_if_internal(std::string const & key, int64_t old_index, const std::string & old_value, etcdv3::AtomicityType const & atomicity_type);
std::shared_ptr<etcdv3::AsyncDeleteAction> rmdir_internal(std::string const & key, bool recursive = false);
std::shared_ptr<etcdv3::AsyncDeleteAction> rmdir_internal(std::string const & key, std::string const &range_end);
std::shared_ptr<etcdv3::AsyncRangeAction> ls_internal(std::string const & key, size_t const limit);
std::shared_ptr<etcdv3::AsyncRangeAction> ls_internal(std::string const & key, std::string const &range_end, size_t const limit);
std::shared_ptr<etcdv3::AsyncWatchAction> watch_internal(std::string const & key, int64_t fromIndex, bool recursive = false);
std::shared_ptr<etcdv3::AsyncWatchAction> watch_internal(std::string const & key, std::string const &range_end, int64_t fromIndex);
std::shared_ptr<etcdv3::AsyncLeaseRevokeAction> leaserevoke_internal(int64_t lease_id);
std::shared_ptr<etcdv3::AsyncLeaseTimeToLiveAction> leasetimetolive_internal(int64_t lease_id);
Response lock_internal(std::string const &key, std::shared_ptr<etcd::KeepAlive> const &keepalive);
std::shared_ptr<etcdv3::AsyncLockAction> lock_with_lease_internal(std::string const &key, int64_t lease_id);
std::shared_ptr<etcdv3::AsyncUnlockAction> unlock_internal(std::string const &lock_key);
std::shared_ptr<etcdv3::AsyncTxnAction> txn_internal(etcdv3::Transaction const &txn);
std::shared_ptr<etcdv3::AsyncCampaignAction> campaign_internal(std::string const &name, int64_t lease_id, std::string const &value);
std::shared_ptr<etcdv3::AsyncProclaimAction> proclaim_internal(
std::string const &name, int64_t lease_id,
std::string const &key, int64_t revision, std::string const &value);
std::shared_ptr<etcdv3::AsyncLeaderAction> leader_internal(std::string const &name);
std::shared_ptr<etcdv3::AsyncResignAction> resign_internal(std::string const &name, int64_t lease_id,
std::string const &key, int64_t revision);
public:
/**
* Return current auth token.
*/
const std::string &current_auth_token() const;
/**
* Obtain the underlying gRPC channel.
*/
#if defined(WITH_GRPC_CHANNEL_CLASS)
std::shared_ptr<grpc::Channel> grpc_channel() const;
#else
std::shared_ptr<grpc_impl::Channel> grpc_channel() const;
#endif
private:
#if defined(WITH_GRPC_CHANNEL_CLASS)
std::shared_ptr<grpc::Channel> channel;
#else
std::shared_ptr<grpc_impl::Channel> channel;
#endif
mutable std::unique_ptr<TokenAuthenticator, TokenAuthenticatorDeleter> token_authenticator;
struct EtcdServerStubs;
struct EtcdServerStubsDeleter {
void operator()(EtcdServerStubs *stubs);
};
std::unique_ptr<EtcdServerStubs, EtcdServerStubsDeleter> stubs;
std::mutex mutex_for_keepalives;
std::map<std::string, int64_t> leases_for_locks;
std::map<int64_t, std::shared_ptr<KeepAlive>> keep_alive_for_locks;
friend class KeepAlive;
friend class Watcher;
friend class Client;
};
} }
#endif #endif

View File

@ -22,6 +22,8 @@ namespace etcd
class Value; class Value;
class Event; class Event;
class Response; class Response;
class Client;
class SyncClient;
/** /**
* Represents a value object received from the etcd server * Represents a value object received from the etcd server
@ -68,6 +70,8 @@ namespace etcd
int64_t lease() const; int64_t lease() const;
protected: protected:
friend class Client;
friend class SyncClient;
friend class Response; friend class Response;
friend class BaseResponse; //deliberately done since Value class will be removed during full V3 friend class BaseResponse; //deliberately done since Value class will be removed during full V3
friend class DeleteRpcResponse; friend class DeleteRpcResponse;

View File

@ -6,24 +6,37 @@
#include <string> #include <string>
#include <thread> #include <thread>
#include "etcd/Client.hpp" #include "etcd/SyncClient.hpp"
#include "etcd/Response.hpp" #include "etcd/Response.hpp"
namespace etcd namespace etcd
{ {
// forward declaration to avoid header/library dependency
class Client;
class Watcher class Watcher
{ {
public: public:
Watcher(Client const &client, std::string const & key, Watcher(Client const &client, std::string const & key,
std::function<void(Response)> callback, bool recursive=false); std::function<void(Response)> callback, bool recursive=false);
Watcher(SyncClient const &client, std::string const & key,
std::function<void(Response)> callback, bool recursive=false);
Watcher(Client const &client, std::string const & key, Watcher(Client const &client, std::string const & key,
std::string const &range_end, std::string const &range_end,
std::function<void(Response)> callback); std::function<void(Response)> callback);
Watcher(SyncClient const &client, std::string const & key,
std::string const &range_end,
std::function<void(Response)> callback);
Watcher(Client const &client, std::string const & key, int64_t fromIndex, Watcher(Client const &client, std::string const & key, int64_t fromIndex,
std::function<void(Response)> callback, bool recursive=false); std::function<void(Response)> callback, bool recursive=false);
Watcher(SyncClient const &client, std::string const & key, int64_t fromIndex,
std::function<void(Response)> callback, bool recursive=false);
Watcher(Client const &client, std::string const & key, Watcher(Client const &client, std::string const & key,
std::string const &range_end, int64_t fromIndex, std::string const &range_end, int64_t fromIndex,
std::function<void(Response)> callback); std::function<void(Response)> callback);
Watcher(SyncClient const &client, std::string const & key,
std::string const &range_end, int64_t fromIndex,
std::function<void(Response)> callback);
Watcher(std::string const & address, std::string const & key, Watcher(std::string const & address, std::string const & key,
std::function<void(Response)> callback, bool recursive=false); std::function<void(Response)> callback, bool recursive=false);
Watcher(std::string const & address, std::string const & key, Watcher(std::string const & address, std::string const & key,

View File

@ -2,6 +2,7 @@
#define __V3_ACTION_HPP__ #define __V3_ACTION_HPP__
#include <chrono> #include <chrono>
#include <ostream>
#include <grpc++/grpc++.h> #include <grpc++/grpc++.h>
#include "proto/rpc.grpc.pb.h" #include "proto/rpc.grpc.pb.h"
@ -20,7 +21,7 @@ using v3electionpb::Election;
namespace etcdv3 namespace etcdv3
{ {
enum AtomicityType enum class AtomicityType
{ {
PREV_INDEX = 0, PREV_INDEX = 0,
PREV_VALUE = 1 PREV_VALUE = 1
@ -46,12 +47,15 @@ namespace etcdv3
Lease::Stub* lease_stub; Lease::Stub* lease_stub;
Lock::Stub* lock_stub; Lock::Stub* lock_stub;
Election::Stub* election_stub; Election::Stub* election_stub;
void dump(std::ostream &os) const;
}; };
class Action class Action
{ {
public: public:
Action(etcdv3::ActionParameters const &params); Action(etcdv3::ActionParameters const &params);
Action(etcdv3::ActionParameters && params);
void waitForResponse(); void waitForResponse();
const std::chrono::high_resolution_clock::time_point startTimepoint(); const std::chrono::high_resolution_clock::time_point startTimepoint();
protected: protected:
@ -60,6 +64,9 @@ namespace etcdv3
CompletionQueue cq_; CompletionQueue cq_;
etcdv3::ActionParameters parameters; etcdv3::ActionParameters parameters;
std::chrono::high_resolution_clock::time_point start_timepoint; std::chrono::high_resolution_clock::time_point start_timepoint;
private:
// Init things like auth token, etc.
void InitAction();
}; };
namespace detail { namespace detail {

View File

@ -16,7 +16,7 @@ namespace etcdv3
class AsyncCompareAndDeleteAction : public etcdv3::Action class AsyncCompareAndDeleteAction : public etcdv3::Action
{ {
public: public:
AsyncCompareAndDeleteAction(etcdv3::ActionParameters const &param, etcdv3::AtomicityType type); AsyncCompareAndDeleteAction(etcdv3::ActionParameters && params, etcdv3::AtomicityType type);
AsyncTxnResponse ParseResponse(); AsyncTxnResponse ParseResponse();
private: private:
TxnResponse reply; TxnResponse reply;

View File

@ -16,7 +16,7 @@ namespace etcdv3
class AsyncCompareAndSwapAction : public etcdv3::Action class AsyncCompareAndSwapAction : public etcdv3::Action
{ {
public: public:
AsyncCompareAndSwapAction(etcdv3::ActionParameters const &param, etcdv3::AtomicityType type); AsyncCompareAndSwapAction(etcdv3::ActionParameters && params, etcdv3::AtomicityType type);
AsyncTxnResponse ParseResponse(); AsyncTxnResponse ParseResponse();
private: private:
TxnResponse reply; TxnResponse reply;

View File

@ -15,7 +15,7 @@ namespace etcdv3
class AsyncDeleteAction : public etcdv3::Action class AsyncDeleteAction : public etcdv3::Action
{ {
public: public:
AsyncDeleteAction(etcdv3::ActionParameters const &param); AsyncDeleteAction(etcdv3::ActionParameters && params);
AsyncDeleteResponse ParseResponse(); AsyncDeleteResponse ParseResponse();
private: private:
DeleteRangeResponse reply; DeleteRangeResponse reply;

View File

@ -24,7 +24,7 @@ namespace etcdv3
class AsyncCampaignAction : public etcdv3::Action class AsyncCampaignAction : public etcdv3::Action
{ {
public: public:
AsyncCampaignAction(etcdv3::ActionParameters const &param); AsyncCampaignAction(etcdv3::ActionParameters && params);
AsyncCampaignResponse ParseResponse(); AsyncCampaignResponse ParseResponse();
private: private:
CampaignResponse reply; CampaignResponse reply;
@ -34,7 +34,7 @@ namespace etcdv3
class AsyncProclaimAction : public etcdv3::Action class AsyncProclaimAction : public etcdv3::Action
{ {
public: public:
AsyncProclaimAction(etcdv3::ActionParameters const &param); AsyncProclaimAction(etcdv3::ActionParameters && params);
AsyncProclaimResponse ParseResponse(); AsyncProclaimResponse ParseResponse();
private: private:
ProclaimResponse reply; ProclaimResponse reply;
@ -44,7 +44,7 @@ namespace etcdv3
class AsyncLeaderAction : public etcdv3::Action class AsyncLeaderAction : public etcdv3::Action
{ {
public: public:
AsyncLeaderAction(etcdv3::ActionParameters const &param); AsyncLeaderAction(etcdv3::ActionParameters && params);
AsyncLeaderResponse ParseResponse(); AsyncLeaderResponse ParseResponse();
private: private:
LeaderResponse reply; LeaderResponse reply;
@ -54,14 +54,12 @@ namespace etcdv3
class AsyncObserveAction : public etcdv3::Action class AsyncObserveAction : public etcdv3::Action
{ {
public: public:
AsyncObserveAction(etcdv3::ActionParameters const &param, const bool once=false); AsyncObserveAction(etcdv3::ActionParameters && params);
AsyncObserveResponse ParseResponse(); AsyncObserveResponse ParseResponse();
void waitForResponse(); void waitForResponse();
void waitForResponse(std::function<void(etcd::Response)> callback);
void CancelObserve(); void CancelObserve();
bool Cancelled() const; bool Cancelled() const;
private: private:
bool once;
LeaderResponse reply; LeaderResponse reply;
std::unique_ptr<ClientAsyncReader<LeaderResponse>> response_reader; std::unique_ptr<ClientAsyncReader<LeaderResponse>> response_reader;
std::atomic_bool isCancelled; std::atomic_bool isCancelled;
@ -71,7 +69,7 @@ namespace etcdv3
class AsyncResignAction : public etcdv3::Action class AsyncResignAction : public etcdv3::Action
{ {
public: public:
AsyncResignAction(etcdv3::ActionParameters const &param); AsyncResignAction(etcdv3::ActionParameters && params);
AsyncResignResponse ParseResponse(); AsyncResignResponse ParseResponse();
private: private:
ResignResponse reply; ResignResponse reply;

View File

@ -15,7 +15,7 @@ namespace etcdv3
class AsyncHeadAction : public etcdv3::Action class AsyncHeadAction : public etcdv3::Action
{ {
public: public:
AsyncHeadAction(etcdv3::ActionParameters const &param); AsyncHeadAction(etcdv3::ActionParameters && params);
AsyncHeadResponse ParseResponse(); AsyncHeadResponse ParseResponse();
private: private:
RangeResponse reply; RangeResponse reply;

View File

@ -25,7 +25,7 @@ namespace etcdv3
{ {
class AsyncLeaseGrantAction : public etcdv3::Action { class AsyncLeaseGrantAction : public etcdv3::Action {
public: public:
AsyncLeaseGrantAction(etcdv3::ActionParameters const &param); AsyncLeaseGrantAction(etcdv3::ActionParameters && params);
AsyncLeaseGrantResponse ParseResponse(); AsyncLeaseGrantResponse ParseResponse();
private: private:
LeaseGrantResponse reply; LeaseGrantResponse reply;
@ -34,7 +34,7 @@ namespace etcdv3
class AsyncLeaseRevokeAction: public etcdv3::Action { class AsyncLeaseRevokeAction: public etcdv3::Action {
public: public:
AsyncLeaseRevokeAction(etcdv3::ActionParameters const &param); AsyncLeaseRevokeAction(etcdv3::ActionParameters && params);
AsyncLeaseRevokeResponse ParseResponse(); AsyncLeaseRevokeResponse ParseResponse();
private: private:
LeaseRevokeResponse reply; LeaseRevokeResponse reply;
@ -43,7 +43,7 @@ namespace etcdv3
class AsyncLeaseKeepAliveAction: public etcdv3::Action { class AsyncLeaseKeepAliveAction: public etcdv3::Action {
public: public:
AsyncLeaseKeepAliveAction(etcdv3::ActionParameters const &param); AsyncLeaseKeepAliveAction(etcdv3::ActionParameters && params);
AsyncLeaseKeepAliveResponse ParseResponse(); AsyncLeaseKeepAliveResponse ParseResponse();
etcd::Response Refresh(); etcd::Response Refresh();
@ -61,7 +61,7 @@ namespace etcdv3
class AsyncLeaseTimeToLiveAction: public etcdv3::Action { class AsyncLeaseTimeToLiveAction: public etcdv3::Action {
public: public:
AsyncLeaseTimeToLiveAction(etcdv3::ActionParameters const &param); AsyncLeaseTimeToLiveAction(etcdv3::ActionParameters && params);
AsyncLeaseTimeToLiveResponse ParseResponse(); AsyncLeaseTimeToLiveResponse ParseResponse();
private: private:
LeaseTimeToLiveResponse reply; LeaseTimeToLiveResponse reply;
@ -70,7 +70,7 @@ namespace etcdv3
class AsyncLeaseLeasesAction: public etcdv3::Action { class AsyncLeaseLeasesAction: public etcdv3::Action {
public: public:
AsyncLeaseLeasesAction(etcdv3::ActionParameters const &param); AsyncLeaseLeasesAction(etcdv3::ActionParameters && params);
AsyncLeaseLeasesResponse ParseResponse(); AsyncLeaseLeasesResponse ParseResponse();
private: private:
LeaseLeasesResponse reply; LeaseLeasesResponse reply;

View File

@ -19,7 +19,7 @@ namespace etcdv3
class AsyncLockAction : public etcdv3::Action class AsyncLockAction : public etcdv3::Action
{ {
public: public:
AsyncLockAction(etcdv3::ActionParameters const &param); AsyncLockAction(etcdv3::ActionParameters && params);
AsyncLockResponse ParseResponse(); AsyncLockResponse ParseResponse();
private: private:
LockResponse reply; LockResponse reply;
@ -29,7 +29,7 @@ namespace etcdv3
class AsyncUnlockAction : public etcdv3::Action class AsyncUnlockAction : public etcdv3::Action
{ {
public: public:
AsyncUnlockAction(etcdv3::ActionParameters const &param); AsyncUnlockAction(etcdv3::ActionParameters && params);
AsyncUnlockResponse ParseResponse(); AsyncUnlockResponse ParseResponse();
private: private:
UnlockResponse reply; UnlockResponse reply;

View File

@ -15,7 +15,7 @@ namespace etcdv3
class AsyncPutAction : public etcdv3::Action class AsyncPutAction : public etcdv3::Action
{ {
public: public:
AsyncPutAction(etcdv3::ActionParameters const &param); AsyncPutAction(etcdv3::ActionParameters && params);
AsyncPutResponse ParseResponse(); AsyncPutResponse ParseResponse();
private: private:
PutResponse reply; PutResponse reply;

View File

@ -15,7 +15,7 @@ namespace etcdv3
class AsyncRangeAction : public etcdv3::Action class AsyncRangeAction : public etcdv3::Action
{ {
public: public:
AsyncRangeAction(etcdv3::ActionParameters const &param); AsyncRangeAction(etcdv3::ActionParameters && params);
AsyncRangeResponse ParseResponse(); AsyncRangeResponse ParseResponse();
private: private:
RangeResponse reply; RangeResponse reply;

View File

@ -16,7 +16,7 @@ namespace etcdv3
class AsyncSetAction : public etcdv3::Action class AsyncSetAction : public etcdv3::Action
{ {
public: public:
AsyncSetAction(etcdv3::ActionParameters const &param, bool create=false); AsyncSetAction(etcdv3::ActionParameters && params, bool create=false);
AsyncTxnResponse ParseResponse(); AsyncTxnResponse ParseResponse();
private: private:
TxnResponse reply; TxnResponse reply;

View File

@ -18,7 +18,7 @@ namespace etcdv3
class AsyncTxnAction : public etcdv3::Action class AsyncTxnAction : public etcdv3::Action
{ {
public: public:
AsyncTxnAction(etcdv3::ActionParameters const &param, etcdv3::Transaction const &tx); AsyncTxnAction(etcdv3::ActionParameters && params, etcdv3::Transaction const &tx);
AsyncTxnResponse ParseResponse(); AsyncTxnResponse ParseResponse();
private: private:
TxnResponse reply; TxnResponse reply;

View File

@ -16,7 +16,7 @@ namespace etcdv3
class AsyncUpdateAction : public etcdv3::Action class AsyncUpdateAction : public etcdv3::Action
{ {
public: public:
AsyncUpdateAction(etcdv3::ActionParameters const &param); AsyncUpdateAction(etcdv3::ActionParameters && params);
AsyncTxnResponse ParseResponse(); AsyncTxnResponse ParseResponse();
private: private:
TxnResponse reply; TxnResponse reply;

View File

@ -19,7 +19,7 @@ namespace etcdv3
class AsyncWatchAction : public etcdv3::Action class AsyncWatchAction : public etcdv3::Action
{ {
public: public:
AsyncWatchAction(etcdv3::ActionParameters const &param); AsyncWatchAction(etcdv3::ActionParameters && params);
AsyncWatchResponse ParseResponse(); AsyncWatchResponse ParseResponse();
void waitForResponse(); void waitForResponse();
void waitForResponse(std::function<void(etcd::Response)> callback); void waitForResponse(std::function<void(etcd::Response)> callback);

View File

@ -50,7 +50,7 @@ public:
void setup_put(std::string const &key, std::string const &value); void setup_put(std::string const &key, std::string const &value);
void setup_delete(std::string const &key); void setup_delete(std::string const &key);
std::unique_ptr<etcdserverpb::TxnRequest> txn_request; std::shared_ptr<etcdserverpb::TxnRequest> txn_request;
private: private:
std::string key; std::string key;
}; };

View File

@ -44,6 +44,7 @@ namespace etcdv3
extern const int ERROR_KEY_NOT_FOUND; extern const int ERROR_KEY_NOT_FOUND;
extern const int ERROR_COMPARE_FAILED; extern const int ERROR_COMPARE_FAILED;
extern const int ERROR_KEY_ALREADY_EXISTS; extern const int ERROR_KEY_ALREADY_EXISTS;
extern const int ERROR_ACTION_CANCELLED;
} }
#endif #endif

View File

@ -1,30 +1,56 @@
file(GLOB_RECURSE CPP_CLIENT_SRC # grpc stuffs
RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}"
"${CMAKE_CURRENT_SOURCE_DIR}/*.cpp"
"${CMAKE_CURRENT_SOURCE_DIR}/**/*.cpp")
set_source_files_properties(${PROTOBUF_GENERATES} PROPERTIES GENERATED TRUE) set_source_files_properties(${PROTOBUF_GENERATES} PROPERTIES GENERATED TRUE)
add_library(etcd-cpp-api ${CPP_CLIENT_SRC} ${PROTOBUF_GENERATES})
add_dependencies(etcd-cpp-api protobuf_generates)
target_link_libraries(etcd-cpp-api PUBLIC macro(include_generated_protobuf_files target)
target_include_directories(${target} PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/../proto/gen)
target_include_directories(${target} PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/../proto/gen/proto)
endmacro()
# prepare common objects
file(GLOB_RECURSE CPP_CLIENT_CORE_SRC
RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}"
"${CMAKE_CURRENT_SOURCE_DIR}/KeepAlive.cpp"
"${CMAKE_CURRENT_SOURCE_DIR}/Response.cpp"
"${CMAKE_CURRENT_SOURCE_DIR}/SyncClient.cpp"
"${CMAKE_CURRENT_SOURCE_DIR}/Value.cpp"
"${CMAKE_CURRENT_SOURCE_DIR}/Watcher.cpp"
"${CMAKE_CURRENT_SOURCE_DIR}/**/*.cpp"
)
add_library(etcd-cpp-api-core-objects OBJECT ${CPP_CLIENT_CORE_SRC} ${PROTOBUF_GENERATES})
add_dependencies(etcd-cpp-api-core-objects protobuf_generates)
include_generated_protobuf_files(etcd-cpp-api-core-objects)
# add the core library, includes the sycnhronous client only
add_library(etcd-cpp-api-core $<TARGET_OBJECTS:etcd-cpp-api-core-objects>)
target_link_libraries(etcd-cpp-api-core PUBLIC
${Boost_LIBRARIES} ${Boost_LIBRARIES}
${CPPREST_LIB}
${PROTOBUF_LIBRARIES} ${PROTOBUF_LIBRARIES}
${OPENSSL_LIBRARIES} ${OPENSSL_LIBRARIES}
${GRPC_LIBRARIES}) ${GRPC_LIBRARIES}
)
include_generated_protobuf_files(etcd-cpp-api-core)
target_include_directories(etcd-cpp-api PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/../proto/gen) # add the client with asynchronus client
target_include_directories(etcd-cpp-api PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/../proto/gen/proto) add_library(etcd-cpp-api $<TARGET_OBJECTS:etcd-cpp-api-core-objects>
"${CMAKE_CURRENT_SOURCE_DIR}/Client.cpp")
target_link_libraries(etcd-cpp-api PUBLIC
${Boost_LIBRARIES}
${CPPREST_LIB} # n.b.: the asynchronous client requires pplx in cpprestsdk
${PROTOBUF_LIBRARIES}
${OPENSSL_LIBRARIES}
${GRPC_LIBRARIES}
)
include_generated_protobuf_files(etcd-cpp-api)
if("${CMAKE_VERSION}" VERSION_LESS "3.14") if("${CMAKE_VERSION}" VERSION_LESS "3.14")
install(TARGETS etcd-cpp-api install(TARGETS etcd-cpp-api-core etcd-cpp-api
EXPORT etcd-targets EXPORT etcd-targets
RUNTIME DESTINATION bin RUNTIME DESTINATION bin
LIBRARY DESTINATION lib LIBRARY DESTINATION lib
ARCHIVE DESTINATION lib) ARCHIVE DESTINATION lib)
else() else()
install(TARGETS etcd-cpp-api install(TARGETS etcd-cpp-api-core etcd-cpp-api
EXPORT etcd-targets) EXPORT etcd-targets)
endif() endif()

File diff suppressed because it is too large Load Diff

View File

@ -21,10 +21,10 @@ void etcd::KeepAlive::EtcdServerStubsDeleter::operator()(etcd::KeepAlive::EtcdSe
} }
} }
etcd::KeepAlive::KeepAlive(Client const &client, int ttl, int64_t lease_id): etcd::KeepAlive::KeepAlive(SyncClient const &client, int ttl, int64_t lease_id):
ttl(ttl), lease_id(lease_id), continue_next(true) { ttl(ttl), lease_id(lease_id), continue_next(true) {
stubs.reset(new EtcdServerStubs{}); stubs.reset(new EtcdServerStubs{});
stubs->leaseServiceStub = Lease::NewStub(client.channel); stubs->leaseServiceStub = Lease::NewStub(client.grpc_channel());
etcdv3::ActionParameters params; etcdv3::ActionParameters params;
params.auth_token.assign(client.current_auth_token()); params.auth_token.assign(client.current_auth_token());
@ -33,7 +33,7 @@ etcd::KeepAlive::KeepAlive(Client const &client, int ttl, int64_t lease_id):
continue_next.store(true); continue_next.store(true);
stubs->call.reset(new etcdv3::AsyncLeaseKeepAliveAction(params)); stubs->call.reset(new etcdv3::AsyncLeaseKeepAliveAction(std::move(params)));
task_ = std::thread([this]() { task_ = std::thread([this]() {
try { try {
// start refresh // start refresh
@ -47,28 +47,28 @@ etcd::KeepAlive::KeepAlive(Client const &client, int ttl, int64_t lease_id):
} }
etcd::KeepAlive::KeepAlive(std::string const & address, int ttl, int64_t lease_id): etcd::KeepAlive::KeepAlive(std::string const & address, int ttl, int64_t lease_id):
KeepAlive(Client(address), ttl, lease_id) { KeepAlive(SyncClient(address), ttl, lease_id) {
} }
etcd::KeepAlive::KeepAlive(std::string const & address, etcd::KeepAlive::KeepAlive(std::string const & address,
std::string const & username, std::string const & password, std::string const & username, std::string const & password,
int ttl, int64_t lease_id, int const auth_token_ttl): int ttl, int64_t lease_id, int const auth_token_ttl):
KeepAlive(Client(address, username, password, auth_token_ttl), ttl, lease_id) { KeepAlive(SyncClient(address, username, password, auth_token_ttl), ttl, lease_id) {
} }
etcd::KeepAlive::KeepAlive(Client const &client, etcd::KeepAlive::KeepAlive(SyncClient const &client,
std::function<void (std::exception_ptr)> const &handler, std::function<void (std::exception_ptr)> const &handler,
int ttl, int64_t lease_id): int ttl, int64_t lease_id):
handler_(handler), ttl(ttl), lease_id(lease_id), continue_next(true) { handler_(handler), ttl(ttl), lease_id(lease_id), continue_next(true) {
stubs.reset(new EtcdServerStubs{}); stubs.reset(new EtcdServerStubs{});
stubs->leaseServiceStub = Lease::NewStub(client.channel); stubs->leaseServiceStub = Lease::NewStub(client.grpc_channel());
etcdv3::ActionParameters params; etcdv3::ActionParameters params;
params.auth_token.assign(client.current_auth_token()); params.auth_token.assign(client.current_auth_token());
params.lease_id = this->lease_id; params.lease_id = this->lease_id;
params.lease_stub = stubs->leaseServiceStub.get(); params.lease_stub = stubs->leaseServiceStub.get();
stubs->call.reset(new etcdv3::AsyncLeaseKeepAliveAction(params)); stubs->call.reset(new etcdv3::AsyncLeaseKeepAliveAction(std::move(params)));
task_ = std::thread([this]() { task_ = std::thread([this]() {
try { try {
// start refresh // start refresh
@ -88,14 +88,14 @@ etcd::KeepAlive::KeepAlive(Client const &client,
etcd::KeepAlive::KeepAlive(std::string const & address, etcd::KeepAlive::KeepAlive(std::string const & address,
std::function<void (std::exception_ptr)> const &handler, std::function<void (std::exception_ptr)> const &handler,
int ttl, int64_t lease_id): int ttl, int64_t lease_id):
KeepAlive(Client(address), handler, ttl, lease_id) { KeepAlive(SyncClient(address), handler, ttl, lease_id) {
} }
etcd::KeepAlive::KeepAlive(std::string const & address, etcd::KeepAlive::KeepAlive(std::string const & address,
std::string const & username, std::string const & password, std::string const & username, std::string const & password,
std::function<void (std::exception_ptr)> const &handler, std::function<void (std::exception_ptr)> const &handler,
int ttl, int64_t lease_id, const int auth_token_ttl): int ttl, int64_t lease_id, const int auth_token_ttl):
KeepAlive(Client(address, username, password, auth_token_ttl), handler, ttl, lease_id) { KeepAlive(SyncClient(address, username, password, auth_token_ttl), handler, ttl, lease_id) {
} }
etcd::KeepAlive::~KeepAlive() etcd::KeepAlive::~KeepAlive()

File diff suppressed because it is too large Load Diff

View File

@ -12,18 +12,18 @@ void etcd::Watcher::EtcdServerStubsDeleter::operator()(etcd::Watcher::EtcdServer
} }
} }
etcd::Watcher::Watcher(Client const &client, std::string const & key, etcd::Watcher::Watcher(SyncClient const &client, std::string const & key,
std::function<void(Response)> callback, bool recursive): std::function<void(Response)> callback, bool recursive):
Watcher(client, key, -1, callback, recursive) { Watcher(client, key, -1, callback, recursive) {
} }
etcd::Watcher::Watcher(Client const &client, std::string const & key, etcd::Watcher::Watcher(SyncClient const &client, std::string const & key,
std::string const &range_end, std::string const &range_end,
std::function<void(Response)> callback): std::function<void(Response)> callback):
Watcher(client, key, range_end, -1, callback) { Watcher(client, key, range_end, -1, callback) {
} }
etcd::Watcher::Watcher(Client const &client, std::string const & key, int64_t fromIndex, etcd::Watcher::Watcher(SyncClient const &client, std::string const & key, int64_t fromIndex,
std::function<void(Response)> callback, bool recursive): std::function<void(Response)> callback, bool recursive):
fromIndex(fromIndex), recursive(recursive) { fromIndex(fromIndex), recursive(recursive) {
stubs.reset(new EtcdServerStubs{}); stubs.reset(new EtcdServerStubs{});
@ -31,7 +31,7 @@ etcd::Watcher::Watcher(Client const &client, std::string const & key, int64_t fr
doWatch(key, "", client.current_auth_token(), callback); doWatch(key, "", client.current_auth_token(), callback);
} }
etcd::Watcher::Watcher(Client const &client, std::string const & key, etcd::Watcher::Watcher(SyncClient const &client, std::string const & key,
std::string const &range_end, int64_t fromIndex, std::string const &range_end, int64_t fromIndex,
std::function<void(Response)> callback): std::function<void(Response)> callback):
fromIndex(fromIndex), recursive(false) { fromIndex(fromIndex), recursive(false) {
@ -53,13 +53,13 @@ etcd::Watcher::Watcher(std::string const & address, std::string const & key,
etcd::Watcher::Watcher(std::string const & address, std::string const & key, int64_t fromIndex, etcd::Watcher::Watcher(std::string const & address, std::string const & key, int64_t fromIndex,
std::function<void(Response)> callback, bool recursive): std::function<void(Response)> callback, bool recursive):
Watcher(Client(address), key, fromIndex, callback, recursive) { Watcher(SyncClient(address), key, fromIndex, callback, recursive) {
} }
etcd::Watcher::Watcher(std::string const & address, std::string const & key, etcd::Watcher::Watcher(std::string const & address, std::string const & key,
std::string const & range_end, int64_t fromIndex, std::string const & range_end, int64_t fromIndex,
std::function<void(Response)> callback): std::function<void(Response)> callback):
Watcher(Client(address), key, range_end, fromIndex, callback) { Watcher(SyncClient(address), key, range_end, fromIndex, callback) {
} }
etcd::Watcher::Watcher(std::string const & address, etcd::Watcher::Watcher(std::string const & address,
@ -83,7 +83,7 @@ etcd::Watcher::Watcher(std::string const & address,
std::string const & key, int64_t fromIndex, std::string const & key, int64_t fromIndex,
std::function<void(Response)> callback, bool recursive, std::function<void(Response)> callback, bool recursive,
int const auth_token_ttl): int const auth_token_ttl):
Watcher(Client(address, username, password, auth_token_ttl), key, fromIndex, callback, recursive) { Watcher(SyncClient(address, username, password, auth_token_ttl), key, fromIndex, callback, recursive) {
} }
etcd::Watcher::Watcher(std::string const & address, etcd::Watcher::Watcher(std::string const & address,
@ -91,7 +91,7 @@ etcd::Watcher::Watcher(std::string const & address,
std::string const & key, std::string const & range_end, int64_t fromIndex, std::string const & key, std::string const & range_end, int64_t fromIndex,
std::function<void(Response)> callback, std::function<void(Response)> callback,
int const auth_token_ttl): int const auth_token_ttl):
Watcher(Client(address, username, password, auth_token_ttl), key, range_end, fromIndex, callback) { Watcher(SyncClient(address, username, password, auth_token_ttl), key, range_end, fromIndex, callback) {
} }
etcd::Watcher::~Watcher() etcd::Watcher::~Watcher()
@ -144,15 +144,18 @@ void etcd::Watcher::doWatch(std::string const & key,
params.withPrefix = recursive; params.withPrefix = recursive;
params.watch_stub = stubs->watchServiceStub.get(); params.watch_stub = stubs->watchServiceStub.get();
stubs->call.reset(new etcdv3::AsyncWatchAction(params)); stubs->call.reset(new etcdv3::AsyncWatchAction(std::move(params)));
task_ = std::thread([this, callback]() { task_ = std::thread([this, callback]() {
stubs->call->waitForResponse(callback); stubs->call->waitForResponse(callback);
if (wait_callback != nullptr) { if (wait_callback != nullptr) {
// issue the callback in another thread to avoid deadlock, is ok to detach the pplx::task // issue the callback in another thread (detached) to avoid deadlock,
pplx::task<void>([this]() -> void { // it is ok to detach a pplx::task, but we cannot use the pplx::task
// in the core library
std::thread canceller([this]() {
wait_callback(stubs->call->Cancelled()); wait_callback(stubs->call->Cancelled());
}); });
canceller.detach();
} }
}); });
cancelled.store(false); cancelled.store(false);

View File

@ -5,6 +5,16 @@
etcdv3::Action::Action(etcdv3::ActionParameters const &params) etcdv3::Action::Action(etcdv3::ActionParameters const &params)
{ {
parameters = params; parameters = params;
this->InitAction();
}
etcdv3::Action::Action(etcdv3::ActionParameters && params)
{
parameters = std::move(params);
this->InitAction();
}
void etcdv3::Action::InitAction() {
if (!parameters.auth_token.empty()) { if (!parameters.auth_token.empty()) {
// use `token` as the key, see: // use `token` as the key, see:
// //
@ -26,6 +36,22 @@ etcdv3::ActionParameters::ActionParameters()
lease_stub = NULL; lease_stub = NULL;
} }
void etcdv3::ActionParameters::dump(std::ostream &os) const {
os << "ActionParameters:" << std::endl;
os << " withPrefix: " << withPrefix << std::endl;
os << " revision: " << revision << std::endl;
os << " old_revision: " << old_revision << std::endl;
os << " lease_id: " << lease_id << std::endl;
os << " ttl: " << ttl << std::endl;
os << " limit: " << limit << std::endl;
os << " name: " << name << std::endl;
os << " key: " << key << std::endl;
os << " range_end: " << range_end << std::endl;
os << " value: " << value << std::endl;
os << " old_value: " << old_value << std::endl;
os << " auth_token: " << auth_token << std::endl;
}
void etcdv3::Action::waitForResponse() void etcdv3::Action::waitForResponse()
{ {
void* got_tag; void* got_tag;

View File

@ -10,8 +10,8 @@ using etcdserverpb::ResponseOp;
using etcdserverpb::TxnRequest; using etcdserverpb::TxnRequest;
etcdv3::AsyncCompareAndDeleteAction::AsyncCompareAndDeleteAction( etcdv3::AsyncCompareAndDeleteAction::AsyncCompareAndDeleteAction(
etcdv3::ActionParameters const &param, etcdv3::AtomicityType type) etcdv3::ActionParameters && params, etcdv3::AtomicityType type)
:etcdv3::Action(param) :etcdv3::Action(std::move(params))
{ {
etcdv3::Transaction transaction(parameters.key); etcdv3::Transaction transaction(parameters.key);
if(type == etcdv3::AtomicityType::PREV_VALUE) if(type == etcdv3::AtomicityType::PREV_VALUE)

View File

@ -10,8 +10,8 @@ using etcdserverpb::ResponseOp;
using etcdserverpb::TxnRequest; using etcdserverpb::TxnRequest;
etcdv3::AsyncCompareAndSwapAction::AsyncCompareAndSwapAction( etcdv3::AsyncCompareAndSwapAction::AsyncCompareAndSwapAction(
etcdv3::ActionParameters const &param, etcdv3::AtomicityType type) etcdv3::ActionParameters && params, etcdv3::AtomicityType type)
: etcdv3::Action(param) : etcdv3::Action(std::move(params))
{ {
etcdv3::Transaction transaction(parameters.key); etcdv3::Transaction transaction(parameters.key);
if(type == etcdv3::AtomicityType::PREV_VALUE) if(type == etcdv3::AtomicityType::PREV_VALUE)

View File

@ -4,8 +4,8 @@
using etcdserverpb::DeleteRangeRequest; using etcdserverpb::DeleteRangeRequest;
etcdv3::AsyncDeleteAction::AsyncDeleteAction( etcdv3::AsyncDeleteAction::AsyncDeleteAction(
ActionParameters const &param) ActionParameters && params)
: etcdv3::Action(param) : etcdv3::Action(std::move(params))
{ {
DeleteRangeRequest del_request; DeleteRangeRequest del_request;
del_request.set_key(parameters.key); del_request.set_key(parameters.key);

View File

@ -1,4 +1,5 @@
#include "etcd/v3/AsyncElectionAction.hpp" #include "etcd/v3/AsyncElectionAction.hpp"
#include <grpcpp/support/status.h>
#include "etcd/v3/action_constants.hpp" #include "etcd/v3/action_constants.hpp"
@ -14,13 +15,13 @@ using v3electionpb::ResignRequest;
using v3electionpb::ResignResponse; using v3electionpb::ResignResponse;
etcdv3::AsyncCampaignAction::AsyncCampaignAction( etcdv3::AsyncCampaignAction::AsyncCampaignAction(
etcdv3::ActionParameters const &param) etcdv3::ActionParameters && params)
: etcdv3::Action(param) : etcdv3::Action(std::move(params))
{ {
CampaignRequest campaign_request; CampaignRequest campaign_request;
campaign_request.set_name(param.name); campaign_request.set_name(parameters.name);
campaign_request.set_lease(param.lease_id); campaign_request.set_lease(parameters.lease_id);
campaign_request.set_value(param.value); campaign_request.set_value(parameters.value);
response_reader = parameters.election_stub->AsyncCampaign(&context, campaign_request, &cq_); response_reader = parameters.election_stub->AsyncCampaign(&context, campaign_request, &cq_);
response_reader->Finish(&reply, &status, (void *)this); response_reader->Finish(&reply, &status, (void *)this);
@ -42,18 +43,18 @@ etcdv3::AsyncCampaignResponse etcdv3::AsyncCampaignAction::ParseResponse()
} }
etcdv3::AsyncProclaimAction::AsyncProclaimAction( etcdv3::AsyncProclaimAction::AsyncProclaimAction(
etcdv3::ActionParameters const &param) etcdv3::ActionParameters && params)
: etcdv3::Action(param) : etcdv3::Action(std::move(params))
{ {
auto leader = new LeaderKey(); auto leader = new LeaderKey();
leader->set_name(param.name); leader->set_name(parameters.name);
leader->set_key(param.key); leader->set_key(parameters.key);
leader->set_rev(param.revision); leader->set_rev(parameters.revision);
leader->set_lease(param.lease_id); leader->set_lease(parameters.lease_id);
ProclaimRequest proclaim_request; ProclaimRequest proclaim_request;
proclaim_request.set_allocated_leader(leader); proclaim_request.set_allocated_leader(leader);
proclaim_request.set_value(param.value); proclaim_request.set_value(parameters.value);
response_reader = parameters.election_stub->AsyncProclaim(&context, proclaim_request, &cq_); response_reader = parameters.election_stub->AsyncProclaim(&context, proclaim_request, &cq_);
response_reader->Finish(&reply, &status, (void *)this); response_reader->Finish(&reply, &status, (void *)this);
@ -75,11 +76,11 @@ etcdv3::AsyncProclaimResponse etcdv3::AsyncProclaimAction::ParseResponse()
} }
etcdv3::AsyncLeaderAction::AsyncLeaderAction( etcdv3::AsyncLeaderAction::AsyncLeaderAction(
etcdv3::ActionParameters const &param) etcdv3::ActionParameters && params)
: etcdv3::Action(param) : etcdv3::Action(std::move(params))
{ {
LeaderRequest leader_request; LeaderRequest leader_request;
leader_request.set_name(param.name); leader_request.set_name(parameters.name);
response_reader = parameters.election_stub->AsyncLeader(&context, leader_request, &cq_); response_reader = parameters.election_stub->AsyncLeader(&context, leader_request, &cq_);
response_reader->Finish(&reply, &status, (void *)this); response_reader->Finish(&reply, &status, (void *)this);
@ -100,19 +101,18 @@ etcdv3::AsyncLeaderResponse etcdv3::AsyncLeaderAction::ParseResponse()
return leader_resp; return leader_resp;
} }
etcdv3::AsyncObserveAction::AsyncObserveAction( etcdv3::AsyncObserveAction::AsyncObserveAction(etcdv3::ActionParameters && params)
etcdv3::ActionParameters const &param, const bool once) : etcdv3::Action(std::move(params))
: etcdv3::Action(param), once(once)
{ {
LeaderRequest leader_request; LeaderRequest leader_request;
leader_request.set_name(param.name); leader_request.set_name(parameters.name);
response_reader = parameters.election_stub->AsyncObserve(&context, leader_request, &cq_, (void *)etcdv3::ELECTION_OBSERVE_CREATE); response_reader = parameters.election_stub->AsyncObserve(&context, leader_request, &cq_, (void *)etcdv3::ELECTION_OBSERVE_CREATE);
void *got_tag; void *got_tag;
bool ok = false; bool ok = false;
if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)etcdv3::ELECTION_OBSERVE_CREATE) { if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)etcdv3::ELECTION_OBSERVE_CREATE) {
response_reader->Read(&reply, (void *)this); // n.b.: leave the issue of `Read` to the `waitForResponse`
} else { } else {
throw std::runtime_error("failed to create a observe connection"); throw std::runtime_error("failed to create a observe connection");
} }
@ -123,66 +123,25 @@ void etcdv3::AsyncObserveAction::waitForResponse()
void* got_tag; void* got_tag;
bool ok = false; bool ok = false;
while(cq_.Next(&got_tag, &ok))
{
if (isCancelled.load()) { if (isCancelled.load()) {
break; status = grpc::Status::CANCELLED;
}
if(ok == false)
{
break;
}
if(got_tag == (void*)this) // read tag
{
auto resp = ParseResponse();
if (resp.get_error_code() != 0) {
CancelObserve();
break;
}
}
if(isCancelled.load()) {
break;
}
if (once) {
break;
}
response_reader->Read(&reply, (void *)this);
} }
if (!status.ok()) {
return;
} }
void etcdv3::AsyncObserveAction::waitForResponse(std::function<void(etcd::Response)> callback)
{
void* got_tag;
bool ok = false;
while(cq_.Next(&got_tag, &ok))
{
if(ok == false)
{
break;
}
if (isCancelled.load()) {
break;
}
if(got_tag == (void*)this) // read tag
{
auto resp = ParseResponse();
auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::high_resolution_clock::now() - start_timepoint);
callback(etcd::Response(resp, duration));
if (resp.get_error_code() != 0) {
CancelObserve();
break;
}
start_timepoint = std::chrono::high_resolution_clock::now();
}
if(isCancelled.load()) {
break;
}
if (once) {
break;
}
response_reader->Read(&reply, (void *)this); response_reader->Read(&reply, (void *)this);
if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void*)this) {
auto response = ParseResponse();
if (response.get_error_code() == 0) {
// issue the next read
response_reader->Read(&reply, (void *)this);
} else {
this->CancelObserve();
}
} else {
this->CancelObserve();
status = grpc::Status::CANCELLED;
} }
} }
@ -223,14 +182,14 @@ etcdv3::AsyncObserveResponse etcdv3::AsyncObserveAction::ParseResponse()
} }
etcdv3::AsyncResignAction::AsyncResignAction( etcdv3::AsyncResignAction::AsyncResignAction(
etcdv3::ActionParameters const &param) etcdv3::ActionParameters && params)
: etcdv3::Action(param) : etcdv3::Action(std::move(params))
{ {
auto leader = new LeaderKey(); auto leader = new LeaderKey();
leader->set_name(param.name); leader->set_name(parameters.name);
leader->set_key(param.key); leader->set_key(parameters.key);
leader->set_rev(param.revision); leader->set_rev(parameters.revision);
leader->set_lease(param.lease_id); leader->set_lease(parameters.lease_id);
ResignRequest resign_request; ResignRequest resign_request;
resign_request.set_allocated_leader(leader); resign_request.set_allocated_leader(leader);

View File

@ -7,8 +7,8 @@
using etcdserverpb::RangeRequest; using etcdserverpb::RangeRequest;
etcdv3::AsyncHeadAction::AsyncHeadAction( etcdv3::AsyncHeadAction::AsyncHeadAction(
etcdv3::ActionParameters const &param) etcdv3::ActionParameters && params)
: etcdv3::Action(param) : etcdv3::Action(std::move(params))
{ {
RangeRequest get_request; RangeRequest get_request;
get_request.set_key(etcdv3::NUL); get_request.set_key(etcdv3::NUL);

View File

@ -10,8 +10,8 @@ using etcdserverpb::LeaseTimeToLiveRequest;
using etcdserverpb::LeaseLeasesRequest; using etcdserverpb::LeaseLeasesRequest;
etcdv3::AsyncLeaseGrantAction::AsyncLeaseGrantAction( etcdv3::AsyncLeaseGrantAction::AsyncLeaseGrantAction(
etcdv3::ActionParameters const &param) etcdv3::ActionParameters && params)
: etcdv3::Action(param) : etcdv3::Action(std::move(params))
{ {
LeaseGrantRequest leasegrant_request; LeaseGrantRequest leasegrant_request;
leasegrant_request.set_ttl(parameters.ttl); leasegrant_request.set_ttl(parameters.ttl);
@ -37,8 +37,8 @@ etcdv3::AsyncLeaseGrantResponse etcdv3::AsyncLeaseGrantAction::ParseResponse()
} }
etcdv3::AsyncLeaseRevokeAction::AsyncLeaseRevokeAction( etcdv3::AsyncLeaseRevokeAction::AsyncLeaseRevokeAction(
etcdv3::ActionParameters const &param) etcdv3::ActionParameters && params)
: etcdv3::Action(param) : etcdv3::Action(std::move(params))
{ {
LeaseRevokeRequest leaserevoke_request; LeaseRevokeRequest leaserevoke_request;
leaserevoke_request.set_id(parameters.lease_id); leaserevoke_request.set_id(parameters.lease_id);
@ -62,8 +62,8 @@ etcdv3::AsyncLeaseRevokeResponse etcdv3::AsyncLeaseRevokeAction::ParseResponse()
} }
etcdv3::AsyncLeaseKeepAliveAction::AsyncLeaseKeepAliveAction( etcdv3::AsyncLeaseKeepAliveAction::AsyncLeaseKeepAliveAction(
etcdv3::ActionParameters const &param) etcdv3::ActionParameters && params)
: etcdv3::Action(param) : etcdv3::Action(std::move(params))
{ {
isCancelled = false; isCancelled = false;
stream = parameters.lease_stub->AsyncLeaseKeepAlive(&context, &cq_, (void*)etcdv3::KEEPALIVE_CREATE); stream = parameters.lease_stub->AsyncLeaseKeepAlive(&context, &cq_, (void*)etcdv3::KEEPALIVE_CREATE);
@ -159,8 +159,8 @@ bool etcdv3::AsyncLeaseKeepAliveAction::Cancelled() const
} }
etcdv3::AsyncLeaseTimeToLiveAction::AsyncLeaseTimeToLiveAction( etcdv3::AsyncLeaseTimeToLiveAction::AsyncLeaseTimeToLiveAction(
etcdv3::ActionParameters const &param) etcdv3::ActionParameters && params)
: etcdv3::Action(param) : etcdv3::Action(std::move(params))
{ {
LeaseTimeToLiveRequest leasetimetolive_request; LeaseTimeToLiveRequest leasetimetolive_request;
leasetimetolive_request.set_id(parameters.lease_id); leasetimetolive_request.set_id(parameters.lease_id);
@ -186,8 +186,8 @@ etcdv3::AsyncLeaseTimeToLiveResponse etcdv3::AsyncLeaseTimeToLiveAction::ParseRe
} }
etcdv3::AsyncLeaseLeasesAction::AsyncLeaseLeasesAction( etcdv3::AsyncLeaseLeasesAction::AsyncLeaseLeasesAction(
etcdv3::ActionParameters const &param) etcdv3::ActionParameters && params)
: etcdv3::Action(param) : etcdv3::Action(std::move(params))
{ {
LeaseLeasesRequest leaseleases_request; LeaseLeasesRequest leaseleases_request;

View File

@ -5,8 +5,8 @@ using v3lockpb::LockRequest;
using v3lockpb::UnlockRequest; using v3lockpb::UnlockRequest;
etcdv3::AsyncLockAction::AsyncLockAction( etcdv3::AsyncLockAction::AsyncLockAction(
ActionParameters const &param) ActionParameters && params)
: etcdv3::Action(param) : etcdv3::Action(std::move(params))
{ {
LockRequest lock_request; LockRequest lock_request;
lock_request.set_name(parameters.key); lock_request.set_name(parameters.key);
@ -35,8 +35,8 @@ etcdv3::AsyncLockResponse etcdv3::AsyncLockAction::ParseResponse()
} }
etcdv3::AsyncUnlockAction::AsyncUnlockAction( etcdv3::AsyncUnlockAction::AsyncUnlockAction(
ActionParameters const &param) ActionParameters && params)
: etcdv3::Action(param) : etcdv3::Action(std::move(params))
{ {
UnlockRequest unlock_request; UnlockRequest unlock_request;
unlock_request.set_key(parameters.key); unlock_request.set_key(parameters.key);

View File

@ -4,8 +4,8 @@
using etcdserverpb::PutRequest; using etcdserverpb::PutRequest;
etcdv3::AsyncPutAction::AsyncPutAction( etcdv3::AsyncPutAction::AsyncPutAction(
ActionParameters const &param) ActionParameters && params)
: etcdv3::Action(param) : etcdv3::Action(std::move(params))
{ {
PutRequest put_request; PutRequest put_request;
put_request.set_key(parameters.key); put_request.set_key(parameters.key);

View File

@ -7,8 +7,8 @@
using etcdserverpb::RangeRequest; using etcdserverpb::RangeRequest;
etcdv3::AsyncRangeAction::AsyncRangeAction( etcdv3::AsyncRangeAction::AsyncRangeAction(
etcdv3::ActionParameters const &param) etcdv3::ActionParameters && params)
: etcdv3::Action(param) : etcdv3::Action(std::move(params))
{ {
RangeRequest get_request; RangeRequest get_request;
if (parameters.key.empty()) { if (parameters.key.empty()) {
@ -16,7 +16,7 @@ etcdv3::AsyncRangeAction::AsyncRangeAction(
} else { } else {
get_request.set_key(parameters.key); get_request.set_key(parameters.key);
} }
get_request.set_limit(param.limit); get_request.set_limit(parameters.limit);
if(parameters.withPrefix) if(parameters.withPrefix)
{ {
if (parameters.key.empty()) { if (parameters.key.empty()) {

View File

@ -4,8 +4,8 @@
#include "etcd/v3/Transaction.hpp" #include "etcd/v3/Transaction.hpp"
etcdv3::AsyncSetAction::AsyncSetAction( etcdv3::AsyncSetAction::AsyncSetAction(
etcdv3::ActionParameters const &param, bool create) etcdv3::ActionParameters && params, bool create)
: etcdv3::Action(param) : etcdv3::Action(std::move(params))
{ {
etcdv3::Transaction transaction(parameters.key); etcdv3::Transaction transaction(parameters.key);
isCreate = create; isCreate = create;

View File

@ -4,8 +4,8 @@
etcdv3::AsyncTxnAction::AsyncTxnAction( etcdv3::AsyncTxnAction::AsyncTxnAction(
etcdv3::ActionParameters const &param, etcdv3::Transaction const &tx) etcdv3::ActionParameters && params, etcdv3::Transaction const &tx)
: etcdv3::Action(param) : etcdv3::Action(std::move(params))
{ {
response_reader = parameters.kv_stub->AsyncTxn(&context, *tx.txn_request, &cq_); response_reader = parameters.kv_stub->AsyncTxn(&context, *tx.txn_request, &cq_);
response_reader->Finish(&reply, &status, (void *)this); response_reader->Finish(&reply, &status, (void *)this);

View File

@ -11,8 +11,8 @@ using etcdserverpb::ResponseOp;
using etcdserverpb::TxnRequest; using etcdserverpb::TxnRequest;
etcdv3::AsyncUpdateAction::AsyncUpdateAction( etcdv3::AsyncUpdateAction::AsyncUpdateAction(
etcdv3::ActionParameters const &param) etcdv3::ActionParameters && params)
: etcdv3::Action(param) : etcdv3::Action(std::move(params))
{ {
etcdv3::Transaction transaction(parameters.key); etcdv3::Transaction transaction(parameters.key);
transaction.init_compare(CompareResult::GREATER, transaction.init_compare(CompareResult::GREATER,

View File

@ -7,8 +7,8 @@ using etcdserverpb::RangeResponse;
using etcdserverpb::WatchCreateRequest; using etcdserverpb::WatchCreateRequest;
etcdv3::AsyncWatchAction::AsyncWatchAction( etcdv3::AsyncWatchAction::AsyncWatchAction(
etcdv3::ActionParameters const &param) etcdv3::ActionParameters && params)
: etcdv3::Action(param) : etcdv3::Action(std::move(params))
{ {
isCancelled.store(false); isCancelled.store(false);
stream = parameters.watch_stub->AsyncWatch(&context,&cq_,(void*)etcdv3::WATCH_CREATE); stream = parameters.watch_stub->AsyncWatch(&context,&cq_,(void*)etcdv3::WATCH_CREATE);

View File

@ -43,3 +43,4 @@ char const * etcdv3::ELECTION_OBSERVE_CREATE = "observe create";
const int etcdv3::ERROR_KEY_NOT_FOUND = 100; const int etcdv3::ERROR_KEY_NOT_FOUND = 100;
const int etcdv3::ERROR_COMPARE_FAILED = 101; const int etcdv3::ERROR_COMPARE_FAILED = 101;
const int etcdv3::ERROR_KEY_ALREADY_EXISTS = 105; const int etcdv3::ERROR_KEY_ALREADY_EXISTS = 105;
const int etcdv3::ERROR_ACTION_CANCELLED = 106;

View File

@ -120,11 +120,6 @@ TEST_CASE("delete a value")
int64_t modify_index = etcd.get("/test/key1").get().value().modified_index(); int64_t modify_index = etcd.get("/test/key1").get().value().modified_index();
int64_t version = etcd.get("/test/key1").get().value().version(); int64_t version = etcd.get("/test/key1").get().value().version();
std::cerr << "index = " << index
<< ", create index = " << create_index
<< ", modify index = " << modify_index
<< std::endl;
int head_index = etcd.head().get().index(); int head_index = etcd.head().get().index();
CHECK(index == head_index); CHECK(index == head_index);
@ -174,7 +169,6 @@ TEST_CASE("atomic compare-and-delete based on prevIndex")
CHECK("42" == res.prev_value().as_string()); CHECK("42" == res.prev_value().as_string());
} }
TEST_CASE("deep atomic compare-and-swap") TEST_CASE("deep atomic compare-and-swap")
{ {
etcd::Client etcd("http://127.0.0.1:2379"); etcd::Client etcd("http://127.0.0.1:2379");

View File

@ -61,11 +61,12 @@ TEST_CASE("double lock will fail")
REQUIRE(0 == resp3.error_code()); REQUIRE(0 == resp3.error_code());
// create a duration // create a duration
first_lock_release = true;
// using a duration longer than default lease TTL for lock (see: DEFAULT_LEASE_TTL_FOR_LOCK) // using a duration longer than default lease TTL for lock (see: DEFAULT_LEASE_TTL_FOR_LOCK)
std::this_thread::sleep_for(std::chrono::seconds(15)); std::this_thread::sleep_for(std::chrono::seconds(15));
// unlock the first lock // unlock the first lock
first_lock_release = true;
etcd::Response resp4 = etcd.unlock(lock_key).get(); etcd::Response resp4 = etcd.unlock(lock_key).get();
CHECK("unlock" == resp4.action()); CHECK("unlock" == resp4.action());
REQUIRE(resp4.is_ok()); REQUIRE(resp4.is_ok());
@ -171,18 +172,18 @@ TEST_CASE("concurrent lock & unlock")
etcd::Client etcd("http://127.0.0.1:2379"); etcd::Client etcd("http://127.0.0.1:2379");
std::string const lock_key = "/test/test_key"; std::string const lock_key = "/test/test_key";
constexpr size_t trials = 128; constexpr size_t trials = 192;
std::function<void(std::string const &, const size_t)> locker = [&etcd](std::string const &key, const size_t index) { std::function<void(std::string const &, const size_t)> locker = [&etcd](std::string const &key, const size_t index) {
std::cout << "start lock for " << index << std::endl;
auto resp = etcd.lock(key).get(); auto resp = etcd.lock(key).get();
std::cout << "lock for " << index << " is ok, start sleep: ..." << resp.error_message() << std::endl; std::cout << "lock for " << index << " is ok, starts sleeping: ..." << resp.error_message() << std::endl << std::flush;
REQUIRE(resp.is_ok()); REQUIRE(resp.is_ok());
std::srand(index); std::srand(index);
size_t time_to_sleep = 1; size_t time_to_sleep = 1;
std::this_thread::sleep_for(std::chrono::seconds(time_to_sleep)); std::this_thread::sleep_for(std::chrono::seconds(time_to_sleep));
std::cout << "lock for " << index << " resumes from sleep: ..." << resp.error_message() << std::endl << std::flush;
REQUIRE(etcd.unlock(resp.lock_key()).get().is_ok()); REQUIRE(etcd.unlock(resp.lock_key()).get().is_ok());
std::cout << "thread " << index << " been unlocked" << std::endl; std::cout << "thread " << index << " been unlocked" << std::endl << std::flush;
}; };
std::vector<std::thread> locks(trials); std::vector<std::thread> locks(trials);

View File

@ -4,8 +4,9 @@
#include <chrono> #include <chrono>
#include <thread> #include <thread>
#include "etcd/Watcher.hpp" #include "etcd/Client.hpp"
#include "etcd/SyncClient.hpp" #include "etcd/SyncClient.hpp"
#include "etcd/Watcher.hpp"
static std::string etcd_uri("http://127.0.0.1:2379"); static std::string etcd_uri("http://127.0.0.1:2379");
static int watcher_called = 0; static int watcher_called = 0;