Drop the Interceptor-based implementation to be compatible with ealier version of gRPC (pre-0.17.0).

Signed-off-by: Tao He <linzhu.ht@alibaba-inc.com>
This commit is contained in:
Tao He 2020-10-10 20:11:40 +08:00 committed by Tao He
parent eb284103e0
commit 3305a19d15
8 changed files with 98 additions and 92 deletions

View File

@ -76,7 +76,7 @@ jobs:
run: | run: |
mkdir -p build mkdir -p build
cd build cd build
cmake .. -DBUILD_TESTS=ON \ cmake .. -DBUILD_ETCD_TESTS=ON \
-DCMAKE_C_COMPILER_LAUNCHER=ccache \ -DCMAKE_C_COMPILER_LAUNCHER=ccache \
-DCMAKE_CXX_COMPILER_LAUNCHER=ccache -DCMAKE_CXX_COMPILER_LAUNCHER=ccache
make -j2 make -j2

View File

@ -6,7 +6,7 @@ set(CMAKE_EXPORT_COMPILE_COMMANDS ON)
set(etcd-cpp-api_VERSION_MAJOR 0) set(etcd-cpp-api_VERSION_MAJOR 0)
set(etcd-cpp-api_VERSION_MINOR 1) set(etcd-cpp-api_VERSION_MINOR 1)
option(BUILD_TESTS "Build test cases" OFF) option(BUILD_ETCD_TESTS "Build test cases" OFF)
find_library(CPPREST_LIB NAMES cpprest) find_library(CPPREST_LIB NAMES cpprest)
find_path(CPPREST_INCLUDE_DIR NAMES cpprest/http_client.h) find_path(CPPREST_INCLUDE_DIR NAMES cpprest/http_client.h)
@ -51,7 +51,7 @@ set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${PROJECT_BINARY_DIR}/bin)
add_subdirectory(src) add_subdirectory(src)
if (BUILD_TESTS) if (BUILD_ETCD_TESTS)
enable_testing() enable_testing()
add_subdirectory(tst) add_subdirectory(tst)
endif () endif ()

View File

@ -234,8 +234,8 @@ namespace etcd
private: private:
std::shared_ptr<grpc::Channel> channel; std::shared_ptr<grpc::Channel> channel;
std::shared_ptr<grpc::CallCredentials> auth_creds; std::string auth_token;
std::unique_ptr<KV::Stub> stub_; std::unique_ptr<KV::Stub> kvServiceStub;
std::unique_ptr<Watch::Stub> watchServiceStub; std::unique_ptr<Watch::Stub> watchServiceStub;
std::unique_ptr<Lease::Stub> leaseServiceStub; std::unique_ptr<Lease::Stub> leaseServiceStub;
std::unique_ptr<Lock::Stub> lockServiceStub; std::unique_ptr<Lock::Stub> lockServiceStub;

View File

