Re-org v3 related files to make it a submodule.

This commit is contained in:
Tao He 2020-03-18 17:49:55 +08:00
parent 566f91970a
commit b9eaa03c8f
51 changed files with 375 additions and 153 deletions

View File

@ -13,23 +13,22 @@ find_package(Boost REQUIRED COMPONENTS system thread locale random)
find_package(OpenSSL REQUIRED)
find_package(Protobuf REQUIRED)
set(GRPC_LIBRARY_PATH /usr/lib
/usr/lib64
/usr/local/lib
/usr/local/lib64
/usr/local/opt/grpc)
find_library(GPR_LIBRARY NAMES gpr PATHS ${GRPC_LIBRARY_PATH})
find_library(GRPC_LIBRARY NAMES grpc PATHS ${GRPC_LIBRARY_PATH})
find_library(GRPC++_LIBRARY NAMES grpc++ PATHS ${GRPC_LIBRARY_PATH})
set(GRPC_LIBRARIES ${GPR_LIBRARY} ${GRPC_LIBRARY} ${GRPC++_LIBRARY})
include(${CMAKE_CURRENT_SOURCE_DIR}/cmake/FindGRPC.cmake)
set(GRPC_LIBRARIES ${GPR_LIBRARY} ${GRPC_LIBRARY} ${GRPC_GRPC++_LIBRARY})
file(GLOB_RECURSE PROTO_SRC RELATIVE "${CMAKE_SOURCE_DIR}/proto" "proto/*.proto")
file(GLOB_RECURSE PROTO_SRC RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}/proto" "proto/*.proto")
add_custom_target(proto-gen
COMMAND protoc -I . --cpp_out=. ${PROTO_SRC}
COMMAND protoc -I . --grpc_out=. --plugin=protoc-gen-grpc=`which grpc_cpp_plugin` ./rpc.proto ./v3lock.proto
COMMENT "Generate protobuf stuffs"
WORKING_DIRECTORY ${CMAKE_SOURCE_DIR}/proto)
# Generate protobuf definitions
execute_process(COMMAND ${Protobuf_PROTOC_EXECUTABLE}
-I ${CMAKE_CURRENT_SOURCE_DIR}/proto
--cpp_out=${CMAKE_CURRENT_SOURCE_DIR}/proto
${PROTO_SRC})
execute_process(COMMAND ${Protobuf_PROTOC_EXECUTABLE}
-I ${CMAKE_CURRENT_SOURCE_DIR}/proto
--grpc_out=${CMAKE_CURRENT_SOURCE_DIR}/proto
--plugin=protoc-gen-grpc=${GRPC_CPP_PLUGIN}
${CMAKE_CURRENT_SOURCE_DIR}/proto/rpc.proto
${CMAKE_CURRENT_SOURCE_DIR}/proto/v3lock.proto)
enable_testing()
include_directories(SYSTEM ${CPPREST_INCLUDE_DIR}
@ -40,4 +39,7 @@ include_directories(${CMAKE_CURRENT_SOURCE_DIR})
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wpedantic -Werror -Wno-string-compare -std=c++11")
add_subdirectory(src)
if (BUILD_TESTS)
add_subdirectory(tst)
endif ()

143
cmake/FindGRPC.cmake Normal file
View File

@ -0,0 +1,143 @@
# This file is taken from
#
# https://github.com/IvanSafonov/grpc-cmake-example
#
# The original repository is open-sourced with the MIT license.
#
#
# Locate and configure the gRPC library
#
# Adds the following targets:
#
# gRPC::gpr - GPR library
# gRPC::grpc - gRPC library
# gRPC::grpc++ - gRPC C++ library
# gRPC::grpc++_reflection - gRPC C++ reflection library
# gRPC::grpc_cpp_plugin - C++ generator plugin for Protocol Buffers
#
#
# Generates C++ sources from the .proto files
#
# grpc_generate_cpp (<SRCS> <HDRS> <DEST> [<ARGN>...])
#
# SRCS - variable to define with autogenerated source files
# HDRS - variable to define with autogenerated header files
# DEST - directory where the source files will be created
# ARGN - .proto files
#
function(GRPC_GENERATE_CPP SRCS HDRS DEST)
if(NOT ARGN)
message(SEND_ERROR "Error: GRPC_GENERATE_CPP() called without any proto files")
return()
endif()
if(GRPC_GENERATE_CPP_APPEND_PATH)
# Create an include path for each file specified
foreach(FIL ${ARGN})
get_filename_component(ABS_FIL ${FIL} ABSOLUTE)
get_filename_component(ABS_PATH ${ABS_FIL} PATH)
list(FIND _protobuf_include_path ${ABS_PATH} _contains_already)
if(${_contains_already} EQUAL -1)
list(APPEND _protobuf_include_path -I ${ABS_PATH})
endif()
endforeach()
else()
set(_protobuf_include_path -I ${CMAKE_CURRENT_SOURCE_DIR})
endif()
if(DEFINED PROTOBUF_IMPORT_DIRS)
foreach(DIR ${PROTOBUF_IMPORT_DIRS})
get_filename_component(ABS_PATH ${DIR} ABSOLUTE)
list(FIND _protobuf_include_path ${ABS_PATH} _contains_already)
if(${_contains_already} EQUAL -1)
list(APPEND _protobuf_include_path -I ${ABS_PATH})
endif()
endforeach()
endif()
set(${SRCS})
set(${HDRS})
foreach(FIL ${ARGN})
get_filename_component(ABS_FIL ${FIL} ABSOLUTE)
get_filename_component(FIL_WE ${FIL} NAME_WE)
list(APPEND ${SRCS} "${DEST}/${FIL_WE}.grpc.pb.cc")
list(APPEND ${HDRS} "${DEST}/${FIL_WE}.grpc.pb.h")
add_custom_command(
OUTPUT "${DEST}/${FIL_WE}.grpc.pb.cc"
"${DEST}/${FIL_WE}.grpc.pb.h"
COMMAND protobuf::protoc
ARGS --grpc_out ${DEST} ${_protobuf_include_path} --plugin=protoc-gen-grpc=${GRPC_CPP_PLUGIN} ${ABS_FIL}
DEPENDS ${ABS_FIL} protobuf::protoc gRPC::grpc_cpp_plugin
COMMENT "Running C++ gRPC compiler on ${FIL}"
VERBATIM )
endforeach()
set_source_files_properties(${${SRCS}} ${${HDRS}} PROPERTIES GENERATED TRUE)
set(${SRCS} ${${SRCS}} PARENT_SCOPE)
set(${HDRS} ${${HDRS}} PARENT_SCOPE)
endfunction()
# By default have GRPC_GENERATE_CPP macro pass -I to protoc
# for each directory where a proto file is referenced.
if(NOT DEFINED GRPC_GENERATE_CPP_APPEND_PATH)
set(GRPC_GENERATE_CPP_APPEND_PATH TRUE)
endif()
# Find gRPC include directory
find_path(GRPC_INCLUDE_DIR grpc/grpc.h)
mark_as_advanced(GRPC_INCLUDE_DIR)
# Find gGPR library
find_library(GPR_LIBRARY NAMES gpr)
mark_as_advanced(GRPC_GPR_LIBRARY)
add_library(gRPC::gpr UNKNOWN IMPORTED)
set_target_properties(gRPC::gpr PROPERTIES
INTERFACE_INCLUDE_DIRECTORIES ${GRPC_INCLUDE_DIR}
INTERFACE_LINK_LIBRARIES "-lpthread;-ldl"
IMPORTED_LOCATION ${GPR_LIBRARY}
)
# Find gRPC library
find_library(GRPC_LIBRARY NAMES grpc)
mark_as_advanced(GRPC_LIBRARY)
add_library(gRPC::grpc UNKNOWN IMPORTED)
set_target_properties(gRPC::grpc PROPERTIES
INTERFACE_INCLUDE_DIRECTORIES ${GRPC_INCLUDE_DIR}
INTERFACE_LINK_LIBRARIES gRPC::gpr
IMPORTED_LOCATION ${GRPC_LIBRARY}
)
# Find gRPC C++ library
find_library(GRPC_GRPC++_LIBRARY NAMES grpc++)
mark_as_advanced(GRPC_GRPC++_LIBRARY)
add_library(gRPC::grpc++ UNKNOWN IMPORTED)
set_target_properties(gRPC::grpc++ PROPERTIES
INTERFACE_INCLUDE_DIRECTORIES ${GRPC_INCLUDE_DIR}
INTERFACE_LINK_LIBRARIES gRPC::grpc
IMPORTED_LOCATION ${GRPC_GRPC++_LIBRARY}
)
# Find gRPC C++ reflection library
find_library(GRPC_GRPC++_REFLECTION_LIBRARY NAMES grpc++_reflection)
mark_as_advanced(GRPC_GRPC++_REFLECTION_LIBRARY)
add_library(gRPC::grpc++_reflection UNKNOWN IMPORTED)
set_target_properties(gRPC::grpc++_reflection PROPERTIES
INTERFACE_INCLUDE_DIRECTORIES ${GRPC_INCLUDE_DIR}
INTERFACE_LINK_LIBRARIES gRPC::grpc++
IMPORTED_LOCATION ${GRPC_GRPC++_REFLECTION_LIBRARY}
)
# Find gRPC CPP generator
find_program(GRPC_CPP_PLUGIN NAMES grpc_cpp_plugin)
mark_as_advanced(GRPC_CPP_PLUGIN)
add_executable(gRPC::grpc_cpp_plugin IMPORTED)
set_target_properties(gRPC::grpc_cpp_plugin PROPERTIES
IMPORTED_LOCATION ${GRPC_CPP_PLUGIN}
)
include(${CMAKE_ROOT}/Modules/FindPackageHandleStandardArgs.cmake)
FIND_PACKAGE_HANDLE_STANDARD_ARGS(gRPC DEFAULT_MSG
GPR_LIBRARY GRPC_LIBRARY GRPC_INCLUDE_DIR GRPC_GRPC++_REFLECTION_LIBRARY GRPC_CPP_PLUGIN)

View File

@ -29,9 +29,11 @@ namespace etcd
public:
/**
* Constructs an etcd client object.
* @param etcd_url is the url of the etcd server to connect to, like "http://127.0.0.1:4001"
* @param etcd_url is the url of the etcd server to connect to, like "http://127.0.0.1:4001",
* or multiple url, seperated by ',' or ';'.
* @param load_balancer is the load balance strategy, can be one of round_robin/pick_first/grpclb/xds.
*/
Client(std::string const & etcd_url);
Client(std::string const & etcd_url, std::string const & load_balancer = "round_robin");
/**
* Sends a get request to the etcd server

View File

@ -36,7 +36,9 @@ namespace etcd
auto v3resp = call->ParseResponse();
resp = etcd::Response(v3resp);
auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::high_resolution_clock::now() - call->startTimepoint());
resp = etcd::Response(v3resp, duration);
return resp;
});
@ -109,8 +111,13 @@ namespace etcd
*/
std::vector<mvccpb::Event> const & events() const;
/**
* Returns the duration of request execution in microseconds.
*/
std::chrono::microseconds const & duration() const;
protected:
Response(const etcdv3::V3Response& response);
Response(const etcdv3::V3Response& response, std::chrono::microseconds const& duration);
Response(int error_code, char const * error_message);
int _error_code;
@ -123,6 +130,7 @@ namespace etcd
Keys _keys;
std::string _lock_key; // for lock
std::vector<mvccpb::Event> _events; // for watch
std::chrono::microseconds _duration; // execute duration (in microseconds), during the action created and response parsed
friend class SyncClient;
friend class etcdv3::AsyncWatchAction;
friend class Client;

