Implements etcd v3 authentication.
Signed-off-by: Tao He <linzhu.ht@alibaba-inc.com>
This commit is contained in:
parent
65271bb92d
commit
0fb4f2887d
|
|
@ -86,9 +86,20 @@ jobs:
|
|||
run: |
|
||||
cd build
|
||||
/usr/local/bin/etcd &
|
||||
CTEST_OUTPUT_ON_FAILURE=1 make test
|
||||
|
||||
# note: no need to clean up on CI env
|
||||
# tests without auth
|
||||
./bin/EtcdSyncTest
|
||||
./bin/EtcdTest
|
||||
./bin/LockTest
|
||||
./bin/WatcherTest
|
||||
|
||||
# tests with auth
|
||||
/usr/local/bin/etcdctl user add root --new-user-password="root" || true
|
||||
/usr/local/bin/etcdctl auth enable || true
|
||||
|
||||
./bin/AuthTest
|
||||
|
||||
/usr/local/bin/etcdctl auth disable --user="root" --password="root" || true
|
||||
|
||||
- name: Check ccache
|
||||
run: |
|
||||
|
|
|
|||
25
README.md
25
README.md
|
|
@ -100,6 +100,31 @@ this case since the respose has been already arrived (we are inside the callback
|
|||
|
||||
## etcd operations
|
||||
|
||||
### Multiple endpoints
|
||||
|
||||
Connecting to multiple endpoints is supported:
|
||||
|
||||
```c++
|
||||
// multiple endpoints are separated by comma
|
||||
etcd::Client etcd("http://a.com:2379,http://b.com:2379,http://c.com:2379");
|
||||
|
||||
// or, separated colon
|
||||
etcd::Client etcd("http://a.com:2379,http://b.com:2379,http://c.com:2379");
|
||||
```
|
||||
|
||||
Behind the screen, gRPC's load balancer is used and the round-robin strategy will
|
||||
be used by default.
|
||||
|
||||
### Etcd authentication
|
||||
|
||||
Etcd [v3's authentication](https://etcd.io/docs/v3.4.0/learning/design-auth-v3/) is currently
|
||||
supported. The `Client::Client` could accept a `username` and `password` as arguments and handle
|
||||
the authentication properly.
|
||||
|
||||
```c++
|
||||
etcd::Client etcd("http://127.0.0.1:2379", "root", "root");
|
||||
```
|
||||
|
||||
### Reading a value
|
||||
|
||||
You can read a value with the ```get``` method of the clinent instance. The only parameter is the
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@
|
|||
#include "proto/rpc.grpc.pb.h"
|
||||
#include "proto/v3lock.grpc.pb.h"
|
||||
|
||||
using etcdserverpb::Auth;
|
||||
using etcdserverpb::KV;
|
||||
using etcdserverpb::Watch;
|
||||
using etcdserverpb::Lease;
|
||||
|
|
@ -31,11 +32,27 @@ namespace etcd
|
|||
public:
|
||||
/**
|
||||
* Constructs an etcd client object.
|
||||
*
|
||||
* @param etcd_url is the url of the etcd server to connect to, like "http://127.0.0.1:4001",
|
||||
* or multiple url, seperated by ',' or ';'.
|
||||
* @param load_balancer is the load balance strategy, can be one of round_robin/pick_first/grpclb/xds.
|
||||
*/
|
||||
Client(std::string const & etcd_url, std::string const & load_balancer = "round_robin");
|
||||
Client(std::string const & etcd_url,
|
||||
std::string const & load_balancer = "round_robin");
|
||||
|
||||
/**
|
||||
* Constructs an etcd client object.
|
||||
*
|
||||
* @param etcd_url is the url of the etcd server to connect to, like "http://127.0.0.1:4001",
|
||||
* or multiple url, seperated by ',' or ';'.
|
||||
* @param username username of etcd auth
|
||||
* @param password password of etcd auth
|
||||
* @param load_balancer is the load balance strategy, can be one of round_robin/pick_first/grpclb/xds.
|
||||
*/
|
||||
Client(std::string const & etcd_url,
|
||||
std::string const & username,
|
||||
std::string const & password,
|
||||
std::string const & load_balancer = "round_robin");
|
||||
|
||||
/**
|
||||
* Sends a get request to the etcd server
|
||||
|
|
@ -217,6 +234,7 @@ namespace etcd
|
|||
|
||||
private:
|
||||
std::shared_ptr<grpc::Channel> channel;
|
||||
std::shared_ptr<grpc::CallCredentials> auth_creds;
|
||||
std::unique_ptr<KV::Stub> stub_;
|
||||
std::unique_ptr<Watch::Stub> watchServiceStub;
|
||||
std::unique_ptr<Lease::Stub> leaseServiceStub;
|
||||
|
|
|
|||
|
|
@ -26,7 +26,8 @@ namespace etcd
|
|||
{
|
||||
public:
|
||||
|
||||
template<typename T>static pplx::task<etcd::Response> create(std::shared_ptr<T> call)
|
||||
template <typename T>
|
||||
static pplx::task<etcd::Response> create(std::shared_ptr<T> call)
|
||||
{
|
||||
return pplx::task<etcd::Response>([call]()
|
||||
{
|
||||
|
|
|
|||
|
|
@ -46,7 +46,6 @@ namespace etcdv3
|
|||
{
|
||||
public:
|
||||
Action(etcdv3::ActionParameters params);
|
||||
Action(){};
|
||||
void waitForResponse();
|
||||
const std::chrono::high_resolution_clock::time_point startTimepoint();
|
||||
protected:
|
||||
|
|
|
|||
113
src/Client.cpp
113
src/Client.cpp
|
|
@ -28,9 +28,12 @@
|
|||
#include "etcd/v3/AsyncTxnAction.hpp"
|
||||
|
||||
#include <boost/algorithm/string.hpp>
|
||||
#include <grpc++/security/credentials.h>
|
||||
|
||||
using grpc::Channel;
|
||||
|
||||
namespace etcd {
|
||||
namespace detail {
|
||||
|
||||
static bool dns_resolve(std::string const &target, std::vector<std::string> &endpoints) {
|
||||
struct addrinfo hints = {}, *addrs;
|
||||
|
|
@ -59,8 +62,7 @@ static bool dns_resolve(std::string const &target, std::vector<std::string> &end
|
|||
return true;
|
||||
}
|
||||
|
||||
etcd::Client::Client(std::string const & address, std::string const & load_balancer)
|
||||
{
|
||||
const std::string strip_and_resolve_addresses(std::string const &address) {
|
||||
std::vector<std::string> addresses;
|
||||
boost::algorithm::split(addresses, address, boost::algorithm::is_any_of(",;"));
|
||||
std::string stripped_address;
|
||||
|
|
@ -70,22 +72,119 @@ etcd::Client::Client(std::string const & address, std::string const & load_balan
|
|||
for (auto const &addr: addresses) {
|
||||
std::string::size_type idx = addr.find(substr);
|
||||
std::string target = idx == std::string::npos ? addr : addr.substr(idx + substr.length());
|
||||
dns_resolve(target, stripped_addresses);
|
||||
etcd::detail::dns_resolve(target, stripped_addresses);
|
||||
}
|
||||
stripped_address = boost::algorithm::join(stripped_addresses, ",");
|
||||
}
|
||||
return "ipv4:///" + stripped_address;
|
||||
}
|
||||
|
||||
class AuthInterceptor: public grpc::experimental::Interceptor {
|
||||
public:
|
||||
AuthInterceptor(grpc::experimental::ClientRpcInfo *,
|
||||
std::string const &token): token_(token) {}
|
||||
|
||||
void Intercept(grpc::experimental::InterceptorBatchMethods* methods) override {
|
||||
if (methods->QueryInterceptionHookPoint(
|
||||
grpc::experimental::InterceptionHookPoints::PRE_SEND_INITIAL_METADATA)) {
|
||||
auto metadata = methods->GetSendInitialMetadata();
|
||||
// use `authorization` as the key also works, see:
|
||||
//
|
||||
// etcd/etcdserver/api/v3rpc/rpctypes/metadatafields.go
|
||||
metadata->insert(std::make_pair("token", token_));
|
||||
}
|
||||
methods->Proceed(); // NB: important!
|
||||
}
|
||||
|
||||
private:
|
||||
grpc::string token_;
|
||||
};
|
||||
|
||||
class AuthInterceptorFactory:
|
||||
public grpc::experimental::ClientInterceptorFactoryInterface {
|
||||
public:
|
||||
AuthInterceptorFactory(std::string const &token): token_(token) {}
|
||||
|
||||
grpc::experimental::Interceptor* CreateClientInterceptor(
|
||||
grpc::experimental::ClientRpcInfo* info) override {
|
||||
return new AuthInterceptor(info, token_);
|
||||
}
|
||||
|
||||
private:
|
||||
grpc::string token_;
|
||||
};
|
||||
|
||||
const bool authenticate(std::shared_ptr<grpc::Channel> const &channel,
|
||||
std::string const &username,
|
||||
std::string const &password,
|
||||
std::string &token_or_message) {
|
||||
// run a round of auth
|
||||
auto auth_stub = Auth::NewStub(channel);
|
||||
ClientContext context;
|
||||
etcdserverpb::AuthenticateRequest auth_request;
|
||||
etcdserverpb::AuthenticateResponse auth_response;
|
||||
auth_request.set_name(username);
|
||||
auth_request.set_password(password);
|
||||
auto status = auth_stub->Authenticate(&context, auth_request, &auth_response);
|
||||
if (status.ok()) {
|
||||
token_or_message = auth_response.token();
|
||||
return true;
|
||||
} else {
|
||||
token_or_message = status.error_message();
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
etcd::Client::Client(std::string const & address,
|
||||
std::string const & load_balancer)
|
||||
{
|
||||
// create channels
|
||||
std::string const addresses = etcd::detail::strip_and_resolve_addresses(address);
|
||||
grpc::ChannelArguments grpc_args;
|
||||
std::shared_ptr<grpc::ChannelCredentials> creds = grpc::InsecureChannelCredentials();
|
||||
grpc_args.SetLoadBalancingPolicyName(load_balancer);
|
||||
this->channel = grpc::CreateCustomChannel(
|
||||
"ipv4:///" + stripped_address,
|
||||
grpc::InsecureChannelCredentials(),
|
||||
grpc_args);
|
||||
this->channel = grpc::CreateCustomChannel(addresses, creds, grpc_args);
|
||||
|
||||
// create stubs
|
||||
stub_= KV::NewStub(this->channel);
|
||||
watchServiceStub= Watch::NewStub(this->channel);
|
||||
leaseServiceStub= Lease::NewStub(this->channel);
|
||||
lockServiceStub = Lock::NewStub(this->channel);
|
||||
}
|
||||
|
||||
etcd::Client::Client(std::string const & address,
|
||||
std::string const & username,
|
||||
std::string const & password,
|
||||
std::string const & load_balancer)
|
||||
{
|
||||
// create channels
|
||||
std::string const addresses = etcd::detail::strip_and_resolve_addresses(address);
|
||||
grpc::ChannelArguments grpc_args;
|
||||
std::shared_ptr<grpc::ChannelCredentials> creds = grpc::InsecureChannelCredentials();
|
||||
grpc_args.SetLoadBalancingPolicyName(load_balancer);
|
||||
this->channel = grpc::CreateCustomChannel(addresses, creds, grpc_args);
|
||||
|
||||
// auth
|
||||
std::string token_or_message;
|
||||
if (!etcd::detail::authenticate(this->channel, username, password, token_or_message)) {
|
||||
throw std::invalid_argument("Etcd authentication failed: " + token_or_message);
|
||||
}
|
||||
using interceptor_factory_t = grpc::experimental::ClientInterceptorFactoryInterface;
|
||||
using interceptor_factory_ptr_t = std::unique_ptr<interceptor_factory_t>;
|
||||
std::vector<interceptor_factory_ptr_t> interceptor_creators;
|
||||
interceptor_creators.emplace_back(new etcd::detail::AuthInterceptorFactory(token_or_message));
|
||||
|
||||
// reset the channel with the authentication interceptor.
|
||||
this->channel = grpc::experimental::CreateCustomChannelWithInterceptors(
|
||||
addresses, creds, grpc_args, std::move(interceptor_creators));
|
||||
stub_= KV::NewStub(this->channel);
|
||||
watchServiceStub= Watch::NewStub(this->channel);
|
||||
leaseServiceStub= Lease::NewStub(this->channel);
|
||||
lockServiceStub = Lock::NewStub(this->channel);
|
||||
}
|
||||
|
||||
pplx::task<etcd::Response> etcd::Client::get(std::string const & key)
|
||||
{
|
||||
|
|
|
|||
|
|
@ -0,0 +1,49 @@
|
|||
#define CATCH_CONFIG_MAIN
|
||||
#include <catch.hpp>
|
||||
|
||||
#include <iostream>
|
||||
|
||||
#include "etcd/Client.hpp"
|
||||
|
||||
|
||||
TEST_CASE("setup with auth")
|
||||
{
|
||||
etcd::Client etcd("http://127.0.0.1:2379", "root", "root");
|
||||
etcd.rmdir("/test", true).wait();
|
||||
}
|
||||
|
||||
TEST_CASE("add a new key after authenticate")
|
||||
{
|
||||
etcd::Client etcd("http://127.0.0.1:2379", "root", "root");
|
||||
etcd.rmdir("/test", true).wait();
|
||||
etcd::Response resp = etcd.add("/test/key1", "42").get();
|
||||
REQUIRE(0 == resp.error_code());
|
||||
CHECK("create" == resp.action());
|
||||
etcd::Value const & val = resp.value();
|
||||
CHECK("42" == val.as_string());
|
||||
CHECK("/test/key1" == val.key());
|
||||
CHECK(!val.is_dir());
|
||||
CHECK(0 < val.created_index());
|
||||
CHECK(0 < val.modified_index());
|
||||
CHECK(0 < resp.index());
|
||||
CHECK(105 == etcd.add("/test/key1", "43").get().error_code()); // Key already exists
|
||||
CHECK(105 == etcd.add("/test/key1", "42").get().error_code()); // Key already exists
|
||||
CHECK("Key already exists" == etcd.add("/test/key1", "42").get().error_message());
|
||||
}
|
||||
|
||||
TEST_CASE("read a value from etcd")
|
||||
{
|
||||
etcd::Client etcd("http://127.0.0.1:2379", "root", "root");
|
||||
etcd::Response resp = etcd.get("/test/key1").get();
|
||||
CHECK("get" == resp.action());
|
||||
REQUIRE(resp.is_ok());
|
||||
REQUIRE(0 == resp.error_code());
|
||||
CHECK("42" == resp.value().as_string());
|
||||
CHECK("" == etcd.get("/test").get().value().as_string()); // key points to a directory
|
||||
}
|
||||
|
||||
TEST_CASE("cleanup")
|
||||
{
|
||||
etcd::Client etcd("http://127.0.0.1:2379", "root", "root");
|
||||
REQUIRE(0 == etcd.rmdir("/test", true).get().error_code());
|
||||
}
|
||||
|
|
@ -25,7 +25,7 @@ TEST_CASE("add a new key")
|
|||
CHECK(!val.is_dir());
|
||||
CHECK(0 < val.created_index());
|
||||
CHECK(0 < val.modified_index());
|
||||
CHECK(0 < resp.index()); // X-Etcd-Index header value
|
||||
CHECK(0 < resp.index());
|
||||
CHECK(105 == etcd.add("/test/key1", "43").get().error_code()); // Key already exists
|
||||
CHECK(105 == etcd.add("/test/key1", "42").get().error_code()); // Key already exists
|
||||
CHECK("Key already exists" == etcd.add("/test/key1", "42").get().error_message());
|
||||
|
|
|
|||
Loading…
Reference in New Issue