@ -22,13 +22,21 @@ namespace etcd
class Watcher class Watcher
{ {
public: public:
Watcher(Client &client, std::string const & key, Watcher(Client const &client, std::string const & key,
std::function<void(Response)> callback, bool recursive=false); std::function<void(Response)> callback, bool recursive=false);
Watcher(Client &client, std::string const & key, int fromIndex, Watcher(Client const &client, std::string const & key, int fromIndex,
std::function<void(Response)> callback, bool recursive=false); std::function<void(Response)> callback, bool recursive=false);
Watcher(std::string const & etcd_url, std::string const & key, Watcher(std::string const & address, std::string const & key,
std::function<void(Response)> callback, bool recursive=false); std::function<void(Response)> callback, bool recursive=false);
Watcher(std::string const & etcd_url, std::string const & key, int fromIndex, Watcher(std::string const & address, std::string const & key, int fromIndex,
std::function<void(Response)> callback, bool recursive=false);
Watcher(std::string const & address,
std::string const & username, std::string const & password,
std::string const & key,
std::function<void(Response)> callback, bool recursive=false);
Watcher(std::string const & address,
std::string const & username, std::string const & password,
std::string const & key, int fromIndex,
std::function<void(Response)> callback, bool recursive=false); std::function<void(Response)> callback, bool recursive=false);
/** /**
@ -54,13 +62,14 @@ namespace etcd
~Watcher(); ~Watcher();
protected: protected:
void doWatch(std::string const & key, std::function<void(Response)> callback); void doWatch(std::string const & key,
std::string const & auth_token,
std::function<void(Response)> callback);
int index; int index;
std::function<void(Response)> callback; std::function<void(Response)> callback;
pplx::task<void> currentTask; pplx::task<void> currentTask;
std::unique_ptr<Watch::Stub> watchServiceStub; std::unique_ptr<Watch::Stub> watchServiceStub;
std::unique_ptr<KV::Stub> stub_;
std::unique_ptr<etcdv3::AsyncWatchAction> call; std::unique_ptr<etcdv3::AsyncWatchAction> call;
private: private:

View File

@ -36,6 +36,7 @@ namespace etcdv3
std::string key; std::string key;
std::string value; std::string value;
std::string old_value; std::string old_value;
std::string auth_token;
KV::Stub* kv_stub; KV::Stub* kv_stub;
Watch::Stub* watch_stub; Watch::Stub* watch_stub;
Lease::Stub* lease_stub; Lease::Stub* lease_stub;

View File

@ -79,41 +79,6 @@ const std::string strip_and_resolve_addresses(std::string const &address) {
return "ipv4:///" + stripped_address; return "ipv4:///" + stripped_address;
} }
class AuthInterceptor: public grpc::experimental::Interceptor {
public:
AuthInterceptor(grpc::experimental::ClientRpcInfo *,
std::string const &token): token_(token) {}
void Intercept(grpc::experimental::InterceptorBatchMethods* methods) override {
if (methods->QueryInterceptionHookPoint(
grpc::experimental::InterceptionHookPoints::PRE_SEND_INITIAL_METADATA)) {
auto metadata = methods->GetSendInitialMetadata();
// use `authorization` as the key also works, see:
//
// etcd/etcdserver/api/v3rpc/rpctypes/metadatafields.go
metadata->insert(std::make_pair("token", token_));
}
methods->Proceed(); // NB: important!
}
private:
grpc::string token_;
};
class AuthInterceptorFactory:
public grpc::experimental::ClientInterceptorFactoryInterface {
public:
AuthInterceptorFactory(std::string const &token): token_(token) {}
grpc::experimental::Interceptor* CreateClientInterceptor(
grpc::experimental::ClientRpcInfo* info) override {
return new AuthInterceptor(info, token_);
}
private:
grpc::string token_;
};
const bool authenticate(std::shared_ptr<grpc::Channel> const &channel, const bool authenticate(std::shared_ptr<grpc::Channel> const &channel,
std::string const &username, std::string const &username,
std::string const &password, std::string const &password,
@ -149,7 +114,7 @@ etcd::Client::Client(std::string const & address,
this->channel = grpc::CreateCustomChannel(addresses, creds, grpc_args); this->channel = grpc::CreateCustomChannel(addresses, creds, grpc_args);
// create stubs // create stubs
stub_= KV::NewStub(this->channel); kvServiceStub = KV::NewStub(this->channel);
watchServiceStub= Watch::NewStub(this->channel); watchServiceStub= Watch::NewStub(this->channel);
leaseServiceStub= Lease::NewStub(this->channel); leaseServiceStub= Lease::NewStub(this->channel);
lockServiceStub = Lock::NewStub(this->channel); lockServiceStub = Lock::NewStub(this->channel);
@ -172,15 +137,16 @@ etcd::Client::Client(std::string const & address,
if (!etcd::detail::authenticate(this->channel, username, password, token_or_message)) { if (!etcd::detail::authenticate(this->channel, username, password, token_or_message)) {
throw std::invalid_argument("Etcd authentication failed: " + token_or_message); throw std::invalid_argument("Etcd authentication failed: " + token_or_message);
} }
using interceptor_factory_t = grpc::experimental::ClientInterceptorFactoryInterface; this->auth_token = token_or_message;
using interceptor_factory_ptr_t = std::unique_ptr<interceptor_factory_t>; // using interceptor_factory_t = grpc::experimental::ClientInterceptorFactoryInterface;
std::vector<interceptor_factory_ptr_t> interceptor_creators; // using interceptor_factory_ptr_t = std::unique_ptr<interceptor_factory_t>;
interceptor_creators.emplace_back(new etcd::detail::AuthInterceptorFactory(token_or_message)); // std::vector<interceptor_factory_ptr_t> interceptor_creators;
// interceptor_creators.emplace_back(new etcd::detail::AuthInterceptorFactory(token_or_message));
// reset the channel with the authentication interceptor. // reset the channel with the authentication interceptor.
this->channel = grpc::experimental::CreateCustomChannelWithInterceptors( // this->channel = grpc::experimental::CreateCustomChannelWithInterceptors(
addresses, creds, grpc_args, std::move(interceptor_creators)); // addresses, creds, grpc_args, std::move(interceptor_creators));
stub_= KV::NewStub(this->channel); kvServiceStub = KV::NewStub(this->channel);
watchServiceStub= Watch::NewStub(this->channel); watchServiceStub= Watch::NewStub(this->channel);
leaseServiceStub= Lease::NewStub(this->channel); leaseServiceStub= Lease::NewStub(this->channel);
lockServiceStub = Lock::NewStub(this->channel); lockServiceStub = Lock::NewStub(this->channel);
@ -189,9 +155,10 @@ etcd::Client::Client(std::string const & address,
pplx::task<etcd::Response> etcd::Client::get(std::string const & key) pplx::task<etcd::Response> etcd::Client::get(std::string const & key)
{ {
etcdv3::ActionParameters params; etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.key.assign(key); params.key.assign(key);
params.withPrefix = false; params.withPrefix = false;
params.kv_stub = stub_.get(); params.kv_stub = kvServiceStub .get();
std::shared_ptr<etcdv3::AsyncGetAction> call(new etcdv3::AsyncGetAction(params)); std::shared_ptr<etcdv3::AsyncGetAction> call(new etcdv3::AsyncGetAction(params));
return Response::create(call); return Response::create(call);
} }
@ -199,9 +166,10 @@ pplx::task<etcd::Response> etcd::Client::get(std::string const & key)
pplx::task<etcd::Response> etcd::Client::set(std::string const & key, std::string const & value, int ttl) pplx::task<etcd::Response> etcd::Client::set(std::string const & key, std::string const & value, int ttl)
{ {
etcdv3::ActionParameters params; etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.key.assign(key); params.key.assign(key);
params.value.assign(value); params.value.assign(value);
params.kv_stub = stub_.get(); params.kv_stub = kvServiceStub .get();
if(ttl > 0) if(ttl > 0)
{ {
@ -226,10 +194,11 @@ pplx::task<etcd::Response> etcd::Client::set(std::string const & key, std::strin
pplx::task<etcd::Response> etcd::Client::set(std::string const & key, std::string const & value, int64_t leaseid) pplx::task<etcd::Response> etcd::Client::set(std::string const & key, std::string const & value, int64_t leaseid)
{ {
etcdv3::ActionParameters params; etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.key.assign(key); params.key.assign(key);
params.value.assign(value); params.value.assign(value);
params.lease_id = leaseid; params.lease_id = leaseid;
params.kv_stub = stub_.get(); params.kv_stub = kvServiceStub .get();
std::shared_ptr<etcdv3::AsyncSetAction> call(new etcdv3::AsyncSetAction(params)); std::shared_ptr<etcdv3::AsyncSetAction> call(new etcdv3::AsyncSetAction(params));
return Response::create(call); return Response::create(call);
} }
@ -238,9 +207,10 @@ pplx::task<etcd::Response> etcd::Client::set(std::string const & key, std::strin
pplx::task<etcd::Response> etcd::Client::add(std::string const & key, std::string const & value, int ttl) pplx::task<etcd::Response> etcd::Client::add(std::string const & key, std::string const & value, int ttl)
{ {
etcdv3::ActionParameters params; etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.key.assign(key); params.key.assign(key);
params.value.assign(value); params.value.assign(value);
params.kv_stub = stub_.get(); params.kv_stub = kvServiceStub .get();
if(ttl > 0) if(ttl > 0)
{ {
@ -264,10 +234,11 @@ pplx::task<etcd::Response> etcd::Client::add(std::string const & key, std::strin
pplx::task<etcd::Response> etcd::Client::add(std::string const & key, std::string const & value, int64_t leaseid) pplx::task<etcd::Response> etcd::Client::add(std::string const & key, std::string const & value, int64_t leaseid)
{ {
etcdv3::ActionParameters params; etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.key.assign(key); params.key.assign(key);
params.value.assign(value); params.value.assign(value);
params.lease_id = leaseid; params.lease_id = leaseid;
params.kv_stub = stub_.get(); params.kv_stub = kvServiceStub .get();
std::shared_ptr<etcdv3::AsyncSetAction> call(new etcdv3::AsyncSetAction(params,true)); std::shared_ptr<etcdv3::AsyncSetAction> call(new etcdv3::AsyncSetAction(params,true));
return Response::create(call); return Response::create(call);
} }
@ -276,9 +247,10 @@ pplx::task<etcd::Response> etcd::Client::add(std::string const & key, std::strin
pplx::task<etcd::Response> etcd::Client::modify(std::string const & key, std::string const & value, int ttl) pplx::task<etcd::Response> etcd::Client::modify(std::string const & key, std::string const & value, int ttl)
{ {
etcdv3::ActionParameters params; etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.key.assign(key); params.key.assign(key);
params.value.assign(value); params.value.assign(value);
params.kv_stub = stub_.get(); params.kv_stub = kvServiceStub .get();
if(ttl > 0) if(ttl > 0)
{ {
@ -302,10 +274,11 @@ pplx::task<etcd::Response> etcd::Client::modify(std::string const & key, std::st
pplx::task<etcd::Response> etcd::Client::modify(std::string const & key, std::string const & value, int64_t leaseid) pplx::task<etcd::Response> etcd::Client::modify(std::string const & key, std::string const & value, int64_t leaseid)
{ {
etcdv3::ActionParameters params; etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.key.assign(key); params.key.assign(key);
params.value.assign(value); params.value.assign(value);
params.lease_id = leaseid; params.lease_id = leaseid;
params.kv_stub = stub_.get(); params.kv_stub = kvServiceStub .get();
std::shared_ptr<etcdv3::AsyncUpdateAction> call(new etcdv3::AsyncUpdateAction(params)); std::shared_ptr<etcdv3::AsyncUpdateAction> call(new etcdv3::AsyncUpdateAction(params));
return Response::create(call); return Response::create(call);
} }
@ -314,10 +287,11 @@ pplx::task<etcd::Response> etcd::Client::modify(std::string const & key, std::st
pplx::task<etcd::Response> etcd::Client::modify_if(std::string const & key, std::string const & value, std::string const & old_value, int ttl) pplx::task<etcd::Response> etcd::Client::modify_if(std::string const & key, std::string const & value, std::string const & old_value, int ttl)
{ {
etcdv3::ActionParameters params; etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.key.assign(key); params.key.assign(key);
params.value.assign(value); params.value.assign(value);
params.old_value.assign(old_value); params.old_value.assign(old_value);
params.kv_stub = stub_.get(); params.kv_stub = kvServiceStub .get();
if(ttl > 0) if(ttl > 0)
{ {
@ -341,24 +315,24 @@ pplx::task<etcd::Response> etcd::Client::modify_if(std::string const & key, std:
pplx::task<etcd::Response> etcd::Client::modify_if(std::string const & key, std::string const & value, std::string const & old_value, int64_t leaseid) pplx::task<etcd::Response> etcd::Client::modify_if(std::string const & key, std::string const & value, std::string const & old_value, int64_t leaseid)
{ {
etcdv3::ActionParameters params; etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.key.assign(key); params.key.assign(key);
params.value.assign(value); params.value.assign(value);
params.old_value.assign(old_value); params.old_value.assign(old_value);
params.lease_id = leaseid; params.lease_id = leaseid;
params.kv_stub = stub_.get(); params.kv_stub = kvServiceStub .get();
std::shared_ptr<etcdv3::AsyncCompareAndSwapAction> call(new etcdv3::AsyncCompareAndSwapAction(params,etcdv3::Atomicity_Type::PREV_VALUE)); std::shared_ptr<etcdv3::AsyncCompareAndSwapAction> call(new etcdv3::AsyncCompareAndSwapAction(params,etcdv3::Atomicity_Type::PREV_VALUE));
return Response::create(call); return Response::create(call);
} }
pplx::task<etcd::Response> etcd::Client::modify_if(std::string const & key, std::string const & value, int old_index, int ttl) pplx::task<etcd::Response> etcd::Client::modify_if(std::string const & key, std::string const & value, int old_index, int ttl)
{ {
etcdv3::ActionParameters params; etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.key.assign(key); params.key.assign(key);
params.value.assign(value); params.value.assign(value);
params.old_revision = old_index; params.old_revision = old_index;
params.kv_stub = stub_.get(); params.kv_stub = kvServiceStub .get();
if(ttl > 0) if(ttl > 0)
{ {
auto res = leasegrant(ttl).get(); auto res = leasegrant(ttl).get();
@ -381,11 +355,12 @@ pplx::task<etcd::Response> etcd::Client::modify_if(std::string const & key, std:
pplx::task<etcd::Response> etcd::Client::modify_if(std::string const & key, std::string const & value, int old_index, int64_t leaseid) pplx::task<etcd::Response> etcd::Client::modify_if(std::string const & key, std::string const & value, int old_index, int64_t leaseid)
{ {
etcdv3::ActionParameters params; etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.key.assign(key); params.key.assign(key);
params.value.assign(value); params.value.assign(value);
params.lease_id = leaseid; params.lease_id = leaseid;
params.old_revision = old_index; params.old_revision = old_index;
params.kv_stub = stub_.get(); params.kv_stub = kvServiceStub .get();
std::shared_ptr<etcdv3::AsyncCompareAndSwapAction> call(new etcdv3::AsyncCompareAndSwapAction(params,etcdv3::Atomicity_Type::PREV_INDEX)); std::shared_ptr<etcdv3::AsyncCompareAndSwapAction> call(new etcdv3::AsyncCompareAndSwapAction(params,etcdv3::Atomicity_Type::PREV_INDEX));
return Response::create(call); return Response::create(call);
} }
@ -394,9 +369,10 @@ pplx::task<etcd::Response> etcd::Client::modify_if(std::string const & key, std:
pplx::task<etcd::Response> etcd::Client::rm(std::string const & key) pplx::task<etcd::Response> etcd::Client::rm(std::string const & key)
{ {
etcdv3::ActionParameters params; etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.key.assign(key); params.key.assign(key);
params.withPrefix = false; params.withPrefix = false;
params.kv_stub = stub_.get(); params.kv_stub = kvServiceStub .get();
std::shared_ptr<etcdv3::AsyncDeleteAction> call(new etcdv3::AsyncDeleteAction(params)); std::shared_ptr<etcdv3::AsyncDeleteAction> call(new etcdv3::AsyncDeleteAction(params));
return Response::create(call); return Response::create(call);
} }
@ -405,9 +381,10 @@ pplx::task<etcd::Response> etcd::Client::rm(std::string const & key)
pplx::task<etcd::Response> etcd::Client::rm_if(std::string const & key, std::string const & old_value) pplx::task<etcd::Response> etcd::Client::rm_if(std::string const & key, std::string const & old_value)
{ {
etcdv3::ActionParameters params; etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.key.assign(key); params.key.assign(key);
params.old_value.assign(old_value); params.old_value.assign(old_value);
params.kv_stub = stub_.get(); params.kv_stub = kvServiceStub .get();
std::shared_ptr<etcdv3::AsyncCompareAndDeleteAction> call(new etcdv3::AsyncCompareAndDeleteAction(params,etcdv3::Atomicity_Type::PREV_VALUE)); std::shared_ptr<etcdv3::AsyncCompareAndDeleteAction> call(new etcdv3::AsyncCompareAndDeleteAction(params,etcdv3::Atomicity_Type::PREV_VALUE));
return Response::create(call); return Response::create(call);
} }
@ -415,9 +392,10 @@ pplx::task<etcd::Response> etcd::Client::rm_if(std::string const & key, std::str
pplx::task<etcd::Response> etcd::Client::rm_if(std::string const & key, int old_index) pplx::task<etcd::Response> etcd::Client::rm_if(std::string const & key, int old_index)
{ {
etcdv3::ActionParameters params; etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.key.assign(key); params.key.assign(key);
params.old_revision = old_index; params.old_revision = old_index;
params.kv_stub = stub_.get(); params.kv_stub = kvServiceStub .get();
std::shared_ptr<etcdv3::AsyncCompareAndDeleteAction> call(new etcdv3::AsyncCompareAndDeleteAction(params, etcdv3::Atomicity_Type::PREV_INDEX));; std::shared_ptr<etcdv3::AsyncCompareAndDeleteAction> call(new etcdv3::AsyncCompareAndDeleteAction(params, etcdv3::Atomicity_Type::PREV_INDEX));;
return Response::create(call); return Response::create(call);
@ -426,9 +404,10 @@ pplx::task<etcd::Response> etcd::Client::rm_if(std::string const & key, int old_
pplx::task<etcd::Response> etcd::Client::rmdir(std::string const & key, bool recursive) pplx::task<etcd::Response> etcd::Client::rmdir(std::string const & key, bool recursive)
{ {
etcdv3::ActionParameters params; etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.key.assign(key); params.key.assign(key);
params.withPrefix = recursive; params.withPrefix = recursive;
params.kv_stub = stub_.get(); params.kv_stub = kvServiceStub .get();
std::shared_ptr<etcdv3::AsyncDeleteAction> call(new etcdv3::AsyncDeleteAction(params)); std::shared_ptr<etcdv3::AsyncDeleteAction> call(new etcdv3::AsyncDeleteAction(params));
return Response::create(call); return Response::create(call);
} }
@ -436,10 +415,11 @@ pplx::task<etcd::Response> etcd::Client::rmdir(std::string const & key, bool rec
pplx::task<etcd::Response> etcd::Client::ls(std::string const & key) pplx::task<etcd::Response> etcd::Client::ls(std::string const & key)
{ {
etcdv3::ActionParameters params; etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.key.assign(key); params.key.assign(key);
params.withPrefix = true; params.withPrefix = true;
params.limit = 0; // default no limit. params.limit = 0; // default no limit.
params.kv_stub = stub_.get(); params.kv_stub = kvServiceStub .get();
std::shared_ptr<etcdv3::AsyncGetAction> call(new etcdv3::AsyncGetAction(params)); std::shared_ptr<etcdv3::AsyncGetAction> call(new etcdv3::AsyncGetAction(params));
return Response::create(call); return Response::create(call);
} }
@ -447,10 +427,11 @@ pplx::task<etcd::Response> etcd::Client::ls(std::string const & key)
pplx::task<etcd::Response> etcd::Client::ls(std::string const & key, size_t const limit) pplx::task<etcd::Response> etcd::Client::ls(std::string const & key, size_t const limit)
{ {
etcdv3::ActionParameters params; etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.key.assign(key); params.key.assign(key);
params.withPrefix = true; params.withPrefix = true;
params.limit = limit; params.limit = limit;
params.kv_stub = stub_.get(); params.kv_stub = kvServiceStub .get();
std::shared_ptr<etcdv3::AsyncGetAction> call(new etcdv3::AsyncGetAction(params)); std::shared_ptr<etcdv3::AsyncGetAction> call(new etcdv3::AsyncGetAction(params));
return Response::create(call); return Response::create(call);
} }
@ -458,6 +439,7 @@ pplx::task<etcd::Response> etcd::Client::ls(std::string const & key, size_t cons
pplx::task<etcd::Response> etcd::Client::watch(std::string const & key, bool recursive) pplx::task<etcd::Response> etcd::Client::watch(std::string const & key, bool recursive)
{ {
etcdv3::ActionParameters params; etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.key.assign(key); params.key.assign(key);
params.withPrefix = recursive; params.withPrefix = recursive;
params.watch_stub = watchServiceStub.get(); params.watch_stub = watchServiceStub.get();
@ -468,6 +450,7 @@ pplx::task<etcd::Response> etcd::Client::watch(std::string const & key, bool rec
pplx::task<etcd::Response> etcd::Client::watch(std::string const & key, int fromIndex, bool recursive) pplx::task<etcd::Response> etcd::Client::watch(std::string const & key, int fromIndex, bool recursive)
{ {
etcdv3::ActionParameters params; etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.key.assign(key); params.key.assign(key);
params.withPrefix = recursive; params.withPrefix = recursive;
params.revision = fromIndex; params.revision = fromIndex;
@ -479,6 +462,7 @@ pplx::task<etcd::Response> etcd::Client::watch(std::string const & key, int from
pplx::task<etcd::Response> etcd::Client::leasegrant(int ttl) pplx::task<etcd::Response> etcd::Client::leasegrant(int ttl)
{ {
etcdv3::ActionParameters params; etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.ttl = ttl; params.ttl = ttl;
params.lease_stub = leaseServiceStub.get(); params.lease_stub = leaseServiceStub.get();
std::shared_ptr<etcdv3::AsyncLeaseGrantAction> call(new etcdv3::AsyncLeaseGrantAction(params)); std::shared_ptr<etcdv3::AsyncLeaseGrantAction> call(new etcdv3::AsyncLeaseGrantAction(params));
@ -487,6 +471,7 @@ pplx::task<etcd::Response> etcd::Client::leasegrant(int ttl)
pplx::task<etcd::Response> etcd::Client::lock(std::string const &key) { pplx::task<etcd::Response> etcd::Client::lock(std::string const &key) {
etcdv3::ActionParameters params; etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.key = key; params.key = key;
params.lock_stub = lockServiceStub.get(); params.lock_stub = lockServiceStub.get();
std::shared_ptr<etcdv3::AsyncLockAction> call(new etcdv3::AsyncLockAction(params)); std::shared_ptr<etcdv3::AsyncLockAction> call(new etcdv3::AsyncLockAction(params));
@ -495,6 +480,7 @@ pplx::task<etcd::Response> etcd::Client::lock(std::string const &key) {
pplx::task<etcd::Response> etcd::Client::unlock(std::string const &key) { pplx::task<etcd::Response> etcd::Client::unlock(std::string const &key) {
etcdv3::ActionParameters params; etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.key = key; params.key = key;
params.lock_stub = lockServiceStub.get(); params.lock_stub = lockServiceStub.get();
std::shared_ptr<etcdv3::AsyncUnlockAction> call(new etcdv3::AsyncUnlockAction(params)); std::shared_ptr<etcdv3::AsyncUnlockAction> call(new etcdv3::AsyncUnlockAction(params));
@ -503,7 +489,8 @@ pplx::task<etcd::Response> etcd::Client::unlock(std::string const &key) {
pplx::task<etcd::Response> etcd::Client::txn(etcdv3::Transaction const &txn) { pplx::task<etcd::Response> etcd::Client::txn(etcdv3::Transaction const &txn) {
etcdv3::ActionParameters params; etcdv3::ActionParameters params;
params.kv_stub = stub_.get(); params.auth_token.assign(this->auth_token);
params.kv_stub = kvServiceStub .get();
std::shared_ptr<etcdv3::AsyncTxnAction> call(new etcdv3::AsyncTxnAction(params, txn)); std::shared_ptr<etcdv3::AsyncTxnAction> call(new etcdv3::AsyncTxnAction(params, txn));
return Response::create(call); return Response::create(call);
} }

View File

@ -1,18 +1,16 @@
#include "etcd/Watcher.hpp" #include "etcd/Watcher.hpp"
#include "etcd/v3/AsyncWatchAction.hpp" #include "etcd/v3/AsyncWatchAction.hpp"
etcd::Watcher::Watcher(Client &client, std::string const & key, etcd::Watcher::Watcher(Client const &client, std::string const & key,
std::function<void(Response)> callback, bool recursive): std::function<void(Response)> callback, bool recursive):
Watcher(client, key, -1, callback, recursive) { Watcher(client, key, -1, callback, recursive) {
} }
etcd::Watcher::Watcher(Client const &client, std::string const & key, int fromIndex,
etcd::Watcher::Watcher(Client &client, std::string const & key, int fromIndex,
std::function<void(Response)> callback, bool recursive): std::function<void(Response)> callback, bool recursive):
fromIndex(fromIndex), recursive(recursive) { fromIndex(fromIndex), recursive(recursive) {
watchServiceStub= Watch::NewStub(client.channel); watchServiceStub= Watch::NewStub(client.channel);
doWatch(key, client.auth_token, callback);
doWatch(key, callback);
} }
etcd::Watcher::Watcher(std::string const & address, std::string const & key, etcd::Watcher::Watcher(std::string const & address, std::string const & key,
@ -22,19 +20,21 @@ etcd::Watcher::Watcher(std::string const & address, std::string const & key,
etcd::Watcher::Watcher(std::string const & address, std::string const & key, int fromIndex, etcd::Watcher::Watcher(std::string const & address, std::string const & key, int fromIndex,
std::function<void(Response)> callback, bool recursive): std::function<void(Response)> callback, bool recursive):
fromIndex(fromIndex), recursive(recursive) { Watcher(Client(address), key, fromIndex, callback, recursive) {
std::string stripped_address(address);
std::string substr("http://");
std::string::size_type i = stripped_address.find(substr);
if(i != std::string::npos)
{
stripped_address.erase(i,substr.length());
} }
std::shared_ptr<Channel> channel = grpc::CreateChannel( etcd::Watcher::Watcher(std::string const & address,
stripped_address, grpc::InsecureChannelCredentials()); std::string const & username, std::string const & password,
watchServiceStub= Watch::NewStub(channel); std::string const & key,
doWatch(key, callback); std::function<void(Response)> callback, bool recursive):
Watcher(address, username, password, key, -1, callback, recursive) {
}
etcd::Watcher::Watcher(std::string const & address,
std::string const & username, std::string const & password,
std::string const & key, int fromIndex,
std::function<void(Response)> callback, bool recursive):
Watcher(Client(address, username, password), key, fromIndex, callback, recursive) {
} }
etcd::Watcher::~Watcher() etcd::Watcher::~Watcher()
@ -63,9 +63,12 @@ void etcd::Watcher::Cancel()
this->Wait(); this->Wait();
} }
void etcd::Watcher::doWatch(std::string const & key, std::function<void(Response)> callback) void etcd::Watcher::doWatch(std::string const & key,
std::string const & auth_token,
std::function<void(Response)> callback)
{ {
etcdv3::ActionParameters params; etcdv3::ActionParameters params;
params.auth_token.assign(auth_token);
params.key.assign(key); params.key.assign(key);
if (fromIndex >= 0) { if (fromIndex >= 0) {
params.revision = fromIndex; params.revision = fromIndex;

View File

@ -4,6 +4,12 @@
etcdv3::Action::Action(etcdv3::ActionParameters params) etcdv3::Action::Action(etcdv3::ActionParameters params)
{ {
parameters = params; parameters = params;
if (!parameters.auth_token.empty()) {
// use `authorization` as the key also works, see:
//
// etcd/etcdserver/api/v3rpc/rpctypes/metadatafields.go
context.AddMetadata("authorization", parameters.auth_token);
}
start_timepoint = std::chrono::high_resolution_clock::now(); start_timepoint = std::chrono::high_resolution_clock::now();
} }