View File

@ -1,6 +1,8 @@
#ifndef __V3_ACTION_HPP__
#define __V3_ACTION_HPP__
#include <chrono>
#include <grpc++/grpc++.h>
#include "proto/rpc.grpc.pb.h"
#include "proto/v3lock.grpc.pb.h"
@ -45,12 +47,13 @@ namespace etcdv3
Action(etcdv3::ActionParameters params);
Action(){};
void waitForResponse();
const std::chrono::high_resolution_clock::time_point startTimepoint();
protected:
Status status;
ClientContext context;
CompletionQueue cq_;
etcdv3::ActionParameters parameters;
std::chrono::high_resolution_clock::time_point start_timepoint;
};
}
#endif

View File

@ -3,8 +3,8 @@
#include <grpc++/grpc++.h>
#include "proto/rpc.grpc.pb.h"
#include "v3/include/Action.hpp"
#include "v3/include/AsyncTxnResponse.hpp"
#include "etcd/v3/Action.hpp"
#include "etcd/v3/AsyncTxnResponse.hpp"
using grpc::ClientAsyncResponseReader;

View File

@ -3,8 +3,8 @@
#include <grpc++/grpc++.h>
#include "proto/rpc.grpc.pb.h"
#include "v3/include/Action.hpp"
#include "v3/include/AsyncTxnResponse.hpp"
#include "etcd/v3/Action.hpp"
#include "etcd/v3/AsyncTxnResponse.hpp"
using grpc::ClientAsyncResponseReader;

