From 60ffe62b26622d411f07be7c35ba11503d0ce15e Mon Sep 17 00:00:00 2001 From: Tao He Date: Tue, 1 Sep 2020 15:25:40 +0800 Subject: [PATCH] Update accumulated changes. Signed-off-by: Tao He --- CMakeLists.txt | 1 + cmake/GenerateProtobuf.cmake | 139 +++++++++++++++++++++++++++++++++++ etcd/Client.hpp | 5 ++ etcd/Watcher.hpp | 15 +++- proto/CMakeLists.txt | 18 +++-- src/Client.cpp | 10 +-- src/Watcher.cpp | 37 ++++++++-- src/v3/AsyncWatchAction.cpp | 10 ++- tst/CMakeLists.txt | 10 ++- tst/WatcherTest.cpp | 4 +- 10 files changed, 222 insertions(+), 27 deletions(-) create mode 100644 cmake/GenerateProtobuf.cmake diff --git a/CMakeLists.txt b/CMakeLists.txt index 04c419c..16960b2 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -33,6 +33,7 @@ include(${CMAKE_CURRENT_SOURCE_DIR}/cmake/FindGRPC.cmake) set(GRPC_LIBRARIES ${GPR_LIBRARY} ${GRPC_LIBRARY} ${GRPC_GRPC++_LIBRARY}) # will set `PROTOBUF_GENERATES`, indicates all generated .cc files, and a target `protobuf_generates`. +include(${CMAKE_CURRENT_SOURCE_DIR}/cmake/GenerateProtobuf.cmake) add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/proto) enable_testing() diff --git a/cmake/GenerateProtobuf.cmake b/cmake/GenerateProtobuf.cmake new file mode 100644 index 0000000..e7bb70f --- /dev/null +++ b/cmake/GenerateProtobuf.cmake @@ -0,0 +1,139 @@ +function(protobuf_generate_latest) + set(_options APPEND_PATH DESCRIPTORS) + set(_singleargs LANGUAGE OUT_VAR EXPORT_MACRO PROTOC_OUT_DIR) + if(COMMAND target_sources) + list(APPEND _singleargs TARGET) + endif() + set(_multiargs PROTOS IMPORT_DIRS GENERATE_EXTENSIONS) + + cmake_parse_arguments(protobuf_generate_latest "${_options}" "${_singleargs}" "${_multiargs}" "${ARGN}") + + if(NOT protobuf_generate_latest_PROTOS AND NOT protobuf_generate_latest_TARGET) + message(SEND_ERROR "Error: protobuf_generate_latest called without any targets or source files") + return() + endif() + + if(NOT protobuf_generate_latest_OUT_VAR AND NOT protobuf_generate_latest_TARGET) + message(SEND_ERROR "Error: protobuf_generate_latest called without a target or output variable") + return() + endif() + + if(NOT protobuf_generate_latest_LANGUAGE) + set(protobuf_generate_latest_LANGUAGE cpp) + endif() + string(TOLOWER ${protobuf_generate_latest_LANGUAGE} protobuf_generate_latest_LANGUAGE) + + if(NOT protobuf_generate_latest_PROTOC_OUT_DIR) + set(protobuf_generate_latest_PROTOC_OUT_DIR ${CMAKE_CURRENT_BINARY_DIR}) + endif() + + if(protobuf_generate_latest_EXPORT_MACRO AND protobuf_generate_latest_LANGUAGE STREQUAL cpp) + set(_dll_export_decl "dllexport_decl=${protobuf_generate_latest_EXPORT_MACRO}:") + endif() + + if(NOT protobuf_generate_latest_GENERATE_EXTENSIONS) + if(protobuf_generate_latest_LANGUAGE STREQUAL cpp) + set(protobuf_generate_latest_GENERATE_EXTENSIONS .pb.h .pb.cc) + elseif(protobuf_generate_latest_LANGUAGE STREQUAL python) + set(protobuf_generate_latest_GENERATE_EXTENSIONS _pb2.py) + else() + message(SEND_ERROR "Error: protobuf_generate_latest given unknown Language ${LANGUAGE}, please provide a value for GENERATE_EXTENSIONS") + return() + endif() + endif() + + if(protobuf_generate_latest_TARGET) + get_target_property(_source_list ${protobuf_generate_latest_TARGET} SOURCES) + foreach(_file ${_source_list}) + if(_file MATCHES "proto$") + list(APPEND protobuf_generate_latest_PROTOS ${_file}) + endif() + endforeach() + endif() + + if(NOT protobuf_generate_latest_PROTOS) + message(SEND_ERROR "Error: protobuf_generate_latest could not find any .proto files") + return() + endif() + + if(protobuf_generate_latest_APPEND_PATH) + # Create an include path for each file specified + foreach(_file ${protobuf_generate_latest_PROTOS}) + get_filename_component(_abs_file ${_file} ABSOLUTE) + get_filename_component(_abs_path ${_abs_file} PATH) + list(FIND _protobuf_include_path ${_abs_path} _contains_already) + if(${_contains_already} EQUAL -1) + list(APPEND _protobuf_include_path -I ${_abs_path}) + endif() + endforeach() + else() + set(_protobuf_include_path -I ${CMAKE_CURRENT_SOURCE_DIR}) + endif() + + foreach(DIR ${protobuf_generate_latest_IMPORT_DIRS}) + get_filename_component(ABS_PATH ${DIR} ABSOLUTE) + list(FIND _protobuf_include_path ${ABS_PATH} _contains_already) + if(${_contains_already} EQUAL -1) + list(APPEND _protobuf_include_path -I ${ABS_PATH}) + endif() + endforeach() + + set(_generated_srcs_all) + foreach(_proto ${protobuf_generate_latest_PROTOS}) + get_filename_component(_abs_file ${_proto} ABSOLUTE) + get_filename_component(_abs_dir ${_abs_file} DIRECTORY) + get_filename_component(_basename ${_proto} NAME_WE) + file(RELATIVE_PATH _rel_dir ${CMAKE_CURRENT_SOURCE_DIR} ${_abs_dir}) + + set(_possible_rel_dir) + if (NOT protobuf_generate_latest_APPEND_PATH) + set(_possible_rel_dir ${_rel_dir}/) + endif() + + set(_generated_srcs) + foreach(_ext ${protobuf_generate_latest_GENERATE_EXTENSIONS}) + list(APPEND _generated_srcs "${protobuf_generate_latest_PROTOC_OUT_DIR}/${_possible_rel_dir}${_basename}${_ext}") + endforeach() + + if(protobuf_generate_latest_DESCRIPTORS AND protobuf_generate_latest_LANGUAGE STREQUAL cpp) + set(_descriptor_file "${CMAKE_CURRENT_BINARY_DIR}/${_basename}.desc") + set(_dll_desc_out "--descriptor_set_out=${_descriptor_file}") + list(APPEND _generated_srcs ${_descriptor_file}) + endif() + list(APPEND _generated_srcs_all ${_generated_srcs}) + + add_custom_command( + OUTPUT ${_generated_srcs} + COMMAND protobuf::protoc + ARGS --${protobuf_generate_latest_LANGUAGE}_out ${_dll_export_decl}${protobuf_generate_latest_PROTOC_OUT_DIR} ${_dll_desc_out} ${_protobuf_include_path} ${_abs_file} + DEPENDS ${_abs_file} protobuf::protoc + COMMENT "Running ${protobuf_generate_latest_LANGUAGE} protocol buffer compiler on ${_proto}" + VERBATIM ) + endforeach() + + set_source_files_properties(${_generated_srcs_all} PROPERTIES GENERATED TRUE) + if(protobuf_generate_latest_OUT_VAR) + set(${protobuf_generate_latest_OUT_VAR} ${_generated_srcs_all} PARENT_SCOPE) + endif() + if(protobuf_generate_latest_TARGET) + target_sources(${protobuf_generate_latest_TARGET} PRIVATE ${_generated_srcs_all}) + endif() +endfunction() + +# Compute generated .cc files, workaround for bugs in FindProtobuf.cmake of cmake-3.14. +function(compute_generated_srcs OUT_VAR GENERATED_OUTPUT_DIR GRPC) + set(${OUT_VAR}) + foreach(_proto_file ${ARGN}) + get_filename_component(_abs_file ${_proto_file} ABSOLUTE) + get_filename_component(_abs_dir ${_abs_file} DIRECTORY) + get_filename_component(_basename ${_proto_file} NAME_WE) + file(RELATIVE_PATH _rel_dir ${CMAKE_CURRENT_SOURCE_DIR} ${_abs_dir}) + + set(_ext ".pb.cc") + if(GRPC) + set(_ext ".grpc.pb.cc") + endif() + list(APPEND ${OUT_VAR} "${GENERATED_OUTPUT_DIR}/${_rel_dir}/${_basename}${_ext}") + endforeach() + set(${OUT_VAR} ${${OUT_VAR}} PARENT_SCOPE) +endfunction() diff --git a/etcd/Client.hpp b/etcd/Client.hpp index 3ea756e..734b060 100644 --- a/etcd/Client.hpp +++ b/etcd/Client.hpp @@ -20,6 +20,8 @@ namespace etcdv3 { namespace etcd { + class Watcher; + /** * Client is responsible for maintaining a connection towards an etcd server. * Etcd operations can be reached via the methods of the client. @@ -205,10 +207,13 @@ namespace etcd pplx::task txn(etcdv3::Transaction const &txn); private: + std::shared_ptr channel; std::unique_ptr stub_; std::unique_ptr watchServiceStub; std::unique_ptr leaseServiceStub; std::unique_ptr lockServiceStub; + + friend class Watcher; }; diff --git a/etcd/Watcher.hpp b/etcd/Watcher.hpp index f308e19..a5cd29a 100644 --- a/etcd/Watcher.hpp +++ b/etcd/Watcher.hpp @@ -2,6 +2,8 @@ #define __ETCD_WATCHER_HPP__ #include + +#include "etcd/Client.hpp" #include "etcd/Response.hpp" #include @@ -20,7 +22,14 @@ namespace etcd class Watcher { public: - Watcher(std::string const & etcd_url, std::string const & key, std::function callback); + Watcher(Client &client, std::string const & key, + std::function callback, bool recursive=false); + Watcher(Client &client, std::string const & key, int fromIndex, + std::function callback, bool recursive=false); + Watcher(std::string const & etcd_url, std::string const & key, + std::function callback, bool recursive=false); + Watcher(std::string const & etcd_url, std::string const & key, int fromIndex, + std::function callback, bool recursive=false); void Cancel(); ~Watcher(); @@ -33,6 +42,10 @@ namespace etcd std::unique_ptr watchServiceStub; std::unique_ptr stub_; std::unique_ptr call; + + private: + int fromIndex; + bool recursive; }; } diff --git a/proto/CMakeLists.txt b/proto/CMakeLists.txt index eff31fb..d46a2c6 100644 --- a/proto/CMakeLists.txt +++ b/proto/CMakeLists.txt @@ -2,30 +2,34 @@ file(GLOB_RECURSE PROTO_SRCS RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" "*.proto") # use `protobuf_generate` rather than `protobuf_generate_cpp` since we want to # output the generated files to source dir, rather than binary dir. -protobuf_generate( +protobuf_generate_latest( LANGUAGE cpp OUT_VAR PROTO_GENERATES PROTOC_OUT_DIR "${CMAKE_CURRENT_SOURCE_DIR}" PROTOS ${PROTO_SRCS} ) +compute_generated_srcs(PROTO_GENERATES_SRCS "${CMAKE_CURRENT_SOURCE_DIR}" false ${PROTO_SRCS}) +set(PROTO_GRPC_SRCS "${CMAKE_CURRENT_SOURCE_DIR}/rpc.proto" + "${CMAKE_CURRENT_SOURCE_DIR}/v3lock.proto") grpc_generate_cpp(PROTO_GRPC_GENERATES PROTO_GRPC_GENERATES_HDRS "${CMAKE_CURRENT_SOURCE_DIR}" - "${CMAKE_CURRENT_SOURCE_DIR}/rpc.proto" - "${CMAKE_CURRENT_SOURCE_DIR}/v3lock.proto" + ${PROTO_GRPC_SRCS} ) +compute_generated_srcs(PROTO_GRPC_GENERATES_SRCS "${CMAKE_CURRENT_SOURCE_DIR}" true ${PROTO_GRPC_SRCS}) # populate `PROTOBUF_GENERATES` in the parent scope. -set(PROTOBUF_GENERATES) +set(PROTOBUF_GENERATE_DEPS) foreach(cxx_file ${PROTO_GENERATES}) if(cxx_file MATCHES "cc$") - list(APPEND PROTOBUF_GENERATES ${cxx_file}) + list(APPEND PROTOBUF_GENERATE_DEPS ${cxx_file}) endif() endforeach() foreach(cxx_file ${PROTO_GRPC_GENERATES}) - list(APPEND PROTOBUF_GENERATES ${cxx_file}) + list(APPEND PROTOBUF_GENERATE_DEPS ${cxx_file}) endforeach() +set(PROTOBUF_GENERATES ${PROTO_GENERATES_SRCS} ${PROTO_GRPC_GENERATES_SRCS}) set(PROTOBUF_GENERATES ${PROTOBUF_GENERATES} PARENT_SCOPE) set_source_files_properties(${PROTOBUF_GENERATES} PROPERTIES GENERATED TRUE) -add_custom_target(protobuf_generates DEPENDS ${PROTOBUF_GENERATES}) +add_custom_target(protobuf_generates DEPENDS ${PROTOBUF_GENERATE_DEPS}) diff --git a/src/Client.cpp b/src/Client.cpp index c3cf93b..45bc35e 100644 --- a/src/Client.cpp +++ b/src/Client.cpp @@ -76,14 +76,14 @@ etcd::Client::Client(std::string const & address, std::string const & load_balan } grpc::ChannelArguments grpc_args; grpc_args.SetLoadBalancingPolicyName(load_balancer); - std::shared_ptr channel = grpc::CreateCustomChannel( + this->channel = grpc::CreateCustomChannel( "ipv4:///" + stripped_address, grpc::InsecureChannelCredentials(), grpc_args); - stub_= KV::NewStub(channel); - watchServiceStub= Watch::NewStub(channel); - leaseServiceStub= Lease::NewStub(channel); - lockServiceStub = Lock::NewStub(channel); + stub_= KV::NewStub(this->channel); + watchServiceStub= Watch::NewStub(this->channel); + leaseServiceStub= Lease::NewStub(this->channel); + lockServiceStub = Lock::NewStub(this->channel); } diff --git a/src/Watcher.cpp b/src/Watcher.cpp index da4db1a..434c462 100644 --- a/src/Watcher.cpp +++ b/src/Watcher.cpp @@ -1,8 +1,28 @@ #include "etcd/Watcher.hpp" #include "etcd/v3/AsyncWatchAction.hpp" -etcd::Watcher::Watcher(std::string const & address, std::string const & key, std::function callback) -{ +etcd::Watcher::Watcher(Client &client, std::string const & key, + std::function callback, bool recursive): + Watcher(client, key, -1, callback, recursive) { +} + + +etcd::Watcher::Watcher(Client &client, std::string const & key, int fromIndex, + std::function callback, bool recursive): + fromIndex(fromIndex), recursive(recursive) { + watchServiceStub= Watch::NewStub(client.channel); + + doWatch(key, callback); +} + +etcd::Watcher::Watcher(std::string const & address, std::string const & key, + std::function callback, bool recursive): + Watcher(address, key, -1, callback, recursive) { +} + +etcd::Watcher::Watcher(std::string const & address, std::string const & key, int fromIndex, + std::function callback, bool recursive): + fromIndex(fromIndex), recursive(recursive) { std::string stripped_address(address); std::string substr("http://"); std::string::size_type i = stripped_address.find(substr); @@ -10,13 +30,13 @@ etcd::Watcher::Watcher(std::string const & address, std::string const & key, std { stripped_address.erase(i,substr.length()); } - std::shared_ptr channel = grpc::CreateChannel(stripped_address, grpc::InsecureChannelCredentials()); - watchServiceStub= Watch::NewStub(channel); + std::shared_ptr channel = grpc::CreateChannel( + stripped_address, grpc::InsecureChannelCredentials()); + watchServiceStub= Watch::NewStub(channel); doWatch(key, callback); } - etcd::Watcher::~Watcher() { call->CancelWatch(); @@ -33,9 +53,12 @@ void etcd::Watcher::doWatch(std::string const & key, std::function= 0) { + params.revision = fromIndex; + } + params.withPrefix = recursive; params.watch_stub = watchServiceStub.get(); - params.revision = 0; + call.reset(new etcdv3::AsyncWatchAction(params)); currentTask = pplx::task([this, callback]() diff --git a/src/v3/AsyncWatchAction.cpp b/src/v3/AsyncWatchAction.cpp index 5eba8e7..d494a80 100644 --- a/src/v3/AsyncWatchAction.cpp +++ b/src/v3/AsyncWatchAction.cpp @@ -98,6 +98,13 @@ void etcdv3::AsyncWatchAction::waitForResponse(std::functionRead(&reply, (void*)this); + } else { + throw std::runtime_error("failed to write WatchCreateRequest to server"); + } + while(cq_.Next(&got_tag, &ok)) { if(ok == false) @@ -120,9 +127,10 @@ void etcdv3::AsyncWatchAction::waitForResponse(std::function( std::chrono::high_resolution_clock::now() - start_timepoint); callback(etcd::Response(resp, duration)); + start_timepoint = std::chrono::high_resolution_clock::now(); } stream->Read(&reply, (void*)this); - } + } } } diff --git a/tst/CMakeLists.txt b/tst/CMakeLists.txt index 13ed48d..600e30c 100644 --- a/tst/CMakeLists.txt +++ b/tst/CMakeLists.txt @@ -1,10 +1,12 @@ find_path(CATCH_INCLUDE_DIR NAMES catch.hpp PATHS ${PROJECT_SOURCE_DIR}) include_directories(${CATCH_INCLUDE_DIR}) -add_executable(etcd_test EtcdTest.cpp - EtcdSyncTest.cpp - WatcherTest.cpp - LockTest.cpp) +add_executable(etcd_test + EtcdTest.cpp + EtcdSyncTest.cpp + WatcherTest.cpp + LockTest.cpp +) set_property(TARGET etcd_test PROPERTY CXX_STANDARD 11) target_include_directories(etcd_test PRIVATE ${CMAKE_SOURCE_DIR}/proto) diff --git a/tst/WatcherTest.cpp b/tst/WatcherTest.cpp index 7cbcca2..1551573 100644 --- a/tst/WatcherTest.cpp +++ b/tst/WatcherTest.cpp @@ -26,7 +26,7 @@ TEST_CASE("create watcher with cancel") etcd.rmdir("/test", true); watcher_called = 0; - etcd::Watcher watcher(etcd_uri, "/test", printResponse); + etcd::Watcher watcher(etcd_uri, "/test", printResponse, true); sleep(1); etcd.set("/test/key", "42"); etcd.set("/test/key", "43"); @@ -52,7 +52,7 @@ TEST_CASE("create watcher") watcher_called = 0; { - etcd::Watcher watcher(etcd_uri, "/test", printResponse); + etcd::Watcher watcher(etcd_uri, "/test", printResponse, true); sleep(1); etcd.set("/test/key", "42"); etcd.set("/test/key", "43");