Update accumulated changes.

Signed-off-by: Tao He <linzhu.ht@alibaba-inc.com>
This commit is contained in:
Tao He 2020-09-01 15:25:40 +08:00
parent 0eaebaf626
commit b3f6b826cd
10 changed files with 222 additions and 27 deletions

View File

@ -33,6 +33,7 @@ include(${CMAKE_CURRENT_SOURCE_DIR}/cmake/FindGRPC.cmake)
set(GRPC_LIBRARIES ${GPR_LIBRARY} ${GRPC_LIBRARY} ${GRPC_GRPC++_LIBRARY}) set(GRPC_LIBRARIES ${GPR_LIBRARY} ${GRPC_LIBRARY} ${GRPC_GRPC++_LIBRARY})
# will set `PROTOBUF_GENERATES`, indicates all generated .cc files, and a target `protobuf_generates`. # will set `PROTOBUF_GENERATES`, indicates all generated .cc files, and a target `protobuf_generates`.
include(${CMAKE_CURRENT_SOURCE_DIR}/cmake/GenerateProtobuf.cmake)
add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/proto) add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/proto)
enable_testing() enable_testing()

View File

@ -0,0 +1,139 @@
function(protobuf_generate_latest)
set(_options APPEND_PATH DESCRIPTORS)
set(_singleargs LANGUAGE OUT_VAR EXPORT_MACRO PROTOC_OUT_DIR)
if(COMMAND target_sources)
list(APPEND _singleargs TARGET)
endif()
set(_multiargs PROTOS IMPORT_DIRS GENERATE_EXTENSIONS)
cmake_parse_arguments(protobuf_generate_latest "${_options}" "${_singleargs}" "${_multiargs}" "${ARGN}")
if(NOT protobuf_generate_latest_PROTOS AND NOT protobuf_generate_latest_TARGET)
message(SEND_ERROR "Error: protobuf_generate_latest called without any targets or source files")
return()
endif()
if(NOT protobuf_generate_latest_OUT_VAR AND NOT protobuf_generate_latest_TARGET)
message(SEND_ERROR "Error: protobuf_generate_latest called without a target or output variable")
return()
endif()
if(NOT protobuf_generate_latest_LANGUAGE)
set(protobuf_generate_latest_LANGUAGE cpp)
endif()
string(TOLOWER ${protobuf_generate_latest_LANGUAGE} protobuf_generate_latest_LANGUAGE)
if(NOT protobuf_generate_latest_PROTOC_OUT_DIR)
set(protobuf_generate_latest_PROTOC_OUT_DIR ${CMAKE_CURRENT_BINARY_DIR})
endif()
if(protobuf_generate_latest_EXPORT_MACRO AND protobuf_generate_latest_LANGUAGE STREQUAL cpp)
set(_dll_export_decl "dllexport_decl=${protobuf_generate_latest_EXPORT_MACRO}:")
endif()
if(NOT protobuf_generate_latest_GENERATE_EXTENSIONS)
if(protobuf_generate_latest_LANGUAGE STREQUAL cpp)
set(protobuf_generate_latest_GENERATE_EXTENSIONS .pb.h .pb.cc)
elseif(protobuf_generate_latest_LANGUAGE STREQUAL python)
set(protobuf_generate_latest_GENERATE_EXTENSIONS _pb2.py)
else()
message(SEND_ERROR "Error: protobuf_generate_latest given unknown Language ${LANGUAGE}, please provide a value for GENERATE_EXTENSIONS")
return()
endif()
endif()
if(protobuf_generate_latest_TARGET)
get_target_property(_source_list ${protobuf_generate_latest_TARGET} SOURCES)
foreach(_file ${_source_list})
if(_file MATCHES "proto$")
list(APPEND protobuf_generate_latest_PROTOS ${_file})
endif()
endforeach()
endif()
if(NOT protobuf_generate_latest_PROTOS)
message(SEND_ERROR "Error: protobuf_generate_latest could not find any .proto files")
return()
endif()
if(protobuf_generate_latest_APPEND_PATH)
# Create an include path for each file specified
foreach(_file ${protobuf_generate_latest_PROTOS})
get_filename_component(_abs_file ${_file} ABSOLUTE)
get_filename_component(_abs_path ${_abs_file} PATH)
list(FIND _protobuf_include_path ${_abs_path} _contains_already)
if(${_contains_already} EQUAL -1)
list(APPEND _protobuf_include_path -I ${_abs_path})
endif()
endforeach()
else()
set(_protobuf_include_path -I ${CMAKE_CURRENT_SOURCE_DIR})
endif()
foreach(DIR ${protobuf_generate_latest_IMPORT_DIRS})
get_filename_component(ABS_PATH ${DIR} ABSOLUTE)
list(FIND _protobuf_include_path ${ABS_PATH} _contains_already)
if(${_contains_already} EQUAL -1)
list(APPEND _protobuf_include_path -I ${ABS_PATH})
endif()
endforeach()
set(_generated_srcs_all)
foreach(_proto ${protobuf_generate_latest_PROTOS})
get_filename_component(_abs_file ${_proto} ABSOLUTE)
get_filename_component(_abs_dir ${_abs_file} DIRECTORY)
get_filename_component(_basename ${_proto} NAME_WE)
file(RELATIVE_PATH _rel_dir ${CMAKE_CURRENT_SOURCE_DIR} ${_abs_dir})
set(_possible_rel_dir)
if (NOT protobuf_generate_latest_APPEND_PATH)
set(_possible_rel_dir ${_rel_dir}/)
endif()
set(_generated_srcs)
foreach(_ext ${protobuf_generate_latest_GENERATE_EXTENSIONS})
list(APPEND _generated_srcs "${protobuf_generate_latest_PROTOC_OUT_DIR}/${_possible_rel_dir}${_basename}${_ext}")
endforeach()
if(protobuf_generate_latest_DESCRIPTORS AND protobuf_generate_latest_LANGUAGE STREQUAL cpp)
set(_descriptor_file "${CMAKE_CURRENT_BINARY_DIR}/${_basename}.desc")
set(_dll_desc_out "--descriptor_set_out=${_descriptor_file}")
list(APPEND _generated_srcs ${_descriptor_file})
endif()
list(APPEND _generated_srcs_all ${_generated_srcs})
add_custom_command(
OUTPUT ${_generated_srcs}
COMMAND protobuf::protoc
ARGS --${protobuf_generate_latest_LANGUAGE}_out ${_dll_export_decl}${protobuf_generate_latest_PROTOC_OUT_DIR} ${_dll_desc_out} ${_protobuf_include_path} ${_abs_file}
DEPENDS ${_abs_file} protobuf::protoc
COMMENT "Running ${protobuf_generate_latest_LANGUAGE} protocol buffer compiler on ${_proto}"
VERBATIM )
endforeach()
set_source_files_properties(${_generated_srcs_all} PROPERTIES GENERATED TRUE)
if(protobuf_generate_latest_OUT_VAR)
set(${protobuf_generate_latest_OUT_VAR} ${_generated_srcs_all} PARENT_SCOPE)
endif()
if(protobuf_generate_latest_TARGET)
target_sources(${protobuf_generate_latest_TARGET} PRIVATE ${_generated_srcs_all})
endif()
endfunction()
# Compute generated .cc files, workaround for bugs in FindProtobuf.cmake of cmake-3.14.
function(compute_generated_srcs OUT_VAR GENERATED_OUTPUT_DIR GRPC)
set(${OUT_VAR})
foreach(_proto_file ${ARGN})
get_filename_component(_abs_file ${_proto_file} ABSOLUTE)
get_filename_component(_abs_dir ${_abs_file} DIRECTORY)
get_filename_component(_basename ${_proto_file} NAME_WE)
file(RELATIVE_PATH _rel_dir ${CMAKE_CURRENT_SOURCE_DIR} ${_abs_dir})
set(_ext ".pb.cc")
if(GRPC)
set(_ext ".grpc.pb.cc")
endif()
list(APPEND ${OUT_VAR} "${GENERATED_OUTPUT_DIR}/${_rel_dir}/${_basename}${_ext}")
endforeach()
set(${OUT_VAR} ${${OUT_VAR}} PARENT_SCOPE)
endfunction()