View File

@ -3,8 +3,8 @@
#include <grpc++/grpc++.h>
#include "proto/rpc.grpc.pb.h"
#include "v3/include/Action.hpp"
#include "v3/include/AsyncDeleteRangeResponse.hpp"
#include "etcd/v3/Action.hpp"
#include "etcd/v3/AsyncDeleteRangeResponse.hpp"
using grpc::ClientAsyncResponseReader;

View File

@ -3,8 +3,8 @@
#include <grpc++/grpc++.h>
#include "proto/rpc.grpc.pb.h"
#include "v3/include/V3Response.hpp"
#include "v3/include/Action.hpp"
#include "etcd/v3/V3Response.hpp"
#include "etcd/v3/Action.hpp"
using grpc::ClientAsyncResponseReader;

View File

@ -3,8 +3,8 @@
#include <grpc++/grpc++.h>
#include "proto/rpc.grpc.pb.h"
#include "v3/include/Action.hpp"
#include "v3/include/AsyncRangeResponse.hpp"
#include "etcd/v3/Action.hpp"
#include "etcd/v3/AsyncRangeResponse.hpp"
using grpc::ClientAsyncResponseReader;

View File

@ -3,8 +3,8 @@
#include <grpc++/grpc++.h>
#include "proto/rpc.grpc.pb.h"
#include "v3/include/Action.hpp"
#include "v3/include/AsyncLeaseGrantResponse.hpp"
#include "etcd/v3/Action.hpp"
#include "etcd/v3/AsyncLeaseGrantResponse.hpp"
using grpc::ClientAsyncResponseReader;
using etcdserverpb::LeaseGrantResponse;

