Implements "v3election.proto" APIs.

Resolves #81.

Signed-off-by: Tao He <sighingnow@gmail.com>
This commit is contained in:
Tao He 2021-09-15 21:32:14 +08:00
parent 116b49b784
commit 87ca961f7e
25 changed files with 874 additions and 10 deletions

View File

@ -161,6 +161,7 @@ jobs:
./build/bin/EtcdTest ./build/bin/EtcdTest
./build/bin/LockTest ./build/bin/LockTest
./build/bin/WatcherTest ./build/bin/WatcherTest
./build/bin/ElectionTest
killall -TERM etcd killall -TERM etcd
sleep 5 sleep 5

View File

@ -603,6 +603,31 @@ is constructed.
Without handler, the internal state can be checked via `KeepAlive::Check()` and it will rethrow Without handler, the internal state can be checked via `KeepAlive::Check()` and it will rethrow
the async exception when there are errors during keeping the lease alive. the async exception when there are errors during keeping the lease alive.
### Election API
Etcd v3's [election APIs](https://github.com/etcd-io/etcd/blob/main/server/etcdserver/api/v3election/v3electionpb/v3election.proto)
are supported via the following interfaces,
```c++
pplx::task<Response> campaign(std::string const &name, int64_t lease_id,
std::string const &value);
pplx::task<Response> proclaim(std::string const &name, int64_t lease_id,
std::string const &key, , int revision,
std::string const &value);
pplx::task<Response> leader(std::string const &name);
std::unique_ptr<Observer> observe(std::string const &name,
std::function<void(Response)> callback,
const bool once = false);
pplx::task<Response> resign(std::string const &name, int64_t lease_id,
std::string const &key, int revision);
```
for more details, please refer to [etcd/Client.hpp](./etcd/Client.hpp).
### TODO ### TODO
1. Cancellation of asynchronous calls(except for watch) 1. Cancellation of asynchronous calls(except for watch)

View File

@ -2,6 +2,7 @@
#define __ETCD_CLIENT_HPP__ #define __ETCD_CLIENT_HPP__
#include <map> #include <map>
#include <memory>
#include <mutex> #include <mutex>
#include <string> #include <string>
@ -12,6 +13,7 @@
namespace etcdv3 { namespace etcdv3 {
class Transaction; class Transaction;
class AsyncObserveAction;
namespace detail { namespace detail {
std::string string_plus_one(std::string const &value); std::string string_plus_one(std::string const &value);
@ -398,6 +400,83 @@ namespace etcd
*/ */
pplx::task<Response> txn(etcdv3::Transaction const &txn); pplx::task<Response> txn(etcdv3::Transaction const &txn);
/**
* Campaign for the election @name@.
*
* @param name is the name of election that will campaign for.
* @param lease_id is a user-managed (usually with a `KeepAlive`) lease id.
* @param value is the value for campaign.
*
* @returns a leader key if succeed, consist of
*
* - name: the name of the election
* - key: a generated election key
* - created rev: the revision of the generated key
* - lease: the lease id of the election leader
*/
pplx::task<Response> campaign(std::string const &name, int64_t lease_id,
std::string const &value);
/**
* Updates the value of election with a new value, with leader key returns by
* @campaign@.
*
* @param name is the name of election
* @param lease_id is the user-provided lease id for the proclamation
* @param key is the generated associated key returned by @campaign@
* @param revision is the created revision of key-value returned by @campaign@
* @param value is the new value to set.
*/
pplx::task<Response> proclaim(std::string const &name, int64_t lease_id,
std::string const &key, int revision, std::string const &value);
/**
* Get the current leader proclamation.
*
* @param name is the names of election.
*
* @returns current election key and value.
*/
pplx::task<Response> leader(std::string const &name);
/**
* An observer that will cancel the associated election::observe request
* when being destruct.
*/
class Observer {
public:
~Observer();
private:
std::shared_ptr<etcdv3::AsyncObserveAction> action = nullptr;
pplx::task<etcd::Response> resp;
friend class Client;
};
/**
* Observe the leader change.
*
* @param name is the names of election to watch.
* @param callback is the names of election to watch.
*
* @returns an observer that holds that action and will cancel the request when being destructed.
*/
std::unique_ptr<Observer> observe(std::string const &name,
std::function<void(Response)> callback,
const bool once = false);
/**
* Updates the value of election with a new value, with leader key returns by
* @campaign@.
*
* @param name is the name of election
* @param lease_id is the user-provided lease id for the proclamation
* @param key is the generated associated key returned by @campaign@
* @param revision is the created revision of key-value returned by @campaign@
*/
pplx::task<Response> resign(std::string const &name, int64_t lease_id,
std::string const &key, int revision);
private: private:
#if defined(WITH_GRPC_CHANNEL_CLASS) #if defined(WITH_GRPC_CHANNEL_CLASS)
std::shared_ptr<grpc::Channel> channel; std::shared_ptr<grpc::Channel> channel;
@ -420,8 +499,6 @@ namespace etcd
friend class Watcher; friend class Watcher;
}; };
} }
#endif #endif

View File

