From a384079fbdac547b1330d83da064503243f0cca4 Mon Sep 17 00:00:00 2001 From: Tao He Date: Sat, 6 Feb 2021 16:45:06 +0800 Subject: [PATCH] Reduce the dependency interface to avoid include all generated protobuf/grpc files. Add CMake config files to make it looks as a cmake module. Fixes https://github.com/etcd-cpp-apiv3/etcd-cpp-apiv3/issues/37. Signed-off-by: Tao He --- .github/workflows/build-test.yml | 4 +- CMakeLists.txt | 41 +++++++---- etcd-cpp-api-config-version.in.cmake | 11 +++ etcd-cpp-api-config.in.cmake | 29 ++++++++ etcd/Client.hpp | 21 +++--- etcd/KeepAlive.hpp | 18 ++--- etcd/Response.hpp | 8 +-- etcd/Value.hpp | 7 +- etcd/Watcher.hpp | 19 ++--- etcd/v3/Transaction.hpp | 11 +-- proto/kv.proto | 12 ++-- proto/rpc.proto | 41 +---------- proto/txn.proto | 42 +++++++++++ src/CMakeLists.txt | 14 +--- src/Client.cpp | 96 ++++++++++++++++---------- src/KeepAlive.cpp | 29 ++++++-- src/Value.cpp | 5 +- src/Watcher.cpp | 28 +++++--- src/v3/AsyncCompareAndDeleteAction.cpp | 2 +- src/v3/AsyncCompareAndSwapAction.cpp | 2 +- src/v3/AsyncSetAction.cpp | 2 +- src/v3/AsyncTxnAction.cpp | 2 +- src/v3/AsyncUpdateAction.cpp | 2 +- src/v3/Transaction.cpp | 36 +++++----- 24 files changed, 290 insertions(+), 192 deletions(-) create mode 100644 etcd-cpp-api-config-version.in.cmake create mode 100644 etcd-cpp-api-config.in.cmake create mode 100644 proto/txn.proto diff --git a/.github/workflows/build-test.yml b/.github/workflows/build-test.yml index badebfc..6333810 100644 --- a/.github/workflows/build-test.yml +++ b/.github/workflows/build-test.yml @@ -7,7 +7,9 @@ jobs: runs-on: ${{ matrix.os }} strategy: matrix: - os: [ubuntu-18.04, ubuntu-20.04, macos-10.15, macos-11.0] + # disabled: macos-11.0 + # why: https://github.com/actions/virtual-environments/issues/2486 + os: [ubuntu-18.04, ubuntu-20.04, macos-10.15] etcd: [v3.2.26, v3.3.11, v3.4.13] steps: - uses: actions/checkout@v2 diff --git a/CMakeLists.txt b/CMakeLists.txt index 301f8ba..d001cc4 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -5,10 +5,21 @@ set(CMAKE_EXPORT_COMPILE_COMMANDS ON) set(etcd-cpp-api_VERSION_MAJOR 0) set(etcd-cpp-api_VERSION_MINOR 1) +set(etcd-cpp-api_VERSION ${etcd-cpp-api_MAJOR_VERSION}.${etcd-cpp-api_MINOR_VERSION}) + +include(CheckCXXCompilerFlag) +include(CheckLibraryExists) +include(GNUInstallDirs) +set(CMAKE_EXPORT_COMPILE_COMMANDS ON) option(BUILD_ETCD_TESTS "Build test cases" OFF) option(BUILD_SHARED_LIBS "Build shared libraries" ON) +# reference: https://gitlab.kitware.com/cmake/community/-/wikis/doc/cmake/RPATH-handling#always-full-rpath +set(CMAKE_BUILD_WITH_INSTALL_RPATH FALSE) +set(CMAKE_INSTALL_RPATH "${CMAKE_INSTALL_PREFIX}/lib:${CMAKE_INSTALL_PREFIX}/lib64") +set(CMAKE_INSTALL_RPATH_USE_LINK_PATH TRUE) + find_package(Boost REQUIRED COMPONENTS system thread random) if (APPLE) # If we're on OS X check for Homebrew's copy of OpenSSL instead of Apple's @@ -32,6 +43,7 @@ find_package(Protobuf REQUIRED) find_package(cpprestsdk QUIET) if(cpprestsdk_FOUND) set(CPPREST_INCLUDE_DIR) + set(CPPREST_LIB cpprestsdk::cpprest) else() find_library(CPPREST_LIB NAMES cpprest) find_path(CPPREST_INCLUDE_DIR NAMES cpprest/http_client.h) @@ -76,18 +88,23 @@ install (FILES ${CMAKE_CURRENT_SOURCE_DIR}/etcd/Client.hpp ${CMAKE_CURRENT_SOURCE_DIR}/etcd/Response.hpp ${CMAKE_CURRENT_SOURCE_DIR}/etcd/Value.hpp ${CMAKE_CURRENT_SOURCE_DIR}/etcd/Watcher.hpp + ${CMAKE_CURRENT_BINARY_DIR}/proto/gen/proto/kv.pb.h DESTINATION include/etcd) install (FILES ${CMAKE_CURRENT_SOURCE_DIR}/etcd/v3/Transaction.hpp + ${CMAKE_CURRENT_BINARY_DIR}/proto/gen/proto/txn.pb.h DESTINATION include/etcd/v3) -install (FILES ${CMAKE_CURRENT_BINARY_DIR}/proto/gen/proto/auth.pb.h - ${CMAKE_CURRENT_BINARY_DIR}/proto/gen/proto/kv.pb.h - ${CMAKE_CURRENT_BINARY_DIR}/proto/gen/proto/rpc.pb.h - ${CMAKE_CURRENT_BINARY_DIR}/proto/gen/proto/rpc.grpc.pb.h - ${CMAKE_CURRENT_BINARY_DIR}/proto/gen/proto/v3lock.pb.h - ${CMAKE_CURRENT_BINARY_DIR}/proto/gen/proto/v3lock.grpc.pb.h - DESTINATION include/etcd/proto) -install (FILES ${CMAKE_CURRENT_BINARY_DIR}/proto/gen/proto/gogoproto/gogo.pb.h - DESTINATION include/etcd/proto/gogoproto) -install (FILES ${CMAKE_CURRENT_BINARY_DIR}/proto/gen/proto/google/api/annotations.pb.h - ${CMAKE_CURRENT_BINARY_DIR}/proto/gen/proto/google/api/http.pb.h - DESTINATION include/etcd/proto/google/api) + +configure_file(etcd-cpp-api-config.in.cmake + "${PROJECT_BINARY_DIR}/etcd-cpp-api-config.cmake" @ONLY +) +configure_file(etcd-cpp-api-config-version.in.cmake + "${PROJECT_BINARY_DIR}/etcd-cpp-api-config-version.cmake" @ONLY +) +install(FILES "${PROJECT_BINARY_DIR}/etcd-cpp-api-config.cmake" + "${PROJECT_BINARY_DIR}/etcd-cpp-api-config-version.cmake" + DESTINATION ${CMAKE_INSTALL_LIBDIR}/cmake/etcd-cpp-api +) +install(EXPORT etcd-targets + FILE etcd-targets.cmake + DESTINATION ${CMAKE_INSTALL_LIBDIR}/cmake/etcd-cpp-api +) diff --git a/etcd-cpp-api-config-version.in.cmake b/etcd-cpp-api-config-version.in.cmake new file mode 100644 index 0000000..4cdc503 --- /dev/null +++ b/etcd-cpp-api-config-version.in.cmake @@ -0,0 +1,11 @@ +set(PACKAGE_VERSION "@etcd-cpp-api_VERSION@") + +# Check whether the requested PACKAGE_FIND_VERSION is compatible +if("${PACKAGE_VERSION}" VERSION_LESS "${PACKAGE_FIND_VERSION}") + set(PACKAGE_VERSION_COMPATIBLE FALSE) +else() + set(PACKAGE_VERSION_COMPATIBLE TRUE) + if ("${PACKAGE_VERSION}" VERSION_EQUAL "${PACKAGE_FIND_VERSION}") + set(PACKAGE_VERSION_EXACT TRUE) + endif() +endif() diff --git a/etcd-cpp-api-config.in.cmake b/etcd-cpp-api-config.in.cmake new file mode 100644 index 0000000..7c0e49c --- /dev/null +++ b/etcd-cpp-api-config.in.cmake @@ -0,0 +1,29 @@ +# - Config file for the etcd-cpp-apiv3 package +# +# It defines the following variables +# +# ETCD_CPP_INCLUDE_DIR - include directory for etcd +# ETCD_CPP_INCLUDE_DIRS - include directories for etcd +# ETCD_CPP_LIBRARIES - libraries to link against + +# find dependencies +include(CMakeFindDependencyMacro) +find_dependency(Protobuf) +find_dependency(gRPC) +find_dependency(cpprestsdk) +if(cpprestsdk_FOUND) + set(CPPREST_LIB cpprestsdk::cpprest) +endif() + +set(ETCD_CPP_HOME "${CMAKE_CURRENT_LIST_DIR}/../../..") +include("${CMAKE_CURRENT_LIST_DIR}/etcd-targets.cmake") + +set(ETCD_CPP_LIBRARIES etcd-cpp-api) +set(ETCD_CPP_INCLUDE_DIR "${ETCD_CPP_HOME}/include") +set(ETCD_CPP_INCLUDE_DIRS "${ETCD_CPP_INCLUDE_DIR}") + +include(FindPackageMessage) +find_package_message(etcd + "Found etcd: ${CMAKE_CURRENT_LIST_FILE} (found version \"@etcd-cpp-api_VERSION@\")" + "etcd-cpp-apiv3 version: @etcd-cpp-api_VERSION@\netcd-cpp-apiv3 libraries: ${ETCD_CPP_LIBRARIES}, include directories: ${ETCD_CPP_INCLUDE_DIRS}" +) diff --git a/etcd/Client.hpp b/etcd/Client.hpp index 11e7ceb..0003eca 100644 --- a/etcd/Client.hpp +++ b/etcd/Client.hpp @@ -1,21 +1,14 @@ #ifndef __ETCD_CLIENT_HPP__ #define __ETCD_CLIENT_HPP__ -#include "etcd/Response.hpp" - #include #include #include #include -#include "proto/rpc.grpc.pb.h" -#include "proto/v3lock.grpc.pb.h" +#include "pplx/pplxtasks.h" -using etcdserverpb::Auth; -using etcdserverpb::KV; -using etcdserverpb::Watch; -using etcdserverpb::Lease; -using v3lockpb::Lock; +#include "etcd/Response.hpp" namespace etcdv3 { class Transaction; @@ -258,10 +251,12 @@ namespace etcd private: std::shared_ptr channel; std::string auth_token; - std::unique_ptr kvServiceStub; - std::unique_ptr watchServiceStub; - std::unique_ptr leaseServiceStub; - std::unique_ptr lockServiceStub; + + struct EtcdServerStubs; + struct EtcdServerStubsDeleter { + void operator()(EtcdServerStubs *stubs); + }; + std::unique_ptr stubs; std::mutex mutex_for_keepalives; std::map leases_for_locks; diff --git a/etcd/KeepAlive.hpp b/etcd/KeepAlive.hpp index 1bba3f1..2165d7f 100644 --- a/etcd/KeepAlive.hpp +++ b/etcd/KeepAlive.hpp @@ -14,17 +14,6 @@ #endif #include -#include -#include "proto/rpc.grpc.pb.h" - -namespace etcdv3 { - class AsyncLeaseKeepAliveAction; -} - -using etcdserverpb::KV; -using etcdserverpb::Lease; -using grpc::Channel; - namespace etcd { /** @@ -53,8 +42,11 @@ namespace etcd void refresh(); pplx::task currentTask; - std::unique_ptr leaseServiceStub; - std::unique_ptr call; + struct EtcdServerStubs; + struct EtcdServerStubsDeleter { + void operator()(EtcdServerStubs *stubs); + }; + std::unique_ptr stubs; private: int ttl; diff --git a/etcd/Response.hpp b/etcd/Response.hpp index bc25623..8a51677 100644 --- a/etcd/Response.hpp +++ b/etcd/Response.hpp @@ -1,14 +1,14 @@ #ifndef __ETCD_RESPONSE_HPP__ #define __ETCD_RESPONSE_HPP__ +#include #include #include -#include "etcd/Value.hpp" -#include -#include "proto/kv.pb.h" +#include "pplx/pplxtasks.h" -#include +#include "etcd/Value.hpp" +#include "kv.pb.h" namespace etcdv3 { class AsyncWatchAction; diff --git a/etcd/Value.hpp b/etcd/Value.hpp index 0d0eacc..2c24b16 100644 --- a/etcd/Value.hpp +++ b/etcd/Value.hpp @@ -1,10 +1,15 @@ #ifndef __ETCD_VECTOR_HPP__ #define __ETCD_VECTOR_HPP__ -#include #include #include +namespace web { + namespace json { + class value; + } +} + namespace etcdv3 { class KeyValue; } diff --git a/etcd/Watcher.hpp b/etcd/Watcher.hpp index 87c32ff..3c9bdc0 100644 --- a/etcd/Watcher.hpp +++ b/etcd/Watcher.hpp @@ -6,17 +6,6 @@ #include "etcd/Client.hpp" #include "etcd/Response.hpp" -#include -#include "proto/rpc.grpc.pb.h" - -namespace etcdv3 { - class AsyncWatchAction; -} - -using etcdserverpb::KV; -using etcdserverpb::Watch; -using grpc::Channel; - namespace etcd { class Watcher @@ -72,8 +61,12 @@ namespace etcd int index; std::function callback; pplx::task currentTask; - std::unique_ptr watchServiceStub; - std::unique_ptr call; + + struct EtcdServerStubs; + struct EtcdServerStubsDeleter { + void operator()(etcd::Watcher::EtcdServerStubs *stubs); + }; + std::unique_ptr stubs; private: int fromIndex; diff --git a/etcd/v3/Transaction.hpp b/etcd/v3/Transaction.hpp index fa5e158..db250f3 100644 --- a/etcd/v3/Transaction.hpp +++ b/etcd/v3/Transaction.hpp @@ -1,11 +1,14 @@ #ifndef V3_SRC_TRANSACTION_HPP_ #define V3_SRC_TRANSACTION_HPP_ -#include -#include "proto/rpc.grpc.pb.h" - #include +#include "txn.pb.h" + +namespace etcdserverpb { + class TxnRequest; +} + namespace etcdv3 { class Transaction { @@ -29,7 +32,7 @@ public: void setup_put(std::string const &key, std::string const &value); void setup_delete(std::string const &key); - etcdserverpb::TxnRequest txn_request; + std::unique_ptr txn_request; private: std::string key; }; diff --git a/proto/kv.proto b/proto/kv.proto index a7c4f73..fb958bf 100644 --- a/proto/kv.proto +++ b/proto/kv.proto @@ -1,13 +1,13 @@ syntax = "proto3"; package mvccpb; -import "gogoproto/gogo.proto"; +// import "gogoproto/gogo.proto"; -option (gogoproto.marshaler_all) = true; -option (gogoproto.sizer_all) = true; -option (gogoproto.unmarshaler_all) = true; -option (gogoproto.goproto_getters_all) = false; -option (gogoproto.goproto_enum_prefix_all) = false; +// option (gogoproto.marshaler_all) = true; +// option (gogoproto.sizer_all) = true; +// option (gogoproto.unmarshaler_all) = true; +// option (gogoproto.goproto_getters_all) = false; +// option (gogoproto.goproto_enum_prefix_all) = false; message KeyValue { // key is the key in bytes. An empty key is not allowed. diff --git a/proto/rpc.proto b/proto/rpc.proto index 9405c90..7c5e5df 100644 --- a/proto/rpc.proto +++ b/proto/rpc.proto @@ -4,6 +4,7 @@ package etcdserverpb; import "gogoproto/gogo.proto"; import "kv.proto"; import "auth.proto"; +import "txn.proto"; // for grpc-gateway import "google/api/annotations.proto"; @@ -532,46 +533,6 @@ message ResponseOp { } } -message Compare { - enum CompareResult { - EQUAL = 0; - GREATER = 1; - LESS = 2; - NOT_EQUAL = 3; - } - enum CompareTarget { - VERSION = 0; - CREATE = 1; - MOD = 2; - VALUE = 3; - LEASE = 4; - } - // result is logical comparison operation for this comparison. - CompareResult result = 1; - // target is the key-value field to inspect for the comparison. - CompareTarget target = 2; - // key is the subject key for the comparison operation. - bytes key = 3; - oneof target_union { - // version is the version of the given key - int64 version = 4; - // create_revision is the creation revision of the given key - int64 create_revision = 5; - // mod_revision is the last modified revision of the given key. - int64 mod_revision = 6; - // value is the value of the given key, in bytes. - bytes value = 7; - // lease is the lease id of the given key. - int64 lease = 8; - // leave room for more target_union field tags, jump to 64 - } - - // range_end compares the given target to all keys in the range [key, range_end). - // See RangeRequest for more details on key ranges. - bytes range_end = 64; - // TODO: fill out with most of the rest of RangeRequest fields when needed. -} - // From google paxosdb paper: // Our implementation hinges around a powerful primitive which we call MultiOp. All other database // operations except for iteration are implemented as a single call to MultiOp. A MultiOp is applied atomically diff --git a/proto/txn.proto b/proto/txn.proto new file mode 100644 index 0000000..b3b1ef2 --- /dev/null +++ b/proto/txn.proto @@ -0,0 +1,42 @@ +syntax = "proto3"; +package etcdserverpb; + +message Compare { + enum CompareResult { + EQUAL = 0; + GREATER = 1; + LESS = 2; + NOT_EQUAL = 3; + } + enum CompareTarget { + VERSION = 0; + CREATE = 1; + MOD = 2; + VALUE = 3; + LEASE = 4; + } + // result is logical comparison operation for this comparison. + CompareResult result = 1; + // target is the key-value field to inspect for the comparison. + CompareTarget target = 2; + // key is the subject key for the comparison operation. + bytes key = 3; + oneof target_union { + // version is the version of the given key + int64 version = 4; + // create_revision is the creation revision of the given key + int64 create_revision = 5; + // mod_revision is the last modified revision of the given key. + int64 mod_revision = 6; + // value is the value of the given key, in bytes. + bytes value = 7; + // lease is the lease id of the given key. + int64 lease = 8; + // leave room for more target_union field tags, jump to 64 + } + + // range_end compares the given target to all keys in the range [key, range_end). + // See RangeRequest for more details on key ranges. + bytes range_end = 64; + // TODO: fill out with most of the rest of RangeRequest fields when needed. +} diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index d8ce50f..6490944 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -8,14 +8,9 @@ add_library(etcd-cpp-api ${CPP_CLIENT_SRC} ${PROTOBUF_GENERATES}) add_dependencies(etcd-cpp-api protobuf_generates) set_property(TARGET etcd-cpp-api PROPERTY CXX_STANDARD 11) -if(cpprestsdk_FOUND) - target_link_libraries(etcd-cpp-api PUBLIC cpprestsdk::cpprest) -else() - target_link_libraries(etcd-cpp-api PUBLIC ${CPPREST_LIB}) -endif() - target_link_libraries(etcd-cpp-api PUBLIC ${Boost_LIBRARIES} + ${CPPREST_LIB} ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} ${GRPC_LIBRARIES}) @@ -23,8 +18,5 @@ target_link_libraries(etcd-cpp-api PUBLIC target_include_directories(etcd-cpp-api PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/../proto/gen) target_include_directories(etcd-cpp-api PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/../proto/gen/proto) -if (WIN32 AND BUILD_SHARED_LIBS) - install (TARGETS etcd-cpp-api RUNTIME DESTINATION bin) -else() - install (TARGETS etcd-cpp-api LIBRARY DESTINATION lib) -endif() +install(TARGETS etcd-cpp-api + EXPORT etcd-targets) diff --git a/src/Client.cpp b/src/Client.cpp index b3a4bac..2c8f10e 100644 --- a/src/Client.cpp +++ b/src/Client.cpp @@ -10,8 +10,16 @@ #include #endif +#include #include #include + +#include + +#include +#include "proto/rpc.grpc.pb.h" +#include "proto/v3lock.grpc.pb.h" + #include "etcd/Client.hpp" #include "etcd/KeepAlive.hpp" #include "etcd/v3/action_constants.hpp" @@ -22,7 +30,6 @@ #include "etcd/v3/AsyncDeleteRangeResponse.hpp" #include "etcd/v3/AsyncLockResponse.hpp" #include "etcd/v3/Transaction.hpp" -#include #include "etcd/v3/AsyncSetAction.hpp" #include "etcd/v3/AsyncCompareAndSwapAction.hpp" @@ -35,8 +42,6 @@ #include "etcd/v3/AsyncLockAction.hpp" #include "etcd/v3/AsyncTxnAction.hpp" -#include - using grpc::Channel; namespace etcd { @@ -109,7 +114,7 @@ const bool authenticate(std::shared_ptr const &channel, std::string const &password, std::string &token_or_message) { // run a round of auth - auto auth_stub = Auth::NewStub(channel); + auto auth_stub = etcdserverpb::Auth::NewStub(channel); ClientContext context; etcdserverpb::AuthenticateRequest auth_request; etcdserverpb::AuthenticateResponse auth_response; @@ -128,6 +133,19 @@ const bool authenticate(std::shared_ptr const &channel, } } +struct etcd::Client::EtcdServerStubs { + std::unique_ptr kvServiceStub; + std::unique_ptr watchServiceStub; + std::unique_ptr leaseServiceStub; + std::unique_ptr lockServiceStub; +}; + +void etcd::Client::EtcdServerStubsDeleter::operator()(etcd::Client::EtcdServerStubs *stubs) { + if (stubs) { + delete stubs; + } +} + etcd::Client::Client(std::string const & address, std::string const & load_balancer) { @@ -141,10 +159,11 @@ etcd::Client::Client(std::string const & address, this->channel = grpc::CreateCustomChannel(addresses, creds, grpc_args); // create stubs - kvServiceStub = KV::NewStub(this->channel); - watchServiceStub= Watch::NewStub(this->channel); - leaseServiceStub= Lease::NewStub(this->channel); - lockServiceStub = Lock::NewStub(this->channel); + stubs.reset(new EtcdServerStubs{}); + stubs->kvServiceStub = KV::NewStub(this->channel); + stubs->watchServiceStub= Watch::NewStub(this->channel); + stubs->leaseServiceStub= Lease::NewStub(this->channel); + stubs->lockServiceStub = Lock::NewStub(this->channel); } etcd::Client::Client(std::string const & address, @@ -169,10 +188,11 @@ etcd::Client::Client(std::string const & address, this->auth_token = token_or_message; // setup stubs - kvServiceStub = KV::NewStub(this->channel); - watchServiceStub= Watch::NewStub(this->channel); - leaseServiceStub= Lease::NewStub(this->channel); - lockServiceStub = Lock::NewStub(this->channel); + stubs.reset(new EtcdServerStubs{}); + stubs->kvServiceStub = KV::NewStub(this->channel); + stubs->watchServiceStub= Watch::NewStub(this->channel); + stubs->leaseServiceStub= Lease::NewStub(this->channel); + stubs->lockServiceStub = Lock::NewStub(this->channel); } pplx::task etcd::Client::get(std::string const & key) @@ -181,7 +201,7 @@ pplx::task etcd::Client::get(std::string const & key) params.auth_token.assign(this->auth_token); params.key.assign(key); params.withPrefix = false; - params.kv_stub = kvServiceStub .get(); + params.kv_stub = stubs->kvServiceStub.get(); std::shared_ptr call(new etcdv3::AsyncGetAction(params)); return Response::create(call); } @@ -192,7 +212,7 @@ pplx::task etcd::Client::set(std::string const & key, std::strin params.auth_token.assign(this->auth_token); params.key.assign(key); params.value.assign(value); - params.kv_stub = kvServiceStub .get(); + params.kv_stub = stubs->kvServiceStub.get(); if(ttl > 0) { @@ -221,7 +241,7 @@ pplx::task etcd::Client::set(std::string const & key, std::strin params.key.assign(key); params.value.assign(value); params.lease_id = leaseid; - params.kv_stub = kvServiceStub .get(); + params.kv_stub = stubs->kvServiceStub.get(); std::shared_ptr call(new etcdv3::AsyncSetAction(params)); return Response::create(call); } @@ -233,7 +253,7 @@ pplx::task etcd::Client::add(std::string const & key, std::strin params.auth_token.assign(this->auth_token); params.key.assign(key); params.value.assign(value); - params.kv_stub = kvServiceStub .get(); + params.kv_stub = stubs->kvServiceStub.get(); if(ttl > 0) { @@ -261,7 +281,7 @@ pplx::task etcd::Client::add(std::string const & key, std::strin params.key.assign(key); params.value.assign(value); params.lease_id = leaseid; - params.kv_stub = kvServiceStub .get(); + params.kv_stub = stubs->kvServiceStub.get(); std::shared_ptr call(new etcdv3::AsyncSetAction(params,true)); return Response::create(call); } @@ -273,7 +293,7 @@ pplx::task etcd::Client::modify(std::string const & key, std::st params.auth_token.assign(this->auth_token); params.key.assign(key); params.value.assign(value); - params.kv_stub = kvServiceStub .get(); + params.kv_stub = stubs->kvServiceStub.get(); if(ttl > 0) { @@ -301,7 +321,7 @@ pplx::task etcd::Client::modify(std::string const & key, std::st params.key.assign(key); params.value.assign(value); params.lease_id = leaseid; - params.kv_stub = kvServiceStub .get(); + params.kv_stub = stubs->kvServiceStub.get(); std::shared_ptr call(new etcdv3::AsyncUpdateAction(params)); return Response::create(call); } @@ -314,7 +334,7 @@ pplx::task etcd::Client::modify_if(std::string const & key, std: params.key.assign(key); params.value.assign(value); params.old_value.assign(old_value); - params.kv_stub = kvServiceStub .get(); + params.kv_stub = stubs->kvServiceStub.get(); if(ttl > 0) { @@ -343,7 +363,7 @@ pplx::task etcd::Client::modify_if(std::string const & key, std: params.value.assign(value); params.old_value.assign(old_value); params.lease_id = leaseid; - params.kv_stub = kvServiceStub .get(); + params.kv_stub = stubs->kvServiceStub.get(); std::shared_ptr call(new etcdv3::AsyncCompareAndSwapAction(params,etcdv3::Atomicity_Type::PREV_VALUE)); return Response::create(call); } @@ -355,7 +375,7 @@ pplx::task etcd::Client::modify_if(std::string const & key, std: params.key.assign(key); params.value.assign(value); params.old_revision = old_index; - params.kv_stub = kvServiceStub.get(); + params.kv_stub = stubs->kvServiceStub.get(); if(ttl > 0) { auto res = leasegrant(ttl).get(); @@ -383,7 +403,7 @@ pplx::task etcd::Client::modify_if(std::string const & key, std: params.value.assign(value); params.lease_id = leaseid; params.old_revision = old_index; - params.kv_stub = kvServiceStub .get(); + params.kv_stub = stubs->kvServiceStub.get(); std::shared_ptr call(new etcdv3::AsyncCompareAndSwapAction(params,etcdv3::Atomicity_Type::PREV_INDEX)); return Response::create(call); } @@ -395,7 +415,7 @@ pplx::task etcd::Client::rm(std::string const & key) params.auth_token.assign(this->auth_token); params.key.assign(key); params.withPrefix = false; - params.kv_stub = kvServiceStub .get(); + params.kv_stub = stubs->kvServiceStub.get(); std::shared_ptr call(new etcdv3::AsyncDeleteAction(params)); return Response::create(call); } @@ -407,7 +427,7 @@ pplx::task etcd::Client::rm_if(std::string const & key, std::str params.auth_token.assign(this->auth_token); params.key.assign(key); params.old_value.assign(old_value); - params.kv_stub = kvServiceStub .get(); + params.kv_stub = stubs->kvServiceStub.get(); std::shared_ptr call(new etcdv3::AsyncCompareAndDeleteAction(params,etcdv3::Atomicity_Type::PREV_VALUE)); return Response::create(call); } @@ -418,7 +438,7 @@ pplx::task etcd::Client::rm_if(std::string const & key, int old_ params.auth_token.assign(this->auth_token); params.key.assign(key); params.old_revision = old_index; - params.kv_stub = kvServiceStub .get(); + params.kv_stub = stubs->kvServiceStub.get(); std::shared_ptr call(new etcdv3::AsyncCompareAndDeleteAction(params, etcdv3::Atomicity_Type::PREV_INDEX));; return Response::create(call); @@ -430,7 +450,7 @@ pplx::task etcd::Client::rmdir(std::string const & key, bool rec params.auth_token.assign(this->auth_token); params.key.assign(key); params.withPrefix = recursive; - params.kv_stub = kvServiceStub .get(); + params.kv_stub = stubs->kvServiceStub.get(); std::shared_ptr call(new etcdv3::AsyncDeleteAction(params)); return Response::create(call); } @@ -442,7 +462,7 @@ pplx::task etcd::Client::ls(std::string const & key) params.key.assign(key); params.withPrefix = true; params.limit = 0; // default no limit. - params.kv_stub = kvServiceStub .get(); + params.kv_stub = stubs->kvServiceStub.get(); std::shared_ptr call(new etcdv3::AsyncGetAction(params)); return Response::create(call); } @@ -454,7 +474,7 @@ pplx::task etcd::Client::ls(std::string const & key, size_t cons params.key.assign(key); params.withPrefix = true; params.limit = limit; - params.kv_stub = kvServiceStub .get(); + params.kv_stub = stubs->kvServiceStub.get(); std::shared_ptr call(new etcdv3::AsyncGetAction(params)); return Response::create(call); } @@ -465,7 +485,7 @@ pplx::task etcd::Client::watch(std::string const & key, bool rec params.auth_token.assign(this->auth_token); params.key.assign(key); params.withPrefix = recursive; - params.watch_stub = watchServiceStub.get(); + params.watch_stub = stubs->watchServiceStub.get(); std::shared_ptr call(new etcdv3::AsyncWatchAction(params)); return Response::create(call); } @@ -477,7 +497,7 @@ pplx::task etcd::Client::watch(std::string const & key, int from params.key.assign(key); params.withPrefix = recursive; params.revision = fromIndex; - params.watch_stub = watchServiceStub.get(); + params.watch_stub = stubs->watchServiceStub.get(); std::shared_ptr call(new etcdv3::AsyncWatchAction(params)); return Response::create(call); } @@ -487,7 +507,7 @@ pplx::task etcd::Client::leasegrant(int ttl) etcdv3::ActionParameters params; params.auth_token.assign(this->auth_token); params.ttl = ttl; - params.lease_stub = leaseServiceStub.get(); + params.lease_stub = stubs->leaseServiceStub.get(); std::shared_ptr call(new etcdv3::AsyncLeaseGrantAction(params)); return Response::create(call); } @@ -497,7 +517,7 @@ pplx::task etcd::Client::leaserevoke(int64_t lease_id) etcdv3::ActionParameters params; params.auth_token.assign(this->auth_token); params.lease_id = lease_id; - params.lease_stub = leaseServiceStub.get(); + params.lease_stub = stubs->leaseServiceStub.get(); std::shared_ptr call(new etcdv3::AsyncLeaseRevokeAction(params)); return Response::create(call); } @@ -507,7 +527,7 @@ pplx::task etcd::Client::leasetimetolive(int64_t lease_id) etcdv3::ActionParameters params; params.auth_token.assign(this->auth_token); params.lease_id = lease_id; - params.lease_stub = leaseServiceStub.get(); + params.lease_stub = stubs->leaseServiceStub.get(); std::shared_ptr call(new etcdv3::AsyncLeaseTimeToLiveAction(params)); return Response::create(call); } @@ -530,7 +550,7 @@ pplx::task etcd::Client::lock(std::string const &key) { } params.key = key; params.lease_id = lease_id; - params.lock_stub = lockServiceStub.get(); + params.lock_stub = stubs->lockServiceStub.get(); std::shared_ptr call(new etcdv3::AsyncLockAction(params)); return Response::create(call).then( [this, lease_id](pplx::task const &resp_task) -> etcd::Response { @@ -554,7 +574,7 @@ pplx::task etcd::Client::lock(std::string const &key, params.auth_token.assign(this->auth_token); params.key = key; params.lease_id = lease_id; - params.lock_stub = lockServiceStub.get(); + params.lock_stub = stubs->lockServiceStub.get(); std::shared_ptr call(new etcdv3::AsyncLockAction(params)); return Response::create(call); } @@ -577,7 +597,7 @@ pplx::task etcd::Client::unlock(std::string const &lock_key) { etcdv3::ActionParameters params; params.auth_token.assign(this->auth_token); params.key = lock_key; - params.lock_stub = lockServiceStub.get(); + params.lock_stub = stubs->lockServiceStub.get(); std::shared_ptr call(new etcdv3::AsyncUnlockAction(params)); return Response::create(call); } @@ -585,7 +605,7 @@ pplx::task etcd::Client::unlock(std::string const &lock_key) { pplx::task etcd::Client::txn(etcdv3::Transaction const &txn) { etcdv3::ActionParameters params; params.auth_token.assign(this->auth_token); - params.kv_stub = kvServiceStub .get(); + params.kv_stub = stubs->kvServiceStub.get(); std::shared_ptr call(new etcdv3::AsyncTxnAction(params, txn)); return Response::create(call); } diff --git a/src/KeepAlive.cpp b/src/KeepAlive.cpp index e617423..c0d9c70 100644 --- a/src/KeepAlive.cpp +++ b/src/KeepAlive.cpp @@ -3,16 +3,35 @@ #include "etcd/KeepAlive.hpp" #include "etcd/v3/AsyncLeaseAction.hpp" +#include +#include "proto/rpc.grpc.pb.h" + +namespace etcdv3 { + class AsyncLeaseKeepAliveAction; +} + +struct etcd::KeepAlive::EtcdServerStubs { + std::unique_ptr leaseServiceStub; + std::unique_ptr call; +}; + +void etcd::KeepAlive::EtcdServerStubsDeleter::operator()(etcd::KeepAlive::EtcdServerStubs *stubs) { + if (stubs) { + delete stubs; + } +} + etcd::KeepAlive::KeepAlive(Client const &client, int ttl, int64_t lease_id): ttl(ttl), lease_id(lease_id), continue_next(true) { - leaseServiceStub= Lease::NewStub(client.channel); + stubs.reset(new EtcdServerStubs{}); + stubs->leaseServiceStub = Lease::NewStub(client.channel); etcdv3::ActionParameters params; params.auth_token.assign(client.auth_token); params.lease_id = this->lease_id; - params.lease_stub = leaseServiceStub.get(); + params.lease_stub = stubs->leaseServiceStub.get(); - call.reset(new etcdv3::AsyncLeaseKeepAliveAction(params)); + stubs->call.reset(new etcdv3::AsyncLeaseKeepAliveAction(params)); currentTask = pplx::task([this]() { // start refresh this->refresh(); @@ -49,7 +68,7 @@ void etcd::KeepAlive::Cancel() std::cout.flags(os_flags); } #endif - call->CancelKeepAlive(); + stubs->call->CancelKeepAlive(); if (keepalive_timer_) { keepalive_timer_->cancel(); } @@ -79,7 +98,7 @@ void etcd::KeepAlive::refresh() std::cerr << "keepalive timer error: " << error << ", " << error.message() << std::endl; #endif } else { - this->call->Refresh(); + this->stubs->call->Refresh(); // trigger the next round; this->refresh(); } diff --git a/src/Value.cpp b/src/Value.cpp index 00d5f46..0ac3496 100644 --- a/src/Value.cpp +++ b/src/Value.cpp @@ -1,4 +1,7 @@ #include + +#include + #include "etcd/Value.hpp" #include "etcd/v3/KeyValue.hpp" @@ -57,5 +60,3 @@ int64_t etcd::Value::lease() const { return leaseId; } - - diff --git a/src/Watcher.cpp b/src/Watcher.cpp index 977f930..303572b 100644 --- a/src/Watcher.cpp +++ b/src/Watcher.cpp @@ -1,6 +1,17 @@ #include "etcd/Watcher.hpp" #include "etcd/v3/AsyncWatchAction.hpp" +struct etcd::Watcher::EtcdServerStubs { + std::unique_ptr watchServiceStub; + std::unique_ptr call; +}; + +void etcd::Watcher::EtcdServerStubsDeleter::operator()(etcd::Watcher::EtcdServerStubs *stubs) { + if (stubs) { + delete stubs; + } +} + etcd::Watcher::Watcher(Client const &client, std::string const & key, std::function callback, bool recursive): Watcher(client, key, -1, callback, recursive) { @@ -9,7 +20,8 @@ etcd::Watcher::Watcher(Client const &client, std::string const & key, etcd::Watcher::Watcher(Client const &client, std::string const & key, int fromIndex, std::function callback, bool recursive): fromIndex(fromIndex), recursive(recursive) { - watchServiceStub= Watch::NewStub(client.channel); + stubs.reset(new EtcdServerStubs{}); + stubs->watchServiceStub = Watch::NewStub(client.channel); doWatch(key, client.auth_token, callback); } @@ -39,27 +51,27 @@ etcd::Watcher::Watcher(std::string const & address, etcd::Watcher::~Watcher() { - call->CancelWatch(); + stubs->call->CancelWatch(); currentTask.wait(); } bool etcd::Watcher::Wait() { currentTask.wait(); - return call->Cancelled(); + return stubs->call->Cancelled(); } void etcd::Watcher::Wait(std::function callback) { currentTask.then([this, callback](pplx::task const & resp_task) { resp_task.wait(); - callback(this->call->Cancelled()); + callback(this->stubs->call->Cancelled()); }); } void etcd::Watcher::Cancel() { - call->CancelWatch(); + stubs->call->CancelWatch(); this->Wait(); } @@ -74,12 +86,12 @@ void etcd::Watcher::doWatch(std::string const & key, params.revision = fromIndex; } params.withPrefix = recursive; - params.watch_stub = watchServiceStub.get(); + params.watch_stub = stubs->watchServiceStub.get(); - call.reset(new etcdv3::AsyncWatchAction(params)); + stubs->call.reset(new etcdv3::AsyncWatchAction(params)); currentTask = pplx::task([this, callback]() { - return call->waitForResponse(callback); + return stubs->call->waitForResponse(callback); }); } diff --git a/src/v3/AsyncCompareAndDeleteAction.cpp b/src/v3/AsyncCompareAndDeleteAction.cpp index 50cee11..2a813f6 100644 --- a/src/v3/AsyncCompareAndDeleteAction.cpp +++ b/src/v3/AsyncCompareAndDeleteAction.cpp @@ -27,7 +27,7 @@ etcdv3::AsyncCompareAndDeleteAction::AsyncCompareAndDeleteAction(etcdv3::ActionP transaction.setup_compare_and_delete_operation(parameters.key); transaction.setup_basic_failure_operation(parameters.key); - response_reader = parameters.kv_stub->AsyncTxn(&context, transaction.txn_request, &cq_); + response_reader = parameters.kv_stub->AsyncTxn(&context, *transaction.txn_request, &cq_); response_reader->Finish(&reply, &status, (void*)this); } diff --git a/src/v3/AsyncCompareAndSwapAction.cpp b/src/v3/AsyncCompareAndSwapAction.cpp index 5e813fd..6bd6ee7 100644 --- a/src/v3/AsyncCompareAndSwapAction.cpp +++ b/src/v3/AsyncCompareAndSwapAction.cpp @@ -27,7 +27,7 @@ etcdv3::AsyncCompareAndSwapAction::AsyncCompareAndSwapAction(etcdv3::ActionParam transaction.setup_basic_failure_operation(parameters.key); transaction.setup_compare_and_swap_sequence(parameters.value, parameters.lease_id); - response_reader = parameters.kv_stub->AsyncTxn(&context, transaction.txn_request, &cq_); + response_reader = parameters.kv_stub->AsyncTxn(&context, *transaction.txn_request, &cq_); response_reader->Finish(&reply, &status, (void*)this); } diff --git a/src/v3/AsyncSetAction.cpp b/src/v3/AsyncSetAction.cpp index 0c28c3f..9788d8f 100644 --- a/src/v3/AsyncSetAction.cpp +++ b/src/v3/AsyncSetAction.cpp @@ -22,7 +22,7 @@ etcdv3::AsyncSetAction::AsyncSetAction(etcdv3::ActionParameters param, bool crea { transaction.setup_set_failure_operation(parameters.key, parameters.value, parameters.lease_id); } - response_reader = parameters.kv_stub->AsyncTxn(&context, transaction.txn_request, &cq_); + response_reader = parameters.kv_stub->AsyncTxn(&context, *transaction.txn_request, &cq_); response_reader->Finish(&reply, &status, (void*)this); } diff --git a/src/v3/AsyncTxnAction.cpp b/src/v3/AsyncTxnAction.cpp index b519ada..acb0ad3 100644 --- a/src/v3/AsyncTxnAction.cpp +++ b/src/v3/AsyncTxnAction.cpp @@ -6,7 +6,7 @@ etcdv3::AsyncTxnAction::AsyncTxnAction(etcdv3::ActionParameters param, etcdv3::Transaction const &tx) : etcdv3::Action(param) { - response_reader = parameters.kv_stub->AsyncTxn(&context, tx.txn_request, &cq_); + response_reader = parameters.kv_stub->AsyncTxn(&context, *tx.txn_request, &cq_); response_reader->Finish(&reply, &status, (void *)this); } diff --git a/src/v3/AsyncUpdateAction.cpp b/src/v3/AsyncUpdateAction.cpp index c985ce2..5c62957 100644 --- a/src/v3/AsyncUpdateAction.cpp +++ b/src/v3/AsyncUpdateAction.cpp @@ -19,7 +19,7 @@ etcdv3::AsyncUpdateAction::AsyncUpdateAction(etcdv3::ActionParameters param) transaction.setup_compare_and_swap_sequence(parameters.value, parameters.lease_id); - response_reader = parameters.kv_stub->AsyncTxn(&context, transaction.txn_request, &cq_); + response_reader = parameters.kv_stub->AsyncTxn(&context, *transaction.txn_request, &cq_); response_reader->Finish(&reply, &status, (void*)this); } diff --git a/src/v3/Transaction.cpp b/src/v3/Transaction.cpp index a32afd4..1adbfd2 100644 --- a/src/v3/Transaction.cpp +++ b/src/v3/Transaction.cpp @@ -1,5 +1,7 @@ #include "etcd/v3/Transaction.hpp" +#include "proto/rpc.grpc.pb.h" + using etcdserverpb::Compare; using etcdserverpb::RangeRequest; using etcdserverpb::PutRequest; @@ -7,13 +9,15 @@ using etcdserverpb::RequestOp; using etcdserverpb::DeleteRangeRequest; etcdv3::Transaction::Transaction() { + txn_request.reset(new etcdserverpb::TxnRequest{}); } etcdv3::Transaction::Transaction(const std::string& key) : key(key) { + txn_request.reset(new etcdserverpb::TxnRequest{}); } void etcdv3::Transaction::init_compare(Compare::CompareResult result, Compare::CompareTarget target){ - Compare* compare = txn_request.add_compare(); + Compare* compare = txn_request->add_compare(); compare->set_result(result); compare->set_target(target); compare->set_key(key); @@ -22,7 +26,7 @@ void etcdv3::Transaction::init_compare(Compare::CompareResult result, Compare::C } void etcdv3::Transaction::init_compare(std::string const& old_value, Compare::CompareResult result, Compare::CompareTarget target){ - Compare* compare = txn_request.add_compare(); + Compare* compare = txn_request->add_compare(); compare->set_result(result); compare->set_target(target); compare->set_key(key); @@ -31,7 +35,7 @@ void etcdv3::Transaction::init_compare(std::string const& old_value, Compare::Co } void etcdv3::Transaction::init_compare(int old_index, Compare::CompareResult result, Compare::CompareTarget target){ - Compare* compare = txn_request.add_compare(); + Compare* compare = txn_request->add_compare(); compare->set_result(result); compare->set_target(target); compare->set_key(key); @@ -45,7 +49,7 @@ void etcdv3::Transaction::init_compare(int old_index, Compare::CompareResult res void etcdv3::Transaction::setup_basic_failure_operation(std::string const& key) { std::unique_ptr get_request(new RangeRequest()); get_request->set_key(key); - RequestOp* req_failure = txn_request.add_failure(); + RequestOp* req_failure = txn_request->add_failure(); req_failure->set_allocated_request_range(get_request.release()); } @@ -58,12 +62,12 @@ void etcdv3::Transaction::setup_set_failure_operation(std::string const &key, st put_request->set_value(value); put_request->set_prev_kv(true); put_request->set_lease(leaseid); - RequestOp* req_failure = txn_request.add_failure(); + RequestOp* req_failure = txn_request->add_failure(); req_failure->set_allocated_request_put(put_request.release()); std::unique_ptr get_request(new RangeRequest()); get_request->set_key(key); - req_failure = txn_request.add_failure(); + req_failure = txn_request->add_failure(); req_failure->set_allocated_request_range(get_request.release()); } @@ -76,12 +80,12 @@ void etcdv3::Transaction::setup_basic_create_sequence(std::string const& key, st put_request->set_value(value); put_request->set_prev_kv(true); put_request->set_lease(leaseid); - RequestOp* req_success = txn_request.add_success(); + RequestOp* req_success = txn_request->add_success(); req_success->set_allocated_request_put(put_request.release()); std::unique_ptr get_request(new RangeRequest()); get_request->set_key(key); - req_success = txn_request.add_success(); + req_success = txn_request->add_success(); req_success->set_allocated_request_range(get_request.release()); } @@ -94,12 +98,12 @@ void etcdv3::Transaction::setup_compare_and_swap_sequence(std::string const& val put_request->set_value(value); put_request->set_prev_kv(true); put_request->set_lease(leaseid); - RequestOp* req_success = txn_request.add_success(); + RequestOp* req_success = txn_request->add_success(); req_success->set_allocated_request_put(put_request.release()); std::unique_ptr get_request(new RangeRequest()); get_request->set_key(key); - req_success = txn_request.add_success(); + req_success = txn_request->add_success(); req_success->set_allocated_request_range(get_request.release()); } @@ -115,7 +119,7 @@ void etcdv3::Transaction::setup_delete_sequence(std::string const &key, std::str del_request->set_range_end(range_end); } - RequestOp* req_success = txn_request.add_success(); + RequestOp* req_success = txn_request->add_success(); req_success->set_allocated_request_delete_range(del_request.release()); } @@ -133,7 +137,7 @@ void etcdv3::Transaction::setup_delete_failure_operation(std::string const &key, get_request->set_sort_target(RangeRequest::SortTarget::RangeRequest_SortTarget_KEY); get_request->set_sort_order(RangeRequest::SortOrder::RangeRequest_SortOrder_ASCEND); } - RequestOp* req_failure = txn_request.add_failure(); + RequestOp* req_failure = txn_request->add_failure(); req_failure->set_allocated_request_range(get_request.release()); del_request.reset(new DeleteRangeRequest()); @@ -143,7 +147,7 @@ void etcdv3::Transaction::setup_delete_failure_operation(std::string const &key, del_request->set_range_end(range_end); } - req_failure = txn_request.add_failure(); + req_failure = txn_request->add_failure(); req_failure->set_allocated_request_delete_range(del_request.release()); } @@ -151,7 +155,7 @@ void etcdv3::Transaction::setup_compare_and_delete_operation(std::string const& std::unique_ptr del_request(new DeleteRangeRequest()); del_request->set_key(key); del_request->set_prev_kv(true); - RequestOp* req_success = txn_request.add_success(); + RequestOp* req_success = txn_request->add_success(); req_success->set_allocated_request_delete_range(del_request.release()); } @@ -160,7 +164,7 @@ void etcdv3::Transaction::setup_put(std::string const &key, std::string const &v put_request->set_key(key); put_request->set_value(value); put_request->set_prev_kv(false); - RequestOp* req_success = txn_request.add_success(); + RequestOp* req_success = txn_request->add_success(); req_success->set_allocated_request_put(put_request.release()); } @@ -169,7 +173,7 @@ void etcdv3::Transaction::setup_delete(std::string const &key) { del_request->set_key(key); del_request->set_prev_kv(false); - RequestOp* req_success = txn_request.add_success(); + RequestOp* req_success = txn_request->add_success(); req_success->set_allocated_request_delete_range(del_request.release()); }