View File

@ -3,7 +3,7 @@
#include <grpc++/grpc++.h>
#include "proto/rpc.grpc.pb.h"
#include "v3/include/V3Response.hpp"
#include "etcd/v3/V3Response.hpp"
using etcdserverpb::LeaseGrantResponse;

View File

@ -3,8 +3,8 @@
#include <grpc++/grpc++.h>
#include "proto/v3lock.grpc.pb.h"
#include "v3/include/Action.hpp"
#include "v3/include/AsyncLockResponse.hpp"
#include "etcd/v3/Action.hpp"
#include "etcd/v3/AsyncLockResponse.hpp"
#include "etcd/Response.hpp"

View File

@ -3,7 +3,7 @@
#include <grpc++/grpc++.h>
#include "proto/v3lock.grpc.pb.h"
#include "v3/include/V3Response.hpp"
#include "etcd/v3/V3Response.hpp"
using grpc::ClientAsyncResponseReader;

View File

@ -3,7 +3,7 @@
#include <grpc++/grpc++.h>
#include "proto/rpc.grpc.pb.h"
#include "v3/include/V3Response.hpp"
#include "etcd/v3/V3Response.hpp"
using grpc::ClientAsyncResponseReader;

View File

@ -3,8 +3,8 @@
#include <grpc++/grpc++.h>
#include "proto/rpc.grpc.pb.h"
#include "v3/include/Action.hpp"
#include "v3/include/AsyncTxnResponse.hpp"
#include "etcd/v3/Action.hpp"
#include "etcd/v3/AsyncTxnResponse.hpp"
using grpc::ClientAsyncResponseReader;

View File

@ -3,9 +3,9 @@
#include <grpc++/grpc++.h>
#include "proto/rpc.grpc.pb.h"
#include "v3/include/Action.hpp"
#include "v3/include/AsyncTxnResponse.hpp"
#include "v3/include/Transaction.hpp"
#include "etcd/v3/Action.hpp"
#include "etcd/v3/AsyncTxnResponse.hpp"
#include "etcd/v3/Transaction.hpp"
using grpc::ClientAsyncResponseReader;

View File

@ -1,8 +1,8 @@
#ifndef __ASYNC_TXNRESPONSE_HPP__
#define __ASYNC_TXNRESPONSE_HPP__
#include "v3/include/V3Response.hpp"
#include "proto/rpc.pb.h"
#include "etcd/v3/V3Response.hpp"
using etcdserverpb::TxnResponse;

View File

@ -3,8 +3,8 @@
#include <grpc++/grpc++.h>
#include "proto/rpc.grpc.pb.h"
#include "v3/include/Action.hpp"
#include "v3/include/AsyncTxnResponse.hpp"
#include "etcd/v3/Action.hpp"
#include "etcd/v3/AsyncTxnResponse.hpp"
using grpc::ClientAsyncResponseReader;

View File

@ -3,8 +3,8 @@
#include <grpc++/grpc++.h>
#include "proto/rpc.grpc.pb.h"
#include "v3/include/Action.hpp"
#include "v3/include/AsyncWatchResponse.hpp"
#include "etcd/v3/Action.hpp"
#include "etcd/v3/AsyncWatchResponse.hpp"
#include "etcd/Response.hpp"

View File

@ -4,7 +4,7 @@
#include <grpc++/grpc++.h>
#include "proto/rpc.grpc.pb.h"
#include "proto/rpc.pb.h"
#include "v3/include/V3Response.hpp"
#include "etcd/v3/V3Response.hpp"
using etcdserverpb::WatchRequest;

View File

