Reduce the dependency interface to avoid include all generated protobuf/grpc files. (#41)

Add CMake config files to make it looks as a cmake module.

Fixes https://github.com/etcd-cpp-apiv3/etcd-cpp-apiv3/issues/37.

Signed-off-by: Tao He <linzhu.ht@alibaba-inc.com>
This commit is contained in:
Tao He 2021-02-06 17:16:58 +08:00 committed by GitHub
parent e02357ca86
commit d2e35ceb47
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 290 additions and 192 deletions

View File

@ -7,7 +7,9 @@ jobs:
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ubuntu-18.04, ubuntu-20.04, macos-10.15, macos-11.0]
# disabled: macos-11.0
# why: https://github.com/actions/virtual-environments/issues/2486
os: [ubuntu-18.04, ubuntu-20.04, macos-10.15]
etcd: [v3.2.26, v3.3.11, v3.4.13]
steps:
- uses: actions/checkout@v2

View File

@ -5,10 +5,21 @@ set(CMAKE_EXPORT_COMPILE_COMMANDS ON)
set(etcd-cpp-api_VERSION_MAJOR 0)
set(etcd-cpp-api_VERSION_MINOR 1)
set(etcd-cpp-api_VERSION ${etcd-cpp-api_MAJOR_VERSION}.${etcd-cpp-api_MINOR_VERSION})
include(CheckCXXCompilerFlag)
include(CheckLibraryExists)
include(GNUInstallDirs)
set(CMAKE_EXPORT_COMPILE_COMMANDS ON)
option(BUILD_ETCD_TESTS "Build test cases" OFF)
option(BUILD_SHARED_LIBS "Build shared libraries" ON)
# reference: https://gitlab.kitware.com/cmake/community/-/wikis/doc/cmake/RPATH-handling#always-full-rpath
set(CMAKE_BUILD_WITH_INSTALL_RPATH FALSE)
set(CMAKE_INSTALL_RPATH "${CMAKE_INSTALL_PREFIX}/lib:${CMAKE_INSTALL_PREFIX}/lib64")
set(CMAKE_INSTALL_RPATH_USE_LINK_PATH TRUE)
find_package(Boost REQUIRED COMPONENTS system thread random)
if (APPLE)
# If we're on OS X check for Homebrew's copy of OpenSSL instead of Apple's
@ -32,6 +43,7 @@ find_package(Protobuf REQUIRED)
find_package(cpprestsdk QUIET)
if(cpprestsdk_FOUND)
set(CPPREST_INCLUDE_DIR)
set(CPPREST_LIB cpprestsdk::cpprest)
else()
find_library(CPPREST_LIB NAMES cpprest)
find_path(CPPREST_INCLUDE_DIR NAMES cpprest/http_client.h)
@ -76,18 +88,23 @@ install (FILES ${CMAKE_CURRENT_SOURCE_DIR}/etcd/Client.hpp
${CMAKE_CURRENT_SOURCE_DIR}/etcd/Response.hpp
${CMAKE_CURRENT_SOURCE_DIR}/etcd/Value.hpp
${CMAKE_CURRENT_SOURCE_DIR}/etcd/Watcher.hpp
${CMAKE_CURRENT_BINARY_DIR}/proto/gen/proto/kv.pb.h
DESTINATION include/etcd)
install (FILES ${CMAKE_CURRENT_SOURCE_DIR}/etcd/v3/Transaction.hpp
${CMAKE_CURRENT_BINARY_DIR}/proto/gen/proto/txn.pb.h
DESTINATION include/etcd/v3)
install (FILES ${CMAKE_CURRENT_BINARY_DIR}/proto/gen/proto/auth.pb.h
${CMAKE_CURRENT_BINARY_DIR}/proto/gen/proto/kv.pb.h
${CMAKE_CURRENT_BINARY_DIR}/proto/gen/proto/rpc.pb.h
${CMAKE_CURRENT_BINARY_DIR}/proto/gen/proto/rpc.grpc.pb.h
${CMAKE_CURRENT_BINARY_DIR}/proto/gen/proto/v3lock.pb.h
${CMAKE_CURRENT_BINARY_DIR}/proto/gen/proto/v3lock.grpc.pb.h
DESTINATION include/etcd/proto)
install (FILES ${CMAKE_CURRENT_BINARY_DIR}/proto/gen/proto/gogoproto/gogo.pb.h
DESTINATION include/etcd/proto/gogoproto)
install (FILES ${CMAKE_CURRENT_BINARY_DIR}/proto/gen/proto/google/api/annotations.pb.h
${CMAKE_CURRENT_BINARY_DIR}/proto/gen/proto/google/api/http.pb.h
DESTINATION include/etcd/proto/google/api)
configure_file(etcd-cpp-api-config.in.cmake
"${PROJECT_BINARY_DIR}/etcd-cpp-api-config.cmake" @ONLY
)
configure_file(etcd-cpp-api-config-version.in.cmake
"${PROJECT_BINARY_DIR}/etcd-cpp-api-config-version.cmake" @ONLY
)
install(FILES "${PROJECT_BINARY_DIR}/etcd-cpp-api-config.cmake"
"${PROJECT_BINARY_DIR}/etcd-cpp-api-config-version.cmake"
DESTINATION ${CMAKE_INSTALL_LIBDIR}/cmake/etcd-cpp-api
)
install(EXPORT etcd-targets
FILE etcd-targets.cmake
DESTINATION ${CMAKE_INSTALL_LIBDIR}/cmake/etcd-cpp-api
)

View File

@ -0,0 +1,11 @@
set(PACKAGE_VERSION "@etcd-cpp-api_VERSION@")
# Check whether the requested PACKAGE_FIND_VERSION is compatible
if("${PACKAGE_VERSION}" VERSION_LESS "${PACKAGE_FIND_VERSION}")
set(PACKAGE_VERSION_COMPATIBLE FALSE)
else()
set(PACKAGE_VERSION_COMPATIBLE TRUE)
if ("${PACKAGE_VERSION}" VERSION_EQUAL "${PACKAGE_FIND_VERSION}")
set(PACKAGE_VERSION_EXACT TRUE)
endif()
endif()