@ -12,6 +12,7 @@
namespace etcdv3 { namespace etcdv3 {
class AsyncWatchAction; class AsyncWatchAction;
class AsyncLeaseKeepAliveAction; class AsyncLeaseKeepAliveAction;
class AsyncObserveAction;
class V3Response; class V3Response;
} }
@ -134,6 +135,11 @@ namespace etcd
*/ */
std::string const & lock_key() const; std::string const & lock_key() const;
/**
* Return the "name" in response.
*/
std::string const & name() const;
/** /**
* Returns the watched events. * Returns the watched events.
*/ */
@ -157,12 +163,16 @@ namespace etcd
Values _values; Values _values;
Keys _keys; Keys _keys;
std::string _lock_key; // for lock std::string _lock_key; // for lock
std::string _name; // for campaign (in v3election)
std::vector<Event> _events; // for watch std::vector<Event> _events; // for watch
std::chrono::microseconds _duration; // execute duration (in microseconds), during the action created and response parsed // execute duration (in microseconds), during the action created and response parsed
std::chrono::microseconds _duration;
friend class Client;
friend class SyncClient; friend class SyncClient;
friend class etcdv3::AsyncWatchAction; friend class etcdv3::AsyncWatchAction;
friend class etcdv3::AsyncLeaseKeepAliveAction; friend class etcdv3::AsyncLeaseKeepAliveAction;
friend class Client; friend class etcdv3::AsyncObserveAction;
}; };
} }

View File

