From 87ca961f7e889de9a4fb9e51be64bba948cd893f Mon Sep 17 00:00:00 2001 From: Tao He Date: Wed, 15 Sep 2021 21:32:14 +0800 Subject: [PATCH] Implements "v3election.proto" APIs. Resolves #81. Signed-off-by: Tao He --- .github/workflows/build-test.yml | 1 + README.md | 25 +++ etcd/Client.hpp | 81 +++++++++- etcd/Response.hpp | 14 +- etcd/SyncClient.hpp | 11 ++ etcd/Value.hpp | 4 + etcd/v3/Action.hpp | 4 + etcd/v3/AsyncElectionAction.hpp | 82 ++++++++++ etcd/v3/AsyncElectionResponse.hpp | 51 ++++++ etcd/v3/AsyncLockAction.hpp | 1 - etcd/v3/AsyncLockResponse.hpp | 1 - etcd/v3/AsyncWatchAction.hpp | 1 - etcd/v3/V3Response.hpp | 4 + etcd/v3/action_constants.hpp | 8 + proto/CMakeLists.txt | 1 + proto/v3election.proto | 119 ++++++++++++++ src/Client.cpp | 78 +++++++++- src/Response.cpp | 7 +- src/SyncClient.cpp | 31 ++++ src/v3/AsyncElectionAction.cpp | 247 ++++++++++++++++++++++++++++++ src/v3/AsyncElectionResponse.cpp | 35 +++++ src/v3/AsyncLockResponse.cpp | 1 - src/v3/V3Response.cpp | 8 + src/v3/action_constants.cpp | 8 + tst/ElectionTest.cpp | 61 ++++++++ 25 files changed, 874 insertions(+), 10 deletions(-) create mode 100644 etcd/v3/AsyncElectionAction.hpp create mode 100644 etcd/v3/AsyncElectionResponse.hpp create mode 100644 proto/v3election.proto create mode 100644 src/v3/AsyncElectionAction.cpp create mode 100644 src/v3/AsyncElectionResponse.cpp create mode 100644 tst/ElectionTest.cpp diff --git a/.github/workflows/build-test.yml b/.github/workflows/build-test.yml index 4611b28..3b38887 100644 --- a/.github/workflows/build-test.yml +++ b/.github/workflows/build-test.yml @@ -161,6 +161,7 @@ jobs: ./build/bin/EtcdTest ./build/bin/LockTest ./build/bin/WatcherTest + ./build/bin/ElectionTest killall -TERM etcd sleep 5 diff --git a/README.md b/README.md index dc42ee8..5cf79e4 100644 --- a/README.md +++ b/README.md @@ -603,6 +603,31 @@ is constructed. Without handler, the internal state can be checked via `KeepAlive::Check()` and it will rethrow the async exception when there are errors during keeping the lease alive. +### Election API + +Etcd v3's [election APIs](https://github.com/etcd-io/etcd/blob/main/server/etcdserver/api/v3election/v3electionpb/v3election.proto) +are supported via the following interfaces, + +```c++ +pplx::task campaign(std::string const &name, int64_t lease_id, + std::string const &value); + +pplx::task proclaim(std::string const &name, int64_t lease_id, + std::string const &key, , int revision, + std::string const &value); + +pplx::task leader(std::string const &name); + +std::unique_ptr observe(std::string const &name, + std::function callback, + const bool once = false); + +pplx::task resign(std::string const &name, int64_t lease_id, + std::string const &key, int revision); +``` + +for more details, please refer to [etcd/Client.hpp](./etcd/Client.hpp). + ### TODO 1. Cancellation of asynchronous calls(except for watch) diff --git a/etcd/Client.hpp b/etcd/Client.hpp index 0418120..c0d6f2a 100644 --- a/etcd/Client.hpp +++ b/etcd/Client.hpp @@ -2,6 +2,7 @@ #define __ETCD_CLIENT_HPP__ #include +#include #include #include @@ -12,6 +13,7 @@ namespace etcdv3 { class Transaction; + class AsyncObserveAction; namespace detail { std::string string_plus_one(std::string const &value); @@ -398,6 +400,83 @@ namespace etcd */ pplx::task txn(etcdv3::Transaction const &txn); + /** + * Campaign for the election @name@. + * + * @param name is the name of election that will campaign for. + * @param lease_id is a user-managed (usually with a `KeepAlive`) lease id. + * @param value is the value for campaign. + * + * @returns a leader key if succeed, consist of + * + * - name: the name of the election + * - key: a generated election key + * - created rev: the revision of the generated key + * - lease: the lease id of the election leader + */ + pplx::task campaign(std::string const &name, int64_t lease_id, + std::string const &value); + + /** + * Updates the value of election with a new value, with leader key returns by + * @campaign@. + * + * @param name is the name of election + * @param lease_id is the user-provided lease id for the proclamation + * @param key is the generated associated key returned by @campaign@ + * @param revision is the created revision of key-value returned by @campaign@ + * @param value is the new value to set. + */ + pplx::task proclaim(std::string const &name, int64_t lease_id, + std::string const &key, int revision, std::string const &value); + + /** + * Get the current leader proclamation. + * + * @param name is the names of election. + * + * @returns current election key and value. + */ + pplx::task leader(std::string const &name); + + /** + * An observer that will cancel the associated election::observe request + * when being destruct. + */ + class Observer { + public: + ~Observer(); + private: + std::shared_ptr action = nullptr; + pplx::task resp; + + friend class Client; + }; + + /** + * Observe the leader change. + * + * @param name is the names of election to watch. + * @param callback is the names of election to watch. + * + * @returns an observer that holds that action and will cancel the request when being destructed. + */ + std::unique_ptr observe(std::string const &name, + std::function callback, + const bool once = false); + + /** + * Updates the value of election with a new value, with leader key returns by + * @campaign@. + * + * @param name is the name of election + * @param lease_id is the user-provided lease id for the proclamation + * @param key is the generated associated key returned by @campaign@ + * @param revision is the created revision of key-value returned by @campaign@ + */ + pplx::task resign(std::string const &name, int64_t lease_id, + std::string const &key, int revision); + private: #if defined(WITH_GRPC_CHANNEL_CLASS) std::shared_ptr channel; @@ -420,8 +499,6 @@ namespace etcd friend class Watcher; }; - - } #endif diff --git a/etcd/Response.hpp b/etcd/Response.hpp index 6095bed..2501acd 100644 --- a/etcd/Response.hpp +++ b/etcd/Response.hpp @@ -12,6 +12,7 @@ namespace etcdv3 { class AsyncWatchAction; class AsyncLeaseKeepAliveAction; + class AsyncObserveAction; class V3Response; } @@ -134,6 +135,11 @@ namespace etcd */ std::string const & lock_key() const; + /** + * Return the "name" in response. + */ + std::string const & name() const; + /** * Returns the watched events. */ @@ -157,12 +163,16 @@ namespace etcd Values _values; Keys _keys; std::string _lock_key; // for lock + std::string _name; // for campaign (in v3election) std::vector _events; // for watch - std::chrono::microseconds _duration; // execute duration (in microseconds), during the action created and response parsed + // execute duration (in microseconds), during the action created and response parsed + std::chrono::microseconds _duration; + + friend class Client; friend class SyncClient; friend class etcdv3::AsyncWatchAction; friend class etcdv3::AsyncLeaseKeepAliveAction; - friend class Client; + friend class etcdv3::AsyncObserveAction; }; } diff --git a/etcd/SyncClient.hpp b/etcd/SyncClient.hpp index b033a25..d2ff556 100644 --- a/etcd/SyncClient.hpp +++ b/etcd/SyncClient.hpp @@ -67,6 +67,17 @@ namespace etcd Response leaserevoke(int64_t lease_id); Response leasetimetolive(int64_t lease_id); + Response campaign(std::string const &name, int64_t lease_id, + std::string const &value); + Response proclaim(std::string const &name, int64_t lease_id, + std::string const &key, int revision, std::string const &value); + Response leader(std::string const &name); + std::unique_ptr observe(std::string const &name, + std::function callback, + const bool once = false); + Response resign(std::string const &name, int64_t lease_id, + std::string const &key, int revision); + /** * Watches for changes of a key or a subtree. Please note that if you watch e.g. "/testdir" and * a new key is created, like "/testdir/newkey" then no change happened in the value of diff --git a/etcd/Value.hpp b/etcd/Value.hpp index 6437b5a..a6ff6d8 100644 --- a/etcd/Value.hpp +++ b/etcd/Value.hpp @@ -13,6 +13,10 @@ namespace mvccpb { class Event; } +namespace electionpb { + class LeaderKey; +} + namespace etcd { class Value; diff --git a/etcd/v3/Action.hpp b/etcd/v3/Action.hpp index f57fae3..498b135 100644 --- a/etcd/v3/Action.hpp +++ b/etcd/v3/Action.hpp @@ -6,6 +6,7 @@ #include #include "proto/rpc.grpc.pb.h" #include "proto/v3lock.grpc.pb.h" +#include "proto/v3election.grpc.pb.h" using grpc::ClientContext; using grpc::CompletionQueue; @@ -15,6 +16,7 @@ using etcdserverpb::KV; using etcdserverpb::Watch; using etcdserverpb::Lease; using v3lockpb::Lock; +using v3electionpb::Election; namespace etcdv3 { @@ -33,6 +35,7 @@ namespace etcdv3 int64_t lease_id; int ttl; int limit; + std::string name; // for campaign (in v3election) std::string key; std::string range_end; std::string value; @@ -42,6 +45,7 @@ namespace etcdv3 Watch::Stub* watch_stub; Lease::Stub* lease_stub; Lock::Stub* lock_stub; + Election::Stub* election_stub; }; class Action diff --git a/etcd/v3/AsyncElectionAction.hpp b/etcd/v3/AsyncElectionAction.hpp new file mode 100644 index 0000000..a9f2b47 --- /dev/null +++ b/etcd/v3/AsyncElectionAction.hpp @@ -0,0 +1,82 @@ +#ifndef __ASYNC_ELECTIONACTION_HPP__ +#define __ASYNC_ELECTIONACTION_HPP__ + +#include +#include "proto/rpc.grpc.pb.h" +#include "proto/v3election.grpc.pb.h" +#include "etcd/v3/Action.hpp" +#include "etcd/v3/AsyncElectionResponse.hpp" +#include "etcd/Response.hpp" + +using grpc::ClientAsyncResponseReader; +using grpc::ClientAsyncReader; +using v3electionpb::CampaignRequest; +using v3electionpb::CampaignResponse; +using v3electionpb::ProclaimRequest; +using v3electionpb::ProclaimResponse; +using v3electionpb::LeaderRequest; +using v3electionpb::LeaderResponse; +using v3electionpb::ResignRequest; +using v3electionpb::ResignResponse; + +namespace etcdv3 +{ + class AsyncCampaignAction : public etcdv3::Action + { + public: + AsyncCampaignAction(etcdv3::ActionParameters const ¶m); + AsyncCampaignResponse ParseResponse(); + private: + CampaignResponse reply; + std::unique_ptr> response_reader; + }; + + class AsyncProclaimAction : public etcdv3::Action + { + public: + AsyncProclaimAction(etcdv3::ActionParameters const ¶m); + AsyncProclaimResponse ParseResponse(); + private: + ProclaimResponse reply; + std::unique_ptr> response_reader; + }; + + class AsyncLeaderAction : public etcdv3::Action + { + public: + AsyncLeaderAction(etcdv3::ActionParameters const ¶m); + AsyncLeaderResponse ParseResponse(); + private: + LeaderResponse reply; + std::unique_ptr> response_reader; + }; + + class AsyncObserveAction : public etcdv3::Action + { + public: + AsyncObserveAction(etcdv3::ActionParameters const ¶m, const bool once=false); + AsyncObserveResponse ParseResponse(); + void waitForResponse(); + void waitForResponse(std::function callback); + void CancelObserve(); + bool Cancelled() const; + private: + bool once; + LeaderResponse reply; + std::unique_ptr> response_reader; + std::atomic_bool isCancelled; + std::mutex protect_is_cancalled; + }; + + class AsyncResignAction : public etcdv3::Action + { + public: + AsyncResignAction(etcdv3::ActionParameters const ¶m); + AsyncResignResponse ParseResponse(); + private: + ResignResponse reply; + std::unique_ptr> response_reader; + }; +} + +#endif diff --git a/etcd/v3/AsyncElectionResponse.hpp b/etcd/v3/AsyncElectionResponse.hpp new file mode 100644 index 0000000..08923f0 --- /dev/null +++ b/etcd/v3/AsyncElectionResponse.hpp @@ -0,0 +1,51 @@ +#ifndef __ASYNC_ELECTIONRESPONSE_HPP__ +#define __ASYNC_ELECTIONRESPONSE_HPP__ + +#include "proto/rpc.pb.h" +#include "proto/v3election.pb.h" +#include "etcd/v3/V3Response.hpp" + +using v3electionpb::CampaignResponse; +using v3electionpb::ProclaimResponse; +using v3electionpb::LeaderResponse; +using v3electionpb::ResignResponse; + +namespace etcdv3 +{ + class AsyncCampaignResponse : public etcdv3::V3Response + { + public: + AsyncCampaignResponse(){}; + void ParseResponse(CampaignResponse& resp); + }; + + class AsyncProclaimResponse : public etcdv3::V3Response + { + public: + AsyncProclaimResponse(){}; + void ParseResponse(ProclaimResponse& resp); + }; + + class AsyncLeaderResponse : public etcdv3::V3Response + { + public: + AsyncLeaderResponse(){}; + void ParseResponse(LeaderResponse& resp); + }; + + class AsyncObserveResponse : public etcdv3::V3Response + { + public: + AsyncObserveResponse(){}; + void ParseResponse(LeaderResponse& resp); + }; + + class AsyncResignResponse : public etcdv3::V3Response + { + public: + AsyncResignResponse(){}; + void ParseResponse(ResignResponse& resp); + }; +} + +#endif diff --git a/etcd/v3/AsyncLockAction.hpp b/etcd/v3/AsyncLockAction.hpp index 8f4ea70..ec8951f 100644 --- a/etcd/v3/AsyncLockAction.hpp +++ b/etcd/v3/AsyncLockAction.hpp @@ -14,7 +14,6 @@ using v3lockpb::LockResponse; using v3lockpb::UnlockRequest; using v3lockpb::UnlockResponse; - namespace etcdv3 { class AsyncLockAction : public etcdv3::Action diff --git a/etcd/v3/AsyncLockResponse.hpp b/etcd/v3/AsyncLockResponse.hpp index 2d8c032..21bc05b 100644 --- a/etcd/v3/AsyncLockResponse.hpp +++ b/etcd/v3/AsyncLockResponse.hpp @@ -5,7 +5,6 @@ #include "proto/v3lock.grpc.pb.h" #include "etcd/v3/V3Response.hpp" - using grpc::ClientAsyncResponseReader; using v3lockpb::LockRequest; using v3lockpb::LockResponse; diff --git a/etcd/v3/AsyncWatchAction.hpp b/etcd/v3/AsyncWatchAction.hpp index 3b73cd8..5f4abc7 100644 --- a/etcd/v3/AsyncWatchAction.hpp +++ b/etcd/v3/AsyncWatchAction.hpp @@ -24,7 +24,6 @@ namespace etcdv3 void waitForResponse(); void waitForResponse(std::function callback); void CancelWatch(); - void WatchReq(std::string const & key); bool Cancelled() const; private: WatchResponse reply; diff --git a/etcd/v3/V3Response.hpp b/etcd/v3/V3Response.hpp index 0c97789..0bd0f17 100644 --- a/etcd/v3/V3Response.hpp +++ b/etcd/v3/V3Response.hpp @@ -3,6 +3,7 @@ #include #include "proto/kv.pb.h" +#include "proto/v3election.pb.h" #include "etcd/v3/KeyValue.hpp" @@ -26,6 +27,8 @@ namespace etcdv3 bool has_values() const; void set_lock_key(std::string const &key); std::string const &get_lock_key() const; + void set_name(std::string const &name); + std::string const &get_name() const; std::vector const & get_events() const; protected: int error_code; @@ -37,6 +40,7 @@ namespace etcdv3 std::vector values; std::vector prev_values; std::string lock_key; // for lock + std::string name; // for campaign (in v3election) std::vector events; // for watch }; } diff --git a/etcd/v3/action_constants.hpp b/etcd/v3/action_constants.hpp index ae10cd7..5ff09e8 100644 --- a/etcd/v3/action_constants.hpp +++ b/etcd/v3/action_constants.hpp @@ -21,6 +21,12 @@ namespace etcdv3 extern char const * LEASETIMETOLIVE; extern char const * LEASELEASES; + extern char const * CAMPAIGN_ACTION; + extern char const * PROCLAIM_ACTION; + extern char const * LEADER_ACTION; + extern char const * OBSERVE_ACTION; + extern char const * RESIGN_ACTION; + extern char const * NUL; extern char const * KEEPALIVE_CREATE; @@ -32,6 +38,8 @@ namespace etcdv3 extern char const * WATCH_WRITE; extern char const * WATCH_WRITES_DONE; + extern char const * ELECTION_OBSERVE_CREATE; + extern const int ERROR_KEY_NOT_FOUND; extern const int ERROR_COMPARE_FAILED; extern const int ERROR_KEY_ALREADY_EXISTS; diff --git a/proto/CMakeLists.txt b/proto/CMakeLists.txt index 49e73c9..de6a88b 100644 --- a/proto/CMakeLists.txt +++ b/proto/CMakeLists.txt @@ -14,6 +14,7 @@ protobuf_generate_latest( compute_generated_srcs(PROTO_GENERATES_SRCS "${PROTO_GEN_OUT_DIR}" false ${PROTO_SRCS}) set(PROTO_GRPC_SRCS "${CMAKE_CURRENT_SOURCE_DIR}/rpc.proto" + "${CMAKE_CURRENT_SOURCE_DIR}/v3election.proto" "${CMAKE_CURRENT_SOURCE_DIR}/v3lock.proto") grpc_generate_cpp(PROTO_GRPC_GENERATES PROTO_GRPC_GENERATES_HDRS "${PROTO_GEN_OUT_DIR}" diff --git a/proto/v3election.proto b/proto/v3election.proto new file mode 100644 index 0000000..3061a10 --- /dev/null +++ b/proto/v3election.proto @@ -0,0 +1,119 @@ +syntax = "proto3"; +package v3electionpb; + +import "gogoproto/gogo.proto"; +import "rpc.proto"; +import "kv.proto"; + +// for grpc-gateway +import "google/api/annotations.proto"; + +option (gogoproto.marshaler_all) = true; +option (gogoproto.unmarshaler_all) = true; + +// The election service exposes client-side election facilities as a gRPC interface. +service Election { + // Campaign waits to acquire leadership in an election, returning a LeaderKey + // representing the leadership if successful. The LeaderKey can then be used + // to issue new values on the election, transactionally guard API requests on + // leadership still being held, and resign from the election. + rpc Campaign(CampaignRequest) returns (CampaignResponse) { + option (google.api.http) = { + post: "/v3/election/campaign" + body: "*" + }; + } + // Proclaim updates the leader's posted value with a new value. + rpc Proclaim(ProclaimRequest) returns (ProclaimResponse) { + option (google.api.http) = { + post: "/v3/election/proclaim" + body: "*" + }; + } + // Leader returns the current election proclamation, if any. + rpc Leader(LeaderRequest) returns (LeaderResponse) { + option (google.api.http) = { + post: "/v3/election/leader" + body: "*" + }; + } + // Observe streams election proclamations in-order as made by the election's + // elected leaders. + rpc Observe(LeaderRequest) returns (stream LeaderResponse) { + option (google.api.http) = { + post: "/v3/election/observe" + body: "*" + }; + } + // Resign releases election leadership so other campaigners may acquire + // leadership on the election. + rpc Resign(ResignRequest) returns (ResignResponse) { + option (google.api.http) = { + post: "/v3/election/resign" + body: "*" + }; + } +} + +message CampaignRequest { + // name is the election's identifier for the campaign. + bytes name = 1; + // lease is the ID of the lease attached to leadership of the election. If the + // lease expires or is revoked before resigning leadership, then the + // leadership is transferred to the next campaigner, if any. + int64 lease = 2; + // value is the initial proclaimed value set when the campaigner wins the + // election. + bytes value = 3; +} + +message CampaignResponse { + etcdserverpb.ResponseHeader header = 1; + // leader describes the resources used for holding leadereship of the election. + LeaderKey leader = 2; +} + +message LeaderKey { + // name is the election identifier that correponds to the leadership key. + bytes name = 1; + // key is an opaque key representing the ownership of the election. If the key + // is deleted, then leadership is lost. + bytes key = 2; + // rev is the creation revision of the key. It can be used to test for ownership + // of an election during transactions by testing the key's creation revision + // matches rev. + int64 rev = 3; + // lease is the lease ID of the election leader. + int64 lease = 4; +} + +message LeaderRequest { + // name is the election identifier for the leadership information. + bytes name = 1; +} + +message LeaderResponse { + etcdserverpb.ResponseHeader header = 1; + // kv is the key-value pair representing the latest leader update. + mvccpb.KeyValue kv = 2; +} + +message ResignRequest { + // leader is the leadership to relinquish by resignation. + LeaderKey leader = 1; +} + +message ResignResponse { + etcdserverpb.ResponseHeader header = 1; +} + +message ProclaimRequest { + // leader is the leadership hold on the election. + LeaderKey leader = 1; + // value is an update meant to overwrite the leader's current value. + bytes value = 2; +} + +message ProclaimResponse { + etcdserverpb.ResponseHeader header = 1; +} diff --git a/src/Client.cpp b/src/Client.cpp index 40812cc..5e50617 100644 --- a/src/Client.cpp +++ b/src/Client.cpp @@ -24,16 +24,18 @@ #include #include "proto/rpc.grpc.pb.h" #include "proto/v3lock.grpc.pb.h" +#include "proto/v3election.grpc.pb.h" #include "etcd/Client.hpp" #include "etcd/KeepAlive.hpp" #include "etcd/v3/action_constants.hpp" #include "etcd/v3/Action.hpp" -#include "etcd/v3/AsyncTxnResponse.hpp" #include "etcd/v3/AsyncRangeResponse.hpp" #include "etcd/v3/AsyncWatchResponse.hpp" #include "etcd/v3/AsyncDeleteRangeResponse.hpp" #include "etcd/v3/AsyncLockResponse.hpp" +#include "etcd/v3/AsyncElectionResponse.hpp" +#include "etcd/v3/AsyncTxnResponse.hpp" #include "etcd/v3/Transaction.hpp" #include "etcd/v3/AsyncSetAction.hpp" @@ -46,6 +48,7 @@ #include "etcd/v3/AsyncWatchAction.hpp" #include "etcd/v3/AsyncLeaseAction.hpp" #include "etcd/v3/AsyncLockAction.hpp" +#include "etcd/v3/AsyncElectionAction.hpp" #include "etcd/v3/AsyncTxnAction.hpp" using grpc::Channel; @@ -165,6 +168,7 @@ struct etcd::Client::EtcdServerStubs { std::unique_ptr watchServiceStub; std::unique_ptr leaseServiceStub; std::unique_ptr lockServiceStub; + std::unique_ptr electionServiceStub; }; void etcd::Client::EtcdServerStubsDeleter::operator()(etcd::Client::EtcdServerStubs *stubs) { @@ -191,6 +195,7 @@ etcd::Client::Client(std::string const & address, stubs->watchServiceStub= Watch::NewStub(this->channel); stubs->leaseServiceStub= Lease::NewStub(this->channel); stubs->lockServiceStub = Lock::NewStub(this->channel); + stubs->electionServiceStub = Election::NewStub(this->channel); } etcd::Client::Client(std::string const & address, @@ -220,6 +225,7 @@ etcd::Client::Client(std::string const & address, stubs->watchServiceStub= Watch::NewStub(this->channel); stubs->leaseServiceStub= Lease::NewStub(this->channel); stubs->lockServiceStub = Lock::NewStub(this->channel); + stubs->electionServiceStub = Election::NewStub(this->channel); } etcd::Client *etcd::Client::WithUser(std::string const & etcd_url, @@ -253,6 +259,7 @@ etcd::Client::Client(std::string const & address, stubs->watchServiceStub= Watch::NewStub(this->channel); stubs->leaseServiceStub= Lease::NewStub(this->channel); stubs->lockServiceStub = Lock::NewStub(this->channel); + stubs->electionServiceStub = Election::NewStub(this->channel); } etcd::Client *etcd::Client::WithSSL(std::string const & etcd_url, @@ -795,3 +802,72 @@ pplx::task etcd::Client::txn(etcdv3::Transaction const &txn) { std::shared_ptr call(new etcdv3::AsyncTxnAction(params, txn)); return Response::create(call); } + +pplx::task etcd::Client::campaign( + std::string const &name, int64_t lease_id, std::string const &value) { + etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); + params.name = name; + params.lease_id = lease_id; + params.value = value; + params.election_stub = stubs->electionServiceStub.get(); + std::shared_ptr call(new etcdv3::AsyncCampaignAction(params)); + return Response::create(call); +} + +pplx::task etcd::Client::proclaim( + std::string const &name, int64_t lease_id, + std::string const &key, int revision, std::string const &value) { + etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); + params.name = name; + params.lease_id = lease_id; + params.key = key; + params.revision = revision; + params.value = value; + params.election_stub = stubs->electionServiceStub.get(); + std::shared_ptr call(new etcdv3::AsyncProclaimAction(params)); + return Response::create(call); +} + +pplx::task etcd::Client::leader(std::string const &name) { + etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); + params.name = name; + params.election_stub = stubs->electionServiceStub.get(); + std::shared_ptr call(new etcdv3::AsyncLeaderAction(params)); + return Response::create(call); +} + +std::unique_ptr etcd::Client::observe( + std::string const &name, std::function callback, const bool once) { + etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); + params.name.assign(name); + params.election_stub = stubs->electionServiceStub.get(); + std::shared_ptr call(new etcdv3::AsyncObserveAction(params, once)); + std::unique_ptr observer(new Observer()); + observer->action = call; + observer->resp = Response::create(call); + return observer; +} + +pplx::task etcd::Client::resign( + std::string const &name, int64_t lease_id, std::string const &key, int revision) { + etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); + params.name = name; + params.lease_id = lease_id; + params.key = key; + params.revision = revision; + params.election_stub = stubs->electionServiceStub.get(); + std::shared_ptr call(new etcdv3::AsyncResignAction(params)); + return Response::create(call); +} + +etcd::Client::Observer::~Observer() { + if (action != nullptr) { + action->CancelObserve(); + resp.wait(); + } +} diff --git a/src/Response.cpp b/src/Response.cpp index 3052449..93a0494 100644 --- a/src/Response.cpp +++ b/src/Response.cpp @@ -22,10 +22,11 @@ etcd::Response::Response(const etcdv3::V3Response& reply, std::chrono::microseco { _value = Value(reply.get_value()); } - _prev_value = Value(reply.get_prev_value()); _lock_key = reply.get_lock_key(); + _name = reply.get_name(); + for (auto const &ev: reply.get_events()) { _events.emplace_back(etcd::Event(ev)); } @@ -112,6 +113,10 @@ std::string const & etcd::Response::lock_key() const { return _lock_key; } +std::string const & etcd::Response::name() const { + return _name; +} + std::vector const & etcd::Response::events() const { return this->_events; } diff --git a/src/SyncClient.cpp b/src/SyncClient.cpp index 8715879..e64001e 100644 --- a/src/SyncClient.cpp +++ b/src/SyncClient.cpp @@ -149,6 +149,37 @@ etcd::Response etcd::SyncClient::leasetimetolive(int64_t lease_id) CHECK_EXCEPTIONS(client.leasetimetolive(lease_id).get()); } +etcd::Response etcd::SyncClient::campaign(std::string const &name, int64_t lease_id, + std::string const &value) +{ + CHECK_EXCEPTIONS(client.campaign(name, lease_id, value).get()); +} + +etcd::Response etcd::SyncClient::proclaim(std::string const &name, int64_t lease_id, + std::string const &key, int revision, + std::string const &value) +{ + CHECK_EXCEPTIONS(client.proclaim(name, lease_id, key, revision, value).get()); +} + +etcd::Response etcd::SyncClient::leader(std::string const &name) +{ + CHECK_EXCEPTIONS(client.leader(name).get()); +} + +std::unique_ptr etcd::SyncClient::observe( + std::string const &name, std::function callback, + const bool once) +{ + return client.observe(name, callback, once); +} + +etcd::Response etcd::SyncClient::resign(std::string const &name, int64_t lease_id, + std::string const &key, int revision) +{ + CHECK_EXCEPTIONS(client.resign(name, lease_id, key, revision).get()); +} + etcd::Response etcd::SyncClient::watch(std::string const & key, bool recursive) { CHECK_EXCEPTIONS(client.watch(key, recursive).get()); diff --git a/src/v3/AsyncElectionAction.cpp b/src/v3/AsyncElectionAction.cpp new file mode 100644 index 0000000..d1387de --- /dev/null +++ b/src/v3/AsyncElectionAction.cpp @@ -0,0 +1,247 @@ +#include "etcd/v3/AsyncElectionAction.hpp" + +#include "etcd/v3/action_constants.hpp" + + +using v3electionpb::LeaderKey; +using v3electionpb::CampaignRequest; +using v3electionpb::CampaignResponse; +using v3electionpb::ProclaimRequest; +using v3electionpb::ProclaimResponse; +using v3electionpb::LeaderRequest; +using v3electionpb::LeaderResponse; +using v3electionpb::ResignRequest; +using v3electionpb::ResignResponse; + +etcdv3::AsyncCampaignAction::AsyncCampaignAction( + etcdv3::ActionParameters const ¶m) + : etcdv3::Action(param) +{ + CampaignRequest campaign_request; + campaign_request.set_name(param.name); + campaign_request.set_lease(param.lease_id); + campaign_request.set_value(param.value); + + response_reader = parameters.election_stub->AsyncCampaign(&context, campaign_request, &cq_); + response_reader->Finish(&reply, &status, (void *)this); +} + +etcdv3::AsyncCampaignResponse etcdv3::AsyncCampaignAction::ParseResponse() +{ + AsyncCampaignResponse campaign_resp; + campaign_resp.set_action(etcdv3::CAMPAIGN_ACTION); + + if(!status.ok()) { + campaign_resp.set_error_code(status.error_code()); + campaign_resp.set_error_message(status.error_message()); + } + else { + campaign_resp.ParseResponse(reply); + } + return campaign_resp; +} + +etcdv3::AsyncProclaimAction::AsyncProclaimAction( + etcdv3::ActionParameters const ¶m) + : etcdv3::Action(param) +{ + auto leader = new LeaderKey(); + leader->set_name(param.name); + leader->set_key(param.key); + leader->set_rev(param.revision); + leader->set_lease(param.lease_id); + + ProclaimRequest proclaim_request; + proclaim_request.set_allocated_leader(leader); + proclaim_request.set_value(param.value); + + response_reader = parameters.election_stub->AsyncProclaim(&context, proclaim_request, &cq_); + response_reader->Finish(&reply, &status, (void *)this); +} + +etcdv3::AsyncProclaimResponse etcdv3::AsyncProclaimAction::ParseResponse() +{ + AsyncProclaimResponse proclaim_resp; + proclaim_resp.set_action(etcdv3::PROCLAIM_ACTION); + + if(!status.ok()) { + proclaim_resp.set_error_code(status.error_code()); + proclaim_resp.set_error_message(status.error_message()); + } + else { + proclaim_resp.ParseResponse(reply); + } + return proclaim_resp; +} + +etcdv3::AsyncLeaderAction::AsyncLeaderAction( + etcdv3::ActionParameters const ¶m) + : etcdv3::Action(param) +{ + LeaderRequest leader_request; + leader_request.set_name(param.name); + + response_reader = parameters.election_stub->AsyncLeader(&context, leader_request, &cq_); + response_reader->Finish(&reply, &status, (void *)this); +} + +etcdv3::AsyncLeaderResponse etcdv3::AsyncLeaderAction::ParseResponse() +{ + AsyncLeaderResponse leader_resp; + leader_resp.set_action(etcdv3::LEADER_ACTION); + + if(!status.ok()) { + leader_resp.set_error_code(status.error_code()); + leader_resp.set_error_message(status.error_message()); + } + else { + leader_resp.ParseResponse(reply); + } + return leader_resp; +} + +etcdv3::AsyncObserveAction::AsyncObserveAction( + etcdv3::ActionParameters const ¶m, const bool once) + : etcdv3::Action(param), once(once) +{ + LeaderRequest leader_request; + leader_request.set_name(param.name); + + response_reader = parameters.election_stub->AsyncObserve(&context, leader_request, &cq_, (void *)etcdv3::ELECTION_OBSERVE_CREATE); + + void *got_tag; + bool ok = false; + if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)etcdv3::ELECTION_OBSERVE_CREATE) { + response_reader->Read(&reply, (void *)this); + } else { + throw std::runtime_error("failed to create a observe connection"); + } +} + +void etcdv3::AsyncObserveAction::waitForResponse() +{ + void* got_tag; + bool ok = false; + + while(cq_.Next(&got_tag, &ok)) + { + if (isCancelled.load()) { + break; + } + if(ok == false) + { + break; + } + if(got_tag == (void*)this) // read tag + { + auto resp = ParseResponse(); + if (resp.get_error_code() != 0) { + CancelObserve(); + break; + } + } + if(isCancelled.load()) { + break; + } + if (once) { + break; + } + response_reader->Read(&reply, (void *)this); + } +} + +void etcdv3::AsyncObserveAction::waitForResponse(std::function callback) +{ + void* got_tag; + bool ok = false; + + while(cq_.Next(&got_tag, &ok)) + { + if(ok == false) + { + break; + } + if (isCancelled.load()) { + break; + } + if(got_tag == (void*)this) // read tag + { + auto resp = ParseResponse(); + auto duration = std::chrono::duration_cast( + std::chrono::high_resolution_clock::now() - start_timepoint); + callback(etcd::Response(resp, duration)); + if (resp.get_error_code() != 0) { + CancelObserve(); + break; + } + start_timepoint = std::chrono::high_resolution_clock::now(); + } + if(isCancelled.load()) { + break; + } + if (once) { + break; + } + response_reader->Read(&reply, (void *)this); + } +} + +void etcdv3::AsyncObserveAction::CancelObserve() +{ + std::lock_guard scope_lock(this->protect_is_cancalled); + if (!isCancelled.exchange(true)) { + cq_.Shutdown(); + } + response_reader->Finish(&status, (void *)this); +} + +bool etcdv3::AsyncObserveAction::Cancelled() const { + return isCancelled.load(); +} + +etcdv3::AsyncObserveResponse etcdv3::AsyncObserveAction::ParseResponse() +{ + AsyncObserveResponse leader_resp; + leader_resp.set_action(etcdv3::OBSERVE_ACTION); + + if(!status.ok()) { + leader_resp.set_error_code(status.error_code()); + leader_resp.set_error_message(status.error_message()); + } + else { + leader_resp.ParseResponse(reply); + } + return leader_resp; +} + +etcdv3::AsyncResignAction::AsyncResignAction( + etcdv3::ActionParameters const ¶m) + : etcdv3::Action(param) +{ + auto leader = new LeaderKey(); + leader->set_name(param.name); + leader->set_key(param.key); + leader->set_rev(param.revision); + leader->set_lease(param.lease_id); + + ResignRequest resign_request; + resign_request.set_allocated_leader(leader); + + response_reader = parameters.election_stub->AsyncResign(&context, resign_request, &cq_); + response_reader->Finish(&reply, &status, (void *)this); +} + +etcdv3::AsyncResignResponse etcdv3::AsyncResignAction::ParseResponse() +{ + AsyncResignResponse resign_resp; + resign_resp.set_action(etcdv3::RESIGN_ACTION); + + if(!status.ok()) { + resign_resp.set_error_code(status.error_code()); + resign_resp.set_error_message(status.error_message()); + } + else { + resign_resp.ParseResponse(reply); + } + return resign_resp; +} diff --git a/src/v3/AsyncElectionResponse.cpp b/src/v3/AsyncElectionResponse.cpp new file mode 100644 index 0000000..a3fdb05 --- /dev/null +++ b/src/v3/AsyncElectionResponse.cpp @@ -0,0 +1,35 @@ +#include "etcd/v3/AsyncElectionResponse.hpp" + +#include "etcd/v3/action_constants.hpp" + +using etcdserverpb::ResponseOp; + +void etcdv3::AsyncCampaignResponse::ParseResponse(CampaignResponse& reply) { + index = reply.header().revision(); + + auto const &leader = reply.leader(); + name = leader.name(); + value.kvs.set_key(leader.key()); + value.kvs.set_create_revision(leader.rev()); + value.kvs.set_lease(leader.lease()); +} + +void etcdv3::AsyncProclaimResponse::ParseResponse(ProclaimResponse& reply) { + index = reply.header().revision(); +} + +void etcdv3::AsyncLeaderResponse::ParseResponse(LeaderResponse& reply) { + index = reply.header().revision(); + + value.kvs = reply.kv(); +} + +void etcdv3::AsyncObserveResponse::ParseResponse(LeaderResponse& reply) { + index = reply.header().revision(); + + value.kvs = reply.kv(); +} + +void etcdv3::AsyncResignResponse::ParseResponse(ResignResponse& reply) { + index = reply.header().revision(); +} diff --git a/src/v3/AsyncLockResponse.cpp b/src/v3/AsyncLockResponse.cpp index 83e163b..ac3f4be 100644 --- a/src/v3/AsyncLockResponse.cpp +++ b/src/v3/AsyncLockResponse.cpp @@ -1,7 +1,6 @@ #include "etcd/v3/AsyncLockResponse.hpp" #include "etcd/v3/action_constants.hpp" - void etcdv3::AsyncLockResponse::ParseResponse(LockResponse& resp) { index = resp.header().revision(); diff --git a/src/v3/V3Response.cpp b/src/v3/V3Response.cpp index 3e76730..7177c13 100644 --- a/src/v3/V3Response.cpp +++ b/src/v3/V3Response.cpp @@ -69,6 +69,14 @@ std::string const & etcdv3::V3Response::get_lock_key() const { return this->lock_key; } +void etcdv3::V3Response::set_name(std::string const &name) { + this->name = name; +} + +std::string const & etcdv3::V3Response::get_name() const { + return this->name; +} + std::vector const & etcdv3::V3Response::get_events() const { return this->events; } diff --git a/src/v3/action_constants.cpp b/src/v3/action_constants.cpp index b0ee9e8..efa14bc 100644 --- a/src/v3/action_constants.cpp +++ b/src/v3/action_constants.cpp @@ -18,6 +18,12 @@ char const * etcdv3::LEASEKEEPALIVE = "leasekeepalive"; char const * etcdv3::LEASETIMETOLIVE = "leasetimetolive"; char const * etcdv3::LEASELEASES = "leaseleases"; +char const * etcdv3::CAMPAIGN_ACTION = "campaign"; +char const * etcdv3::PROCLAIM_ACTION = "preclaim"; +char const * etcdv3::LEADER_ACTION = "leader"; +char const * etcdv3::OBSERVE_ACTION = "obverse"; +char const * etcdv3::RESIGN_ACTION = "resign"; + // see: noPrefixEnd in etcd, however c++ doesn't allows '\0' inside a string, thus we use // the UTF-8 char U+0000 (i.e., "\xC0\x80"). char const * etcdv3::NUL = "\xC0\x80"; @@ -31,6 +37,8 @@ char const * etcdv3::WATCH_CREATE = "watch create"; char const * etcdv3::WATCH_WRITE = "watch write"; char const * etcdv3::WATCH_WRITES_DONE = "watch writes done"; +char const * etcdv3::ELECTION_OBSERVE_CREATE = "observe create"; + const int etcdv3::ERROR_KEY_NOT_FOUND = 100; const int etcdv3::ERROR_COMPARE_FAILED = 101; const int etcdv3::ERROR_KEY_ALREADY_EXISTS = 105; diff --git a/tst/ElectionTest.cpp b/tst/ElectionTest.cpp new file mode 100644 index 0000000..fdf5293 --- /dev/null +++ b/tst/ElectionTest.cpp @@ -0,0 +1,61 @@ +#define CATCH_CONFIG_MAIN +#include + +#include +#include +#include + +#include "etcd/Client.hpp" +#include "etcd/KeepAlive.hpp" + + +TEST_CASE("setup") +{ + etcd::Client etcd("http://127.0.0.1:2379"); + etcd.rmdir("/test", true).wait(); +} + +TEST_CASE("campaign and resign") +{ + etcd::Client etcd("http://127.0.0.1:2379"); + + auto keepalive = etcd.leasekeepalive(60).get(); + auto lease_id = keepalive->Lease(); + + // campaign + auto resp1 = etcd.campaign("test", lease_id, "xxxx").get(); + REQUIRE(0 == resp1.error_code()); + + // leader + { + auto resp2 = etcd.leader("test").get(); + REQUIRE(0 == resp2.error_code()); + REQUIRE(resp1.value().key() == resp2.value().key()); + REQUIRE("xxxx" == resp2.value().as_string()); + } + + // proclaim + auto resp3 = etcd.proclaim("test", lease_id, + resp1.value().key(), resp1.value().created_index(), + "tttt").get(); + REQUIRE(0 == resp3.error_code()); + + // leader + { + auto resp4 = etcd.leader("test").get(); + REQUIRE(0 == resp4.error_code()); + REQUIRE(resp1.value().key() == resp4.value().key()); + REQUIRE("tttt" == resp4.value().as_string()); + } + + // resign + auto resp5 = etcd.resign("test", lease_id, + resp1.value().key(), resp1.value().created_index()).get(); + REQUIRE(0 == resp5.error_code()); +} + +TEST_CASE("cleanup") +{ + etcd::Client etcd("http://127.0.0.1:2379"); + etcd.rmdir("/test", true).get(); +}