View File

@ -0,0 +1,29 @@
# - Config file for the etcd-cpp-apiv3 package
#
# It defines the following variables
#
# ETCD_CPP_INCLUDE_DIR - include directory for etcd
# ETCD_CPP_INCLUDE_DIRS - include directories for etcd
# ETCD_CPP_LIBRARIES - libraries to link against
# find dependencies
include(CMakeFindDependencyMacro)
find_dependency(Protobuf)
find_dependency(gRPC)
find_dependency(cpprestsdk)
if(cpprestsdk_FOUND)
set(CPPREST_LIB cpprestsdk::cpprest)
endif()
set(ETCD_CPP_HOME "${CMAKE_CURRENT_LIST_DIR}/../../..")
include("${CMAKE_CURRENT_LIST_DIR}/etcd-targets.cmake")
set(ETCD_CPP_LIBRARIES etcd-cpp-api)
set(ETCD_CPP_INCLUDE_DIR "${ETCD_CPP_HOME}/include")
set(ETCD_CPP_INCLUDE_DIRS "${ETCD_CPP_INCLUDE_DIR}")
include(FindPackageMessage)
find_package_message(etcd
"Found etcd: ${CMAKE_CURRENT_LIST_FILE} (found version \"@etcd-cpp-api_VERSION@\")"
"etcd-cpp-apiv3 version: @etcd-cpp-api_VERSION@\netcd-cpp-apiv3 libraries: ${ETCD_CPP_LIBRARIES}, include directories: ${ETCD_CPP_INCLUDE_DIRS}"
)

View File