@ -4,7 +4,7 @@
#include <grpc++/grpc++.h>
#include "proto/kv.pb.h"
#include "v3/include/KeyValue.hpp"
#include "etcd/v3/KeyValue.hpp"
namespace etcdv3
{

View File

@ -1,11 +1,12 @@
file(GLOB_RECURSE CPP_CLIENT_SRC RELATIVE "${CMAKE_SOURCE_DIR}/src"
"${CMAKE_SOURCE_DIR}/proto/*.cc"
"${CMAKE_SOURCE_DIR}/src/*.cpp"
"${CMAKE_SOURCE_DIR}/v3/src/*.cpp")
file(GLOB_RECURSE CPP_CLIENT_SRC
RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}"
"${CMAKE_CURRENT_SOURCE_DIR}/*.cpp"
"${CMAKE_CURRENT_SOURCE_DIR}/**/*.cpp"
"${CMAKE_CURRENT_SOURCE_DIR}/../proto/*.cc")
add_library(etcd-cpp-api SHARED ${CPP_CLIENT_SRC})
set_property(TARGET etcd-cpp-api PROPERTY CXX_STANDARD 11)
target_include_directories(etcd-cpp-api PRIVATE ${CMAKE_SOURCE_DIR}/proto)
target_include_directories(etcd-cpp-api PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/../proto)
target_link_libraries(etcd-cpp-api PUBLIC
${CPPREST_LIB}
@ -28,11 +29,10 @@ install (FILES ../proto/auth.pb.h
../proto/v3lock.pb.h
../proto/v3lock.grpc.pb.h
DESTINATION include/etcd/proto)
install (FILES ../v3/include/Transaction.hpp
DESTINATION include/etcd/v3/include)
install (FILES ../etcd/v3/Transaction.hpp
DESTINATION include/etcd/v3)
install (FILES ../proto/gogoproto/gogo.pb.h
DESTINATION include/etcd/proto/gogoproto)
install (FILES ../proto/google/api/annotations.pb.h
../proto/google/api/http.pb.h
DESTINATION include/etcd/proto/google/api)

View File

@ -1,42 +1,85 @@
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#include <memory>
#include "etcd/Client.hpp"
#include "v3/include/action_constants.hpp"
#include "v3/include/Action.hpp"
#include "v3/include/AsyncTxnResponse.hpp"
#include "v3/include/AsyncRangeResponse.hpp"
#include "v3/include/AsyncWatchResponse.hpp"
#include "v3/include/AsyncDeleteRangeResponse.hpp"
#include "v3/include/AsyncLockResponse.hpp"
#include "v3/include/Transaction.hpp"
#include "etcd/v3/action_constants.hpp"
#include "etcd/v3/Action.hpp"
#include "etcd/v3/AsyncTxnResponse.hpp"
#include "etcd/v3/AsyncRangeResponse.hpp"
#include "etcd/v3/AsyncWatchResponse.hpp"
#include "etcd/v3/AsyncDeleteRangeResponse.hpp"
#include "etcd/v3/AsyncLockResponse.hpp"
#include "etcd/v3/Transaction.hpp"
#include <iostream>
#include "v3/include/AsyncSetAction.hpp"
#include "v3/include/AsyncCompareAndSwapAction.hpp"
#include "v3/include/AsyncCompareAndDeleteAction.hpp"
#include "v3/include/AsyncUpdateAction.hpp"
#include "v3/include/AsyncGetAction.hpp"
#include "v3/include/AsyncDeleteAction.hpp"
#include "v3/include/AsyncWatchAction.hpp"
#include "v3/include/AsyncLeaseGrantAction.hpp"
#include "v3/include/AsyncLockAction.hpp"
#include "v3/include/AsyncTxnAction.hpp"
#include "etcd/v3/AsyncSetAction.hpp"
#include "etcd/v3/AsyncCompareAndSwapAction.hpp"
#include "etcd/v3/AsyncCompareAndDeleteAction.hpp"
#include "etcd/v3/AsyncUpdateAction.hpp"
#include "etcd/v3/AsyncGetAction.hpp"
#include "etcd/v3/AsyncDeleteAction.hpp"
#include "etcd/v3/AsyncWatchAction.hpp"
#include "etcd/v3/AsyncLeaseGrantAction.hpp"
#include "etcd/v3/AsyncLockAction.hpp"
#include "etcd/v3/AsyncTxnAction.hpp"
#include <boost/algorithm/string.hpp>
using grpc::Channel;
static bool dns_resolve(std::string const &target, std::vector<std::string> &endpoints) {
struct addrinfo hints = {}, *addrs;
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = IPPROTO_TCP;
etcd::Client::Client(std::string const & address)
{
std::string stripped_address;
std::string substr("://");
std::string::size_type i = address.find(substr);
if(i != std::string::npos)
{
stripped_address = address.substr(i+substr.length());
std::vector<std::string> target_parts;
boost::split(target_parts, target, boost::is_any_of(":"));
if (target_parts.size() != 2) {
std::cerr << "warn: invalid URL: " << target << std::endl;
return false;
}
std::shared_ptr<Channel> channel = grpc::CreateChannel(stripped_address, grpc::InsecureChannelCredentials());
std::cout << "channel is: " << channel << std::endl;
if (getaddrinfo(target_parts[0].c_str(), target_parts[1].c_str(), &hints, &addrs) != 0) {
std::cerr << "warn: getaddrinfo() failed for endpoint " << target << std::endl;
return false;
}
char host[16] = {'\0'};
for (struct addrinfo* addr = addrs; addr != nullptr; addr = addr->ai_next) {
memset(host, '\0', sizeof(host));
getnameinfo(addr->ai_addr, addr->ai_addrlen, host, sizeof(host), NULL, 0, NI_NUMERICHOST);
endpoints.emplace_back(std::string(host) + ":" + target_parts[1]);
}
freeaddrinfo(addrs);
return true;
}
etcd::Client::Client(std::string const & address, std::string const & load_balancer)
{
std::vector<std::string> addresses;
boost::algorithm::split(addresses, address, boost::algorithm::is_any_of(",;"));
std::string stripped_address;
{
std::vector<std::string> stripped_addresses;
std::string substr("://");
for (auto const &addr: addresses) {
std::string::size_type idx = addr.find(substr);
std::string target = idx == std::string::npos ? addr : addr.substr(idx + substr.length());
dns_resolve(target, stripped_addresses);
}
stripped_address = boost::algorithm::join(stripped_addresses, ",");
}
grpc::ChannelArguments grpc_args;
grpc_args.SetLoadBalancingPolicyName(load_balancer);
std::shared_ptr<Channel> channel = grpc::CreateCustomChannel(
"ipv4:///" + stripped_address,
grpc::InsecureChannelCredentials(),
grpc_args);
stub_= KV::NewStub(channel);
watchServiceStub= Watch::NewStub(channel);
leaseServiceStub= Lease::NewStub(channel);

View File

@ -1,10 +1,10 @@
#include "etcd/Response.hpp"
#include "v3/include/V3Response.hpp"
#include "etcd/v3/V3Response.hpp"
#include <iostream>
etcd::Response::Response(const etcdv3::V3Response& reply)
etcd::Response::Response(const etcdv3::V3Response& reply, std::chrono::microseconds const& duration)
{
_index = reply.get_index();
_action = reply.get_action();
@ -28,6 +28,9 @@ etcd::Response::Response(const etcdv3::V3Response& reply)
_lock_key = reply.get_lock_key();
_events = reply.get_events();
// duration
_duration = duration;
}
@ -105,4 +108,8 @@ std::string const & etcd::Response::lock_key() const {
std::vector<mvccpb::Event> const & etcd::Response::events() const {
return this->_events;
};
}
std::chrono::microseconds const& etcd::Response::duration() const {
return this->_duration;
}

View File

@ -1,6 +1,6 @@
#include <iomanip>
#include "etcd/Value.hpp"
#include "v3/include/KeyValue.hpp"
#include "etcd/v3/KeyValue.hpp"
etcd::Value::Value()
: dir(false),

View File

@ -1,5 +1,5 @@
#include "etcd/Watcher.hpp"
#include "v3/include/AsyncWatchAction.hpp"
#include "etcd/v3/AsyncWatchAction.hpp"
etcd::Watcher::Watcher(std::string const & address, std::string const & key, std::function<void(Response)> callback)
{

View File

@ -1,9 +1,10 @@
#include <grpc/support/log.h>
#include "v3/include/Action.hpp"
#include "etcd/v3/Action.hpp"
etcdv3::Action::Action(etcdv3::ActionParameters params)
{
parameters = params;
start_timepoint = std::chrono::high_resolution_clock::now();
}
etcdv3::ActionParameters::ActionParameters()
@ -27,3 +28,6 @@ void etcdv3::Action::waitForResponse()
GPR_ASSERT(got_tag == (void*)this);
}
const std::chrono::high_resolution_clock::time_point etcdv3::Action::startTimepoint() {
return this->start_timepoint;
}

View File

@ -1,6 +1,6 @@
#include "v3/include/AsyncCompareAndDeleteAction.hpp"
#include "v3/include/action_constants.hpp"
#include "v3/include/Transaction.hpp"
#include "etcd/v3/AsyncCompareAndDeleteAction.hpp"
#include "etcd/v3/action_constants.hpp"
#include "etcd/v3/Transaction.hpp"
using etcdserverpb::Compare;
using etcdserverpb::RangeRequest;

View File

@ -1,6 +1,6 @@
#include "v3/include/AsyncCompareAndSwapAction.hpp"
#include "v3/include/action_constants.hpp"
#include "v3/include/Transaction.hpp"
#include "etcd/v3/AsyncCompareAndSwapAction.hpp"
#include "etcd/v3/action_constants.hpp"
#include "etcd/v3/Transaction.hpp"
using etcdserverpb::Compare;
using etcdserverpb::RangeRequest;

View File

@ -1,5 +1,5 @@
#include "v3/include/AsyncDeleteAction.hpp"
#include "v3/include/action_constants.hpp"
#include "etcd/v3/AsyncDeleteAction.hpp"
#include "etcd/v3/action_constants.hpp"
using etcdserverpb::DeleteRangeRequest;

View File

@ -1,5 +1,5 @@
#include "v3/include/AsyncDeleteRangeResponse.hpp"
#include "v3/include/action_constants.hpp"
#include "etcd/v3/AsyncDeleteRangeResponse.hpp"
#include "etcd/v3/action_constants.hpp"
void etcdv3::AsyncDeleteRangeResponse::ParseResponse(std::string const& key, bool prefix, DeleteRangeResponse& resp)

View File

@ -1,5 +1,5 @@
#include "v3/include/AsyncGetAction.hpp"
#include "v3/include/action_constants.hpp"
#include "etcd/v3/AsyncGetAction.hpp"
#include "etcd/v3/action_constants.hpp"
using etcdserverpb::RangeRequest;

View File

@ -1,6 +1,6 @@
#include "v3/include/AsyncLeaseGrantAction.hpp"
#include "v3/include/action_constants.hpp"
#include "v3/include/Transaction.hpp"
#include "etcd/v3/AsyncLeaseGrantAction.hpp"
#include "etcd/v3/action_constants.hpp"
#include "etcd/v3/Transaction.hpp"
using etcdserverpb::LeaseGrantRequest;

View File

@ -1,5 +1,5 @@
#include "v3/include/AsyncLeaseGrantResponse.hpp"
#include "v3/include/action_constants.hpp"
#include "etcd/v3/AsyncLeaseGrantResponse.hpp"
#include "etcd/v3/action_constants.hpp"
void etcdv3::AsyncLeaseGrantResponse::ParseResponse(LeaseGrantResponse& resp)

View File

@ -1,5 +1,5 @@
#include "v3/include/AsyncLockAction.hpp"
#include "v3/include/action_constants.hpp"
#include "etcd/v3/AsyncLockAction.hpp"
#include "etcd/v3/action_constants.hpp"
using v3lockpb::LockRequest;
using v3lockpb::UnlockRequest;

View File

@ -1,5 +1,5 @@
#include "v3/include/AsyncLockResponse.hpp"
#include "v3/include/action_constants.hpp"
#include "etcd/v3/AsyncLockResponse.hpp"
#include "etcd/v3/action_constants.hpp"
void etcdv3::AsyncLockResponse::ParseResponse(LockResponse& resp)

View File

@ -1,5 +1,5 @@
#include "v3/include/AsyncRangeResponse.hpp"
#include "v3/include/action_constants.hpp"
#include "etcd/v3/AsyncRangeResponse.hpp"
#include "etcd/v3/action_constants.hpp"
void etcdv3::AsyncRangeResponse::ParseResponse(RangeResponse& resp, bool prefix)

View File

@ -1,6 +1,6 @@
#include "v3/include/AsyncSetAction.hpp"
#include "v3/include/action_constants.hpp"
#include "v3/include/Transaction.hpp"
#include "etcd/v3/AsyncSetAction.hpp"
#include "etcd/v3/action_constants.hpp"
#include "etcd/v3/Transaction.hpp"
using etcdserverpb::Compare;

View File

@ -1,6 +1,6 @@
#include "v3/include/action_constants.hpp"
#include "v3/include/AsyncTxnAction.hpp"
#include "v3/include/Transaction.hpp"
#include "etcd/v3/action_constants.hpp"
#include "etcd/v3/AsyncTxnAction.hpp"
#include "etcd/v3/Transaction.hpp"
etcdv3::AsyncTxnAction::AsyncTxnAction(etcdv3::ActionParameters param, etcdv3::Transaction const &tx)

View File

@ -1,7 +1,7 @@
#include "v3/include/AsyncTxnResponse.hpp"
#include "v3/include/AsyncRangeResponse.hpp"
#include "v3/include/AsyncDeleteRangeResponse.hpp"
#include "v3/include/action_constants.hpp"
#include "etcd/v3/AsyncTxnResponse.hpp"
#include "etcd/v3/AsyncRangeResponse.hpp"
#include "etcd/v3/AsyncDeleteRangeResponse.hpp"
#include "etcd/v3/action_constants.hpp"
using etcdserverpb::ResponseOp;

View File

@ -1,7 +1,7 @@
#include "v3/include/AsyncUpdateAction.hpp"
#include "v3/include/AsyncRangeResponse.hpp"
#include "v3/include/action_constants.hpp"
#include "v3/include/Transaction.hpp"
#include "etcd/v3/AsyncUpdateAction.hpp"
#include "etcd/v3/AsyncRangeResponse.hpp"
#include "etcd/v3/action_constants.hpp"
#include "etcd/v3/Transaction.hpp"
using etcdserverpb::Compare;
using etcdserverpb::RangeRequest;

View File

@ -1,5 +1,5 @@
#include "v3/include/AsyncWatchAction.hpp"
#include "v3/include/action_constants.hpp"
#include "etcd/v3/AsyncWatchAction.hpp"
#include "etcd/v3/action_constants.hpp"
using etcdserverpb::RangeRequest;
@ -36,21 +36,20 @@ etcdv3::AsyncWatchAction::AsyncWatchAction(etcdv3::ActionParameters param)
} else {
throw std::runtime_error("failed to create a watch connection");
}
// wait "write" success
if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)"write") {
stream->Read(&reply, (void*)this);
} else {
throw std::runtime_error("failed to read proper reply from server");
}
}
void etcdv3::AsyncWatchAction::waitForResponse()
{
void* got_tag;
bool ok = false;
// wait "write" (WatchCreateRequest) success, and start to read the first reply
if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)"write") {
stream->Read(&reply, (void*)this);
} else {
throw std::runtime_error("failed to write WatchCreateRequest to server");
}
while(cq_.Next(&got_tag, &ok))
{
if(ok == false)
@ -64,12 +63,22 @@ void etcdv3::AsyncWatchAction::waitForResponse()
}
if(got_tag == (void*)this) // read tag
{
if(reply.events_size())
{
if ((reply.created() && reply.header().revision() < parameters.revision) ||
reply.events_size() > 0) {
// we stop watch under two conditions:
//
// 1. watch for a future revision, return immediately with empty events set
// 2. receive any effective events.
stream->WritesDone((void*)"writes done");
// leave a warning if the response is too large and been fragmented
if (reply.fragment()) {
std::cerr << "WARN: The response hasn't been fully received and parsed" << std::endl;
}
}
else
{
// otherwise, start next round read-reply
stream->Read(&reply, (void*)this);
}
}
@ -105,8 +114,12 @@ void etcdv3::AsyncWatchAction::waitForResponse(std::function<void(etcd::Response
{
if(reply.events_size())
{
// for the callback case, we don't stop immediately if watching for a future revison,
// we wait until there are some expected events.
auto resp = ParseResponse();
callback(resp);
auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::high_resolution_clock::now() - start_timepoint);
callback(etcd::Response(resp, duration));
}
stream->Read(&reply, (void*)this);
}

View File

@ -1,5 +1,5 @@
#include "v3/include/AsyncWatchResponse.hpp"
#include "v3/include/action_constants.hpp"
#include "etcd/v3/AsyncWatchResponse.hpp"
#include "etcd/v3/action_constants.hpp"
void etcdv3::AsyncWatchResponse::ParseResponse(WatchResponse& reply)
{
@ -7,8 +7,6 @@ void etcdv3::AsyncWatchResponse::ParseResponse(WatchResponse& reply)
for (auto const &e: reply.events()) {
events.emplace_back(e);
}
std::cout << "index = " << index << ", event size = " << reply.events().size()
<< ", res event size = " << events.size() << std::endl;
for(int cnt =0; cnt < reply.events_size(); cnt++)
{
auto event = reply.events(cnt);

View File

@ -1,4 +1,4 @@
#include "v3/include/KeyValue.hpp"
#include "etcd/v3/KeyValue.hpp"
etcdv3::KeyValue::KeyValue()
{

View File

@ -1,4 +1,4 @@
#include "v3/include/Transaction.hpp"
#include "etcd/v3/Transaction.hpp"
using etcdserverpb::Compare;
using etcdserverpb::RangeRequest;

View File

@ -1,5 +1,5 @@
#include "v3/include/V3Response.hpp"
#include "v3/include/action_constants.hpp"
#include "etcd/v3/V3Response.hpp"
#include "etcd/v3/action_constants.hpp"
void etcdv3::V3Response::set_error_code(int code)
{

View File

@ -1,4 +1,4 @@
#include "v3/include/action_constants.hpp"
#include "etcd/v3/action_constants.hpp"
char const * etcdv3::CREATE_ACTION = "create";
char const * etcdv3::COMPARESWAP_ACTION = "compareAndSwap";
@ -10,4 +10,3 @@ char const * etcdv3::COMPAREDELETE_ACTION = "compareAndDelete";
char const * etcdv3::LOCK_ACTION = "lock";
char const * etcdv3::UNLOCK_ACTION = "unlock";
char const * etcdv3::TXN_ACTION = "txn";