@ -67,6 +67,17 @@ namespace etcd
Response leaserevoke(int64_t lease_id); Response leaserevoke(int64_t lease_id);
Response leasetimetolive(int64_t lease_id); Response leasetimetolive(int64_t lease_id);
Response campaign(std::string const &name, int64_t lease_id,
std::string const &value);
Response proclaim(std::string const &name, int64_t lease_id,
std::string const &key, int revision, std::string const &value);
Response leader(std::string const &name);
std::unique_ptr<Client::Observer> observe(std::string const &name,
std::function<void(Response)> callback,
const bool once = false);
Response resign(std::string const &name, int64_t lease_id,
std::string const &key, int revision);
/** /**
* Watches for changes of a key or a subtree. Please note that if you watch e.g. "/testdir" and * Watches for changes of a key or a subtree. Please note that if you watch e.g. "/testdir" and
* a new key is created, like "/testdir/newkey" then no change happened in the value of * a new key is created, like "/testdir/newkey" then no change happened in the value of

View File

@ -13,6 +13,10 @@ namespace mvccpb {
class Event; class Event;
} }
namespace electionpb {
class LeaderKey;
}
namespace etcd namespace etcd
{ {
class Value; class Value;

View File

@ -6,6 +6,7 @@
#include <grpc++/grpc++.h> #include <grpc++/grpc++.h>
#include "proto/rpc.grpc.pb.h" #include "proto/rpc.grpc.pb.h"
#include "proto/v3lock.grpc.pb.h" #include "proto/v3lock.grpc.pb.h"
#include "proto/v3election.grpc.pb.h"
using grpc::ClientContext; using grpc::ClientContext;
using grpc::CompletionQueue; using grpc::CompletionQueue;
@ -15,6 +16,7 @@ using etcdserverpb::KV;
using etcdserverpb::Watch; using etcdserverpb::Watch;
using etcdserverpb::Lease; using etcdserverpb::Lease;
using v3lockpb::Lock; using v3lockpb::Lock;
using v3electionpb::Election;
namespace etcdv3 namespace etcdv3
{ {
@ -33,6 +35,7 @@ namespace etcdv3
int64_t lease_id; int64_t lease_id;
int ttl; int ttl;
int limit; int limit;
std::string name; // for campaign (in v3election)
std::string key; std::string key;
std::string range_end; std::string range_end;
std::string value; std::string value;
@ -42,6 +45,7 @@ namespace etcdv3
Watch::Stub* watch_stub; Watch::Stub* watch_stub;
Lease::Stub* lease_stub; Lease::Stub* lease_stub;
Lock::Stub* lock_stub; Lock::Stub* lock_stub;
Election::Stub* election_stub;
}; };
class Action class Action

View File

@ -0,0 +1,82 @@
#ifndef __ASYNC_ELECTIONACTION_HPP__
#define __ASYNC_ELECTIONACTION_HPP__
#include <grpc++/grpc++.h>
#include "proto/rpc.grpc.pb.h"
#include "proto/v3election.grpc.pb.h"
#include "etcd/v3/Action.hpp"
#include "etcd/v3/AsyncElectionResponse.hpp"
#include "etcd/Response.hpp"
using grpc::ClientAsyncResponseReader;
using grpc::ClientAsyncReader;
using v3electionpb::CampaignRequest;
using v3electionpb::CampaignResponse;
using v3electionpb::ProclaimRequest;
using v3electionpb::ProclaimResponse;
using v3electionpb::LeaderRequest;
using v3electionpb::LeaderResponse;
using v3electionpb::ResignRequest;
using v3electionpb::ResignResponse;
namespace etcdv3
{
class AsyncCampaignAction : public etcdv3::Action
{
public:
AsyncCampaignAction(etcdv3::ActionParameters const &param);
AsyncCampaignResponse ParseResponse();
private:
CampaignResponse reply;
std::unique_ptr<ClientAsyncResponseReader<CampaignResponse>> response_reader;
};
class AsyncProclaimAction : public etcdv3::Action
{
public:
AsyncProclaimAction(etcdv3::ActionParameters const &param);
AsyncProclaimResponse ParseResponse();
private:
ProclaimResponse reply;
std::unique_ptr<ClientAsyncResponseReader<ProclaimResponse>> response_reader;
};
class AsyncLeaderAction : public etcdv3::Action
{
public:
AsyncLeaderAction(etcdv3::ActionParameters const &param);
AsyncLeaderResponse ParseResponse();
private:
LeaderResponse reply;
std::unique_ptr<ClientAsyncResponseReader<LeaderResponse>> response_reader;
};
class AsyncObserveAction : public etcdv3::Action
{
public:
AsyncObserveAction(etcdv3::ActionParameters const &param, const bool once=false);
AsyncObserveResponse ParseResponse();
void waitForResponse();
void waitForResponse(std::function<void(etcd::Response)> callback);
void CancelObserve();
bool Cancelled() const;
private:
bool once;
LeaderResponse reply;
std::unique_ptr<ClientAsyncReader<LeaderResponse>> response_reader;
std::atomic_bool isCancelled;
std::mutex protect_is_cancalled;
};
class AsyncResignAction : public etcdv3::Action
{
public:
AsyncResignAction(etcdv3::ActionParameters const &param);
AsyncResignResponse ParseResponse();
private:
ResignResponse reply;
std::unique_ptr<ClientAsyncResponseReader<ResignResponse>> response_reader;
};
}
#endif

View File

@ -0,0 +1,51 @@
#ifndef __ASYNC_ELECTIONRESPONSE_HPP__
#define __ASYNC_ELECTIONRESPONSE_HPP__
#include "proto/rpc.pb.h"
#include "proto/v3election.pb.h"
#include "etcd/v3/V3Response.hpp"
using v3electionpb::CampaignResponse;
using v3electionpb::ProclaimResponse;
using v3electionpb::LeaderResponse;
using v3electionpb::ResignResponse;
namespace etcdv3
{
class AsyncCampaignResponse : public etcdv3::V3Response
{
public:
AsyncCampaignResponse(){};
void ParseResponse(CampaignResponse& resp);
};
class AsyncProclaimResponse : public etcdv3::V3Response
{
public:
AsyncProclaimResponse(){};
void ParseResponse(ProclaimResponse& resp);
};
class AsyncLeaderResponse : public etcdv3::V3Response
{
public:
AsyncLeaderResponse(){};
void ParseResponse(LeaderResponse& resp);
};
class AsyncObserveResponse : public etcdv3::V3Response
{
public:
AsyncObserveResponse(){};
void ParseResponse(LeaderResponse& resp);
};
class AsyncResignResponse : public etcdv3::V3Response
{
public:
AsyncResignResponse(){};
void ParseResponse(ResignResponse& resp);
};
}
#endif

View File

@ -14,7 +14,6 @@ using v3lockpb::LockResponse;
using v3lockpb::UnlockRequest; using v3lockpb::UnlockRequest;
using v3lockpb::UnlockResponse; using v3lockpb::UnlockResponse;
namespace etcdv3 namespace etcdv3
{ {
class AsyncLockAction : public etcdv3::Action class AsyncLockAction : public etcdv3::Action

View File

@ -5,7 +5,6 @@
#include "proto/v3lock.grpc.pb.h" #include "proto/v3lock.grpc.pb.h"
#include "etcd/v3/V3Response.hpp" #include "etcd/v3/V3Response.hpp"
using grpc::ClientAsyncResponseReader; using grpc::ClientAsyncResponseReader;
using v3lockpb::LockRequest; using v3lockpb::LockRequest;
using v3lockpb::LockResponse; using v3lockpb::LockResponse;

View File

@ -24,7 +24,6 @@ namespace etcdv3
void waitForResponse(); void waitForResponse();
void waitForResponse(std::function<void(etcd::Response)> callback); void waitForResponse(std::function<void(etcd::Response)> callback);
void CancelWatch(); void CancelWatch();
void WatchReq(std::string const & key);
bool Cancelled() const; bool Cancelled() const;
private: private:
WatchResponse reply; WatchResponse reply;

View File

@ -3,6 +3,7 @@
#include <grpc++/grpc++.h> #include <grpc++/grpc++.h>
#include "proto/kv.pb.h" #include "proto/kv.pb.h"
#include "proto/v3election.pb.h"
#include "etcd/v3/KeyValue.hpp" #include "etcd/v3/KeyValue.hpp"
@ -26,6 +27,8 @@ namespace etcdv3
bool has_values() const; bool has_values() const;
void set_lock_key(std::string const &key); void set_lock_key(std::string const &key);
std::string const &get_lock_key() const; std::string const &get_lock_key() const;
void set_name(std::string const &name);
std::string const &get_name() const;
std::vector<mvccpb::Event> const & get_events() const; std::vector<mvccpb::Event> const & get_events() const;
protected: protected:
int error_code; int error_code;
@ -37,6 +40,7 @@ namespace etcdv3
std::vector<etcdv3::KeyValue> values; std::vector<etcdv3::KeyValue> values;
std::vector<etcdv3::KeyValue> prev_values; std::vector<etcdv3::KeyValue> prev_values;
std::string lock_key; // for lock std::string lock_key; // for lock
std::string name; // for campaign (in v3election)
std::vector<mvccpb::Event> events; // for watch std::vector<mvccpb::Event> events; // for watch
}; };
} }

View File

@ -21,6 +21,12 @@ namespace etcdv3
extern char const * LEASETIMETOLIVE; extern char const * LEASETIMETOLIVE;
extern char const * LEASELEASES; extern char const * LEASELEASES;
extern char const * CAMPAIGN_ACTION;
extern char const * PROCLAIM_ACTION;
extern char const * LEADER_ACTION;
extern char const * OBSERVE_ACTION;
extern char const * RESIGN_ACTION;
extern char const * NUL; extern char const * NUL;
extern char const * KEEPALIVE_CREATE; extern char const * KEEPALIVE_CREATE;
@ -32,6 +38,8 @@ namespace etcdv3
extern char const * WATCH_WRITE; extern char const * WATCH_WRITE;
extern char const * WATCH_WRITES_DONE; extern char const * WATCH_WRITES_DONE;
extern char const * ELECTION_OBSERVE_CREATE;
extern const int ERROR_KEY_NOT_FOUND; extern const int ERROR_KEY_NOT_FOUND;
extern const int ERROR_COMPARE_FAILED; extern const int ERROR_COMPARE_FAILED;
extern const int ERROR_KEY_ALREADY_EXISTS; extern const int ERROR_KEY_ALREADY_EXISTS;

View File

@ -14,6 +14,7 @@ protobuf_generate_latest(
compute_generated_srcs(PROTO_GENERATES_SRCS "${PROTO_GEN_OUT_DIR}" false ${PROTO_SRCS}) compute_generated_srcs(PROTO_GENERATES_SRCS "${PROTO_GEN_OUT_DIR}" false ${PROTO_SRCS})
set(PROTO_GRPC_SRCS "${CMAKE_CURRENT_SOURCE_DIR}/rpc.proto" set(PROTO_GRPC_SRCS "${CMAKE_CURRENT_SOURCE_DIR}/rpc.proto"
"${CMAKE_CURRENT_SOURCE_DIR}/v3election.proto"
"${CMAKE_CURRENT_SOURCE_DIR}/v3lock.proto") "${CMAKE_CURRENT_SOURCE_DIR}/v3lock.proto")
grpc_generate_cpp(PROTO_GRPC_GENERATES PROTO_GRPC_GENERATES_HDRS grpc_generate_cpp(PROTO_GRPC_GENERATES PROTO_GRPC_GENERATES_HDRS
"${PROTO_GEN_OUT_DIR}" "${PROTO_GEN_OUT_DIR}"

119
proto/v3election.proto Normal file
View File

@ -0,0 +1,119 @@
syntax = "proto3";
package v3electionpb;
import "gogoproto/gogo.proto";
import "rpc.proto";
import "kv.proto";
// for grpc-gateway
import "google/api/annotations.proto";
option (gogoproto.marshaler_all) = true;
option (gogoproto.unmarshaler_all) = true;
// The election service exposes client-side election facilities as a gRPC interface.
service Election {
// Campaign waits to acquire leadership in an election, returning a LeaderKey
// representing the leadership if successful. The LeaderKey can then be used
// to issue new values on the election, transactionally guard API requests on
// leadership still being held, and resign from the election.
rpc Campaign(CampaignRequest) returns (CampaignResponse) {
option (google.api.http) = {
post: "/v3/election/campaign"
body: "*"
};
}
// Proclaim updates the leader's posted value with a new value.
rpc Proclaim(ProclaimRequest) returns (ProclaimResponse) {
option (google.api.http) = {
post: "/v3/election/proclaim"
body: "*"
};
}
// Leader returns the current election proclamation, if any.
rpc Leader(LeaderRequest) returns (LeaderResponse) {
option (google.api.http) = {
post: "/v3/election/leader"
body: "*"
};
}
// Observe streams election proclamations in-order as made by the election's
// elected leaders.
rpc Observe(LeaderRequest) returns (stream LeaderResponse) {
option (google.api.http) = {
post: "/v3/election/observe"
body: "*"
};
}
// Resign releases election leadership so other campaigners may acquire
// leadership on the election.
rpc Resign(ResignRequest) returns (ResignResponse) {
option (google.api.http) = {
post: "/v3/election/resign"
body: "*"
};
}
}
message CampaignRequest {
// name is the election's identifier for the campaign.
bytes name = 1;
// lease is the ID of the lease attached to leadership of the election. If the
// lease expires or is revoked before resigning leadership, then the
// leadership is transferred to the next campaigner, if any.
int64 lease = 2;
// value is the initial proclaimed value set when the campaigner wins the
// election.
bytes value = 3;
}
message CampaignResponse {
etcdserverpb.ResponseHeader header = 1;
// leader describes the resources used for holding leadereship of the election.
LeaderKey leader = 2;
}
message LeaderKey {
// name is the election identifier that correponds to the leadership key.
bytes name = 1;
// key is an opaque key representing the ownership of the election. If the key
// is deleted, then leadership is lost.
bytes key = 2;
// rev is the creation revision of the key. It can be used to test for ownership
// of an election during transactions by testing the key's creation revision
// matches rev.
int64 rev = 3;
// lease is the lease ID of the election leader.
int64 lease = 4;
}
message LeaderRequest {
// name is the election identifier for the leadership information.
bytes name = 1;
}
message LeaderResponse {
etcdserverpb.ResponseHeader header = 1;
// kv is the key-value pair representing the latest leader update.
mvccpb.KeyValue kv = 2;
}
message ResignRequest {
// leader is the leadership to relinquish by resignation.
LeaderKey leader = 1;
}
message ResignResponse {
etcdserverpb.ResponseHeader header = 1;
}
message ProclaimRequest {
// leader is the leadership hold on the election.
LeaderKey leader = 1;
// value is an update meant to overwrite the leader's current value.
bytes value = 2;
}
message ProclaimResponse {
etcdserverpb.ResponseHeader header = 1;
}

View File

@ -24,16 +24,18 @@
#include <grpc++/security/credentials.h> #include <grpc++/security/credentials.h>
#include "proto/rpc.grpc.pb.h" #include "proto/rpc.grpc.pb.h"
#include "proto/v3lock.grpc.pb.h" #include "proto/v3lock.grpc.pb.h"
#include "proto/v3election.grpc.pb.h"
#include "etcd/Client.hpp" #include "etcd/Client.hpp"
#include "etcd/KeepAlive.hpp" #include "etcd/KeepAlive.hpp"
#include "etcd/v3/action_constants.hpp" #include "etcd/v3/action_constants.hpp"
#include "etcd/v3/Action.hpp" #include "etcd/v3/Action.hpp"
#include "etcd/v3/AsyncTxnResponse.hpp"
#include "etcd/v3/AsyncRangeResponse.hpp" #include "etcd/v3/AsyncRangeResponse.hpp"
#include "etcd/v3/AsyncWatchResponse.hpp" #include "etcd/v3/AsyncWatchResponse.hpp"
#include "etcd/v3/AsyncDeleteRangeResponse.hpp" #include "etcd/v3/AsyncDeleteRangeResponse.hpp"
#include "etcd/v3/AsyncLockResponse.hpp" #include "etcd/v3/AsyncLockResponse.hpp"
#include "etcd/v3/AsyncElectionResponse.hpp"
#include "etcd/v3/AsyncTxnResponse.hpp"
#include "etcd/v3/Transaction.hpp" #include "etcd/v3/Transaction.hpp"
#include "etcd/v3/AsyncSetAction.hpp" #include "etcd/v3/AsyncSetAction.hpp"
@ -46,6 +48,7 @@
#include "etcd/v3/AsyncWatchAction.hpp" #include "etcd/v3/AsyncWatchAction.hpp"
#include "etcd/v3/AsyncLeaseAction.hpp" #include "etcd/v3/AsyncLeaseAction.hpp"
#include "etcd/v3/AsyncLockAction.hpp" #include "etcd/v3/AsyncLockAction.hpp"
#include "etcd/v3/AsyncElectionAction.hpp"
#include "etcd/v3/AsyncTxnAction.hpp" #include "etcd/v3/AsyncTxnAction.hpp"
using grpc::Channel; using grpc::Channel;
@ -165,6 +168,7 @@ struct etcd::Client::EtcdServerStubs {
std::unique_ptr<etcdserverpb::Watch::Stub> watchServiceStub; std::unique_ptr<etcdserverpb::Watch::Stub> watchServiceStub;
std::unique_ptr<etcdserverpb::Lease::Stub> leaseServiceStub; std::unique_ptr<etcdserverpb::Lease::Stub> leaseServiceStub;
std::unique_ptr<v3lockpb::Lock::Stub> lockServiceStub; std::unique_ptr<v3lockpb::Lock::Stub> lockServiceStub;
std::unique_ptr<v3electionpb::Election::Stub> electionServiceStub;
}; };
void etcd::Client::EtcdServerStubsDeleter::operator()(etcd::Client::EtcdServerStubs *stubs) { void etcd::Client::EtcdServerStubsDeleter::operator()(etcd::Client::EtcdServerStubs *stubs) {
@ -191,6 +195,7 @@ etcd::Client::Client(std::string const & address,
stubs->watchServiceStub= Watch::NewStub(this->channel); stubs->watchServiceStub= Watch::NewStub(this->channel);
stubs->leaseServiceStub= Lease::NewStub(this->channel); stubs->leaseServiceStub= Lease::NewStub(this->channel);
stubs->lockServiceStub = Lock::NewStub(this->channel); stubs->lockServiceStub = Lock::NewStub(this->channel);
stubs->electionServiceStub = Election::NewStub(this->channel);
} }
etcd::Client::Client(std::string const & address, etcd::Client::Client(std::string const & address,
@ -220,6 +225,7 @@ etcd::Client::Client(std::string const & address,
stubs->watchServiceStub= Watch::NewStub(this->channel); stubs->watchServiceStub= Watch::NewStub(this->channel);
stubs->leaseServiceStub= Lease::NewStub(this->channel); stubs->leaseServiceStub= Lease::NewStub(this->channel);
stubs->lockServiceStub = Lock::NewStub(this->channel); stubs->lockServiceStub = Lock::NewStub(this->channel);
stubs->electionServiceStub = Election::NewStub(this->channel);
} }
etcd::Client *etcd::Client::WithUser(std::string const & etcd_url, etcd::Client *etcd::Client::WithUser(std::string const & etcd_url,
@ -253,6 +259,7 @@ etcd::Client::Client(std::string const & address,
stubs->watchServiceStub= Watch::NewStub(this->channel); stubs->watchServiceStub= Watch::NewStub(this->channel);
stubs->leaseServiceStub= Lease::NewStub(this->channel); stubs->leaseServiceStub= Lease::NewStub(this->channel);
stubs->lockServiceStub = Lock::NewStub(this->channel); stubs->lockServiceStub = Lock::NewStub(this->channel);
stubs->electionServiceStub = Election::NewStub(this->channel);
} }
etcd::Client *etcd::Client::WithSSL(std::string const & etcd_url, etcd::Client *etcd::Client::WithSSL(std::string const & etcd_url,
@ -795,3 +802,72 @@ pplx::task<etcd::Response> etcd::Client::txn(etcdv3::Transaction const &txn) {
std::shared_ptr<etcdv3::AsyncTxnAction> call(new etcdv3::AsyncTxnAction(params, txn)); std::shared_ptr<etcdv3::AsyncTxnAction> call(new etcdv3::AsyncTxnAction(params, txn));
return Response::create(call); return Response::create(call);
} }
pplx::task<etcd::Response> etcd::Client::campaign(
std::string const &name, int64_t lease_id, std::string const &value) {
etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.name = name;
params.lease_id = lease_id;
params.value = value;
params.election_stub = stubs->electionServiceStub.get();
std::shared_ptr<etcdv3::AsyncCampaignAction> call(new etcdv3::AsyncCampaignAction(params));
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::proclaim(
std::string const &name, int64_t lease_id,
std::string const &key, int revision, std::string const &value) {
etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.name = name;
params.lease_id = lease_id;
params.key = key;
params.revision = revision;
params.value = value;
params.election_stub = stubs->electionServiceStub.get();
std::shared_ptr<etcdv3::AsyncProclaimAction> call(new etcdv3::AsyncProclaimAction(params));
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::leader(std::string const &name) {
etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.name = name;
params.election_stub = stubs->electionServiceStub.get();
std::shared_ptr<etcdv3::AsyncLeaderAction> call(new etcdv3::AsyncLeaderAction(params));
return Response::create(call);
}
std::unique_ptr<etcd::Client::Observer> etcd::Client::observe(
std::string const &name, std::function<void(Response)> callback, const bool once) {
etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.name.assign(name);
params.election_stub = stubs->electionServiceStub.get();
std::shared_ptr<etcdv3::AsyncObserveAction> call(new etcdv3::AsyncObserveAction(params, once));
std::unique_ptr<Observer> observer(new Observer());
observer->action = call;
observer->resp = Response::create(call);
return observer;
}
pplx::task<etcd::Response> etcd::Client::resign(
std::string const &name, int64_t lease_id, std::string const &key, int revision) {
etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.name = name;
params.lease_id = lease_id;
params.key = key;
params.revision = revision;
params.election_stub = stubs->electionServiceStub.get();
std::shared_ptr<etcdv3::AsyncResignAction> call(new etcdv3::AsyncResignAction(params));
return Response::create(call);
}
etcd::Client::Observer::~Observer() {
if (action != nullptr) {
action->CancelObserve();
resp.wait();
}
}

View File

@ -22,10 +22,11 @@ etcd::Response::Response(const etcdv3::V3Response& reply, std::chrono::microseco
{ {
_value = Value(reply.get_value()); _value = Value(reply.get_value());
} }
_prev_value = Value(reply.get_prev_value()); _prev_value = Value(reply.get_prev_value());
_lock_key = reply.get_lock_key(); _lock_key = reply.get_lock_key();
_name = reply.get_name();
for (auto const &ev: reply.get_events()) { for (auto const &ev: reply.get_events()) {
_events.emplace_back(etcd::Event(ev)); _events.emplace_back(etcd::Event(ev));
} }
@ -112,6 +113,10 @@ std::string const & etcd::Response::lock_key() const {
return _lock_key; return _lock_key;
} }
std::string const & etcd::Response::name() const {
return _name;
}
std::vector<etcd::Event> const & etcd::Response::events() const { std::vector<etcd::Event> const & etcd::Response::events() const {
return this->_events; return this->_events;
} }

View File

@ -149,6 +149,37 @@ etcd::Response etcd::SyncClient::leasetimetolive(int64_t lease_id)
CHECK_EXCEPTIONS(client.leasetimetolive(lease_id).get()); CHECK_EXCEPTIONS(client.leasetimetolive(lease_id).get());
} }
etcd::Response etcd::SyncClient::campaign(std::string const &name, int64_t lease_id,
std::string const &value)
{
CHECK_EXCEPTIONS(client.campaign(name, lease_id, value).get());
}
etcd::Response etcd::SyncClient::proclaim(std::string const &name, int64_t lease_id,
std::string const &key, int revision,
std::string const &value)
{
CHECK_EXCEPTIONS(client.proclaim(name, lease_id, key, revision, value).get());
}
etcd::Response etcd::SyncClient::leader(std::string const &name)
{
CHECK_EXCEPTIONS(client.leader(name).get());
}
std::unique_ptr<etcd::Client::Observer> etcd::SyncClient::observe(
std::string const &name, std::function<void(etcd::Response)> callback,
const bool once)
{
return client.observe(name, callback, once);
}
etcd::Response etcd::SyncClient::resign(std::string const &name, int64_t lease_id,
std::string const &key, int revision)
{
CHECK_EXCEPTIONS(client.resign(name, lease_id, key, revision).get());
}
etcd::Response etcd::SyncClient::watch(std::string const & key, bool recursive) etcd::Response etcd::SyncClient::watch(std::string const & key, bool recursive)
{ {
CHECK_EXCEPTIONS(client.watch(key, recursive).get()); CHECK_EXCEPTIONS(client.watch(key, recursive).get());

View File

@ -0,0 +1,247 @@
#include "etcd/v3/AsyncElectionAction.hpp"
#include "etcd/v3/action_constants.hpp"
using v3electionpb::LeaderKey;
using v3electionpb::CampaignRequest;
using v3electionpb::CampaignResponse;
using v3electionpb::ProclaimRequest;
using v3electionpb::ProclaimResponse;
using v3electionpb::LeaderRequest;
using v3electionpb::LeaderResponse;
using v3electionpb::ResignRequest;
using v3electionpb::ResignResponse;
etcdv3::AsyncCampaignAction::AsyncCampaignAction(
etcdv3::ActionParameters const &param)
: etcdv3::Action(param)
{
CampaignRequest campaign_request;
campaign_request.set_name(param.name);
campaign_request.set_lease(param.lease_id);
campaign_request.set_value(param.value);
response_reader = parameters.election_stub->AsyncCampaign(&context, campaign_request, &cq_);
response_reader->Finish(&reply, &status, (void *)this);
}
etcdv3::AsyncCampaignResponse etcdv3::AsyncCampaignAction::ParseResponse()
{
AsyncCampaignResponse campaign_resp;
campaign_resp.set_action(etcdv3::CAMPAIGN_ACTION);
if(!status.ok()) {
campaign_resp.set_error_code(status.error_code());
campaign_resp.set_error_message(status.error_message());
}
else {
campaign_resp.ParseResponse(reply);
}
return campaign_resp;
}
etcdv3::AsyncProclaimAction::AsyncProclaimAction(
etcdv3::ActionParameters const &param)
: etcdv3::Action(param)
{
auto leader = new LeaderKey();
leader->set_name(param.name);
leader->set_key(param.key);
leader->set_rev(param.revision);
leader->set_lease(param.lease_id);
ProclaimRequest proclaim_request;
proclaim_request.set_allocated_leader(leader);
proclaim_request.set_value(param.value);
response_reader = parameters.election_stub->AsyncProclaim(&context, proclaim_request, &cq_);
response_reader->Finish(&reply, &status, (void *)this);
}
etcdv3::AsyncProclaimResponse etcdv3::AsyncProclaimAction::ParseResponse()
{
AsyncProclaimResponse proclaim_resp;
proclaim_resp.set_action(etcdv3::PROCLAIM_ACTION);
if(!status.ok()) {
proclaim_resp.set_error_code(status.error_code());
proclaim_resp.set_error_message(status.error_message());
}
else {
proclaim_resp.ParseResponse(reply);
}
return proclaim_resp;
}
etcdv3::AsyncLeaderAction::AsyncLeaderAction(
etcdv3::ActionParameters const &param)
: etcdv3::Action(param)
{
LeaderRequest leader_request;
leader_request.set_name(param.name);
response_reader = parameters.election_stub->AsyncLeader(&context, leader_request, &cq_);
response_reader->Finish(&reply, &status, (void *)this);
}
etcdv3::AsyncLeaderResponse etcdv3::AsyncLeaderAction::ParseResponse()
{
AsyncLeaderResponse leader_resp;
leader_resp.set_action(etcdv3::LEADER_ACTION);
if(!status.ok()) {
leader_resp.set_error_code(status.error_code());
leader_resp.set_error_message(status.error_message());
}
else {
leader_resp.ParseResponse(reply);
}
return leader_resp;
}
etcdv3::AsyncObserveAction::AsyncObserveAction(
etcdv3::ActionParameters const &param, const bool once)
: etcdv3::Action(param), once(once)
{
LeaderRequest leader_request;
leader_request.set_name(param.name);
response_reader = parameters.election_stub->AsyncObserve(&context, leader_request, &cq_, (void *)etcdv3::ELECTION_OBSERVE_CREATE);
void *got_tag;
bool ok = false;
if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)etcdv3::ELECTION_OBSERVE_CREATE) {
response_reader->Read(&reply, (void *)this);
} else {
throw std::runtime_error("failed to create a observe connection");
}
}
void etcdv3::AsyncObserveAction::waitForResponse()
{
void* got_tag;
bool ok = false;
while(cq_.Next(&got_tag, &ok))
{
if (isCancelled.load()) {
break;
}
if(ok == false)
{
break;
}
if(got_tag == (void*)this) // read tag
{
auto resp = ParseResponse();
if (resp.get_error_code() != 0) {
CancelObserve();
break;
}
}
if(isCancelled.load()) {
break;
}
if (once) {
break;
}
response_reader->Read(&reply, (void *)this);
}
}
void etcdv3::AsyncObserveAction::waitForResponse(std::function<void(etcd::Response)> callback)
{
void* got_tag;
bool ok = false;
while(cq_.Next(&got_tag, &ok))
{
if(ok == false)
{
break;
}
if (isCancelled.load()) {
break;
}
if(got_tag == (void*)this) // read tag
{
auto resp = ParseResponse();
auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::high_resolution_clock::now() - start_timepoint);
callback(etcd::Response(resp, duration));
if (resp.get_error_code() != 0) {
CancelObserve();
break;
}
start_timepoint = std::chrono::high_resolution_clock::now();
}
if(isCancelled.load()) {
break;
}
if (once) {
break;
}
response_reader->Read(&reply, (void *)this);
}
}
void etcdv3::AsyncObserveAction::CancelObserve()
{
std::lock_guard<std::mutex> scope_lock(this->protect_is_cancalled);
if (!isCancelled.exchange(true)) {
cq_.Shutdown();
}
response_reader->Finish(&status, (void *)this);
}
bool etcdv3::AsyncObserveAction::Cancelled() const {
return isCancelled.load();
}
etcdv3::AsyncObserveResponse etcdv3::AsyncObserveAction::ParseResponse()
{
AsyncObserveResponse leader_resp;
leader_resp.set_action(etcdv3::OBSERVE_ACTION);
if(!status.ok()) {
leader_resp.set_error_code(status.error_code());
leader_resp.set_error_message(status.error_message());
}
else {
leader_resp.ParseResponse(reply);
}
return leader_resp;
}
etcdv3::AsyncResignAction::AsyncResignAction(
etcdv3::ActionParameters const &param)
: etcdv3::Action(param)
{
auto leader = new LeaderKey();
leader->set_name(param.name);
leader->set_key(param.key);
leader->set_rev(param.revision);
leader->set_lease(param.lease_id);
ResignRequest resign_request;
resign_request.set_allocated_leader(leader);
response_reader = parameters.election_stub->AsyncResign(&context, resign_request, &cq_);
response_reader->Finish(&reply, &status, (void *)this);
}
etcdv3::AsyncResignResponse etcdv3::AsyncResignAction::ParseResponse()
{
AsyncResignResponse resign_resp;
resign_resp.set_action(etcdv3::RESIGN_ACTION);
if(!status.ok()) {
resign_resp.set_error_code(status.error_code());
resign_resp.set_error_message(status.error_message());
}
else {
resign_resp.ParseResponse(reply);
}
return resign_resp;
}

View File

@ -0,0 +1,35 @@
#include "etcd/v3/AsyncElectionResponse.hpp"
#include "etcd/v3/action_constants.hpp"
using etcdserverpb::ResponseOp;
void etcdv3::AsyncCampaignResponse::ParseResponse(CampaignResponse& reply) {
index = reply.header().revision();
auto const &leader = reply.leader();
name = leader.name();
value.kvs.set_key(leader.key());
value.kvs.set_create_revision(leader.rev());
value.kvs.set_lease(leader.lease());
}
void etcdv3::AsyncProclaimResponse::ParseResponse(ProclaimResponse& reply) {
index = reply.header().revision();
}
void etcdv3::AsyncLeaderResponse::ParseResponse(LeaderResponse& reply) {
index = reply.header().revision();
value.kvs = reply.kv();
}
void etcdv3::AsyncObserveResponse::ParseResponse(LeaderResponse& reply) {
index = reply.header().revision();
value.kvs = reply.kv();
}
void etcdv3::AsyncResignResponse::ParseResponse(ResignResponse& reply) {
index = reply.header().revision();
}

View File

@ -1,7 +1,6 @@
#include "etcd/v3/AsyncLockResponse.hpp" #include "etcd/v3/AsyncLockResponse.hpp"
#include "etcd/v3/action_constants.hpp" #include "etcd/v3/action_constants.hpp"
void etcdv3::AsyncLockResponse::ParseResponse(LockResponse& resp) void etcdv3::AsyncLockResponse::ParseResponse(LockResponse& resp)
{ {
index = resp.header().revision(); index = resp.header().revision();

View File

@ -69,6 +69,14 @@ std::string const & etcdv3::V3Response::get_lock_key() const {
return this->lock_key; return this->lock_key;
} }
void etcdv3::V3Response::set_name(std::string const &name) {
this->name = name;
}
std::string const & etcdv3::V3Response::get_name() const {
return this->name;
}
std::vector<mvccpb::Event> const & etcdv3::V3Response::get_events() const { std::vector<mvccpb::Event> const & etcdv3::V3Response::get_events() const {
return this->events; return this->events;
} }

View File

@ -18,6 +18,12 @@ char const * etcdv3::LEASEKEEPALIVE = "leasekeepalive";
char const * etcdv3::LEASETIMETOLIVE = "leasetimetolive"; char const * etcdv3::LEASETIMETOLIVE = "leasetimetolive";
char const * etcdv3::LEASELEASES = "leaseleases"; char const * etcdv3::LEASELEASES = "leaseleases";
char const * etcdv3::CAMPAIGN_ACTION = "campaign";
char const * etcdv3::PROCLAIM_ACTION = "preclaim";
char const * etcdv3::LEADER_ACTION = "leader";
char const * etcdv3::OBSERVE_ACTION = "obverse";
char const * etcdv3::RESIGN_ACTION = "resign";
// see: noPrefixEnd in etcd, however c++ doesn't allows '\0' inside a string, thus we use // see: noPrefixEnd in etcd, however c++ doesn't allows '\0' inside a string, thus we use
// the UTF-8 char U+0000 (i.e., "\xC0\x80"). // the UTF-8 char U+0000 (i.e., "\xC0\x80").
char const * etcdv3::NUL = "\xC0\x80"; char const * etcdv3::NUL = "\xC0\x80";
@ -31,6 +37,8 @@ char const * etcdv3::WATCH_CREATE = "watch create";
char const * etcdv3::WATCH_WRITE = "watch write"; char const * etcdv3::WATCH_WRITE = "watch write";
char const * etcdv3::WATCH_WRITES_DONE = "watch writes done"; char const * etcdv3::WATCH_WRITES_DONE = "watch writes done";
char const * etcdv3::ELECTION_OBSERVE_CREATE = "observe create";
const int etcdv3::ERROR_KEY_NOT_FOUND = 100; const int etcdv3::ERROR_KEY_NOT_FOUND = 100;
const int etcdv3::ERROR_COMPARE_FAILED = 101; const int etcdv3::ERROR_COMPARE_FAILED = 101;
const int etcdv3::ERROR_KEY_ALREADY_EXISTS = 105; const int etcdv3::ERROR_KEY_ALREADY_EXISTS = 105;

61
tst/ElectionTest.cpp Normal file
View File

@ -0,0 +1,61 @@
#define CATCH_CONFIG_MAIN
#include <catch.hpp>
#include <chrono>
#include <iostream>
#include <thread>
#include "etcd/Client.hpp"
#include "etcd/KeepAlive.hpp"
TEST_CASE("setup")
{
etcd::Client etcd("http://127.0.0.1:2379");
etcd.rmdir("/test", true).wait();
}
TEST_CASE("campaign and resign")
{
etcd::Client etcd("http://127.0.0.1:2379");
auto keepalive = etcd.leasekeepalive(60).get();
auto lease_id = keepalive->Lease();
// campaign
auto resp1 = etcd.campaign("test", lease_id, "xxxx").get();
REQUIRE(0 == resp1.error_code());
// leader
{
auto resp2 = etcd.leader("test").get();
REQUIRE(0 == resp2.error_code());
REQUIRE(resp1.value().key() == resp2.value().key());
REQUIRE("xxxx" == resp2.value().as_string());
}
// proclaim
auto resp3 = etcd.proclaim("test", lease_id,
resp1.value().key(), resp1.value().created_index(),
"tttt").get();
REQUIRE(0 == resp3.error_code());
// leader
{
auto resp4 = etcd.leader("test").get();
REQUIRE(0 == resp4.error_code());
REQUIRE(resp1.value().key() == resp4.value().key());
REQUIRE("tttt" == resp4.value().as_string());
}
// resign
auto resp5 = etcd.resign("test", lease_id,
resp1.value().key(), resp1.value().created_index()).get();
REQUIRE(0 == resp5.error_code());
}
TEST_CASE("cleanup")
{
etcd::Client etcd("http://127.0.0.1:2379");
etcd.rmdir("/test", true).get();
}