View File

@ -20,6 +20,8 @@ namespace etcdv3 {
namespace etcd namespace etcd
{ {
class Watcher;
/** /**
* Client is responsible for maintaining a connection towards an etcd server. * Client is responsible for maintaining a connection towards an etcd server.
* Etcd operations can be reached via the methods of the client. * Etcd operations can be reached via the methods of the client.
@ -205,10 +207,13 @@ namespace etcd
pplx::task<Response> txn(etcdv3::Transaction const &txn); pplx::task<Response> txn(etcdv3::Transaction const &txn);
private: private:
std::shared_ptr<grpc::Channel> channel;
std::unique_ptr<KV::Stub> stub_; std::unique_ptr<KV::Stub> stub_;
std::unique_ptr<Watch::Stub> watchServiceStub; std::unique_ptr<Watch::Stub> watchServiceStub;
std::unique_ptr<Lease::Stub> leaseServiceStub; std::unique_ptr<Lease::Stub> leaseServiceStub;
std::unique_ptr<Lock::Stub> lockServiceStub; std::unique_ptr<Lock::Stub> lockServiceStub;
friend class Watcher;
}; };

View File

@ -2,6 +2,8 @@
#define __ETCD_WATCHER_HPP__ #define __ETCD_WATCHER_HPP__
#include <string> #include <string>
#include "etcd/Client.hpp"
#include "etcd/Response.hpp" #include "etcd/Response.hpp"
#include <grpc++/grpc++.h> #include <grpc++/grpc++.h>
@ -20,7 +22,14 @@ namespace etcd
class Watcher class Watcher
{ {
public: public:
Watcher(std::string const & etcd_url, std::string const & key, std::function<void(Response)> callback); Watcher(Client &client, std::string const & key,
std::function<void(Response)> callback, bool recursive=false);
Watcher(Client &client, std::string const & key, int fromIndex,
std::function<void(Response)> callback, bool recursive=false);
Watcher(std::string const & etcd_url, std::string const & key,
std::function<void(Response)> callback, bool recursive=false);
Watcher(std::string const & etcd_url, std::string const & key, int fromIndex,
std::function<void(Response)> callback, bool recursive=false);
void Cancel(); void Cancel();
~Watcher(); ~Watcher();
@ -33,6 +42,10 @@ namespace etcd
std::unique_ptr<Watch::Stub> watchServiceStub; std::unique_ptr<Watch::Stub> watchServiceStub;
std::unique_ptr<KV::Stub> stub_; std::unique_ptr<KV::Stub> stub_;
std::unique_ptr<etcdv3::AsyncWatchAction> call; std::unique_ptr<etcdv3::AsyncWatchAction> call;
private:
int fromIndex;
bool recursive;
}; };
} }

View File

@ -2,30 +2,34 @@ file(GLOB_RECURSE PROTO_SRCS RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" "*.proto")
# use `protobuf_generate` rather than `protobuf_generate_cpp` since we want to # use `protobuf_generate` rather than `protobuf_generate_cpp` since we want to
# output the generated files to source dir, rather than binary dir. # output the generated files to source dir, rather than binary dir.
protobuf_generate( protobuf_generate_latest(
LANGUAGE cpp LANGUAGE cpp
OUT_VAR PROTO_GENERATES OUT_VAR PROTO_GENERATES
PROTOC_OUT_DIR "${CMAKE_CURRENT_SOURCE_DIR}" PROTOC_OUT_DIR "${CMAKE_CURRENT_SOURCE_DIR}"
PROTOS ${PROTO_SRCS} PROTOS ${PROTO_SRCS}
) )
compute_generated_srcs(PROTO_GENERATES_SRCS "${CMAKE_CURRENT_SOURCE_DIR}" false ${PROTO_SRCS})
set(PROTO_GRPC_SRCS "${CMAKE_CURRENT_SOURCE_DIR}/rpc.proto"
"${CMAKE_CURRENT_SOURCE_DIR}/v3lock.proto")
grpc_generate_cpp(PROTO_GRPC_GENERATES PROTO_GRPC_GENERATES_HDRS grpc_generate_cpp(PROTO_GRPC_GENERATES PROTO_GRPC_GENERATES_HDRS
"${CMAKE_CURRENT_SOURCE_DIR}" "${CMAKE_CURRENT_SOURCE_DIR}"
"${CMAKE_CURRENT_SOURCE_DIR}/rpc.proto" ${PROTO_GRPC_SRCS}
"${CMAKE_CURRENT_SOURCE_DIR}/v3lock.proto"
) )
compute_generated_srcs(PROTO_GRPC_GENERATES_SRCS "${CMAKE_CURRENT_SOURCE_DIR}" true ${PROTO_GRPC_SRCS})
# populate `PROTOBUF_GENERATES` in the parent scope. # populate `PROTOBUF_GENERATES` in the parent scope.
set(PROTOBUF_GENERATES) set(PROTOBUF_GENERATE_DEPS)
foreach(cxx_file ${PROTO_GENERATES}) foreach(cxx_file ${PROTO_GENERATES})
if(cxx_file MATCHES "cc$") if(cxx_file MATCHES "cc$")
list(APPEND PROTOBUF_GENERATES ${cxx_file}) list(APPEND PROTOBUF_GENERATE_DEPS ${cxx_file})
endif() endif()
endforeach() endforeach()
foreach(cxx_file ${PROTO_GRPC_GENERATES}) foreach(cxx_file ${PROTO_GRPC_GENERATES})
list(APPEND PROTOBUF_GENERATES ${cxx_file}) list(APPEND PROTOBUF_GENERATE_DEPS ${cxx_file})
endforeach() endforeach()
set(PROTOBUF_GENERATES ${PROTO_GENERATES_SRCS} ${PROTO_GRPC_GENERATES_SRCS})
set(PROTOBUF_GENERATES ${PROTOBUF_GENERATES} PARENT_SCOPE) set(PROTOBUF_GENERATES ${PROTOBUF_GENERATES} PARENT_SCOPE)
set_source_files_properties(${PROTOBUF_GENERATES} PROPERTIES GENERATED TRUE) set_source_files_properties(${PROTOBUF_GENERATES} PROPERTIES GENERATED TRUE)
add_custom_target(protobuf_generates DEPENDS ${PROTOBUF_GENERATES}) add_custom_target(protobuf_generates DEPENDS ${PROTOBUF_GENERATE_DEPS})

View File

@ -76,14 +76,14 @@ etcd::Client::Client(std::string const & address, std::string const & load_balan
} }
grpc::ChannelArguments grpc_args; grpc::ChannelArguments grpc_args;
grpc_args.SetLoadBalancingPolicyName(load_balancer); grpc_args.SetLoadBalancingPolicyName(load_balancer);
std::shared_ptr<Channel> channel = grpc::CreateCustomChannel( this->channel = grpc::CreateCustomChannel(
"ipv4:///" + stripped_address, "ipv4:///" + stripped_address,
grpc::InsecureChannelCredentials(), grpc::InsecureChannelCredentials(),
grpc_args); grpc_args);
stub_= KV::NewStub(channel); stub_= KV::NewStub(this->channel);
watchServiceStub= Watch::NewStub(channel); watchServiceStub= Watch::NewStub(this->channel);
leaseServiceStub= Lease::NewStub(channel); leaseServiceStub= Lease::NewStub(this->channel);
lockServiceStub = Lock::NewStub(channel); lockServiceStub = Lock::NewStub(this->channel);
} }

View File

@ -1,8 +1,28 @@
#include "etcd/Watcher.hpp" #include "etcd/Watcher.hpp"
#include "etcd/v3/AsyncWatchAction.hpp" #include "etcd/v3/AsyncWatchAction.hpp"
etcd::Watcher::Watcher(std::string const & address, std::string const & key, std::function<void(Response)> callback) etcd::Watcher::Watcher(Client &client, std::string const & key,
{ std::function<void(Response)> callback, bool recursive):
Watcher(client, key, -1, callback, recursive) {
}
etcd::Watcher::Watcher(Client &client, std::string const & key, int fromIndex,
std::function<void(Response)> callback, bool recursive):
fromIndex(fromIndex), recursive(recursive) {
watchServiceStub= Watch::NewStub(client.channel);
doWatch(key, callback);
}
etcd::Watcher::Watcher(std::string const & address, std::string const & key,
std::function<void(Response)> callback, bool recursive):
Watcher(address, key, -1, callback, recursive) {
}
etcd::Watcher::Watcher(std::string const & address, std::string const & key, int fromIndex,
std::function<void(Response)> callback, bool recursive):
fromIndex(fromIndex), recursive(recursive) {
std::string stripped_address(address); std::string stripped_address(address);
std::string substr("http://"); std::string substr("http://");
std::string::size_type i = stripped_address.find(substr); std::string::size_type i = stripped_address.find(substr);
@ -10,13 +30,13 @@ etcd::Watcher::Watcher(std::string const & address, std::string const & key, std
{ {
stripped_address.erase(i,substr.length()); stripped_address.erase(i,substr.length());
} }
std::shared_ptr<Channel> channel = grpc::CreateChannel(stripped_address, grpc::InsecureChannelCredentials());
watchServiceStub= Watch::NewStub(channel);
std::shared_ptr<Channel> channel = grpc::CreateChannel(
stripped_address, grpc::InsecureChannelCredentials());
watchServiceStub= Watch::NewStub(channel);
doWatch(key, callback); doWatch(key, callback);
} }
etcd::Watcher::~Watcher() etcd::Watcher::~Watcher()
{ {
call->CancelWatch(); call->CancelWatch();
@ -33,9 +53,12 @@ void etcd::Watcher::doWatch(std::string const & key, std::function<void(Response
{ {
etcdv3::ActionParameters params; etcdv3::ActionParameters params;
params.key.assign(key); params.key.assign(key);
params.withPrefix = true; if (fromIndex >= 0) {
params.revision = fromIndex;
}
params.withPrefix = recursive;
params.watch_stub = watchServiceStub.get(); params.watch_stub = watchServiceStub.get();
params.revision = 0;
call.reset(new etcdv3::AsyncWatchAction(params)); call.reset(new etcdv3::AsyncWatchAction(params));
currentTask = pplx::task<void>([this, callback]() currentTask = pplx::task<void>([this, callback]()

View File

@ -98,6 +98,13 @@ void etcdv3::AsyncWatchAction::waitForResponse(std::function<void(etcd::Response
void* got_tag; void* got_tag;
bool ok = false; bool ok = false;
// wait "write" (WatchCreateRequest) success, and start to read the first reply
if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)"write") {
stream->Read(&reply, (void*)this);
} else {
throw std::runtime_error("failed to write WatchCreateRequest to server");
}
while(cq_.Next(&got_tag, &ok)) while(cq_.Next(&got_tag, &ok))
{ {
if(ok == false) if(ok == false)
@ -120,6 +127,7 @@ void etcdv3::AsyncWatchAction::waitForResponse(std::function<void(etcd::Response
auto duration = std::chrono::duration_cast<std::chrono::microseconds>( auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::high_resolution_clock::now() - start_timepoint); std::chrono::high_resolution_clock::now() - start_timepoint);
callback(etcd::Response(resp, duration)); callback(etcd::Response(resp, duration));
start_timepoint = std::chrono::high_resolution_clock::now();
} }
stream->Read(&reply, (void*)this); stream->Read(&reply, (void*)this);
} }

View File

@ -1,10 +1,12 @@
find_path(CATCH_INCLUDE_DIR NAMES catch.hpp PATHS ${PROJECT_SOURCE_DIR}) find_path(CATCH_INCLUDE_DIR NAMES catch.hpp PATHS ${PROJECT_SOURCE_DIR})
include_directories(${CATCH_INCLUDE_DIR}) include_directories(${CATCH_INCLUDE_DIR})
add_executable(etcd_test EtcdTest.cpp add_executable(etcd_test
EtcdSyncTest.cpp EtcdTest.cpp
WatcherTest.cpp EtcdSyncTest.cpp
LockTest.cpp) WatcherTest.cpp
LockTest.cpp
)
set_property(TARGET etcd_test PROPERTY CXX_STANDARD 11) set_property(TARGET etcd_test PROPERTY CXX_STANDARD 11)
target_include_directories(etcd_test PRIVATE ${CMAKE_SOURCE_DIR}/proto) target_include_directories(etcd_test PRIVATE ${CMAKE_SOURCE_DIR}/proto)

View File

@ -26,7 +26,7 @@ TEST_CASE("create watcher with cancel")
etcd.rmdir("/test", true); etcd.rmdir("/test", true);
watcher_called = 0; watcher_called = 0;
etcd::Watcher watcher(etcd_uri, "/test", printResponse); etcd::Watcher watcher(etcd_uri, "/test", printResponse, true);
sleep(1); sleep(1);
etcd.set("/test/key", "42"); etcd.set("/test/key", "42");
etcd.set("/test/key", "43"); etcd.set("/test/key", "43");
@ -52,7 +52,7 @@ TEST_CASE("create watcher")
watcher_called = 0; watcher_called = 0;
{ {
etcd::Watcher watcher(etcd_uri, "/test", printResponse); etcd::Watcher watcher(etcd_uri, "/test", printResponse, true);
sleep(1); sleep(1);
etcd.set("/test/key", "42"); etcd.set("/test/key", "42");
etcd.set("/test/key", "43"); etcd.set("/test/key", "43");