From d74c87e649671a26b738f8a47b7d12d658a0ba3f Mon Sep 17 00:00:00 2001 From: Tao He Date: Sat, 10 Oct 2020 20:11:40 +0800 Subject: [PATCH] Drop the Interceptor-based implementation to be compatible with ealier version of gRPC (pre-0.17.0). Signed-off-by: Tao He --- .github/workflows/build-test.yml | 2 +- CMakeLists.txt | 4 +- etcd/Client.hpp | 4 +- etcd/Watcher.hpp | 21 ++++-- etcd/v3/Action.hpp | 1 + src/Client.cpp | 113 ++++++++++++++----------------- src/Watcher.cpp | 39 ++++++----- src/v3/Action.cpp | 6 ++ 8 files changed, 98 insertions(+), 92 deletions(-) diff --git a/.github/workflows/build-test.yml b/.github/workflows/build-test.yml index 2eadc33..0f6fe84 100644 --- a/.github/workflows/build-test.yml +++ b/.github/workflows/build-test.yml @@ -76,7 +76,7 @@ jobs: run: | mkdir -p build cd build - cmake .. -DBUILD_TESTS=ON \ + cmake .. -DBUILD_ETCD_TESTS=ON \ -DCMAKE_C_COMPILER_LAUNCHER=ccache \ -DCMAKE_CXX_COMPILER_LAUNCHER=ccache make -j2 diff --git a/CMakeLists.txt b/CMakeLists.txt index 542a725..3dbcc7a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -6,7 +6,7 @@ set(CMAKE_EXPORT_COMPILE_COMMANDS ON) set(etcd-cpp-api_VERSION_MAJOR 0) set(etcd-cpp-api_VERSION_MINOR 1) -option(BUILD_TESTS "Build test cases" OFF) +option(BUILD_ETCD_TESTS "Build test cases" OFF) find_library(CPPREST_LIB NAMES cpprest) find_path(CPPREST_INCLUDE_DIR NAMES cpprest/http_client.h) @@ -51,7 +51,7 @@ set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${PROJECT_BINARY_DIR}/bin) add_subdirectory(src) -if (BUILD_TESTS) +if (BUILD_ETCD_TESTS) enable_testing() add_subdirectory(tst) endif () diff --git a/etcd/Client.hpp b/etcd/Client.hpp index 6b108cd..1245822 100644 --- a/etcd/Client.hpp +++ b/etcd/Client.hpp @@ -234,8 +234,8 @@ namespace etcd private: std::shared_ptr channel; - std::shared_ptr auth_creds; - std::unique_ptr stub_; + std::string auth_token; + std::unique_ptr kvServiceStub; std::unique_ptr watchServiceStub; std::unique_ptr leaseServiceStub; std::unique_ptr lockServiceStub; diff --git a/etcd/Watcher.hpp b/etcd/Watcher.hpp index 501ad7e..8941b83 100644 --- a/etcd/Watcher.hpp +++ b/etcd/Watcher.hpp @@ -22,13 +22,21 @@ namespace etcd class Watcher { public: - Watcher(Client &client, std::string const & key, + Watcher(Client const &client, std::string const & key, std::function callback, bool recursive=false); - Watcher(Client &client, std::string const & key, int fromIndex, + Watcher(Client const &client, std::string const & key, int fromIndex, std::function callback, bool recursive=false); - Watcher(std::string const & etcd_url, std::string const & key, + Watcher(std::string const & address, std::string const & key, std::function callback, bool recursive=false); - Watcher(std::string const & etcd_url, std::string const & key, int fromIndex, + Watcher(std::string const & address, std::string const & key, int fromIndex, + std::function callback, bool recursive=false); + Watcher(std::string const & address, + std::string const & username, std::string const & password, + std::string const & key, + std::function callback, bool recursive=false); + Watcher(std::string const & address, + std::string const & username, std::string const & password, + std::string const & key, int fromIndex, std::function callback, bool recursive=false); /** @@ -54,13 +62,14 @@ namespace etcd ~Watcher(); protected: - void doWatch(std::string const & key, std::function callback); + void doWatch(std::string const & key, + std::string const & auth_token, + std::function callback); int index; std::function callback; pplx::task currentTask; std::unique_ptr watchServiceStub; - std::unique_ptr stub_; std::unique_ptr call; private: diff --git a/etcd/v3/Action.hpp b/etcd/v3/Action.hpp index 30dc89b..3c8f8f2 100644 --- a/etcd/v3/Action.hpp +++ b/etcd/v3/Action.hpp @@ -36,6 +36,7 @@ namespace etcdv3 std::string key; std::string value; std::string old_value; + std::string auth_token; KV::Stub* kv_stub; Watch::Stub* watch_stub; Lease::Stub* lease_stub; diff --git a/src/Client.cpp b/src/Client.cpp index 3690c64..50ea923 100644 --- a/src/Client.cpp +++ b/src/Client.cpp @@ -79,41 +79,6 @@ const std::string strip_and_resolve_addresses(std::string const &address) { return "ipv4:///" + stripped_address; } -class AuthInterceptor: public grpc::experimental::Interceptor { - public: - AuthInterceptor(grpc::experimental::ClientRpcInfo *, - std::string const &token): token_(token) {} - - void Intercept(grpc::experimental::InterceptorBatchMethods* methods) override { - if (methods->QueryInterceptionHookPoint( - grpc::experimental::InterceptionHookPoints::PRE_SEND_INITIAL_METADATA)) { - auto metadata = methods->GetSendInitialMetadata(); - // use `authorization` as the key also works, see: - // - // etcd/etcdserver/api/v3rpc/rpctypes/metadatafields.go - metadata->insert(std::make_pair("token", token_)); - } - methods->Proceed(); // NB: important! - } - - private: - grpc::string token_; -}; - -class AuthInterceptorFactory: - public grpc::experimental::ClientInterceptorFactoryInterface { - public: - AuthInterceptorFactory(std::string const &token): token_(token) {} - - grpc::experimental::Interceptor* CreateClientInterceptor( - grpc::experimental::ClientRpcInfo* info) override { - return new AuthInterceptor(info, token_); - } - - private: - grpc::string token_; -}; - const bool authenticate(std::shared_ptr const &channel, std::string const &username, std::string const &password, @@ -149,7 +114,7 @@ etcd::Client::Client(std::string const & address, this->channel = grpc::CreateCustomChannel(addresses, creds, grpc_args); // create stubs - stub_= KV::NewStub(this->channel); + kvServiceStub = KV::NewStub(this->channel); watchServiceStub= Watch::NewStub(this->channel); leaseServiceStub= Lease::NewStub(this->channel); lockServiceStub = Lock::NewStub(this->channel); @@ -172,15 +137,16 @@ etcd::Client::Client(std::string const & address, if (!etcd::detail::authenticate(this->channel, username, password, token_or_message)) { throw std::invalid_argument("Etcd authentication failed: " + token_or_message); } - using interceptor_factory_t = grpc::experimental::ClientInterceptorFactoryInterface; - using interceptor_factory_ptr_t = std::unique_ptr; - std::vector interceptor_creators; - interceptor_creators.emplace_back(new etcd::detail::AuthInterceptorFactory(token_or_message)); + this->auth_token = token_or_message; + // using interceptor_factory_t = grpc::experimental::ClientInterceptorFactoryInterface; + // using interceptor_factory_ptr_t = std::unique_ptr; + // std::vector interceptor_creators; + // interceptor_creators.emplace_back(new etcd::detail::AuthInterceptorFactory(token_or_message)); // reset the channel with the authentication interceptor. - this->channel = grpc::experimental::CreateCustomChannelWithInterceptors( - addresses, creds, grpc_args, std::move(interceptor_creators)); - stub_= KV::NewStub(this->channel); + // this->channel = grpc::experimental::CreateCustomChannelWithInterceptors( + // addresses, creds, grpc_args, std::move(interceptor_creators)); + kvServiceStub = KV::NewStub(this->channel); watchServiceStub= Watch::NewStub(this->channel); leaseServiceStub= Lease::NewStub(this->channel); lockServiceStub = Lock::NewStub(this->channel); @@ -189,9 +155,10 @@ etcd::Client::Client(std::string const & address, pplx::task etcd::Client::get(std::string const & key) { etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); params.key.assign(key); params.withPrefix = false; - params.kv_stub = stub_.get(); + params.kv_stub = kvServiceStub .get(); std::shared_ptr call(new etcdv3::AsyncGetAction(params)); return Response::create(call); } @@ -199,9 +166,10 @@ pplx::task etcd::Client::get(std::string const & key) pplx::task etcd::Client::set(std::string const & key, std::string const & value, int ttl) { etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); params.key.assign(key); params.value.assign(value); - params.kv_stub = stub_.get(); + params.kv_stub = kvServiceStub .get(); if(ttl > 0) { @@ -226,10 +194,11 @@ pplx::task etcd::Client::set(std::string const & key, std::strin pplx::task etcd::Client::set(std::string const & key, std::string const & value, int64_t leaseid) { etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); params.key.assign(key); params.value.assign(value); params.lease_id = leaseid; - params.kv_stub = stub_.get(); + params.kv_stub = kvServiceStub .get(); std::shared_ptr call(new etcdv3::AsyncSetAction(params)); return Response::create(call); } @@ -238,9 +207,10 @@ pplx::task etcd::Client::set(std::string const & key, std::strin pplx::task etcd::Client::add(std::string const & key, std::string const & value, int ttl) { etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); params.key.assign(key); params.value.assign(value); - params.kv_stub = stub_.get(); + params.kv_stub = kvServiceStub .get(); if(ttl > 0) { @@ -264,10 +234,11 @@ pplx::task etcd::Client::add(std::string const & key, std::strin pplx::task etcd::Client::add(std::string const & key, std::string const & value, int64_t leaseid) { etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); params.key.assign(key); params.value.assign(value); params.lease_id = leaseid; - params.kv_stub = stub_.get(); + params.kv_stub = kvServiceStub .get(); std::shared_ptr call(new etcdv3::AsyncSetAction(params,true)); return Response::create(call); } @@ -276,9 +247,10 @@ pplx::task etcd::Client::add(std::string const & key, std::strin pplx::task etcd::Client::modify(std::string const & key, std::string const & value, int ttl) { etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); params.key.assign(key); params.value.assign(value); - params.kv_stub = stub_.get(); + params.kv_stub = kvServiceStub .get(); if(ttl > 0) { @@ -302,10 +274,11 @@ pplx::task etcd::Client::modify(std::string const & key, std::st pplx::task etcd::Client::modify(std::string const & key, std::string const & value, int64_t leaseid) { etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); params.key.assign(key); params.value.assign(value); params.lease_id = leaseid; - params.kv_stub = stub_.get(); + params.kv_stub = kvServiceStub .get(); std::shared_ptr call(new etcdv3::AsyncUpdateAction(params)); return Response::create(call); } @@ -314,10 +287,11 @@ pplx::task etcd::Client::modify(std::string const & key, std::st pplx::task etcd::Client::modify_if(std::string const & key, std::string const & value, std::string const & old_value, int ttl) { etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); params.key.assign(key); params.value.assign(value); params.old_value.assign(old_value); - params.kv_stub = stub_.get(); + params.kv_stub = kvServiceStub .get(); if(ttl > 0) { @@ -341,24 +315,24 @@ pplx::task etcd::Client::modify_if(std::string const & key, std: pplx::task etcd::Client::modify_if(std::string const & key, std::string const & value, std::string const & old_value, int64_t leaseid) { etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); params.key.assign(key); params.value.assign(value); params.old_value.assign(old_value); params.lease_id = leaseid; - params.kv_stub = stub_.get(); + params.kv_stub = kvServiceStub .get(); std::shared_ptr call(new etcdv3::AsyncCompareAndSwapAction(params,etcdv3::Atomicity_Type::PREV_VALUE)); return Response::create(call); } - - pplx::task etcd::Client::modify_if(std::string const & key, std::string const & value, int old_index, int ttl) { etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); params.key.assign(key); params.value.assign(value); params.old_revision = old_index; - params.kv_stub = stub_.get(); + params.kv_stub = kvServiceStub .get(); if(ttl > 0) { auto res = leasegrant(ttl).get(); @@ -381,11 +355,12 @@ pplx::task etcd::Client::modify_if(std::string const & key, std: pplx::task etcd::Client::modify_if(std::string const & key, std::string const & value, int old_index, int64_t leaseid) { etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); params.key.assign(key); params.value.assign(value); params.lease_id = leaseid; params.old_revision = old_index; - params.kv_stub = stub_.get(); + params.kv_stub = kvServiceStub .get(); std::shared_ptr call(new etcdv3::AsyncCompareAndSwapAction(params,etcdv3::Atomicity_Type::PREV_INDEX)); return Response::create(call); } @@ -394,9 +369,10 @@ pplx::task etcd::Client::modify_if(std::string const & key, std: pplx::task etcd::Client::rm(std::string const & key) { etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); params.key.assign(key); params.withPrefix = false; - params.kv_stub = stub_.get(); + params.kv_stub = kvServiceStub .get(); std::shared_ptr call(new etcdv3::AsyncDeleteAction(params)); return Response::create(call); } @@ -405,9 +381,10 @@ pplx::task etcd::Client::rm(std::string const & key) pplx::task etcd::Client::rm_if(std::string const & key, std::string const & old_value) { etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); params.key.assign(key); params.old_value.assign(old_value); - params.kv_stub = stub_.get(); + params.kv_stub = kvServiceStub .get(); std::shared_ptr call(new etcdv3::AsyncCompareAndDeleteAction(params,etcdv3::Atomicity_Type::PREV_VALUE)); return Response::create(call); } @@ -415,9 +392,10 @@ pplx::task etcd::Client::rm_if(std::string const & key, std::str pplx::task etcd::Client::rm_if(std::string const & key, int old_index) { etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); params.key.assign(key); params.old_revision = old_index; - params.kv_stub = stub_.get(); + params.kv_stub = kvServiceStub .get(); std::shared_ptr call(new etcdv3::AsyncCompareAndDeleteAction(params, etcdv3::Atomicity_Type::PREV_INDEX));; return Response::create(call); @@ -426,9 +404,10 @@ pplx::task etcd::Client::rm_if(std::string const & key, int old_ pplx::task etcd::Client::rmdir(std::string const & key, bool recursive) { etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); params.key.assign(key); params.withPrefix = recursive; - params.kv_stub = stub_.get(); + params.kv_stub = kvServiceStub .get(); std::shared_ptr call(new etcdv3::AsyncDeleteAction(params)); return Response::create(call); } @@ -436,10 +415,11 @@ pplx::task etcd::Client::rmdir(std::string const & key, bool rec pplx::task etcd::Client::ls(std::string const & key) { etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); params.key.assign(key); params.withPrefix = true; params.limit = 0; // default no limit. - params.kv_stub = stub_.get(); + params.kv_stub = kvServiceStub .get(); std::shared_ptr call(new etcdv3::AsyncGetAction(params)); return Response::create(call); } @@ -447,10 +427,11 @@ pplx::task etcd::Client::ls(std::string const & key) pplx::task etcd::Client::ls(std::string const & key, size_t const limit) { etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); params.key.assign(key); params.withPrefix = true; params.limit = limit; - params.kv_stub = stub_.get(); + params.kv_stub = kvServiceStub .get(); std::shared_ptr call(new etcdv3::AsyncGetAction(params)); return Response::create(call); } @@ -458,6 +439,7 @@ pplx::task etcd::Client::ls(std::string const & key, size_t cons pplx::task etcd::Client::watch(std::string const & key, bool recursive) { etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); params.key.assign(key); params.withPrefix = recursive; params.watch_stub = watchServiceStub.get(); @@ -468,6 +450,7 @@ pplx::task etcd::Client::watch(std::string const & key, bool rec pplx::task etcd::Client::watch(std::string const & key, int fromIndex, bool recursive) { etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); params.key.assign(key); params.withPrefix = recursive; params.revision = fromIndex; @@ -479,6 +462,7 @@ pplx::task etcd::Client::watch(std::string const & key, int from pplx::task etcd::Client::leasegrant(int ttl) { etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); params.ttl = ttl; params.lease_stub = leaseServiceStub.get(); std::shared_ptr call(new etcdv3::AsyncLeaseGrantAction(params)); @@ -487,6 +471,7 @@ pplx::task etcd::Client::leasegrant(int ttl) pplx::task etcd::Client::lock(std::string const &key) { etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); params.key = key; params.lock_stub = lockServiceStub.get(); std::shared_ptr call(new etcdv3::AsyncLockAction(params)); @@ -495,6 +480,7 @@ pplx::task etcd::Client::lock(std::string const &key) { pplx::task etcd::Client::unlock(std::string const &key) { etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); params.key = key; params.lock_stub = lockServiceStub.get(); std::shared_ptr call(new etcdv3::AsyncUnlockAction(params)); @@ -503,7 +489,8 @@ pplx::task etcd::Client::unlock(std::string const &key) { pplx::task etcd::Client::txn(etcdv3::Transaction const &txn) { etcdv3::ActionParameters params; - params.kv_stub = stub_.get(); + params.auth_token.assign(this->auth_token); + params.kv_stub = kvServiceStub .get(); std::shared_ptr call(new etcdv3::AsyncTxnAction(params, txn)); return Response::create(call); } diff --git a/src/Watcher.cpp b/src/Watcher.cpp index 4f26def..977f930 100644 --- a/src/Watcher.cpp +++ b/src/Watcher.cpp @@ -1,18 +1,16 @@ #include "etcd/Watcher.hpp" #include "etcd/v3/AsyncWatchAction.hpp" -etcd::Watcher::Watcher(Client &client, std::string const & key, +etcd::Watcher::Watcher(Client const &client, std::string const & key, std::function callback, bool recursive): Watcher(client, key, -1, callback, recursive) { } - -etcd::Watcher::Watcher(Client &client, std::string const & key, int fromIndex, +etcd::Watcher::Watcher(Client const &client, std::string const & key, int fromIndex, std::function callback, bool recursive): fromIndex(fromIndex), recursive(recursive) { watchServiceStub= Watch::NewStub(client.channel); - - doWatch(key, callback); + doWatch(key, client.auth_token, callback); } etcd::Watcher::Watcher(std::string const & address, std::string const & key, @@ -22,19 +20,21 @@ etcd::Watcher::Watcher(std::string const & address, std::string const & key, etcd::Watcher::Watcher(std::string const & address, std::string const & key, int fromIndex, std::function callback, bool recursive): - fromIndex(fromIndex), recursive(recursive) { - std::string stripped_address(address); - std::string substr("http://"); - std::string::size_type i = stripped_address.find(substr); - if(i != std::string::npos) - { - stripped_address.erase(i,substr.length()); - } + Watcher(Client(address), key, fromIndex, callback, recursive) { +} - std::shared_ptr channel = grpc::CreateChannel( - stripped_address, grpc::InsecureChannelCredentials()); - watchServiceStub= Watch::NewStub(channel); - doWatch(key, callback); +etcd::Watcher::Watcher(std::string const & address, + std::string const & username, std::string const & password, + std::string const & key, + std::function callback, bool recursive): + Watcher(address, username, password, key, -1, callback, recursive) { +} + +etcd::Watcher::Watcher(std::string const & address, + std::string const & username, std::string const & password, + std::string const & key, int fromIndex, + std::function callback, bool recursive): + Watcher(Client(address, username, password), key, fromIndex, callback, recursive) { } etcd::Watcher::~Watcher() @@ -63,9 +63,12 @@ void etcd::Watcher::Cancel() this->Wait(); } -void etcd::Watcher::doWatch(std::string const & key, std::function callback) +void etcd::Watcher::doWatch(std::string const & key, + std::string const & auth_token, + std::function callback) { etcdv3::ActionParameters params; + params.auth_token.assign(auth_token); params.key.assign(key); if (fromIndex >= 0) { params.revision = fromIndex; diff --git a/src/v3/Action.cpp b/src/v3/Action.cpp index 0d9cc4d..0f4c81b 100644 --- a/src/v3/Action.cpp +++ b/src/v3/Action.cpp @@ -4,6 +4,12 @@ etcdv3::Action::Action(etcdv3::ActionParameters params) { parameters = params; + if (!parameters.auth_token.empty()) { + // use `authorization` as the key also works, see: + // + // etcd/etcdserver/api/v3rpc/rpctypes/metadatafields.go + context.AddMetadata("authorization", parameters.auth_token); + } start_timepoint = std::chrono::high_resolution_clock::now(); }