@ -1,21 +1,14 @@
#ifndef __ETCD_CLIENT_HPP__
#define __ETCD_CLIENT_HPP__
#include "etcd/Response.hpp"
#include <map>
#include <mutex>
#include <string>
#include <grpc++/grpc++.h>
#include "proto/rpc.grpc.pb.h"
#include "proto/v3lock.grpc.pb.h"
#include "pplx/pplxtasks.h"
using etcdserverpb::Auth;
using etcdserverpb::KV;
using etcdserverpb::Watch;
using etcdserverpb::Lease;
using v3lockpb::Lock;
#include "etcd/Response.hpp"
namespace etcdv3 {
class Transaction;
@ -258,10 +251,12 @@ namespace etcd
private:
std::shared_ptr<grpc::Channel> channel;
std::string auth_token;
std::unique_ptr<KV::Stub> kvServiceStub;
std::unique_ptr<Watch::Stub> watchServiceStub;
std::unique_ptr<Lease::Stub> leaseServiceStub;
std::unique_ptr<Lock::Stub> lockServiceStub;
struct EtcdServerStubs;
struct EtcdServerStubsDeleter {
void operator()(EtcdServerStubs *stubs);
};
std::unique_ptr<EtcdServerStubs, EtcdServerStubsDeleter> stubs;
std::mutex mutex_for_keepalives;
std::map<std::string, int64_t> leases_for_locks;

View File

@ -14,17 +14,6 @@
#endif
#include <boost/asio/steady_timer.hpp>
#include <grpc++/grpc++.h>
#include "proto/rpc.grpc.pb.h"
namespace etcdv3 {
class AsyncLeaseKeepAliveAction;
}
using etcdserverpb::KV;
using etcdserverpb::Lease;
using grpc::Channel;
namespace etcd
{
/**
@ -53,8 +42,11 @@ namespace etcd
void refresh();
pplx::task<void> currentTask;
std::unique_ptr<Lease::Stub> leaseServiceStub;
std::unique_ptr<etcdv3::AsyncLeaseKeepAliveAction> call;
struct EtcdServerStubs;
struct EtcdServerStubsDeleter {
void operator()(EtcdServerStubs *stubs);
};
std::unique_ptr<EtcdServerStubs, EtcdServerStubsDeleter> stubs;
private:
int ttl;

View File

@ -1,14 +1,14 @@
#ifndef __ETCD_RESPONSE_HPP__
#define __ETCD_RESPONSE_HPP__
#include <iostream>
#include <string>
#include <vector>
#include "etcd/Value.hpp"
#include <grpc++/grpc++.h>
#include "proto/kv.pb.h"
#include "pplx/pplxtasks.h"
#include <iostream>
#include "etcd/Value.hpp"
#include "kv.pb.h"
namespace etcdv3 {
class AsyncWatchAction;

View File

@ -1,10 +1,15 @@
#ifndef __ETCD_VECTOR_HPP__
#define __ETCD_VECTOR_HPP__
#include <cpprest/http_client.h>
#include <string>
#include <vector>
namespace web {
namespace json {
class value;
}
}
namespace etcdv3 {
class KeyValue;
}

View File

@ -6,17 +6,6 @@
#include "etcd/Client.hpp"
#include "etcd/Response.hpp"
#include <grpc++/grpc++.h>
#include "proto/rpc.grpc.pb.h"
namespace etcdv3 {
class AsyncWatchAction;
}
using etcdserverpb::KV;
using etcdserverpb::Watch;
using grpc::Channel;
namespace etcd
{
class Watcher
@ -72,8 +61,12 @@ namespace etcd
int index;
std::function<void(Response)> callback;
pplx::task<void> currentTask;
std::unique_ptr<Watch::Stub> watchServiceStub;
std::unique_ptr<etcdv3::AsyncWatchAction> call;
struct EtcdServerStubs;
struct EtcdServerStubsDeleter {
void operator()(etcd::Watcher::EtcdServerStubs *stubs);
};
std::unique_ptr<EtcdServerStubs, EtcdServerStubsDeleter> stubs;
private:
int fromIndex;

View File

@ -1,11 +1,14 @@
#ifndef V3_SRC_TRANSACTION_HPP_
#define V3_SRC_TRANSACTION_HPP_
#include <grpc++/grpc++.h>
#include "proto/rpc.grpc.pb.h"
#include <string>
#include "txn.pb.h"
namespace etcdserverpb {
class TxnRequest;
}
namespace etcdv3 {
class Transaction {
@ -29,7 +32,7 @@ public:
void setup_put(std::string const &key, std::string const &value);
void setup_delete(std::string const &key);
etcdserverpb::TxnRequest txn_request;
std::unique_ptr<etcdserverpb::TxnRequest> txn_request;
private:
std::string key;
};

View File

@ -1,13 +1,13 @@
syntax = "proto3";
package mvccpb;
import "gogoproto/gogo.proto";
// import "gogoproto/gogo.proto";
option (gogoproto.marshaler_all) = true;
option (gogoproto.sizer_all) = true;
option (gogoproto.unmarshaler_all) = true;
option (gogoproto.goproto_getters_all) = false;
option (gogoproto.goproto_enum_prefix_all) = false;
// option (gogoproto.marshaler_all) = true;
// option (gogoproto.sizer_all) = true;
// option (gogoproto.unmarshaler_all) = true;
// option (gogoproto.goproto_getters_all) = false;
// option (gogoproto.goproto_enum_prefix_all) = false;
message KeyValue {
// key is the key in bytes. An empty key is not allowed.

View File

@ -4,6 +4,7 @@ package etcdserverpb;
import "gogoproto/gogo.proto";
import "kv.proto";
import "auth.proto";
import "txn.proto";
// for grpc-gateway
import "google/api/annotations.proto";
@ -532,46 +533,6 @@ message ResponseOp {
}
}
message Compare {
enum CompareResult {
EQUAL = 0;
GREATER = 1;
LESS = 2;
NOT_EQUAL = 3;
}
enum CompareTarget {
VERSION = 0;
CREATE = 1;
MOD = 2;
VALUE = 3;
LEASE = 4;
}
// result is logical comparison operation for this comparison.
CompareResult result = 1;
// target is the key-value field to inspect for the comparison.
CompareTarget target = 2;
// key is the subject key for the comparison operation.
bytes key = 3;
oneof target_union {
// version is the version of the given key
int64 version = 4;
// create_revision is the creation revision of the given key
int64 create_revision = 5;
// mod_revision is the last modified revision of the given key.
int64 mod_revision = 6;
// value is the value of the given key, in bytes.
bytes value = 7;
// lease is the lease id of the given key.
int64 lease = 8;
// leave room for more target_union field tags, jump to 64
}
// range_end compares the given target to all keys in the range [key, range_end).
// See RangeRequest for more details on key ranges.
bytes range_end = 64;
// TODO: fill out with most of the rest of RangeRequest fields when needed.
}
// From google paxosdb paper:
// Our implementation hinges around a powerful primitive which we call MultiOp. All other database
// operations except for iteration are implemented as a single call to MultiOp. A MultiOp is applied atomically

42
proto/txn.proto Normal file
View File

@ -0,0 +1,42 @@
syntax = "proto3";
package etcdserverpb;
message Compare {
enum CompareResult {
EQUAL = 0;
GREATER = 1;
LESS = 2;
NOT_EQUAL = 3;
}
enum CompareTarget {
VERSION = 0;
CREATE = 1;
MOD = 2;
VALUE = 3;
LEASE = 4;
}
// result is logical comparison operation for this comparison.
CompareResult result = 1;
// target is the key-value field to inspect for the comparison.
CompareTarget target = 2;
// key is the subject key for the comparison operation.
bytes key = 3;
oneof target_union {
// version is the version of the given key
int64 version = 4;
// create_revision is the creation revision of the given key
int64 create_revision = 5;
// mod_revision is the last modified revision of the given key.
int64 mod_revision = 6;
// value is the value of the given key, in bytes.
bytes value = 7;
// lease is the lease id of the given key.
int64 lease = 8;
// leave room for more target_union field tags, jump to 64
}
// range_end compares the given target to all keys in the range [key, range_end).
// See RangeRequest for more details on key ranges.
bytes range_end = 64;
// TODO: fill out with most of the rest of RangeRequest fields when needed.
}

View File

@ -8,14 +8,9 @@ add_library(etcd-cpp-api ${CPP_CLIENT_SRC} ${PROTOBUF_GENERATES})
add_dependencies(etcd-cpp-api protobuf_generates)
set_property(TARGET etcd-cpp-api PROPERTY CXX_STANDARD 11)
if(cpprestsdk_FOUND)
target_link_libraries(etcd-cpp-api PUBLIC cpprestsdk::cpprest)
else()
target_link_libraries(etcd-cpp-api PUBLIC ${CPPREST_LIB})
endif()
target_link_libraries(etcd-cpp-api PUBLIC
${Boost_LIBRARIES}
${CPPREST_LIB}
${PROTOBUF_LIBRARIES}
${OPENSSL_LIBRARIES}
${GRPC_LIBRARIES})
@ -23,8 +18,5 @@ target_link_libraries(etcd-cpp-api PUBLIC
target_include_directories(etcd-cpp-api PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/../proto/gen)
target_include_directories(etcd-cpp-api PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/../proto/gen/proto)
if (WIN32 AND BUILD_SHARED_LIBS)
install (TARGETS etcd-cpp-api RUNTIME DESTINATION bin)
else()
install (TARGETS etcd-cpp-api LIBRARY DESTINATION lib)
endif()
install(TARGETS etcd-cpp-api
EXPORT etcd-targets)

View File

@ -10,8 +10,16 @@
#include <sys/socket.h>
#endif
#include <iostream>
#include <limits>
#include <memory>
#include <boost/algorithm/string.hpp>
#include <grpc++/grpc++.h>
#include "proto/rpc.grpc.pb.h"
#include "proto/v3lock.grpc.pb.h"
#include "etcd/Client.hpp"
#include "etcd/KeepAlive.hpp"
#include "etcd/v3/action_constants.hpp"
@ -22,7 +30,6 @@
#include "etcd/v3/AsyncDeleteRangeResponse.hpp"
#include "etcd/v3/AsyncLockResponse.hpp"
#include "etcd/v3/Transaction.hpp"
#include <iostream>
#include "etcd/v3/AsyncSetAction.hpp"
#include "etcd/v3/AsyncCompareAndSwapAction.hpp"
@ -35,8 +42,6 @@
#include "etcd/v3/AsyncLockAction.hpp"
#include "etcd/v3/AsyncTxnAction.hpp"
#include <boost/algorithm/string.hpp>
using grpc::Channel;
namespace etcd {
@ -109,7 +114,7 @@ const bool authenticate(std::shared_ptr<grpc::Channel> const &channel,
std::string const &password,
std::string &token_or_message) {
// run a round of auth
auto auth_stub = Auth::NewStub(channel);
auto auth_stub = etcdserverpb::Auth::NewStub(channel);
ClientContext context;
etcdserverpb::AuthenticateRequest auth_request;
etcdserverpb::AuthenticateResponse auth_response;
@ -128,6 +133,19 @@ const bool authenticate(std::shared_ptr<grpc::Channel> const &channel,
}
}
struct etcd::Client::EtcdServerStubs {
std::unique_ptr<etcdserverpb::KV::Stub> kvServiceStub;
std::unique_ptr<etcdserverpb::Watch::Stub> watchServiceStub;
std::unique_ptr<etcdserverpb::Lease::Stub> leaseServiceStub;
std::unique_ptr<v3lockpb::Lock::Stub> lockServiceStub;
};
void etcd::Client::EtcdServerStubsDeleter::operator()(etcd::Client::EtcdServerStubs *stubs) {
if (stubs) {
delete stubs;
}
}
etcd::Client::Client(std::string const & address,
std::string const & load_balancer)
{
@ -141,10 +159,11 @@ etcd::Client::Client(std::string const & address,
this->channel = grpc::CreateCustomChannel(addresses, creds, grpc_args);
// create stubs
kvServiceStub = KV::NewStub(this->channel);
watchServiceStub= Watch::NewStub(this->channel);
leaseServiceStub= Lease::NewStub(this->channel);
lockServiceStub = Lock::NewStub(this->channel);
stubs.reset(new EtcdServerStubs{});
stubs->kvServiceStub = KV::NewStub(this->channel);
stubs->watchServiceStub= Watch::NewStub(this->channel);
stubs->leaseServiceStub= Lease::NewStub(this->channel);
stubs->lockServiceStub = Lock::NewStub(this->channel);
}
etcd::Client::Client(std::string const & address,
@ -169,10 +188,11 @@ etcd::Client::Client(std::string const & address,
this->auth_token = token_or_message;
// setup stubs
kvServiceStub = KV::NewStub(this->channel);
watchServiceStub= Watch::NewStub(this->channel);
leaseServiceStub= Lease::NewStub(this->channel);
lockServiceStub = Lock::NewStub(this->channel);
stubs.reset(new EtcdServerStubs{});
stubs->kvServiceStub = KV::NewStub(this->channel);
stubs->watchServiceStub= Watch::NewStub(this->channel);
stubs->leaseServiceStub= Lease::NewStub(this->channel);
stubs->lockServiceStub = Lock::NewStub(this->channel);
}
pplx::task<etcd::Response> etcd::Client::get(std::string const & key)
@ -181,7 +201,7 @@ pplx::task<etcd::Response> etcd::Client::get(std::string const & key)
params.auth_token.assign(this->auth_token);
params.key.assign(key);
params.withPrefix = false;
params.kv_stub = kvServiceStub .get();
params.kv_stub = stubs->kvServiceStub.get();
std::shared_ptr<etcdv3::AsyncGetAction> call(new etcdv3::AsyncGetAction(params));
return Response::create(call);
}
@ -192,7 +212,7 @@ pplx::task<etcd::Response> etcd::Client::set(std::string const & key, std::strin
params.auth_token.assign(this->auth_token);
params.key.assign(key);
params.value.assign(value);
params.kv_stub = kvServiceStub .get();
params.kv_stub = stubs->kvServiceStub.get();
if(ttl > 0)
{
@ -221,7 +241,7 @@ pplx::task<etcd::Response> etcd::Client::set(std::string const & key, std::strin
params.key.assign(key);
params.value.assign(value);
params.lease_id = leaseid;
params.kv_stub = kvServiceStub .get();
params.kv_stub = stubs->kvServiceStub.get();
std::shared_ptr<etcdv3::AsyncSetAction> call(new etcdv3::AsyncSetAction(params));
return Response::create(call);
}
@ -233,7 +253,7 @@ pplx::task<etcd::Response> etcd::Client::add(std::string const & key, std::strin
params.auth_token.assign(this->auth_token);
params.key.assign(key);
params.value.assign(value);
params.kv_stub = kvServiceStub .get();
params.kv_stub = stubs->kvServiceStub.get();
if(ttl > 0)
{
@ -261,7 +281,7 @@ pplx::task<etcd::Response> etcd::Client::add(std::string const & key, std::strin
params.key.assign(key);
params.value.assign(value);
params.lease_id = leaseid;
params.kv_stub = kvServiceStub .get();
params.kv_stub = stubs->kvServiceStub.get();
std::shared_ptr<etcdv3::AsyncSetAction> call(new etcdv3::AsyncSetAction(params,true));
return Response::create(call);
}
@ -273,7 +293,7 @@ pplx::task<etcd::Response> etcd::Client::modify(std::string const & key, std::st
params.auth_token.assign(this->auth_token);
params.key.assign(key);
params.value.assign(value);
params.kv_stub = kvServiceStub .get();
params.kv_stub = stubs->kvServiceStub.get();
if(ttl > 0)
{
@ -301,7 +321,7 @@ pplx::task<etcd::Response> etcd::Client::modify(std::string const & key, std::st
params.key.assign(key);
params.value.assign(value);
params.lease_id = leaseid;
params.kv_stub = kvServiceStub .get();
params.kv_stub = stubs->kvServiceStub.get();
std::shared_ptr<etcdv3::AsyncUpdateAction> call(new etcdv3::AsyncUpdateAction(params));
return Response::create(call);
}
@ -314,7 +334,7 @@ pplx::task<etcd::Response> etcd::Client::modify_if(std::string const & key, std:
params.key.assign(key);
params.value.assign(value);
params.old_value.assign(old_value);
params.kv_stub = kvServiceStub .get();
params.kv_stub = stubs->kvServiceStub.get();
if(ttl > 0)
{
@ -343,7 +363,7 @@ pplx::task<etcd::Response> etcd::Client::modify_if(std::string const & key, std:
params.value.assign(value);
params.old_value.assign(old_value);
params.lease_id = leaseid;
params.kv_stub = kvServiceStub .get();
params.kv_stub = stubs->kvServiceStub.get();
std::shared_ptr<etcdv3::AsyncCompareAndSwapAction> call(new etcdv3::AsyncCompareAndSwapAction(params,etcdv3::Atomicity_Type::PREV_VALUE));
return Response::create(call);
}
@ -355,7 +375,7 @@ pplx::task<etcd::Response> etcd::Client::modify_if(std::string const & key, std:
params.key.assign(key);
params.value.assign(value);
params.old_revision = old_index;
params.kv_stub = kvServiceStub.get();
params.kv_stub = stubs->kvServiceStub.get();
if(ttl > 0)
{
auto res = leasegrant(ttl).get();
@ -383,7 +403,7 @@ pplx::task<etcd::Response> etcd::Client::modify_if(std::string const & key, std:
params.value.assign(value);
params.lease_id = leaseid;
params.old_revision = old_index;
params.kv_stub = kvServiceStub .get();
params.kv_stub = stubs->kvServiceStub.get();
std::shared_ptr<etcdv3::AsyncCompareAndSwapAction> call(new etcdv3::AsyncCompareAndSwapAction(params,etcdv3::Atomicity_Type::PREV_INDEX));
return Response::create(call);
}
@ -395,7 +415,7 @@ pplx::task<etcd::Response> etcd::Client::rm(std::string const & key)
params.auth_token.assign(this->auth_token);
params.key.assign(key);
params.withPrefix = false;
params.kv_stub = kvServiceStub .get();
params.kv_stub = stubs->kvServiceStub.get();
std::shared_ptr<etcdv3::AsyncDeleteAction> call(new etcdv3::AsyncDeleteAction(params));
return Response::create(call);
}
@ -407,7 +427,7 @@ pplx::task<etcd::Response> etcd::Client::rm_if(std::string const & key, std::str
params.auth_token.assign(this->auth_token);
params.key.assign(key);
params.old_value.assign(old_value);
params.kv_stub = kvServiceStub .get();
params.kv_stub = stubs->kvServiceStub.get();
std::shared_ptr<etcdv3::AsyncCompareAndDeleteAction> call(new etcdv3::AsyncCompareAndDeleteAction(params,etcdv3::Atomicity_Type::PREV_VALUE));
return Response::create(call);
}
@ -418,7 +438,7 @@ pplx::task<etcd::Response> etcd::Client::rm_if(std::string const & key, int old_
params.auth_token.assign(this->auth_token);
params.key.assign(key);
params.old_revision = old_index;
params.kv_stub = kvServiceStub .get();
params.kv_stub = stubs->kvServiceStub.get();
std::shared_ptr<etcdv3::AsyncCompareAndDeleteAction> call(new etcdv3::AsyncCompareAndDeleteAction(params, etcdv3::Atomicity_Type::PREV_INDEX));;
return Response::create(call);
@ -430,7 +450,7 @@ pplx::task<etcd::Response> etcd::Client::rmdir(std::string const & key, bool rec
params.auth_token.assign(this->auth_token);
params.key.assign(key);
params.withPrefix = recursive;
params.kv_stub = kvServiceStub .get();
params.kv_stub = stubs->kvServiceStub.get();
std::shared_ptr<etcdv3::AsyncDeleteAction> call(new etcdv3::AsyncDeleteAction(params));
return Response::create(call);
}
@ -442,7 +462,7 @@ pplx::task<etcd::Response> etcd::Client::ls(std::string const & key)
params.key.assign(key);
params.withPrefix = true;
params.limit = 0; // default no limit.
params.kv_stub = kvServiceStub .get();
params.kv_stub = stubs->kvServiceStub.get();
std::shared_ptr<etcdv3::AsyncGetAction> call(new etcdv3::AsyncGetAction(params));
return Response::create(call);
}
@ -454,7 +474,7 @@ pplx::task<etcd::Response> etcd::Client::ls(std::string const & key, size_t cons
params.key.assign(key);
params.withPrefix = true;
params.limit = limit;
params.kv_stub = kvServiceStub .get();
params.kv_stub = stubs->kvServiceStub.get();
std::shared_ptr<etcdv3::AsyncGetAction> call(new etcdv3::AsyncGetAction(params));
return Response::create(call);
}
@ -465,7 +485,7 @@ pplx::task<etcd::Response> etcd::Client::watch(std::string const & key, bool rec
params.auth_token.assign(this->auth_token);
params.key.assign(key);
params.withPrefix = recursive;
params.watch_stub = watchServiceStub.get();
params.watch_stub = stubs->watchServiceStub.get();
std::shared_ptr<etcdv3::AsyncWatchAction> call(new etcdv3::AsyncWatchAction(params));
return Response::create(call);
}
@ -477,7 +497,7 @@ pplx::task<etcd::Response> etcd::Client::watch(std::string const & key, int from
params.key.assign(key);
params.withPrefix = recursive;
params.revision = fromIndex;
params.watch_stub = watchServiceStub.get();
params.watch_stub = stubs->watchServiceStub.get();
std::shared_ptr<etcdv3::AsyncWatchAction> call(new etcdv3::AsyncWatchAction(params));
return Response::create(call);
}
@ -487,7 +507,7 @@ pplx::task<etcd::Response> etcd::Client::leasegrant(int ttl)
etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.ttl = ttl;
params.lease_stub = leaseServiceStub.get();
params.lease_stub = stubs->leaseServiceStub.get();
std::shared_ptr<etcdv3::AsyncLeaseGrantAction> call(new etcdv3::AsyncLeaseGrantAction(params));
return Response::create(call);
}
@ -497,7 +517,7 @@ pplx::task<etcd::Response> etcd::Client::leaserevoke(int64_t lease_id)
etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.lease_id = lease_id;
params.lease_stub = leaseServiceStub.get();
params.lease_stub = stubs->leaseServiceStub.get();
std::shared_ptr<etcdv3::AsyncLeaseRevokeAction> call(new etcdv3::AsyncLeaseRevokeAction(params));
return Response::create(call);
}
@ -507,7 +527,7 @@ pplx::task<etcd::Response> etcd::Client::leasetimetolive(int64_t lease_id)
etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.lease_id = lease_id;
params.lease_stub = leaseServiceStub.get();
params.lease_stub = stubs->leaseServiceStub.get();
std::shared_ptr<etcdv3::AsyncLeaseTimeToLiveAction> call(new etcdv3::AsyncLeaseTimeToLiveAction(params));
return Response::create(call);
}
@ -530,7 +550,7 @@ pplx::task<etcd::Response> etcd::Client::lock(std::string const &key) {
}
params.key = key;
params.lease_id = lease_id;
params.lock_stub = lockServiceStub.get();
params.lock_stub = stubs->lockServiceStub.get();
std::shared_ptr<etcdv3::AsyncLockAction> call(new etcdv3::AsyncLockAction(params));
return Response::create(call).then(
[this, lease_id](pplx::task<etcd::Response> const &resp_task) -> etcd::Response {
@ -554,7 +574,7 @@ pplx::task<etcd::Response> etcd::Client::lock(std::string const &key,
params.auth_token.assign(this->auth_token);
params.key = key;
params.lease_id = lease_id;
params.lock_stub = lockServiceStub.get();
params.lock_stub = stubs->lockServiceStub.get();
std::shared_ptr<etcdv3::AsyncLockAction> call(new etcdv3::AsyncLockAction(params));
return Response::create(call);
}
@ -577,7 +597,7 @@ pplx::task<etcd::Response> etcd::Client::unlock(std::string const &lock_key) {
etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.key = lock_key;
params.lock_stub = lockServiceStub.get();
params.lock_stub = stubs->lockServiceStub.get();
std::shared_ptr<etcdv3::AsyncUnlockAction> call(new etcdv3::AsyncUnlockAction(params));
return Response::create(call);
}
@ -585,7 +605,7 @@ pplx::task<etcd::Response> etcd::Client::unlock(std::string const &lock_key) {
pplx::task<etcd::Response> etcd::Client::txn(etcdv3::Transaction const &txn) {
etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.kv_stub = kvServiceStub .get();
params.kv_stub = stubs->kvServiceStub.get();
std::shared_ptr<etcdv3::AsyncTxnAction> call(new etcdv3::AsyncTxnAction(params, txn));
return Response::create(call);
}

View File

@ -3,16 +3,35 @@
#include "etcd/KeepAlive.hpp"
#include "etcd/v3/AsyncLeaseAction.hpp"
#include <grpc++/grpc++.h>
#include "proto/rpc.grpc.pb.h"
namespace etcdv3 {
class AsyncLeaseKeepAliveAction;
}
struct etcd::KeepAlive::EtcdServerStubs {
std::unique_ptr<etcdserverpb::Lease::Stub> leaseServiceStub;
std::unique_ptr<etcdv3::AsyncLeaseKeepAliveAction> call;
};
void etcd::KeepAlive::EtcdServerStubsDeleter::operator()(etcd::KeepAlive::EtcdServerStubs *stubs) {
if (stubs) {
delete stubs;
}
}
etcd::KeepAlive::KeepAlive(Client const &client, int ttl, int64_t lease_id):
ttl(ttl), lease_id(lease_id), continue_next(true) {
leaseServiceStub= Lease::NewStub(client.channel);
stubs.reset(new EtcdServerStubs{});
stubs->leaseServiceStub = Lease::NewStub(client.channel);
etcdv3::ActionParameters params;
params.auth_token.assign(client.auth_token);
params.lease_id = this->lease_id;
params.lease_stub = leaseServiceStub.get();
params.lease_stub = stubs->leaseServiceStub.get();
call.reset(new etcdv3::AsyncLeaseKeepAliveAction(params));
stubs->call.reset(new etcdv3::AsyncLeaseKeepAliveAction(params));
currentTask = pplx::task<void>([this]() {
// start refresh
this->refresh();
@ -49,7 +68,7 @@ void etcd::KeepAlive::Cancel()
std::cout.flags(os_flags);
}
#endif
call->CancelKeepAlive();
stubs->call->CancelKeepAlive();
if (keepalive_timer_) {
keepalive_timer_->cancel();
}
@ -79,7 +98,7 @@ void etcd::KeepAlive::refresh()
std::cerr << "keepalive timer error: " << error << ", " << error.message() << std::endl;
#endif
} else {
this->call->Refresh();
this->stubs->call->Refresh();
// trigger the next round;
this->refresh();
}

View File

@ -1,4 +1,7 @@
#include <iomanip>
#include <cpprest/http_client.h>
#include "etcd/Value.hpp"
#include "etcd/v3/KeyValue.hpp"
@ -57,5 +60,3 @@ int64_t etcd::Value::lease() const
{
return leaseId;
}

View File

@ -1,6 +1,17 @@
#include "etcd/Watcher.hpp"
#include "etcd/v3/AsyncWatchAction.hpp"
struct etcd::Watcher::EtcdServerStubs {
std::unique_ptr<Watch::Stub> watchServiceStub;
std::unique_ptr<etcdv3::AsyncWatchAction> call;
};
void etcd::Watcher::EtcdServerStubsDeleter::operator()(etcd::Watcher::EtcdServerStubs *stubs) {
if (stubs) {
delete stubs;
}
}
etcd::Watcher::Watcher(Client const &client, std::string const & key,
std::function<void(Response)> callback, bool recursive):
Watcher(client, key, -1, callback, recursive) {
@ -9,7 +20,8 @@ etcd::Watcher::Watcher(Client const &client, std::string const & key,
etcd::Watcher::Watcher(Client const &client, std::string const & key, int fromIndex,
std::function<void(Response)> callback, bool recursive):
fromIndex(fromIndex), recursive(recursive) {
watchServiceStub= Watch::NewStub(client.channel);
stubs.reset(new EtcdServerStubs{});
stubs->watchServiceStub = Watch::NewStub(client.channel);
doWatch(key, client.auth_token, callback);
}
@ -39,27 +51,27 @@ etcd::Watcher::Watcher(std::string const & address,
etcd::Watcher::~Watcher()
{
call->CancelWatch();
stubs->call->CancelWatch();
currentTask.wait();
}
bool etcd::Watcher::Wait()
{
currentTask.wait();
return call->Cancelled();
return stubs->call->Cancelled();
}
void etcd::Watcher::Wait(std::function<void(bool)> callback)
{
currentTask.then([this, callback](pplx::task<void> const & resp_task) {
resp_task.wait();
callback(this->call->Cancelled());
callback(this->stubs->call->Cancelled());
});
}
void etcd::Watcher::Cancel()
{
call->CancelWatch();
stubs->call->CancelWatch();
this->Wait();
}
@ -74,12 +86,12 @@ void etcd::Watcher::doWatch(std::string const & key,
params.revision = fromIndex;
}
params.withPrefix = recursive;
params.watch_stub = watchServiceStub.get();
params.watch_stub = stubs->watchServiceStub.get();
call.reset(new etcdv3::AsyncWatchAction(params));
stubs->call.reset(new etcdv3::AsyncWatchAction(params));
currentTask = pplx::task<void>([this, callback]()
{
return call->waitForResponse(callback);
return stubs->call->waitForResponse(callback);
});
}

View File

@ -27,7 +27,7 @@ etcdv3::AsyncCompareAndDeleteAction::AsyncCompareAndDeleteAction(etcdv3::ActionP
transaction.setup_compare_and_delete_operation(parameters.key);
transaction.setup_basic_failure_operation(parameters.key);
response_reader = parameters.kv_stub->AsyncTxn(&context, transaction.txn_request, &cq_);
response_reader = parameters.kv_stub->AsyncTxn(&context, *transaction.txn_request, &cq_);
response_reader->Finish(&reply, &status, (void*)this);
}

View File

@ -27,7 +27,7 @@ etcdv3::AsyncCompareAndSwapAction::AsyncCompareAndSwapAction(etcdv3::ActionParam
transaction.setup_basic_failure_operation(parameters.key);
transaction.setup_compare_and_swap_sequence(parameters.value, parameters.lease_id);
response_reader = parameters.kv_stub->AsyncTxn(&context, transaction.txn_request, &cq_);
response_reader = parameters.kv_stub->AsyncTxn(&context, *transaction.txn_request, &cq_);
response_reader->Finish(&reply, &status, (void*)this);
}

View File

@ -22,7 +22,7 @@ etcdv3::AsyncSetAction::AsyncSetAction(etcdv3::ActionParameters param, bool crea
{
transaction.setup_set_failure_operation(parameters.key, parameters.value, parameters.lease_id);
}
response_reader = parameters.kv_stub->AsyncTxn(&context, transaction.txn_request, &cq_);
response_reader = parameters.kv_stub->AsyncTxn(&context, *transaction.txn_request, &cq_);
response_reader->Finish(&reply, &status, (void*)this);
}

View File

@ -6,7 +6,7 @@
etcdv3::AsyncTxnAction::AsyncTxnAction(etcdv3::ActionParameters param, etcdv3::Transaction const &tx)
: etcdv3::Action(param)
{
response_reader = parameters.kv_stub->AsyncTxn(&context, tx.txn_request, &cq_);
response_reader = parameters.kv_stub->AsyncTxn(&context, *tx.txn_request, &cq_);
response_reader->Finish(&reply, &status, (void *)this);
}

View File

@ -19,7 +19,7 @@ etcdv3::AsyncUpdateAction::AsyncUpdateAction(etcdv3::ActionParameters param)
transaction.setup_compare_and_swap_sequence(parameters.value, parameters.lease_id);
response_reader = parameters.kv_stub->AsyncTxn(&context, transaction.txn_request, &cq_);
response_reader = parameters.kv_stub->AsyncTxn(&context, *transaction.txn_request, &cq_);
response_reader->Finish(&reply, &status, (void*)this);
}

View File

@ -1,5 +1,7 @@
#include "etcd/v3/Transaction.hpp"
#include "proto/rpc.grpc.pb.h"
using etcdserverpb::Compare;
using etcdserverpb::RangeRequest;
using etcdserverpb::PutRequest;
@ -7,13 +9,15 @@ using etcdserverpb::RequestOp;
using etcdserverpb::DeleteRangeRequest;
etcdv3::Transaction::Transaction() {
txn_request.reset(new etcdserverpb::TxnRequest{});
}
etcdv3::Transaction::Transaction(const std::string& key) : key(key) {
txn_request.reset(new etcdserverpb::TxnRequest{});
}
void etcdv3::Transaction::init_compare(Compare::CompareResult result, Compare::CompareTarget target){
Compare* compare = txn_request.add_compare();
Compare* compare = txn_request->add_compare();
compare->set_result(result);
compare->set_target(target);
compare->set_key(key);
@ -22,7 +26,7 @@ void etcdv3::Transaction::init_compare(Compare::CompareResult result, Compare::C
}
void etcdv3::Transaction::init_compare(std::string const& old_value, Compare::CompareResult result, Compare::CompareTarget target){
Compare* compare = txn_request.add_compare();
Compare* compare = txn_request->add_compare();
compare->set_result(result);
compare->set_target(target);
compare->set_key(key);
@ -31,7 +35,7 @@ void etcdv3::Transaction::init_compare(std::string const& old_value, Compare::Co
}
void etcdv3::Transaction::init_compare(int old_index, Compare::CompareResult result, Compare::CompareTarget target){
Compare* compare = txn_request.add_compare();
Compare* compare = txn_request->add_compare();
compare->set_result(result);
compare->set_target(target);
compare->set_key(key);
@ -45,7 +49,7 @@ void etcdv3::Transaction::init_compare(int old_index, Compare::CompareResult res
void etcdv3::Transaction::setup_basic_failure_operation(std::string const& key) {
std::unique_ptr<RangeRequest> get_request(new RangeRequest());
get_request->set_key(key);
RequestOp* req_failure = txn_request.add_failure();
RequestOp* req_failure = txn_request->add_failure();
req_failure->set_allocated_request_range(get_request.release());
}
@ -58,12 +62,12 @@ void etcdv3::Transaction::setup_set_failure_operation(std::string const &key, st
put_request->set_value(value);
put_request->set_prev_kv(true);
put_request->set_lease(leaseid);
RequestOp* req_failure = txn_request.add_failure();
RequestOp* req_failure = txn_request->add_failure();
req_failure->set_allocated_request_put(put_request.release());
std::unique_ptr<RangeRequest> get_request(new RangeRequest());
get_request->set_key(key);
req_failure = txn_request.add_failure();
req_failure = txn_request->add_failure();
req_failure->set_allocated_request_range(get_request.release());
}
@ -76,12 +80,12 @@ void etcdv3::Transaction::setup_basic_create_sequence(std::string const& key, st
put_request->set_value(value);
put_request->set_prev_kv(true);
put_request->set_lease(leaseid);
RequestOp* req_success = txn_request.add_success();
RequestOp* req_success = txn_request->add_success();
req_success->set_allocated_request_put(put_request.release());
std::unique_ptr<RangeRequest> get_request(new RangeRequest());
get_request->set_key(key);
req_success = txn_request.add_success();
req_success = txn_request->add_success();
req_success->set_allocated_request_range(get_request.release());
}
@ -94,12 +98,12 @@ void etcdv3::Transaction::setup_compare_and_swap_sequence(std::string const& val
put_request->set_value(value);
put_request->set_prev_kv(true);
put_request->set_lease(leaseid);
RequestOp* req_success = txn_request.add_success();
RequestOp* req_success = txn_request->add_success();
req_success->set_allocated_request_put(put_request.release());
std::unique_ptr<RangeRequest> get_request(new RangeRequest());
get_request->set_key(key);
req_success = txn_request.add_success();
req_success = txn_request->add_success();
req_success->set_allocated_request_range(get_request.release());
}
@ -115,7 +119,7 @@ void etcdv3::Transaction::setup_delete_sequence(std::string const &key, std::str
del_request->set_range_end(range_end);
}
RequestOp* req_success = txn_request.add_success();
RequestOp* req_success = txn_request->add_success();
req_success->set_allocated_request_delete_range(del_request.release());
}
@ -133,7 +137,7 @@ void etcdv3::Transaction::setup_delete_failure_operation(std::string const &key,
get_request->set_sort_target(RangeRequest::SortTarget::RangeRequest_SortTarget_KEY);
get_request->set_sort_order(RangeRequest::SortOrder::RangeRequest_SortOrder_ASCEND);
}
RequestOp* req_failure = txn_request.add_failure();
RequestOp* req_failure = txn_request->add_failure();
req_failure->set_allocated_request_range(get_request.release());
del_request.reset(new DeleteRangeRequest());
@ -143,7 +147,7 @@ void etcdv3::Transaction::setup_delete_failure_operation(std::string const &key,
del_request->set_range_end(range_end);
}
req_failure = txn_request.add_failure();
req_failure = txn_request->add_failure();
req_failure->set_allocated_request_delete_range(del_request.release());
}
@ -151,7 +155,7 @@ void etcdv3::Transaction::setup_compare_and_delete_operation(std::string const&
std::unique_ptr<DeleteRangeRequest> del_request(new DeleteRangeRequest());
del_request->set_key(key);
del_request->set_prev_kv(true);
RequestOp* req_success = txn_request.add_success();
RequestOp* req_success = txn_request->add_success();
req_success->set_allocated_request_delete_range(del_request.release());
}
@ -160,7 +164,7 @@ void etcdv3::Transaction::setup_put(std::string const &key, std::string const &v
put_request->set_key(key);
put_request->set_value(value);
put_request->set_prev_kv(false);
RequestOp* req_success = txn_request.add_success();
RequestOp* req_success = txn_request->add_success();
req_success->set_allocated_request_put(put_request.release());
}
@ -169,7 +173,7 @@ void etcdv3::Transaction::setup_delete(std::string const &key) {
del_request->set_key(key);
del_request->set_prev_kv(false);
RequestOp* req_success = txn_request.add_success();
RequestOp* req_success = txn_request->add_success();
req_success->set_allocated_request_delete_range(del_request.release());
}