diff --git a/.github/workflows/build-test.yml b/.github/workflows/build-test.yml index ae303a1..6ace002 100644 --- a/.github/workflows/build-test.yml +++ b/.github/workflows/build-test.yml @@ -15,7 +15,7 @@ jobs: strategy: fail-fast: false matrix: - os: [ubuntu-20.04, ubuntu-22.04, macos-10.15, macos-11, macos-12] + os: [ubuntu-20.04, ubuntu-22.04, macos-11, macos-12] etcd: [v3.2.26, v3.3.11, v3.4.13, v3.5.7] exclude: - os: ubuntu-20.04 @@ -32,13 +32,6 @@ jobs: - os: ubuntu-22.04 etcd: v3.5.7 - - os: macos-10.15 - etcd: v3.2.26 - - os: macos-10.15 - etcd: v3.3.11 - - os: macos-10.15 - etcd: v3.4.13 - - os: macos-11 etcd: v3.2.26 - os: macos-11 diff --git a/etcd/v3/AsyncCompareAndDeleteAction.hpp b/etcd/v3/AsyncCompareAndDeleteAction.hpp deleted file mode 100644 index 2ce8ab9..0000000 --- a/etcd/v3/AsyncCompareAndDeleteAction.hpp +++ /dev/null @@ -1,27 +0,0 @@ -#ifndef __ASYNC_COMPAREANDDELETE_HPP__ -#define __ASYNC_COMPAREANDDELETE_HPP__ - -#include -#include "proto/rpc.grpc.pb.h" -#include "etcd/v3/Action.hpp" -#include "etcd/v3/AsyncTxnResponse.hpp" - - -using grpc::ClientAsyncResponseReader; -using etcdserverpb::TxnResponse; -using etcdserverpb::KV; - -namespace etcdv3 -{ - class AsyncCompareAndDeleteAction : public etcdv3::Action - { - public: - AsyncCompareAndDeleteAction(etcdv3::ActionParameters && params, etcdv3::AtomicityType type); - AsyncTxnResponse ParseResponse(); - private: - TxnResponse reply; - std::unique_ptr> response_reader; - }; -} - -#endif diff --git a/etcd/v3/AsyncCompareAndSwapAction.hpp b/etcd/v3/AsyncCompareAndSwapAction.hpp deleted file mode 100644 index 2cf2aca..0000000 --- a/etcd/v3/AsyncCompareAndSwapAction.hpp +++ /dev/null @@ -1,27 +0,0 @@ -#ifndef __ASYNC_COMPAREANDSWAP_HPP__ -#define __ASYNC_COMPAREANDSWAP_HPP__ - -#include -#include "proto/rpc.grpc.pb.h" -#include "etcd/v3/Action.hpp" -#include "etcd/v3/AsyncTxnResponse.hpp" - - -using grpc::ClientAsyncResponseReader; -using etcdserverpb::TxnResponse; -using etcdserverpb::KV; - -namespace etcdv3 -{ - class AsyncCompareAndSwapAction : public etcdv3::Action - { - public: - AsyncCompareAndSwapAction(etcdv3::ActionParameters && params, etcdv3::AtomicityType type); - AsyncTxnResponse ParseResponse(); - private: - TxnResponse reply; - std::unique_ptr> response_reader; - }; -} - -#endif diff --git a/etcd/v3/AsyncDeleteAction.hpp b/etcd/v3/AsyncDeleteAction.hpp deleted file mode 100644 index 67e3f83..0000000 --- a/etcd/v3/AsyncDeleteAction.hpp +++ /dev/null @@ -1,26 +0,0 @@ -#ifndef __ASYNC_DELETE_HPP__ -#define __ASYNC_DELETE_HPP__ - -#include -#include "proto/rpc.grpc.pb.h" -#include "etcd/v3/Action.hpp" -#include "etcd/v3/AsyncDeleteResponse.hpp" - - -using grpc::ClientAsyncResponseReader; -using etcdserverpb::DeleteRangeResponse; - -namespace etcdv3 -{ - class AsyncDeleteAction : public etcdv3::Action - { - public: - AsyncDeleteAction(etcdv3::ActionParameters && params); - AsyncDeleteResponse ParseResponse(); - private: - DeleteRangeResponse reply; - std::unique_ptr> response_reader; - }; -} - -#endif diff --git a/etcd/v3/AsyncDeleteResponse.hpp b/etcd/v3/AsyncDeleteResponse.hpp deleted file mode 100644 index 9818690..0000000 --- a/etcd/v3/AsyncDeleteResponse.hpp +++ /dev/null @@ -1,23 +0,0 @@ -#ifndef __ASYNC_DELETERESPONSE_HPP__ -#define __ASYNC_DELETERESPONSE_HPP__ - -#include -#include "proto/rpc.grpc.pb.h" -#include "etcd/v3/V3Response.hpp" -#include "etcd/v3/Action.hpp" - - -using grpc::ClientAsyncResponseReader; -using etcdserverpb::DeleteRangeResponse; - -namespace etcdv3 -{ - class AsyncDeleteResponse : public etcdv3::V3Response - { - public: - AsyncDeleteResponse(){}; - void ParseResponse(std::string const& key, bool prefix, DeleteRangeResponse& resp); - }; -} - -#endif diff --git a/etcd/v3/AsyncElectionAction.hpp b/etcd/v3/AsyncElectionAction.hpp deleted file mode 100644 index ca25eb6..0000000 --- a/etcd/v3/AsyncElectionAction.hpp +++ /dev/null @@ -1,80 +0,0 @@ -#ifndef __ASYNC_ELECTIONACTION_HPP__ -#define __ASYNC_ELECTIONACTION_HPP__ - -#include -#include "proto/rpc.grpc.pb.h" -#include "proto/v3election.grpc.pb.h" -#include "etcd/v3/Action.hpp" -#include "etcd/v3/AsyncElectionResponse.hpp" -#include "etcd/Response.hpp" - -using grpc::ClientAsyncResponseReader; -using grpc::ClientAsyncReader; -using v3electionpb::CampaignRequest; -using v3electionpb::CampaignResponse; -using v3electionpb::ProclaimRequest; -using v3electionpb::ProclaimResponse; -using v3electionpb::LeaderRequest; -using v3electionpb::LeaderResponse; -using v3electionpb::ResignRequest; -using v3electionpb::ResignResponse; - -namespace etcdv3 -{ - class AsyncCampaignAction : public etcdv3::Action - { - public: - AsyncCampaignAction(etcdv3::ActionParameters && params); - AsyncCampaignResponse ParseResponse(); - private: - CampaignResponse reply; - std::unique_ptr> response_reader; - }; - - class AsyncProclaimAction : public etcdv3::Action - { - public: - AsyncProclaimAction(etcdv3::ActionParameters && params); - AsyncProclaimResponse ParseResponse(); - private: - ProclaimResponse reply; - std::unique_ptr> response_reader; - }; - - class AsyncLeaderAction : public etcdv3::Action - { - public: - AsyncLeaderAction(etcdv3::ActionParameters && params); - AsyncLeaderResponse ParseResponse(); - private: - LeaderResponse reply; - std::unique_ptr> response_reader; - }; - - class AsyncObserveAction : public etcdv3::Action - { - public: - AsyncObserveAction(etcdv3::ActionParameters && params); - AsyncObserveResponse ParseResponse(); - void waitForResponse(); - void CancelObserve(); - bool Cancelled() const; - private: - LeaderResponse reply; - std::unique_ptr> response_reader; - std::atomic_bool isCancelled; - std::mutex protect_is_cancalled; - }; - - class AsyncResignAction : public etcdv3::Action - { - public: - AsyncResignAction(etcdv3::ActionParameters && params); - AsyncResignResponse ParseResponse(); - private: - ResignResponse reply; - std::unique_ptr> response_reader; - }; -} - -#endif diff --git a/etcd/v3/AsyncElectionResponse.hpp b/etcd/v3/AsyncElectionResponse.hpp deleted file mode 100644 index 08923f0..0000000 --- a/etcd/v3/AsyncElectionResponse.hpp +++ /dev/null @@ -1,51 +0,0 @@ -#ifndef __ASYNC_ELECTIONRESPONSE_HPP__ -#define __ASYNC_ELECTIONRESPONSE_HPP__ - -#include "proto/rpc.pb.h" -#include "proto/v3election.pb.h" -#include "etcd/v3/V3Response.hpp" - -using v3electionpb::CampaignResponse; -using v3electionpb::ProclaimResponse; -using v3electionpb::LeaderResponse; -using v3electionpb::ResignResponse; - -namespace etcdv3 -{ - class AsyncCampaignResponse : public etcdv3::V3Response - { - public: - AsyncCampaignResponse(){}; - void ParseResponse(CampaignResponse& resp); - }; - - class AsyncProclaimResponse : public etcdv3::V3Response - { - public: - AsyncProclaimResponse(){}; - void ParseResponse(ProclaimResponse& resp); - }; - - class AsyncLeaderResponse : public etcdv3::V3Response - { - public: - AsyncLeaderResponse(){}; - void ParseResponse(LeaderResponse& resp); - }; - - class AsyncObserveResponse : public etcdv3::V3Response - { - public: - AsyncObserveResponse(){}; - void ParseResponse(LeaderResponse& resp); - }; - - class AsyncResignResponse : public etcdv3::V3Response - { - public: - AsyncResignResponse(){}; - void ParseResponse(ResignResponse& resp); - }; -} - -#endif diff --git a/etcd/v3/AsyncGRPC.hpp b/etcd/v3/AsyncGRPC.hpp new file mode 100644 index 0000000..8533c28 --- /dev/null +++ b/etcd/v3/AsyncGRPC.hpp @@ -0,0 +1,445 @@ +#ifndef __ASYNC_GRPC_HPP__ +#define __ASYNC_GRPC_HPP__ + +#include +#include + +#include + +#include "proto/rpc.pb.h" +#include "proto/rpc.grpc.pb.h" +#include "proto/v3election.pb.h" +#include "proto/v3election.grpc.pb.h" +#include "proto/v3lock.pb.h" +#include "proto/v3lock.grpc.pb.h" + +#include "etcd/v3/Action.hpp" +#include "etcd/v3/V3Response.hpp" +#include "etcd/Response.hpp" + +using grpc::ClientAsyncReader; +using grpc::ClientAsyncReaderWriter; +using grpc::ClientAsyncResponseReader; + +using etcdserverpb::KV; + +using v3electionpb::CampaignRequest; +using v3electionpb::CampaignResponse; +using etcdserverpb::DeleteRangeRequest; +using etcdserverpb::DeleteRangeResponse; +using etcdserverpb::LeaseCheckpointRequest; +using etcdserverpb::LeaseCheckpointResponse; +using etcdserverpb::LeaseGrantRequest; +using etcdserverpb::LeaseGrantResponse; +using etcdserverpb::LeaseRevokeRequest; +using etcdserverpb::LeaseRevokeResponse; +using etcdserverpb::LeaseKeepAliveRequest; +using etcdserverpb::LeaseKeepAliveResponse; +using etcdserverpb::LeaseLeasesRequest; +using etcdserverpb::LeaseLeasesResponse; +using etcdserverpb::LeaseTimeToLiveRequest; +using etcdserverpb::LeaseTimeToLiveResponse; +using etcdserverpb::TxnRequest; +using etcdserverpb::TxnResponse; +using etcdserverpb::RangeRequest; +using etcdserverpb::RangeResponse; +using v3electionpb::ResignRequest; +using v3electionpb::ResignResponse; +using etcdserverpb::PutRequest; +using etcdserverpb::PutResponse; +using etcdserverpb::RangeRequest; +using etcdserverpb::RangeResponse; +using etcdserverpb::TxnRequest; +using etcdserverpb::TxnResponse; +using etcdserverpb::WatchRequest; +using etcdserverpb::WatchResponse; +using v3electionpb::LeaderRequest; +using v3electionpb::LeaderResponse; +using v3electionpb::ProclaimRequest; +using v3electionpb::ProclaimResponse; +using v3lockpb::LockRequest; +using v3lockpb::LockResponse; +using v3lockpb::UnlockRequest; +using v3lockpb::UnlockResponse; + +namespace etcd { + class KeepAlive; +} + +namespace etcdv3 { + class Transaction; +} + +namespace etcdv3 { + class AsyncCampaignResponse : public etcdv3::V3Response + { + public: + AsyncCampaignResponse(){}; + void ParseResponse(CampaignResponse& resp); + }; + + class AsyncDeleteResponse : public etcdv3::V3Response + { + public: + AsyncDeleteResponse(){}; + void ParseResponse(std::string const& key, bool prefix, DeleteRangeResponse& resp); + }; + + class AsyncHeadResponse : public etcdv3::V3Response + { + public: + AsyncHeadResponse(){}; + void ParseResponse(RangeResponse& resp); + }; + + class AsyncLeaderResponse : public etcdv3::V3Response + { + public: + AsyncLeaderResponse(){}; + void ParseResponse(LeaderResponse& resp); + }; + + class AsyncLeaseGrantResponse : public etcdv3::V3Response + { + public: + AsyncLeaseGrantResponse(){}; + void ParseResponse(LeaseGrantResponse& resp); + }; + + class AsyncLeaseKeepAliveResponse : public etcdv3::V3Response + { + public: + AsyncLeaseKeepAliveResponse(){}; + void ParseResponse(LeaseKeepAliveResponse& resp); + }; + + class AsyncLeaseLeasesResponse : public etcdv3::V3Response + { + public: + AsyncLeaseLeasesResponse(){}; + void ParseResponse(LeaseLeasesResponse& resp); + }; + + class AsyncLeaseRevokeResponse : public etcdv3::V3Response + { + public: + AsyncLeaseRevokeResponse(){}; + void ParseResponse(LeaseRevokeResponse& resp); + }; + + class AsyncLeaseTimeToLiveResponse : public etcdv3::V3Response + { + public: + AsyncLeaseTimeToLiveResponse(){}; + void ParseResponse(LeaseTimeToLiveResponse& resp); + }; + + class AsyncLockResponse : public etcdv3::V3Response + { + public: + AsyncLockResponse(){}; + void ParseResponse(LockResponse& resp); + }; + + class AsyncObserveResponse : public etcdv3::V3Response + { + public: + AsyncObserveResponse(){}; + void ParseResponse(LeaderResponse& resp); + }; + + class AsyncProclaimResponse : public etcdv3::V3Response + { + public: + AsyncProclaimResponse(){}; + void ParseResponse(ProclaimResponse& resp); + }; + + class AsyncPutResponse : public etcdv3::V3Response + { + public: + AsyncPutResponse(){}; + void ParseResponse(PutResponse& resp); + }; + + class AsyncRangeResponse : public etcdv3::V3Response + { + public: + AsyncRangeResponse(){}; + void ParseResponse(RangeResponse& resp, bool prefix=false); + }; + + class AsyncResignResponse : public etcdv3::V3Response + { + public: + AsyncResignResponse(){}; + void ParseResponse(ResignResponse& resp); + }; + + class AsyncTxnResponse : public etcdv3::V3Response + { + public: + AsyncTxnResponse(){}; + void ParseResponse(TxnResponse& resp); + void ParseResponse(std::string const& key, bool prefix, TxnResponse& resp); + }; + + class AsyncUnlockResponse : public etcdv3::V3Response + { + public: + AsyncUnlockResponse(){}; + void ParseResponse(UnlockResponse& resp); + }; + + class AsyncWatchResponse : public etcdv3::V3Response + { + public: + AsyncWatchResponse(){}; + void ParseResponse(WatchResponse& resp); + }; +} + +namespace etcdv3 +{ + class AsyncCampaignAction : public etcdv3::Action + { + public: + AsyncCampaignAction(etcdv3::ActionParameters && params); + AsyncCampaignResponse ParseResponse(); + private: + CampaignResponse reply; + std::unique_ptr> response_reader; + }; + + class AsyncCompareAndDeleteAction : public etcdv3::Action + { + public: + AsyncCompareAndDeleteAction(etcdv3::ActionParameters && params, etcdv3::AtomicityType type); + AsyncTxnResponse ParseResponse(); + private: + TxnResponse reply; + std::unique_ptr> response_reader; + }; + + class AsyncCompareAndSwapAction : public etcdv3::Action + { + public: + AsyncCompareAndSwapAction(etcdv3::ActionParameters && params, etcdv3::AtomicityType type); + AsyncTxnResponse ParseResponse(); + private: + TxnResponse reply; + std::unique_ptr> response_reader; + }; + + class AsyncDeleteAction : public etcdv3::Action + { + public: + AsyncDeleteAction(etcdv3::ActionParameters && params); + AsyncDeleteResponse ParseResponse(); + private: + DeleteRangeResponse reply; + std::unique_ptr> response_reader; + }; + + class AsyncHeadAction : public etcdv3::Action + { + public: + AsyncHeadAction(etcdv3::ActionParameters && params); + AsyncHeadResponse ParseResponse(); + private: + RangeResponse reply; + std::unique_ptr> response_reader; + }; + + class AsyncLeaderAction : public etcdv3::Action + { + public: + AsyncLeaderAction(etcdv3::ActionParameters && params); + AsyncLeaderResponse ParseResponse(); + private: + LeaderResponse reply; + std::unique_ptr> response_reader; + }; + + class AsyncLeaseGrantAction : public etcdv3::Action { + public: + AsyncLeaseGrantAction(etcdv3::ActionParameters && params); + AsyncLeaseGrantResponse ParseResponse(); + private: + LeaseGrantResponse reply; + std::unique_ptr> response_reader; + }; + + class AsyncLeaseKeepAliveAction: public etcdv3::Action { + public: + AsyncLeaseKeepAliveAction(etcdv3::ActionParameters && params); + AsyncLeaseKeepAliveResponse ParseResponse(); + + etcd::Response Refresh(); + void CancelKeepAlive(); + bool Cancelled() const; + + private: + etcdv3::ActionParameters& mutable_parameters(); + + LeaseKeepAliveResponse reply; + std::unique_ptr> stream; + + LeaseKeepAliveRequest req; + bool isCancelled; + std::recursive_mutex protect_is_cancelled; + + friend class etcd::KeepAlive; + }; + + class AsyncLeaseLeasesAction: public etcdv3::Action { + public: + AsyncLeaseLeasesAction(etcdv3::ActionParameters && params); + AsyncLeaseLeasesResponse ParseResponse(); + private: + LeaseLeasesResponse reply; + std::unique_ptr> response_reader; + }; + + class AsyncLeaseRevokeAction: public etcdv3::Action { + public: + AsyncLeaseRevokeAction(etcdv3::ActionParameters && params); + AsyncLeaseRevokeResponse ParseResponse(); + private: + LeaseRevokeResponse reply; + std::unique_ptr> response_reader; + }; + + class AsyncLeaseTimeToLiveAction: public etcdv3::Action { + public: + AsyncLeaseTimeToLiveAction(etcdv3::ActionParameters && params); + AsyncLeaseTimeToLiveResponse ParseResponse(); + private: + LeaseTimeToLiveResponse reply; + std::unique_ptr> response_reader; + }; + + class AsyncLockAction : public etcdv3::Action + { + public: + AsyncLockAction(etcdv3::ActionParameters && params); + AsyncLockResponse ParseResponse(); + private: + LockResponse reply; + std::unique_ptr> response_reader; + }; + + class AsyncObserveAction : public etcdv3::Action + { + public: + AsyncObserveAction(etcdv3::ActionParameters && params); + AsyncObserveResponse ParseResponse(); + void waitForResponse(); + void CancelObserve(); + bool Cancelled() const; + private: + LeaderResponse reply; + std::unique_ptr> response_reader; + std::atomic_bool isCancelled; + std::mutex protect_is_cancalled; + }; + + class AsyncProclaimAction : public etcdv3::Action + { + public: + AsyncProclaimAction(etcdv3::ActionParameters && params); + AsyncProclaimResponse ParseResponse(); + private: + ProclaimResponse reply; + std::unique_ptr> response_reader; + }; + + class AsyncPutAction : public etcdv3::Action + { + public: + AsyncPutAction(etcdv3::ActionParameters && params); + AsyncPutResponse ParseResponse(); + private: + PutResponse reply; + std::unique_ptr> response_reader; + }; + + class AsyncRangeAction : public etcdv3::Action + { + public: + AsyncRangeAction(etcdv3::ActionParameters && params); + AsyncRangeResponse ParseResponse(); + private: + RangeResponse reply; + std::unique_ptr> response_reader; + }; + + class AsyncResignAction : public etcdv3::Action + { + public: + AsyncResignAction(etcdv3::ActionParameters && params); + AsyncResignResponse ParseResponse(); + private: + ResignResponse reply; + std::unique_ptr> response_reader; + }; + + class AsyncSetAction : public etcdv3::Action + { + public: + AsyncSetAction(etcdv3::ActionParameters && params, bool create=false); + AsyncTxnResponse ParseResponse(); + private: + TxnResponse reply; + std::unique_ptr> response_reader; + bool isCreate; + }; + + class AsyncTxnAction : public etcdv3::Action + { + public: + AsyncTxnAction(etcdv3::ActionParameters && params, etcdv3::Transaction const &tx); + AsyncTxnResponse ParseResponse(); + private: + TxnResponse reply; + std::unique_ptr> response_reader; + }; + + class AsyncUnlockAction : public etcdv3::Action + { + public: + AsyncUnlockAction(etcdv3::ActionParameters && params); + AsyncUnlockResponse ParseResponse(); + private: + UnlockResponse reply; + std::unique_ptr> response_reader; + }; + + class AsyncUpdateAction : public etcdv3::Action + { + public: + AsyncUpdateAction(etcdv3::ActionParameters && params); + AsyncTxnResponse ParseResponse(); + private: + TxnResponse reply; + std::unique_ptr> response_reader; + }; + + class AsyncWatchAction : public etcdv3::Action + { + public: + AsyncWatchAction(etcdv3::ActionParameters && params); + AsyncWatchResponse ParseResponse(); + void waitForResponse(); + void waitForResponse(std::function callback); + void CancelWatch(); + bool Cancelled() const; + private: + int64_t watch_id = -1; + WatchResponse reply; + std::unique_ptr> stream; + std::atomic_bool isCancelled; + }; +} + +#endif diff --git a/etcd/v3/AsyncHeadAction.hpp b/etcd/v3/AsyncHeadAction.hpp deleted file mode 100644 index 7f3a9a8..0000000 --- a/etcd/v3/AsyncHeadAction.hpp +++ /dev/null @@ -1,26 +0,0 @@ -#ifndef __ASYNC_HEAD_HPP__ -#define __ASYNC_HEAD_HPP__ - -#include -#include "proto/rpc.grpc.pb.h" -#include "etcd/v3/Action.hpp" -#include "etcd/v3/AsyncHeadResponse.hpp" - - -using grpc::ClientAsyncResponseReader; -using etcdserverpb::RangeResponse; - -namespace etcdv3 -{ - class AsyncHeadAction : public etcdv3::Action - { - public: - AsyncHeadAction(etcdv3::ActionParameters && params); - AsyncHeadResponse ParseResponse(); - private: - RangeResponse reply; - std::unique_ptr> response_reader; - }; -} - -#endif diff --git a/etcd/v3/AsyncHeadResponse.hpp b/etcd/v3/AsyncHeadResponse.hpp deleted file mode 100644 index e54c1e4..0000000 --- a/etcd/v3/AsyncHeadResponse.hpp +++ /dev/null @@ -1,22 +0,0 @@ -#ifndef __ASYNC_HEADRESPONSE_HPP__ -#define __ASYNC_HEADRESPONSE_HPP__ - -#include -#include "proto/rpc.grpc.pb.h" -#include "etcd/v3/V3Response.hpp" - - -using grpc::ClientAsyncResponseReader; -using etcdserverpb::RangeResponse; - -namespace etcdv3 -{ - class AsyncHeadResponse : public etcdv3::V3Response - { - public: - AsyncHeadResponse(){}; - void ParseResponse(RangeResponse& resp); - }; -} - -#endif diff --git a/etcd/v3/AsyncLeaseAction.hpp b/etcd/v3/AsyncLeaseAction.hpp deleted file mode 100644 index f8fc972..0000000 --- a/etcd/v3/AsyncLeaseAction.hpp +++ /dev/null @@ -1,89 +0,0 @@ -#ifndef __ASYNC_LEASEACTION_HPP__ -#define __ASYNC_LEASEACTION_HPP__ - -#include - -#include -#include "proto/rpc.grpc.pb.h" -#include "etcd/v3/Action.hpp" -#include "etcd/v3/AsyncLeaseResponse.hpp" -#include "etcd/Response.hpp" - -using grpc::ClientAsyncResponseReader; -using grpc::ClientAsyncReaderWriter; -using etcdserverpb::LeaseGrantResponse; -using etcdserverpb::LeaseRevokeResponse; -using etcdserverpb::LeaseCheckpoint; -using etcdserverpb::LeaseCheckpointResponse; -using etcdserverpb::LeaseKeepAliveRequest; -using etcdserverpb::LeaseKeepAliveResponse; -using etcdserverpb::LeaseTimeToLiveResponse; -using etcdserverpb::LeaseStatus; -using etcdserverpb::LeaseLeasesResponse; - -namespace etcd { - class KeepAlive; -} - -namespace etcdv3 -{ - class AsyncLeaseGrantAction : public etcdv3::Action { - public: - AsyncLeaseGrantAction(etcdv3::ActionParameters && params); - AsyncLeaseGrantResponse ParseResponse(); - private: - LeaseGrantResponse reply; - std::unique_ptr> response_reader; - }; - - class AsyncLeaseRevokeAction: public etcdv3::Action { - public: - AsyncLeaseRevokeAction(etcdv3::ActionParameters && params); - AsyncLeaseRevokeResponse ParseResponse(); - private: - LeaseRevokeResponse reply; - std::unique_ptr> response_reader; - }; - - class AsyncLeaseKeepAliveAction: public etcdv3::Action { - public: - AsyncLeaseKeepAliveAction(etcdv3::ActionParameters && params); - AsyncLeaseKeepAliveResponse ParseResponse(); - - etcd::Response Refresh(); - void CancelKeepAlive(); - bool Cancelled() const; - - private: - etcdv3::ActionParameters& mutable_parameters(); - - LeaseKeepAliveResponse reply; - std::unique_ptr> stream; - - LeaseKeepAliveRequest req; - bool isCancelled; - std::recursive_mutex protect_is_cancelled; - - friend class etcd::KeepAlive; - }; - - class AsyncLeaseTimeToLiveAction: public etcdv3::Action { - public: - AsyncLeaseTimeToLiveAction(etcdv3::ActionParameters && params); - AsyncLeaseTimeToLiveResponse ParseResponse(); - private: - LeaseTimeToLiveResponse reply; - std::unique_ptr> response_reader; - }; - - class AsyncLeaseLeasesAction: public etcdv3::Action { - public: - AsyncLeaseLeasesAction(etcdv3::ActionParameters && params); - AsyncLeaseLeasesResponse ParseResponse(); - private: - LeaseLeasesResponse reply; - std::unique_ptr> response_reader; - }; -} - -#endif diff --git a/etcd/v3/AsyncLeaseResponse.hpp b/etcd/v3/AsyncLeaseResponse.hpp deleted file mode 100644 index 4800c9a..0000000 --- a/etcd/v3/AsyncLeaseResponse.hpp +++ /dev/null @@ -1,55 +0,0 @@ -#ifndef __ASYNC_LEASERESPONSE_HPP__ -#define __ASYNC_LEASERESPONSE_HPP__ - -#include -#include "proto/rpc.grpc.pb.h" -#include "etcd/v3/V3Response.hpp" - -using etcdserverpb::LeaseGrantResponse; -using etcdserverpb::LeaseRevokeResponse; -using etcdserverpb::LeaseCheckpoint; -using etcdserverpb::LeaseCheckpointResponse; -using etcdserverpb::LeaseKeepAliveResponse; -using etcdserverpb::LeaseTimeToLiveResponse; -using etcdserverpb::LeaseStatus; -using etcdserverpb::LeaseLeasesResponse; - -namespace etcdv3 -{ - class AsyncLeaseGrantResponse : public etcdv3::V3Response - { - public: - AsyncLeaseGrantResponse(){}; - void ParseResponse(LeaseGrantResponse& resp); - }; - - class AsyncLeaseRevokeResponse : public etcdv3::V3Response - { - public: - AsyncLeaseRevokeResponse(){}; - void ParseResponse(LeaseRevokeResponse& resp); - }; - - class AsyncLeaseKeepAliveResponse : public etcdv3::V3Response - { - public: - AsyncLeaseKeepAliveResponse(){}; - void ParseResponse(LeaseKeepAliveResponse& resp); - }; - - class AsyncLeaseTimeToLiveResponse : public etcdv3::V3Response - { - public: - AsyncLeaseTimeToLiveResponse(){}; - void ParseResponse(LeaseTimeToLiveResponse& resp); - }; - - class AsyncLeaseLeasesResponse : public etcdv3::V3Response - { - public: - AsyncLeaseLeasesResponse(){}; - void ParseResponse(LeaseLeasesResponse& resp); - }; -} - -#endif diff --git a/etcd/v3/AsyncLockAction.hpp b/etcd/v3/AsyncLockAction.hpp deleted file mode 100644 index ea0bbcd..0000000 --- a/etcd/v3/AsyncLockAction.hpp +++ /dev/null @@ -1,40 +0,0 @@ -#ifndef __ASYNC_LOCKACTION_HPP__ -#define __ASYNC_LOCKACTION_HPP__ - -#include -#include "proto/v3lock.grpc.pb.h" -#include "etcd/v3/Action.hpp" -#include "etcd/v3/AsyncLockResponse.hpp" -#include "etcd/Response.hpp" - - -using grpc::ClientAsyncResponseReader; -using v3lockpb::LockRequest; -using v3lockpb::LockResponse; -using v3lockpb::UnlockRequest; -using v3lockpb::UnlockResponse; - -namespace etcdv3 -{ - class AsyncLockAction : public etcdv3::Action - { - public: - AsyncLockAction(etcdv3::ActionParameters && params); - AsyncLockResponse ParseResponse(); - private: - LockResponse reply; - std::unique_ptr> response_reader; - }; - - class AsyncUnlockAction : public etcdv3::Action - { - public: - AsyncUnlockAction(etcdv3::ActionParameters && params); - AsyncUnlockResponse ParseResponse(); - private: - UnlockResponse reply; - std::unique_ptr> response_reader; - }; -} - -#endif diff --git a/etcd/v3/AsyncLockResponse.hpp b/etcd/v3/AsyncLockResponse.hpp deleted file mode 100644 index 21bc05b..0000000 --- a/etcd/v3/AsyncLockResponse.hpp +++ /dev/null @@ -1,32 +0,0 @@ -#ifndef __ASYNC_LOCK_HPP__ -#define __ASYNC_LOCK_HPP__ - -#include -#include "proto/v3lock.grpc.pb.h" -#include "etcd/v3/V3Response.hpp" - -using grpc::ClientAsyncResponseReader; -using v3lockpb::LockRequest; -using v3lockpb::LockResponse; -using v3lockpb::UnlockRequest; -using v3lockpb::UnlockResponse; - -namespace etcdv3 -{ - class AsyncLockResponse : public etcdv3::V3Response - { - public: - AsyncLockResponse(){}; - void ParseResponse(LockResponse& resp); - }; - - class AsyncUnlockResponse : public etcdv3::V3Response - { - public: - AsyncUnlockResponse(){}; - void ParseResponse(UnlockResponse& resp); - }; -} - -#endif - diff --git a/etcd/v3/AsyncPutAction.hpp b/etcd/v3/AsyncPutAction.hpp deleted file mode 100644 index f13cd77..0000000 --- a/etcd/v3/AsyncPutAction.hpp +++ /dev/null @@ -1,26 +0,0 @@ -#ifndef __ASYNC_PUT_HPP__ -#define __ASYNC_PUT_HPP__ - -#include -#include "proto/rpc.grpc.pb.h" -#include "etcd/v3/Action.hpp" -#include "etcd/v3/AsyncPutResponse.hpp" - - -using grpc::ClientAsyncResponseReader; -using etcdserverpb::PutResponse; - -namespace etcdv3 -{ - class AsyncPutAction : public etcdv3::Action - { - public: - AsyncPutAction(etcdv3::ActionParameters && params); - AsyncPutResponse ParseResponse(); - private: - PutResponse reply; - std::unique_ptr> response_reader; - }; -} - -#endif diff --git a/etcd/v3/AsyncPutResponse.hpp b/etcd/v3/AsyncPutResponse.hpp deleted file mode 100644 index ea94e2b..0000000 --- a/etcd/v3/AsyncPutResponse.hpp +++ /dev/null @@ -1,23 +0,0 @@ -#ifndef __ASYNC_PUTRESPONSE_HPP__ -#define __ASYNC_PUTRESPONSE_HPP__ - -#include -#include "proto/rpc.grpc.pb.h" -#include "etcd/v3/V3Response.hpp" -#include "etcd/v3/Action.hpp" - - -using grpc::ClientAsyncResponseReader; -using etcdserverpb::PutResponse; - -namespace etcdv3 -{ - class AsyncPutResponse : public etcdv3::V3Response - { - public: - AsyncPutResponse(){}; - void ParseResponse(PutResponse& resp); - }; -} - -#endif diff --git a/etcd/v3/AsyncRangeAction.hpp b/etcd/v3/AsyncRangeAction.hpp deleted file mode 100644 index e082c3a..0000000 --- a/etcd/v3/AsyncRangeAction.hpp +++ /dev/null @@ -1,26 +0,0 @@ -#ifndef __ASYNC_RANGE_HPP__ -#define __ASYNC_RANGE_HPP__ - -#include -#include "proto/rpc.grpc.pb.h" -#include "etcd/v3/Action.hpp" -#include "etcd/v3/AsyncRangeResponse.hpp" - - -using grpc::ClientAsyncResponseReader; -using etcdserverpb::RangeResponse; - -namespace etcdv3 -{ - class AsyncRangeAction : public etcdv3::Action - { - public: - AsyncRangeAction(etcdv3::ActionParameters && params); - AsyncRangeResponse ParseResponse(); - private: - RangeResponse reply; - std::unique_ptr> response_reader; - }; -} - -#endif diff --git a/etcd/v3/AsyncRangeResponse.hpp b/etcd/v3/AsyncRangeResponse.hpp deleted file mode 100644 index 1eb4122..0000000 --- a/etcd/v3/AsyncRangeResponse.hpp +++ /dev/null @@ -1,22 +0,0 @@ -#ifndef __ASYNC_RANGERESPONSE_HPP__ -#define __ASYNC_RANGERESPONSE_HPP__ - -#include -#include "proto/rpc.grpc.pb.h" -#include "etcd/v3/V3Response.hpp" - - -using grpc::ClientAsyncResponseReader; -using etcdserverpb::RangeResponse; - -namespace etcdv3 -{ - class AsyncRangeResponse : public etcdv3::V3Response - { - public: - AsyncRangeResponse(){}; - void ParseResponse(RangeResponse& resp, bool prefix=false); - }; -} - -#endif diff --git a/etcd/v3/AsyncSetAction.hpp b/etcd/v3/AsyncSetAction.hpp deleted file mode 100644 index 0c0990c..0000000 --- a/etcd/v3/AsyncSetAction.hpp +++ /dev/null @@ -1,28 +0,0 @@ -#ifndef __ASYNC_SET_HPP__ -#define __ASYNC_SET_HPP__ - -#include -#include "proto/rpc.grpc.pb.h" -#include "etcd/v3/Action.hpp" -#include "etcd/v3/AsyncTxnResponse.hpp" - - -using grpc::ClientAsyncResponseReader; -using etcdserverpb::TxnResponse; -using etcdserverpb::KV; - -namespace etcdv3 -{ - class AsyncSetAction : public etcdv3::Action - { - public: - AsyncSetAction(etcdv3::ActionParameters && params, bool create=false); - AsyncTxnResponse ParseResponse(); - private: - TxnResponse reply; - std::unique_ptr> response_reader; - bool isCreate; - }; -} - -#endif diff --git a/etcd/v3/AsyncTxnAction.hpp b/etcd/v3/AsyncTxnAction.hpp deleted file mode 100644 index 1880842..0000000 --- a/etcd/v3/AsyncTxnAction.hpp +++ /dev/null @@ -1,29 +0,0 @@ -#ifndef __ASYNC_TXNACTION_HPP__ -#define __ASYNC_TXNACTION_HPP__ - -#include -#include "proto/rpc.grpc.pb.h" -#include "etcd/v3/Action.hpp" -#include "etcd/v3/AsyncTxnResponse.hpp" -#include "etcd/v3/Transaction.hpp" - - -using grpc::ClientAsyncResponseReader; -using etcdserverpb::TxnRequest; -using etcdserverpb::TxnResponse; -using etcdserverpb::KV; - -namespace etcdv3 -{ - class AsyncTxnAction : public etcdv3::Action - { - public: - AsyncTxnAction(etcdv3::ActionParameters && params, etcdv3::Transaction const &tx); - AsyncTxnResponse ParseResponse(); - private: - TxnResponse reply; - std::unique_ptr> response_reader; - }; -} - -#endif diff --git a/etcd/v3/AsyncTxnResponse.hpp b/etcd/v3/AsyncTxnResponse.hpp deleted file mode 100644 index 7f65319..0000000 --- a/etcd/v3/AsyncTxnResponse.hpp +++ /dev/null @@ -1,20 +0,0 @@ -#ifndef __ASYNC_TXNRESPONSE_HPP__ -#define __ASYNC_TXNRESPONSE_HPP__ - -#include "proto/rpc.pb.h" -#include "etcd/v3/V3Response.hpp" - -using etcdserverpb::TxnResponse; - -namespace etcdv3 -{ - class AsyncTxnResponse : public etcdv3::V3Response - { - public: - AsyncTxnResponse(){}; - void ParseResponse(TxnResponse& resp); - void ParseResponse(std::string const& key, bool prefix, TxnResponse& resp); - }; -} - -#endif diff --git a/etcd/v3/AsyncUpdateAction.hpp b/etcd/v3/AsyncUpdateAction.hpp deleted file mode 100644 index c57eb87..0000000 --- a/etcd/v3/AsyncUpdateAction.hpp +++ /dev/null @@ -1,27 +0,0 @@ -#ifndef __ASYNC_UPDATE_HPP__ -#define __ASYNC_UPDATE_HPP__ - -#include -#include "proto/rpc.grpc.pb.h" -#include "etcd/v3/Action.hpp" -#include "etcd/v3/AsyncTxnResponse.hpp" - - -using grpc::ClientAsyncResponseReader; -using etcdserverpb::TxnResponse; -using etcdserverpb::KV; - -namespace etcdv3 -{ - class AsyncUpdateAction : public etcdv3::Action - { - public: - AsyncUpdateAction(etcdv3::ActionParameters && params); - AsyncTxnResponse ParseResponse(); - private: - TxnResponse reply; - std::unique_ptr> response_reader; - }; -} - -#endif diff --git a/etcd/v3/AsyncWatchAction.hpp b/etcd/v3/AsyncWatchAction.hpp deleted file mode 100644 index 9186bee..0000000 --- a/etcd/v3/AsyncWatchAction.hpp +++ /dev/null @@ -1,36 +0,0 @@ -#ifndef __ASYNC_WATCHACTION_HPP__ -#define __ASYNC_WATCHACTION_HPP__ - -#include -#include - -#include -#include "proto/rpc.grpc.pb.h" -#include "etcd/v3/Action.hpp" -#include "etcd/v3/AsyncWatchResponse.hpp" -#include "etcd/Response.hpp" - -using grpc::ClientAsyncReaderWriter; -using etcdserverpb::WatchRequest; -using etcdserverpb::WatchResponse; - -namespace etcdv3 -{ - class AsyncWatchAction : public etcdv3::Action - { - public: - AsyncWatchAction(etcdv3::ActionParameters && params); - AsyncWatchResponse ParseResponse(); - void waitForResponse(); - void waitForResponse(std::function callback); - void CancelWatch(); - bool Cancelled() const; - private: - int64_t watch_id = -1; - WatchResponse reply; - std::unique_ptr> stream; - std::atomic_bool isCancelled; - }; -} - -#endif diff --git a/etcd/v3/AsyncWatchResponse.hpp b/etcd/v3/AsyncWatchResponse.hpp deleted file mode 100644 index 5ce6b57..0000000 --- a/etcd/v3/AsyncWatchResponse.hpp +++ /dev/null @@ -1,25 +0,0 @@ -#ifndef __ASYNC_WATCH_HPP__ -#define __ASYNC_WATCH_HPP__ - -#include -#include "proto/rpc.grpc.pb.h" -#include "proto/rpc.pb.h" -#include "etcd/v3/V3Response.hpp" - - -using etcdserverpb::WatchRequest; -using etcdserverpb::WatchResponse; -using etcdserverpb::KV; - -namespace etcdv3 -{ - class AsyncWatchResponse : public etcdv3::V3Response - { - public: - AsyncWatchResponse(){}; - void ParseResponse(WatchResponse& resp); - }; -} - -#endif - diff --git a/src/Client.cpp b/src/Client.cpp index c176e0b..f7dd9d1 100644 --- a/src/Client.cpp +++ b/src/Client.cpp @@ -39,29 +39,9 @@ #include "etcd/Watcher.hpp" #include "etcd/v3/action_constants.hpp" #include "etcd/v3/Action.hpp" -#include "etcd/v3/AsyncRangeResponse.hpp" -#include "etcd/v3/AsyncWatchResponse.hpp" -#include "etcd/v3/AsyncDeleteResponse.hpp" -#include "etcd/v3/AsyncPutResponse.hpp" -#include "etcd/v3/AsyncLockResponse.hpp" -#include "etcd/v3/AsyncElectionResponse.hpp" -#include "etcd/v3/AsyncTxnResponse.hpp" +#include "etcd/v3/AsyncGRPC.hpp" #include "etcd/v3/Transaction.hpp" -#include "etcd/v3/AsyncSetAction.hpp" -#include "etcd/v3/AsyncCompareAndSwapAction.hpp" -#include "etcd/v3/AsyncCompareAndDeleteAction.hpp" -#include "etcd/v3/AsyncUpdateAction.hpp" -#include "etcd/v3/AsyncHeadAction.hpp" -#include "etcd/v3/AsyncRangeAction.hpp" -#include "etcd/v3/AsyncDeleteAction.hpp" -#include "etcd/v3/AsyncPutAction.hpp" -#include "etcd/v3/AsyncWatchAction.hpp" -#include "etcd/v3/AsyncLeaseAction.hpp" -#include "etcd/v3/AsyncLockAction.hpp" -#include "etcd/v3/AsyncElectionAction.hpp" -#include "etcd/v3/AsyncTxnAction.hpp" - etcd::Client::Client(etcd::SyncClient *client): client(client) { this->own_client = false; diff --git a/src/KeepAlive.cpp b/src/KeepAlive.cpp index 1a936cb..b1d21f1 100644 --- a/src/KeepAlive.cpp +++ b/src/KeepAlive.cpp @@ -2,7 +2,7 @@ #include #include "etcd/KeepAlive.hpp" -#include "etcd/v3/AsyncLeaseAction.hpp" +#include "etcd/v3/AsyncGRPC.hpp" #include #include "proto/rpc.grpc.pb.h" diff --git a/src/SyncClient.cpp b/src/SyncClient.cpp index ec93b44..495ed8c 100644 --- a/src/SyncClient.cpp +++ b/src/SyncClient.cpp @@ -41,29 +41,9 @@ #include "etcd/KeepAlive.hpp" #include "etcd/v3/action_constants.hpp" #include "etcd/v3/Action.hpp" -#include "etcd/v3/AsyncRangeResponse.hpp" -#include "etcd/v3/AsyncWatchResponse.hpp" -#include "etcd/v3/AsyncDeleteResponse.hpp" -#include "etcd/v3/AsyncPutResponse.hpp" -#include "etcd/v3/AsyncLockResponse.hpp" -#include "etcd/v3/AsyncElectionResponse.hpp" -#include "etcd/v3/AsyncTxnResponse.hpp" +#include "etcd/v3/AsyncGRPC.hpp" #include "etcd/v3/Transaction.hpp" -#include "etcd/v3/AsyncSetAction.hpp" -#include "etcd/v3/AsyncCompareAndSwapAction.hpp" -#include "etcd/v3/AsyncCompareAndDeleteAction.hpp" -#include "etcd/v3/AsyncUpdateAction.hpp" -#include "etcd/v3/AsyncHeadAction.hpp" -#include "etcd/v3/AsyncRangeAction.hpp" -#include "etcd/v3/AsyncDeleteAction.hpp" -#include "etcd/v3/AsyncPutAction.hpp" -#include "etcd/v3/AsyncWatchAction.hpp" -#include "etcd/v3/AsyncLeaseAction.hpp" -#include "etcd/v3/AsyncLockAction.hpp" -#include "etcd/v3/AsyncElectionAction.hpp" -#include "etcd/v3/AsyncTxnAction.hpp" - namespace etcd { namespace detail { diff --git a/src/Watcher.cpp b/src/Watcher.cpp index 530d165..c69ccf1 100644 --- a/src/Watcher.cpp +++ b/src/Watcher.cpp @@ -2,7 +2,7 @@ #include "etcd/SyncClient.hpp" -#include "etcd/v3/AsyncWatchAction.hpp" +#include "etcd/v3/AsyncGRPC.hpp" struct etcd::Watcher::EtcdServerStubs { std::unique_ptr watchServiceStub; diff --git a/src/v3/AsyncCompareAndDeleteAction.cpp b/src/v3/AsyncCompareAndDeleteAction.cpp deleted file mode 100644 index f4253d4..0000000 --- a/src/v3/AsyncCompareAndDeleteAction.cpp +++ /dev/null @@ -1,59 +0,0 @@ -#include "etcd/v3/AsyncCompareAndDeleteAction.hpp" - -#include "etcd/v3/action_constants.hpp" -#include "etcd/v3/Transaction.hpp" - -using etcdserverpb::RangeRequest; -using etcdserverpb::PutRequest; -using etcdserverpb::RequestOp; -using etcdserverpb::ResponseOp; -using etcdserverpb::TxnRequest; - -etcdv3::AsyncCompareAndDeleteAction::AsyncCompareAndDeleteAction( - etcdv3::ActionParameters && params, etcdv3::AtomicityType type) - :etcdv3::Action(std::move(params)) -{ - etcdv3::Transaction transaction(parameters.key); - if(type == etcdv3::AtomicityType::PREV_VALUE) - { - transaction.init_compare(parameters.old_value, - CompareResult::EQUAL, - CompareTarget::VALUE); - } - else if (type == etcdv3::AtomicityType::PREV_INDEX) - { - transaction.init_compare(parameters.old_revision, - CompareResult::EQUAL, - CompareTarget::MOD); - } - - transaction.setup_compare_and_delete_operation(parameters.key); - transaction.setup_basic_failure_operation(parameters.key); - - response_reader = parameters.kv_stub->AsyncTxn(&context, *transaction.txn_request, &cq_); - response_reader->Finish(&reply, &status, (void*)this); -} - -etcdv3::AsyncTxnResponse etcdv3::AsyncCompareAndDeleteAction::ParseResponse() -{ - AsyncTxnResponse txn_resp; - txn_resp.set_action(etcdv3::COMPAREDELETE_ACTION); - - if(!status.ok()) - { - txn_resp.set_error_code(status.error_code()); - txn_resp.set_error_message(status.error_message()); - } - else - { - txn_resp.ParseResponse(parameters.key, parameters.withPrefix, reply); - - if(!reply.succeeded()) - { - txn_resp.set_error_code(ERROR_COMPARE_FAILED); - txn_resp.set_error_message("etcd-cpp-apiv3: compare failed"); - } - } - - return txn_resp; -} diff --git a/src/v3/AsyncCompareAndSwapAction.cpp b/src/v3/AsyncCompareAndSwapAction.cpp deleted file mode 100644 index 82d003d..0000000 --- a/src/v3/AsyncCompareAndSwapAction.cpp +++ /dev/null @@ -1,61 +0,0 @@ -#include "etcd/v3/AsyncCompareAndSwapAction.hpp" - -#include "etcd/v3/action_constants.hpp" -#include "etcd/v3/Transaction.hpp" - -using etcdserverpb::RangeRequest; -using etcdserverpb::PutRequest; -using etcdserverpb::RequestOp; -using etcdserverpb::ResponseOp; -using etcdserverpb::TxnRequest; - -etcdv3::AsyncCompareAndSwapAction::AsyncCompareAndSwapAction( - etcdv3::ActionParameters && params, etcdv3::AtomicityType type) - : etcdv3::Action(std::move(params)) -{ - etcdv3::Transaction transaction(parameters.key); - if(type == etcdv3::AtomicityType::PREV_VALUE) - { - transaction.init_compare(parameters.old_value, - CompareResult::EQUAL, - CompareTarget::VALUE); - } - else if (type == etcdv3::AtomicityType::PREV_INDEX) - { - transaction.init_compare(parameters.old_revision, - CompareResult::EQUAL, - CompareTarget::MOD); - } - - transaction.setup_basic_failure_operation(parameters.key); - transaction.setup_compare_and_swap_sequence(parameters.value, parameters.lease_id); - - response_reader = parameters.kv_stub->AsyncTxn(&context, *transaction.txn_request, &cq_); - response_reader->Finish(&reply, &status, (void*)this); -} - -etcdv3::AsyncTxnResponse etcdv3::AsyncCompareAndSwapAction::ParseResponse() -{ - AsyncTxnResponse txn_resp; - txn_resp.set_action(etcdv3::COMPARESWAP_ACTION); - - if(!status.ok()) - { - txn_resp.set_error_code(status.error_code()); - txn_resp.set_error_message(status.error_message()); - } - else - { - txn_resp.ParseResponse(parameters.key, parameters.withPrefix, reply); - - //if there is an error code returned by parseResponse, we must - //not overwrite it. - if(!reply.succeeded() && !txn_resp.get_error_code()) - { - txn_resp.set_error_code(ERROR_COMPARE_FAILED); - txn_resp.set_error_message("etcd-cpp-apiv3: compare failed"); - } - } - - return txn_resp; -} diff --git a/src/v3/AsyncDeleteAction.cpp b/src/v3/AsyncDeleteAction.cpp deleted file mode 100644 index 7fad717..0000000 --- a/src/v3/AsyncDeleteAction.cpp +++ /dev/null @@ -1,49 +0,0 @@ -#include "etcd/v3/AsyncDeleteAction.hpp" -#include "etcd/v3/action_constants.hpp" - -using etcdserverpb::DeleteRangeRequest; - -etcdv3::AsyncDeleteAction::AsyncDeleteAction( - ActionParameters && params) - : etcdv3::Action(std::move(params)) -{ - DeleteRangeRequest del_request; - if (!parameters.withPrefix) { - del_request.set_key(parameters.key); - } else { - if (parameters.key.empty()) { - // see: WithFromKey in etcdv3/client - del_request.set_key(etcdv3::NUL); - del_request.set_range_end(etcdv3::NUL); - } else { - del_request.set_key(parameters.key); - del_request.set_range_end(detail::string_plus_one(parameters.key)); - } - } - if(!parameters.range_end.empty()) { - del_request.set_range_end(parameters.range_end); - } - - del_request.set_prev_kv(true); - - response_reader = parameters.kv_stub->AsyncDeleteRange(&context, del_request, &cq_); - response_reader->Finish(&reply, &status, (void*)this); -} - -etcdv3::AsyncDeleteResponse etcdv3::AsyncDeleteAction::ParseResponse() -{ - AsyncDeleteResponse del_resp; - del_resp.set_action(etcdv3::DELETE_ACTION); - - if(!status.ok()) - { - del_resp.set_error_code(status.error_code()); - del_resp.set_error_message(status.error_message()); - } - else - { - del_resp.ParseResponse(parameters.key, parameters.withPrefix || !parameters.range_end.empty(), reply); - } - - return del_resp; -} diff --git a/src/v3/AsyncDeleteResponse.cpp b/src/v3/AsyncDeleteResponse.cpp deleted file mode 100644 index f504e3a..0000000 --- a/src/v3/AsyncDeleteResponse.cpp +++ /dev/null @@ -1,32 +0,0 @@ -#include "etcd/v3/AsyncDeleteResponse.hpp" -#include "etcd/v3/action_constants.hpp" - - -void etcdv3::AsyncDeleteResponse::ParseResponse(std::string const& key, bool prefix, DeleteRangeResponse& resp) -{ - index = resp.header().revision(); - - if(resp.prev_kvs_size() == 0) - { - error_code = etcdv3::ERROR_KEY_NOT_FOUND; - error_message = "etcd-cpp-apiv3: key not found"; - } - else - { - //get all previous values - for(int cnt=0; cnt < resp.prev_kvs_size(); cnt++) - { - etcdv3::KeyValue kv; - kv.kvs.CopyFrom(resp.prev_kvs(cnt)); - values.push_back(kv); - } - - if(!prefix) - { - prev_value = values[0]; - value = values[0]; - value.kvs.clear_value(); - values.clear(); - } - } -} diff --git a/src/v3/AsyncElectionAction.cpp b/src/v3/AsyncElectionAction.cpp deleted file mode 100644 index 2428ff4..0000000 --- a/src/v3/AsyncElectionAction.cpp +++ /dev/null @@ -1,214 +0,0 @@ -#include "etcd/v3/AsyncElectionAction.hpp" -#include - -#include "etcd/v3/action_constants.hpp" - - -using v3electionpb::LeaderKey; -using v3electionpb::CampaignRequest; -using v3electionpb::CampaignResponse; -using v3electionpb::ProclaimRequest; -using v3electionpb::ProclaimResponse; -using v3electionpb::LeaderRequest; -using v3electionpb::LeaderResponse; -using v3electionpb::ResignRequest; -using v3electionpb::ResignResponse; - -etcdv3::AsyncCampaignAction::AsyncCampaignAction( - etcdv3::ActionParameters && params) - : etcdv3::Action(std::move(params)) -{ - CampaignRequest campaign_request; - campaign_request.set_name(parameters.name); - campaign_request.set_lease(parameters.lease_id); - campaign_request.set_value(parameters.value); - - response_reader = parameters.election_stub->AsyncCampaign(&context, campaign_request, &cq_); - response_reader->Finish(&reply, &status, (void *)this); -} - -etcdv3::AsyncCampaignResponse etcdv3::AsyncCampaignAction::ParseResponse() -{ - AsyncCampaignResponse campaign_resp; - campaign_resp.set_action(etcdv3::CAMPAIGN_ACTION); - - if(!status.ok()) { - campaign_resp.set_error_code(status.error_code()); - campaign_resp.set_error_message(status.error_message()); - } - else { - campaign_resp.ParseResponse(reply); - } - return campaign_resp; -} - -etcdv3::AsyncProclaimAction::AsyncProclaimAction( - etcdv3::ActionParameters && params) - : etcdv3::Action(std::move(params)) -{ - auto leader = new LeaderKey(); - leader->set_name(parameters.name); - leader->set_key(parameters.key); - leader->set_rev(parameters.revision); - leader->set_lease(parameters.lease_id); - - ProclaimRequest proclaim_request; - proclaim_request.set_allocated_leader(leader); - proclaim_request.set_value(parameters.value); - - response_reader = parameters.election_stub->AsyncProclaim(&context, proclaim_request, &cq_); - response_reader->Finish(&reply, &status, (void *)this); -} - -etcdv3::AsyncProclaimResponse etcdv3::AsyncProclaimAction::ParseResponse() -{ - AsyncProclaimResponse proclaim_resp; - proclaim_resp.set_action(etcdv3::PROCLAIM_ACTION); - - if(!status.ok()) { - proclaim_resp.set_error_code(status.error_code()); - proclaim_resp.set_error_message(status.error_message()); - } - else { - proclaim_resp.ParseResponse(reply); - } - return proclaim_resp; -} - -etcdv3::AsyncLeaderAction::AsyncLeaderAction( - etcdv3::ActionParameters && params) - : etcdv3::Action(std::move(params)) -{ - LeaderRequest leader_request; - leader_request.set_name(parameters.name); - - response_reader = parameters.election_stub->AsyncLeader(&context, leader_request, &cq_); - response_reader->Finish(&reply, &status, (void *)this); -} - -etcdv3::AsyncLeaderResponse etcdv3::AsyncLeaderAction::ParseResponse() -{ - AsyncLeaderResponse leader_resp; - leader_resp.set_action(etcdv3::LEADER_ACTION); - - if(!status.ok()) { - leader_resp.set_error_code(status.error_code()); - leader_resp.set_error_message(status.error_message()); - } - else { - leader_resp.ParseResponse(reply); - } - return leader_resp; -} - -etcdv3::AsyncObserveAction::AsyncObserveAction(etcdv3::ActionParameters && params) - : etcdv3::Action(std::move(params)) -{ - LeaderRequest leader_request; - leader_request.set_name(parameters.name); - - response_reader = parameters.election_stub->AsyncObserve(&context, leader_request, &cq_, (void *)etcdv3::ELECTION_OBSERVE_CREATE); - - void *got_tag; - bool ok = false; - if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)etcdv3::ELECTION_OBSERVE_CREATE) { - // n.b.: leave the issue of `Read` to the `waitForResponse` - } else { - throw std::runtime_error("failed to create a observe connection"); - } -} - -void etcdv3::AsyncObserveAction::waitForResponse() -{ - void* got_tag; - bool ok = false; - - if (isCancelled.load()) { - status = grpc::Status::CANCELLED; - } - if (!status.ok()) { - return; - } - - response_reader->Read(&reply, (void *)this); - if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void*)this) { - auto response = ParseResponse(); - if (response.get_error_code() == 0) { - // issue the next read - response_reader->Read(&reply, (void *)this); - } else { - this->CancelObserve(); - } - } else { - this->CancelObserve(); - status = grpc::Status::CANCELLED; - } -} - -void etcdv3::AsyncObserveAction::CancelObserve() -{ - std::lock_guard scope_lock(this->protect_is_cancalled); - if (!isCancelled.exchange(true)) { - void* got_tag; - bool ok = false; - response_reader->Finish(&status, (void *)this); - if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)this) { - // ok - } else { - std::cerr << "Failed to finish a election observing connection" << std::endl; - } - - cq_.Shutdown(); - } -} - -bool etcdv3::AsyncObserveAction::Cancelled() const { - return isCancelled.load(); -} - -etcdv3::AsyncObserveResponse etcdv3::AsyncObserveAction::ParseResponse() -{ - AsyncObserveResponse leader_resp; - leader_resp.set_action(etcdv3::OBSERVE_ACTION); - - if(!status.ok()) { - leader_resp.set_error_code(status.error_code()); - leader_resp.set_error_message(status.error_message()); - } - else { - leader_resp.ParseResponse(reply); - } - return leader_resp; -} - -etcdv3::AsyncResignAction::AsyncResignAction( - etcdv3::ActionParameters && params) - : etcdv3::Action(std::move(params)) -{ - auto leader = new LeaderKey(); - leader->set_name(parameters.name); - leader->set_key(parameters.key); - leader->set_rev(parameters.revision); - leader->set_lease(parameters.lease_id); - - ResignRequest resign_request; - resign_request.set_allocated_leader(leader); - - response_reader = parameters.election_stub->AsyncResign(&context, resign_request, &cq_); - response_reader->Finish(&reply, &status, (void *)this); -} - -etcdv3::AsyncResignResponse etcdv3::AsyncResignAction::ParseResponse() -{ - AsyncResignResponse resign_resp; - resign_resp.set_action(etcdv3::RESIGN_ACTION); - - if(!status.ok()) { - resign_resp.set_error_code(status.error_code()); - resign_resp.set_error_message(status.error_message()); - } - else { - resign_resp.ParseResponse(reply); - } - return resign_resp; -} diff --git a/src/v3/AsyncElectionResponse.cpp b/src/v3/AsyncElectionResponse.cpp deleted file mode 100644 index a3fdb05..0000000 --- a/src/v3/AsyncElectionResponse.cpp +++ /dev/null @@ -1,35 +0,0 @@ -#include "etcd/v3/AsyncElectionResponse.hpp" - -#include "etcd/v3/action_constants.hpp" - -using etcdserverpb::ResponseOp; - -void etcdv3::AsyncCampaignResponse::ParseResponse(CampaignResponse& reply) { - index = reply.header().revision(); - - auto const &leader = reply.leader(); - name = leader.name(); - value.kvs.set_key(leader.key()); - value.kvs.set_create_revision(leader.rev()); - value.kvs.set_lease(leader.lease()); -} - -void etcdv3::AsyncProclaimResponse::ParseResponse(ProclaimResponse& reply) { - index = reply.header().revision(); -} - -void etcdv3::AsyncLeaderResponse::ParseResponse(LeaderResponse& reply) { - index = reply.header().revision(); - - value.kvs = reply.kv(); -} - -void etcdv3::AsyncObserveResponse::ParseResponse(LeaderResponse& reply) { - index = reply.header().revision(); - - value.kvs = reply.kv(); -} - -void etcdv3::AsyncResignResponse::ParseResponse(ResignResponse& reply) { - index = reply.header().revision(); -} diff --git a/src/v3/AsyncGRPC.cpp b/src/v3/AsyncGRPC.cpp new file mode 100644 index 0000000..f8d0461 --- /dev/null +++ b/src/v3/AsyncGRPC.cpp @@ -0,0 +1,1369 @@ +#include "etcd/v3/AsyncGRPC.hpp" + +#include + +#include + +#include "etcd/v3/action_constants.hpp" +#include "etcd/v3/Transaction.hpp" +#include "etcd/Response.hpp" + +using etcdserverpb::RangeRequest; +using etcdserverpb::PutRequest; +using etcdserverpb::RequestOp; +using etcdserverpb::ResponseOp; +using etcdserverpb::TxnRequest; +using etcdserverpb::RangeRequest; +using etcdserverpb::PutRequest; +using etcdserverpb::RequestOp; +using etcdserverpb::ResponseOp; +using etcdserverpb::TxnRequest; +using etcdserverpb::DeleteRangeRequest; +using v3electionpb::LeaderKey; +using v3electionpb::CampaignRequest; +using v3electionpb::CampaignResponse; +using v3electionpb::ProclaimRequest; +using v3electionpb::ProclaimResponse; +using v3electionpb::LeaderRequest; +using v3electionpb::LeaderResponse; +using v3electionpb::ResignRequest; +using v3electionpb::ResignResponse; +using etcdserverpb::ResponseOp; +using etcdserverpb::RangeRequest; +using etcdserverpb::LeaseGrantRequest; +using etcdserverpb::LeaseRevokeRequest; +using etcdserverpb::LeaseCheckpointRequest; +using etcdserverpb::LeaseKeepAliveRequest; +using etcdserverpb::LeaseTimeToLiveRequest; +using etcdserverpb::LeaseLeasesRequest; +using v3lockpb::LockRequest; +using v3lockpb::UnlockRequest; +using etcdserverpb::PutRequest; +using etcdserverpb::RangeRequest; +using etcdserverpb::ResponseOp; +using etcdserverpb::RangeRequest; +using etcdserverpb::PutRequest; +using etcdserverpb::RequestOp; +using etcdserverpb::ResponseOp; +using etcdserverpb::TxnRequest; +using etcdserverpb::WatchCreateRequest; + +void etcdv3::AsyncCampaignResponse::ParseResponse(CampaignResponse& reply) { + index = reply.header().revision(); + + auto const &leader = reply.leader(); + name = leader.name(); + value.kvs.set_key(leader.key()); + value.kvs.set_create_revision(leader.rev()); + value.kvs.set_lease(leader.lease()); +} + +void etcdv3::AsyncDeleteResponse::ParseResponse(std::string const& key, bool prefix, DeleteRangeResponse& resp) +{ + index = resp.header().revision(); + + if(resp.prev_kvs_size() == 0) + { + error_code = etcdv3::ERROR_KEY_NOT_FOUND; + error_message = "etcd-cpp-apiv3: key not found"; + } + else + { + //get all previous values + for(int cnt=0; cnt < resp.prev_kvs_size(); cnt++) + { + etcdv3::KeyValue kv; + kv.kvs.CopyFrom(resp.prev_kvs(cnt)); + values.push_back(kv); + } + + if(!prefix) + { + prev_value = values[0]; + value = values[0]; + value.kvs.clear_value(); + values.clear(); + } + } +} + +void etcdv3::AsyncHeadResponse::ParseResponse(RangeResponse& resp) +{ + cluster_id = resp.header().cluster_id(); + member_id = resp.header().member_id(); + index = resp.header().revision(); + raft_term = resp.header().raft_term(); +} + +void etcdv3::AsyncLeaderResponse::ParseResponse(LeaderResponse& reply) { + index = reply.header().revision(); + + value.kvs = reply.kv(); +} + +void etcdv3::AsyncLeaseGrantResponse::ParseResponse(LeaseGrantResponse& resp) { + index = resp.header().revision(); + value.kvs.set_lease(resp.id()); + value.set_ttl(resp.ttl()); + error_message = resp.error(); +} + +void etcdv3::AsyncLeaseKeepAliveResponse::ParseResponse(LeaseKeepAliveResponse& resp) { + index = resp.header().revision(); + value.kvs.set_lease(resp.id()); + value.set_ttl(resp.ttl()); +} + +void etcdv3::AsyncLeaseLeasesResponse::ParseResponse(LeaseLeasesResponse& resp) { + index = resp.header().revision(); + for (auto lease : resp.leases()) { + leases.emplace_back(lease.id()); + } +} + +void etcdv3::AsyncLeaseRevokeResponse::ParseResponse(LeaseRevokeResponse& resp) { + index = resp.header().revision(); +} + +void etcdv3::AsyncLeaseTimeToLiveResponse::ParseResponse(LeaseTimeToLiveResponse& resp) { + index = resp.header().revision(); + value.kvs.set_lease(resp.id()); + value.set_ttl(resp.ttl()); + // FIXME: unsupported: fields "grantedTTL" and "keys" +} + +void etcdv3::AsyncLockResponse::ParseResponse(LockResponse& resp) +{ + index = resp.header().revision(); + lock_key = resp.key(); +} + +void etcdv3::AsyncObserveResponse::ParseResponse(LeaderResponse& reply) { + index = reply.header().revision(); + + value.kvs = reply.kv(); +} + +void etcdv3::AsyncProclaimResponse::ParseResponse(ProclaimResponse& reply) { + index = reply.header().revision(); +} + +void etcdv3::AsyncPutResponse::ParseResponse(PutResponse& resp) +{ + index = resp.header().revision(); + + //get all previous values + etcdv3::KeyValue kv; + kv.kvs.CopyFrom(resp.prev_kv()); + prev_value = kv; +} + +void etcdv3::AsyncRangeResponse::ParseResponse(RangeResponse& resp, bool prefix) +{ + index = resp.header().revision(); + if(resp.kvs_size() == 0 && !prefix) + { + error_code = etcdv3::ERROR_KEY_NOT_FOUND; + error_message = "etcd-cpp-apiv3: key not found"; + return; + } + else + { + for(int index=0; index < resp.kvs_size(); index++) + { + etcdv3::KeyValue kv; + kv.kvs.CopyFrom(resp.kvs(index)); + values.push_back(kv); + } + + if(!prefix) + { + value = values[0]; + values.clear(); + } + } +} + +void etcdv3::AsyncResignResponse::ParseResponse(ResignResponse& reply) { + index = reply.header().revision(); +} + +void etcdv3::AsyncTxnResponse::ParseResponse(TxnResponse& reply) { + index = reply.header().revision(); +} + +void etcdv3::AsyncTxnResponse::ParseResponse(std::string const& key, bool prefix, TxnResponse& reply) +{ + index = reply.header().revision(); + for(int index=0; index < reply.responses_size(); index++) + { + auto resp = reply.responses(index); + if(ResponseOp::ResponseCase::kResponseRange == resp.response_case()) + { + AsyncRangeResponse response; + response.ParseResponse(*(resp.mutable_response_range()),prefix); + + error_code = response.get_error_code(); + error_message = response.get_error_message(); + + values = response.get_values(); + value = response.get_value(); + } + else if(ResponseOp::ResponseCase::kResponsePut == resp.response_case()) + { + auto put_resp = resp.response_put(); + if(put_resp.has_prev_kv()) + { + prev_value.kvs.CopyFrom(put_resp.prev_kv()); + } + } + else if(ResponseOp::ResponseCase::kResponseDeleteRange == resp.response_case()) + { + AsyncDeleteResponse response; + response.ParseResponse(key,prefix,*(resp.mutable_response_delete_range())); + + prev_value.kvs.CopyFrom(response.get_prev_value().kvs); + + values = response.get_values(); + value = response.get_value(); + } + } +} + +void etcdv3::AsyncUnlockResponse::ParseResponse(UnlockResponse& resp) +{ + index = resp.header().revision(); +} + +void etcdv3::AsyncWatchResponse::ParseResponse(WatchResponse& reply) +{ + if (reply.canceled() && reply.compact_revision() != 0) { + error_code = grpc::StatusCode::OUT_OF_RANGE; + error_message = "required revision has been compacted"; + compact_revision = reply.compact_revision(); + return; + } + index = reply.header().revision(); + for (auto const &e: reply.events()) { + events.emplace_back(e); + } + for(int cnt =0; cnt < reply.events_size(); cnt++) + { + auto event = reply.events(cnt); + if(mvccpb::Event::EventType::Event_EventType_PUT == event.type()) + { + if(event.kv().version() == 1) + { + action = etcdv3::CREATE_ACTION; + } + else + { + action = etcdv3::SET_ACTION; + } + value.kvs = event.kv(); + + } + else if(mvccpb::Event::EventType::Event_EventType_DELETE_ == event.type()) + { + action = etcdv3::DELETE_ACTION; + value.kvs = event.kv(); + } + if(event.has_prev_kv()) + { + prev_value.kvs = event.prev_kv(); + } + // just store the first occurence of the key in values. + // this is done so tas client will not need to change their behaviour. + // break immediately + break; + } +} + +etcdv3::AsyncCampaignAction::AsyncCampaignAction( + etcdv3::ActionParameters && params) + : etcdv3::Action(std::move(params)) +{ + CampaignRequest campaign_request; + campaign_request.set_name(parameters.name); + campaign_request.set_lease(parameters.lease_id); + campaign_request.set_value(parameters.value); + + response_reader = parameters.election_stub->AsyncCampaign(&context, campaign_request, &cq_); + response_reader->Finish(&reply, &status, (void *)this); +} + +etcdv3::AsyncCampaignResponse etcdv3::AsyncCampaignAction::ParseResponse() +{ + AsyncCampaignResponse campaign_resp; + campaign_resp.set_action(etcdv3::CAMPAIGN_ACTION); + + if(!status.ok()) { + campaign_resp.set_error_code(status.error_code()); + campaign_resp.set_error_message(status.error_message()); + } + else { + campaign_resp.ParseResponse(reply); + } + return campaign_resp; +} + +etcdv3::AsyncCompareAndDeleteAction::AsyncCompareAndDeleteAction( + etcdv3::ActionParameters && params, etcdv3::AtomicityType type) + :etcdv3::Action(std::move(params)) +{ + etcdv3::Transaction transaction(parameters.key); + if(type == etcdv3::AtomicityType::PREV_VALUE) + { + transaction.init_compare(parameters.old_value, + CompareResult::EQUAL, + CompareTarget::VALUE); + } + else if (type == etcdv3::AtomicityType::PREV_INDEX) + { + transaction.init_compare(parameters.old_revision, + CompareResult::EQUAL, + CompareTarget::MOD); + } + + transaction.setup_compare_and_delete_operation(parameters.key); + transaction.setup_basic_failure_operation(parameters.key); + + response_reader = parameters.kv_stub->AsyncTxn(&context, *transaction.txn_request, &cq_); + response_reader->Finish(&reply, &status, (void*)this); +} + +etcdv3::AsyncTxnResponse etcdv3::AsyncCompareAndDeleteAction::ParseResponse() +{ + AsyncTxnResponse txn_resp; + txn_resp.set_action(etcdv3::COMPAREDELETE_ACTION); + + if(!status.ok()) + { + txn_resp.set_error_code(status.error_code()); + txn_resp.set_error_message(status.error_message()); + } + else + { + txn_resp.ParseResponse(parameters.key, parameters.withPrefix, reply); + + if(!reply.succeeded()) + { + txn_resp.set_error_code(ERROR_COMPARE_FAILED); + txn_resp.set_error_message("etcd-cpp-apiv3: compare failed"); + } + } + + return txn_resp; +} + +etcdv3::AsyncCompareAndSwapAction::AsyncCompareAndSwapAction( + etcdv3::ActionParameters && params, etcdv3::AtomicityType type) + : etcdv3::Action(std::move(params)) +{ + etcdv3::Transaction transaction(parameters.key); + if(type == etcdv3::AtomicityType::PREV_VALUE) + { + transaction.init_compare(parameters.old_value, + CompareResult::EQUAL, + CompareTarget::VALUE); + } + else if (type == etcdv3::AtomicityType::PREV_INDEX) + { + transaction.init_compare(parameters.old_revision, + CompareResult::EQUAL, + CompareTarget::MOD); + } + + transaction.setup_basic_failure_operation(parameters.key); + transaction.setup_compare_and_swap_sequence(parameters.value, parameters.lease_id); + + response_reader = parameters.kv_stub->AsyncTxn(&context, *transaction.txn_request, &cq_); + response_reader->Finish(&reply, &status, (void*)this); +} + +etcdv3::AsyncTxnResponse etcdv3::AsyncCompareAndSwapAction::ParseResponse() +{ + AsyncTxnResponse txn_resp; + txn_resp.set_action(etcdv3::COMPARESWAP_ACTION); + + if(!status.ok()) + { + txn_resp.set_error_code(status.error_code()); + txn_resp.set_error_message(status.error_message()); + } + else + { + txn_resp.ParseResponse(parameters.key, parameters.withPrefix, reply); + + //if there is an error code returned by parseResponse, we must + //not overwrite it. + if(!reply.succeeded() && !txn_resp.get_error_code()) + { + txn_resp.set_error_code(ERROR_COMPARE_FAILED); + txn_resp.set_error_message("etcd-cpp-apiv3: compare failed"); + } + } + + return txn_resp; +} + +etcdv3::AsyncDeleteAction::AsyncDeleteAction( + ActionParameters && params) + : etcdv3::Action(std::move(params)) +{ + DeleteRangeRequest del_request; + if (!parameters.withPrefix) { + del_request.set_key(parameters.key); + } else { + if (parameters.key.empty()) { + // see: WithFromKey in etcdv3/client + del_request.set_key(etcdv3::NUL); + del_request.set_range_end(etcdv3::NUL); + } else { + del_request.set_key(parameters.key); + del_request.set_range_end(detail::string_plus_one(parameters.key)); + } + } + if(!parameters.range_end.empty()) { + del_request.set_range_end(parameters.range_end); + } + + del_request.set_prev_kv(true); + + response_reader = parameters.kv_stub->AsyncDeleteRange(&context, del_request, &cq_); + response_reader->Finish(&reply, &status, (void*)this); +} + +etcdv3::AsyncDeleteResponse etcdv3::AsyncDeleteAction::ParseResponse() +{ + AsyncDeleteResponse del_resp; + del_resp.set_action(etcdv3::DELETE_ACTION); + + if(!status.ok()) + { + del_resp.set_error_code(status.error_code()); + del_resp.set_error_message(status.error_message()); + } + else + { + del_resp.ParseResponse(parameters.key, parameters.withPrefix || !parameters.range_end.empty(), reply); + } + + return del_resp; +} + +etcdv3::AsyncHeadAction::AsyncHeadAction( + etcdv3::ActionParameters && params) + : etcdv3::Action(std::move(params)) +{ + RangeRequest get_request; + get_request.set_key(etcdv3::NUL); + get_request.set_limit(1); + response_reader = parameters.kv_stub->AsyncRange(&context,get_request,&cq_); + response_reader->Finish(&reply, &status, (void*)this); +} + +etcdv3::AsyncHeadResponse etcdv3::AsyncHeadAction::ParseResponse() +{ + AsyncHeadResponse head_resp; + head_resp.set_action(etcdv3::GET_ACTION); + + if(!status.ok()) + { + head_resp.set_error_code(status.error_code()); + head_resp.set_error_message(status.error_message()); + } + else + { + head_resp.ParseResponse(reply); + } + return head_resp; +} + +etcdv3::AsyncLeaderAction::AsyncLeaderAction( + etcdv3::ActionParameters && params) + : etcdv3::Action(std::move(params)) +{ + LeaderRequest leader_request; + leader_request.set_name(parameters.name); + + response_reader = parameters.election_stub->AsyncLeader(&context, leader_request, &cq_); + response_reader->Finish(&reply, &status, (void *)this); +} + +etcdv3::AsyncLeaderResponse etcdv3::AsyncLeaderAction::ParseResponse() +{ + AsyncLeaderResponse leader_resp; + leader_resp.set_action(etcdv3::LEADER_ACTION); + + if(!status.ok()) { + leader_resp.set_error_code(status.error_code()); + leader_resp.set_error_message(status.error_message()); + } + else { + leader_resp.ParseResponse(reply); + } + return leader_resp; +} + +etcdv3::AsyncLeaseGrantAction::AsyncLeaseGrantAction( + etcdv3::ActionParameters && params) + : etcdv3::Action(std::move(params)) +{ + LeaseGrantRequest leasegrant_request; + leasegrant_request.set_ttl(parameters.ttl); + // If ID is set to 0, etcd will choose an ID. + leasegrant_request.set_id(parameters.lease_id); + + response_reader = parameters.lease_stub->AsyncLeaseGrant(&context, leasegrant_request, &cq_); + response_reader->Finish(&reply, &status, (void*)this); +} + +etcdv3::AsyncLeaseGrantResponse etcdv3::AsyncLeaseGrantAction::ParseResponse() +{ + AsyncLeaseGrantResponse lease_resp; + lease_resp.set_action(etcdv3::LEASEGRANT); + + if (!status.ok()) { + lease_resp.set_error_code(status.error_code()); + lease_resp.set_error_message(status.error_message()); + } else { + lease_resp.ParseResponse(reply); + } + return lease_resp; +} + +etcdv3::AsyncLeaseKeepAliveAction::AsyncLeaseKeepAliveAction( + etcdv3::ActionParameters && params) + : etcdv3::Action(std::move(params)) +{ + isCancelled = false; + stream = parameters.lease_stub->AsyncLeaseKeepAlive(&context, &cq_, (void*)etcdv3::KEEPALIVE_CREATE); + + void *got_tag = nullptr; + bool ok = false; + if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)etcdv3::KEEPALIVE_CREATE) { + // ok + } else { + throw std::runtime_error("Failed to create a lease keep-alive connection"); + } +} + +etcdv3::AsyncLeaseKeepAliveResponse etcdv3::AsyncLeaseKeepAliveAction::ParseResponse() +{ + AsyncLeaseKeepAliveResponse lease_resp; + lease_resp.set_action(etcdv3::LEASEKEEPALIVE); + + if (!status.ok()) { + lease_resp.set_error_code(status.error_code()); + lease_resp.set_error_message(status.error_message()); + } else { + lease_resp.ParseResponse(reply); + } + return lease_resp; +} + +etcd::Response etcdv3::AsyncLeaseKeepAliveAction::Refresh() +{ + std::lock_guard scope_lock(this->protect_is_cancelled); + + auto start_timepoint = std::chrono::high_resolution_clock::now(); + if (isCancelled) { + status = grpc::Status::CANCELLED; + return etcd::Response(ParseResponse(), etcd::detail::duration_till_now(start_timepoint)); + } + + LeaseKeepAliveRequest leasekeepalive_request; + leasekeepalive_request.set_id(parameters.lease_id); + + void *got_tag = nullptr; + bool ok = false; + + if (parameters.has_grpc_timeout()) { + stream->Write(leasekeepalive_request, (void *)etcdv3::KEEPALIVE_WRITE); + // wait write finish + switch (cq_.AsyncNext(&got_tag, &ok, parameters.grpc_deadline())) { + case CompletionQueue::NextStatus::TIMEOUT: { + status = grpc::Status(grpc::StatusCode::DEADLINE_EXCEEDED, "gRPC timeout during keep alive write"); + break; + } + case CompletionQueue::NextStatus::SHUTDOWN: { + status = grpc::Status(grpc::StatusCode::UNAVAILABLE, "gRPC already shutdown during keep alive write"); + break; + } + case CompletionQueue::NextStatus::GOT_EVENT: { + if (!ok || got_tag != (void *)etcdv3::KEEPALIVE_WRITE) { + return etcd::Response(grpc::StatusCode::ABORTED, "Failed to create a lease keep-alive connection: write not ok or invalid tag"); + } + } + } + if (!status.ok()) { + this->CancelKeepAlive(); + return etcd::Response(ParseResponse(), etcd::detail::duration_till_now(start_timepoint)); + } + + stream->Read(&reply, (void*)etcdv3::KEEPALIVE_READ); + // wait read finish + switch (cq_.AsyncNext(&got_tag, &ok, parameters.grpc_deadline())) { + case CompletionQueue::NextStatus::TIMEOUT: { + status = grpc::Status(grpc::StatusCode::DEADLINE_EXCEEDED, "gRPC timeout during keep alive read"); + break; + } + case CompletionQueue::NextStatus::SHUTDOWN: { + status = grpc::Status(grpc::StatusCode::UNAVAILABLE, "gRPC already shutdown during keep alive read"); + break; + } + case CompletionQueue::NextStatus::GOT_EVENT: { + if (ok && got_tag == (void *)etcdv3::KEEPALIVE_READ) { + return etcd::Response(ParseResponse(), etcd::detail::duration_till_now(start_timepoint)); + } + break; + } + } + this->CancelKeepAlive(); + return etcd::Response(grpc::StatusCode::ABORTED, "Failed to create a lease keep-alive connection: read not ok or invalid tag"); + } else { + stream->Write(leasekeepalive_request, (void *)etcdv3::KEEPALIVE_WRITE); + // wait write finish + if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)etcdv3::KEEPALIVE_WRITE) { + stream->Read(&reply, (void*)etcdv3::KEEPALIVE_READ); + // wait read finish + if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)etcdv3::KEEPALIVE_READ) { + return etcd::Response(ParseResponse(), etcd::detail::duration_till_now(start_timepoint)); + } + } + this->CancelKeepAlive(); + return etcd::Response(grpc::StatusCode::ABORTED, "Failed to create a lease keep-alive connection: read not ok or invalid tag"); + } +} + +void etcdv3::AsyncLeaseKeepAliveAction::CancelKeepAlive() +{ + std::lock_guard scope_lock(this->protect_is_cancelled); + if(isCancelled == false) + { + isCancelled = true; + + void *got_tag = nullptr; + bool ok = false; + + stream->WritesDone((void*)etcdv3::KEEPALIVE_DONE); + if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)etcdv3::KEEPALIVE_DONE) { + // ok + } else { + std::cerr << "Failed to mark a lease keep-alive connection as DONE: " + << context.debug_error_string() << std::endl; + } + + stream->Finish(&status, (void *)KEEPALIVE_FINISH); + if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)KEEPALIVE_FINISH) { + // ok + } else { + std::cerr << "Failed to finish a lease keep-alive connection: " + << status.error_message() + << ", " << context.debug_error_string() << std::endl; + } + + // cancel on-the-fly calls + context.TryCancel(); + + cq_.Shutdown(); + } +} + +bool etcdv3::AsyncLeaseKeepAliveAction::Cancelled() const +{ + return isCancelled; +} + +etcdv3::ActionParameters& etcdv3::AsyncLeaseKeepAliveAction::mutable_parameters() { + return this->parameters; +} + +etcdv3::AsyncLeaseLeasesAction::AsyncLeaseLeasesAction( + etcdv3::ActionParameters && params) + : etcdv3::Action(std::move(params)) +{ + LeaseLeasesRequest leaseleases_request; + + response_reader = parameters.lease_stub->AsyncLeaseLeases(&context, leaseleases_request, &cq_); + response_reader->Finish(&reply, &status, (void*)this); +} + +etcdv3::AsyncLeaseLeasesResponse etcdv3::AsyncLeaseLeasesAction::ParseResponse() +{ + AsyncLeaseLeasesResponse lease_resp; + lease_resp.set_action(etcdv3::LEASELEASES); + + if (!status.ok()) { + lease_resp.set_error_code(status.error_code()); + lease_resp.set_error_message(status.error_message()); + } else { + lease_resp.ParseResponse(reply); + } + return lease_resp; +} + +etcdv3::AsyncLeaseRevokeAction::AsyncLeaseRevokeAction( + etcdv3::ActionParameters && params) + : etcdv3::Action(std::move(params)) +{ + LeaseRevokeRequest leaserevoke_request; + leaserevoke_request.set_id(parameters.lease_id); + + response_reader = parameters.lease_stub->AsyncLeaseRevoke(&context, leaserevoke_request, &cq_); + response_reader->Finish(&reply, &status, (void*)this); +} + +etcdv3::AsyncLeaseRevokeResponse etcdv3::AsyncLeaseRevokeAction::ParseResponse() +{ + AsyncLeaseRevokeResponse lease_resp; + lease_resp.set_action(etcdv3::LEASEREVOKE); + + if (!status.ok()) { + lease_resp.set_error_code(status.error_code()); + lease_resp.set_error_message(status.error_message()); + } else { + lease_resp.ParseResponse(reply); + } + return lease_resp; +} + +etcdv3::AsyncLeaseTimeToLiveAction::AsyncLeaseTimeToLiveAction( + etcdv3::ActionParameters && params) + : etcdv3::Action(std::move(params)) +{ + LeaseTimeToLiveRequest leasetimetolive_request; + leasetimetolive_request.set_id(parameters.lease_id); + // FIXME: unsupported parameters: "keys" + // leasetimetolive_request.set_keys(parameters.keys); + + response_reader = parameters.lease_stub->AsyncLeaseTimeToLive(&context, leasetimetolive_request, &cq_); + response_reader->Finish(&reply, &status, (void*)this); +} + +etcdv3::AsyncLeaseTimeToLiveResponse etcdv3::AsyncLeaseTimeToLiveAction::ParseResponse() +{ + AsyncLeaseTimeToLiveResponse lease_resp; + lease_resp.set_action(etcdv3::LEASETIMETOLIVE); + + if (!status.ok()) { + lease_resp.set_error_code(status.error_code()); + lease_resp.set_error_message(status.error_message()); + } else { + lease_resp.ParseResponse(reply); + } + return lease_resp; +} + +etcdv3::AsyncLockAction::AsyncLockAction( + ActionParameters && params) + : etcdv3::Action(std::move(params)) +{ + LockRequest lock_request; + lock_request.set_name(parameters.key); + lock_request.set_lease(parameters.lease_id); + + response_reader = parameters.lock_stub->AsyncLock(&context, lock_request, &cq_); + response_reader->Finish(&reply, &status, (void*)this); +} + +etcdv3::AsyncLockResponse etcdv3::AsyncLockAction::ParseResponse() +{ + AsyncLockResponse lock_resp; + lock_resp.set_action(etcdv3::LOCK_ACTION); + + if(!status.ok()) + { + lock_resp.set_error_code(status.error_code()); + lock_resp.set_error_message(status.error_message()); + } + else + { + lock_resp.ParseResponse(reply); + } + + return lock_resp; +} + +etcdv3::AsyncObserveAction::AsyncObserveAction(etcdv3::ActionParameters && params) + : etcdv3::Action(std::move(params)) +{ + LeaderRequest leader_request; + leader_request.set_name(parameters.name); + + response_reader = parameters.election_stub->AsyncObserve(&context, leader_request, &cq_, (void *)etcdv3::ELECTION_OBSERVE_CREATE); + + void *got_tag; + bool ok = false; + if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)etcdv3::ELECTION_OBSERVE_CREATE) { + // n.b.: leave the issue of `Read` to the `waitForResponse` + } else { + throw std::runtime_error("failed to create a observe connection"); + } +} + +void etcdv3::AsyncObserveAction::waitForResponse() +{ + void* got_tag; + bool ok = false; + + if (isCancelled.load()) { + status = grpc::Status::CANCELLED; + } + if (!status.ok()) { + return; + } + + response_reader->Read(&reply, (void *)this); + if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void*)this) { + auto response = ParseResponse(); + if (response.get_error_code() == 0) { + // issue the next read + response_reader->Read(&reply, (void *)this); + } else { + this->CancelObserve(); + } + } else { + this->CancelObserve(); + status = grpc::Status::CANCELLED; + } +} + +void etcdv3::AsyncObserveAction::CancelObserve() +{ + std::lock_guard scope_lock(this->protect_is_cancalled); + if (!isCancelled.exchange(true)) { + void* got_tag; + bool ok = false; + response_reader->Finish(&status, (void *)this); + if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)this) { + // ok + } else { + std::cerr << "Failed to finish a election observing connection" << std::endl; + } + + cq_.Shutdown(); + } +} + +bool etcdv3::AsyncObserveAction::Cancelled() const { + return isCancelled.load(); +} + +etcdv3::AsyncObserveResponse etcdv3::AsyncObserveAction::ParseResponse() +{ + AsyncObserveResponse leader_resp; + leader_resp.set_action(etcdv3::OBSERVE_ACTION); + + if(!status.ok()) { + leader_resp.set_error_code(status.error_code()); + leader_resp.set_error_message(status.error_message()); + } + else { + leader_resp.ParseResponse(reply); + } + return leader_resp; +} + +etcdv3::AsyncProclaimAction::AsyncProclaimAction( + etcdv3::ActionParameters && params) + : etcdv3::Action(std::move(params)) +{ + auto leader = new LeaderKey(); + leader->set_name(parameters.name); + leader->set_key(parameters.key); + leader->set_rev(parameters.revision); + leader->set_lease(parameters.lease_id); + + ProclaimRequest proclaim_request; + proclaim_request.set_allocated_leader(leader); + proclaim_request.set_value(parameters.value); + + response_reader = parameters.election_stub->AsyncProclaim(&context, proclaim_request, &cq_); + response_reader->Finish(&reply, &status, (void *)this); +} + +etcdv3::AsyncProclaimResponse etcdv3::AsyncProclaimAction::ParseResponse() +{ + AsyncProclaimResponse proclaim_resp; + proclaim_resp.set_action(etcdv3::PROCLAIM_ACTION); + + if(!status.ok()) { + proclaim_resp.set_error_code(status.error_code()); + proclaim_resp.set_error_message(status.error_message()); + } + else { + proclaim_resp.ParseResponse(reply); + } + return proclaim_resp; +} + +etcdv3::AsyncPutAction::AsyncPutAction( + ActionParameters && params) + : etcdv3::Action(std::move(params)) +{ + PutRequest put_request; + put_request.set_key(parameters.key); + put_request.set_value(parameters.value); + put_request.set_lease(parameters.lease_id); + put_request.set_prev_kv(true); + + response_reader = parameters.kv_stub->AsyncPut(&context, put_request, &cq_); + response_reader->Finish(&reply, &status, (void*)this); +} + +etcdv3::AsyncPutResponse etcdv3::AsyncPutAction::ParseResponse() +{ + AsyncPutResponse put_resp; + put_resp.set_action(etcdv3::PUT_ACTION); + + if(!status.ok()) + { + put_resp.set_error_code(status.error_code()); + put_resp.set_error_message(status.error_message()); + } + else + { + put_resp.ParseResponse(reply); + } + + return put_resp; +} + +etcdv3::AsyncRangeAction::AsyncRangeAction( + etcdv3::ActionParameters && params) + : etcdv3::Action(std::move(params)) +{ + RangeRequest get_request; + if (!parameters.withPrefix) { + get_request.set_key(parameters.key); + } else { + if (parameters.key.empty()) { + // see: WithFromKey in etcdv3/client + get_request.set_key(etcdv3::NUL); + get_request.set_range_end(etcdv3::NUL); + } else { + get_request.set_key(parameters.key); + get_request.set_range_end(detail::string_plus_one(parameters.key)); + } + } + if(!parameters.range_end.empty()) { + get_request.set_range_end(parameters.range_end); + } + if(parameters.revision > 0) { + get_request.set_revision(parameters.revision); + } + + get_request.set_limit(parameters.limit); + get_request.set_sort_order(RangeRequest::SortOrder::RangeRequest_SortOrder_NONE); + + // set keys_only and count_only + get_request.set_keys_only(params.keys_only); + get_request.set_count_only(params.count_only); + + response_reader = parameters.kv_stub->AsyncRange(&context,get_request,&cq_); + response_reader->Finish(&reply, &status, (void*)this); +} + +etcdv3::AsyncRangeResponse etcdv3::AsyncRangeAction::ParseResponse() +{ + AsyncRangeResponse range_resp; + range_resp.set_action(etcdv3::GET_ACTION); + + if(!status.ok()) + { + range_resp.set_error_code(status.error_code()); + range_resp.set_error_message(status.error_message()); + } + else + { + range_resp.ParseResponse(reply, parameters.withPrefix || !parameters.range_end.empty()); + } + return range_resp; +} + +etcdv3::AsyncResignAction::AsyncResignAction( + etcdv3::ActionParameters && params) + : etcdv3::Action(std::move(params)) +{ + auto leader = new LeaderKey(); + leader->set_name(parameters.name); + leader->set_key(parameters.key); + leader->set_rev(parameters.revision); + leader->set_lease(parameters.lease_id); + + ResignRequest resign_request; + resign_request.set_allocated_leader(leader); + + response_reader = parameters.election_stub->AsyncResign(&context, resign_request, &cq_); + response_reader->Finish(&reply, &status, (void *)this); +} + +etcdv3::AsyncResignResponse etcdv3::AsyncResignAction::ParseResponse() +{ + AsyncResignResponse resign_resp; + resign_resp.set_action(etcdv3::RESIGN_ACTION); + + if(!status.ok()) { + resign_resp.set_error_code(status.error_code()); + resign_resp.set_error_message(status.error_message()); + } + else { + resign_resp.ParseResponse(reply); + } + return resign_resp; +} + +etcdv3::AsyncSetAction::AsyncSetAction( + etcdv3::ActionParameters && params, bool create) + : etcdv3::Action(std::move(params)) +{ + etcdv3::Transaction transaction(parameters.key); + isCreate = create; + transaction.init_compare(CompareResult::EQUAL, + CompareTarget::VERSION); + + transaction.setup_basic_create_sequence(parameters.key, parameters.value, parameters.lease_id); + + if(isCreate) + { + transaction.setup_basic_failure_operation(parameters.key); + } + else + { + transaction.setup_set_failure_operation(parameters.key, parameters.value, parameters.lease_id); + } + response_reader = parameters.kv_stub->AsyncTxn(&context, *transaction.txn_request, &cq_); + response_reader->Finish(&reply, &status, (void*)this); +} + +etcdv3::AsyncTxnResponse etcdv3::AsyncSetAction::ParseResponse() +{ + + AsyncTxnResponse txn_resp; + txn_resp.set_action(isCreate? etcdv3::CREATE_ACTION : etcdv3::SET_ACTION); + + if(!status.ok()) + { + txn_resp.set_error_code(status.error_code()); + txn_resp.set_error_message(status.error_message()); + } + else + { + txn_resp.ParseResponse(parameters.key, parameters.withPrefix, reply); + + if(!reply.succeeded() && isCreate) + { + txn_resp.set_error_code(etcdv3::ERROR_KEY_ALREADY_EXISTS); + txn_resp.set_error_message("etcd-cpp-apiv3: key already exists"); + } + } + return txn_resp; +} + +etcdv3::AsyncTxnAction::AsyncTxnAction( + etcdv3::ActionParameters && params, etcdv3::Transaction const &tx) + : etcdv3::Action(std::move(params)) +{ + response_reader = parameters.kv_stub->AsyncTxn(&context, *tx.txn_request, &cq_); + response_reader->Finish(&reply, &status, (void *)this); +} + +etcdv3::AsyncTxnResponse etcdv3::AsyncTxnAction::ParseResponse() +{ + AsyncTxnResponse txn_resp; + txn_resp.set_action(etcdv3::TXN_ACTION); + + if(!status.ok()) + { + txn_resp.set_error_code(status.error_code()); + txn_resp.set_error_message(status.error_message()); + } + else + { + txn_resp.ParseResponse(parameters.key, parameters.withPrefix, reply); + + //if there is an error code returned by parseResponse, we must + //not overwrite it. + if(!reply.succeeded() && !txn_resp.get_error_code()) + { + txn_resp.set_error_code(ERROR_COMPARE_FAILED); + txn_resp.set_error_message("etcd-cpp-apiv3: compare failed"); + } + } + + return txn_resp; +} + +etcdv3::AsyncUnlockAction::AsyncUnlockAction( + ActionParameters && params) + : etcdv3::Action(std::move(params)) +{ + UnlockRequest unlock_request; + unlock_request.set_key(parameters.key); + + response_reader = parameters.lock_stub->AsyncUnlock(&context, unlock_request, &cq_); + response_reader->Finish(&reply, &status, (void*)this); +} + +etcdv3::AsyncUnlockResponse etcdv3::AsyncUnlockAction::ParseResponse() +{ + AsyncUnlockResponse unlock_resp; + unlock_resp.set_action(etcdv3::UNLOCK_ACTION); + + if(!status.ok()) + { + unlock_resp.set_error_code(status.error_code()); + unlock_resp.set_error_message(status.error_message()); + } + else + { + unlock_resp.ParseResponse(reply); + } + + return unlock_resp; +} + +etcdv3::AsyncUpdateAction::AsyncUpdateAction( + etcdv3::ActionParameters && params) + : etcdv3::Action(std::move(params)) +{ + etcdv3::Transaction transaction(parameters.key); + transaction.init_compare(CompareResult::GREATER, + CompareTarget::VERSION); + + transaction.setup_compare_and_swap_sequence(parameters.value, parameters.lease_id); + + response_reader = parameters.kv_stub->AsyncTxn(&context, *transaction.txn_request, &cq_); + response_reader->Finish(&reply, &status, (void*)this); +} + +etcdv3::AsyncTxnResponse etcdv3::AsyncUpdateAction::ParseResponse() +{ + AsyncTxnResponse txn_resp; + + if(!status.ok()) + { + txn_resp.set_error_code(status.error_code()); + txn_resp.set_error_message(status.error_message()); + } + else + { + if(reply.succeeded()) + { + txn_resp.ParseResponse(parameters.key, parameters.withPrefix, reply); + txn_resp.set_action(etcdv3::UPDATE_ACTION); + } + else + { + txn_resp.set_error_code(etcdv3::ERROR_KEY_NOT_FOUND); + txn_resp.set_error_message("etcd-cpp-apiv3: key not found"); + } + } + return txn_resp; +} + +etcdv3::AsyncWatchAction::AsyncWatchAction( + etcdv3::ActionParameters && params) + : etcdv3::Action(std::move(params)) +{ + isCancelled.store(false); + stream = parameters.watch_stub->AsyncWatch(&context,&cq_,(void*)etcdv3::WATCH_CREATE); + + WatchRequest watch_req; + WatchCreateRequest watch_create_req; + + if(!parameters.withPrefix) { + watch_create_req.set_key(parameters.key); + } else { + if (parameters.key.empty()) { + watch_create_req.set_key(etcdv3::NUL); + watch_create_req.set_range_end(etcdv3::NUL); + } else { + watch_create_req.set_key(parameters.key); + watch_create_req.set_range_end(detail::string_plus_one(parameters.key)); + } + } + if(!parameters.range_end.empty()) { + watch_create_req.set_range_end(parameters.range_end); + } + + watch_create_req.set_prev_kv(true); + watch_create_req.set_start_revision(parameters.revision); + + watch_req.mutable_create_request()->CopyFrom(watch_create_req); + + // wait "create" success (the stream becomes ready) + void *got_tag; + bool ok = false; + if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)etcdv3::WATCH_CREATE) { + stream->Write(watch_req, (void *)etcdv3::WATCH_WRITE); + } else { + throw std::runtime_error("failed to create a watch connection"); + } + + // wait "write" (WatchCreateRequest) success, and start to read the first reply + if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)etcdv3::WATCH_WRITE) { + stream->Read(&reply, (void*)this); + this->watch_id = reply.watch_id(); + } else { + throw std::runtime_error("failed to write WatchCreateRequest to server"); + } +} + +void etcdv3::AsyncWatchAction::waitForResponse() +{ + void* got_tag; + bool ok = false; + + while(cq_.Next(&got_tag, &ok)) + { + if(ok == false) + { + break; + } + if(got_tag == (void *)etcdv3::WATCH_WRITE_CANCEL) { + stream->WritesDone((void*)etcdv3::WATCH_WRITES_DONE); + continue; + } + if(got_tag == (void*)etcdv3::WATCH_WRITES_DONE) + { + stream->Finish(&status, (void *)etcdv3::WATCH_FINISH); + continue; + } + if (got_tag == (void *)etcdv3::WATCH_FINISH) { + // shutdown + cq_.Shutdown(); + break; + } + if(got_tag == (void*)this) // read tag + { + if (reply.canceled()) { + // cancel on-the-fly calls, but don't shutdown the completion queue as there + // are still a inflight call to finish + context.TryCancel(); + continue; + } + + // we stop watch under two conditions: + // + // 1. watch for a future revision, return immediately with empty events set + // 2. receive any effective events. + if ((reply.created() && reply.header().revision() < parameters.revision) || + reply.events_size() > 0) { + // leave a warning if the response is too large and been fragmented + if (reply.fragment()) { + std::cerr << "WARN: The response hasn't been fully received and parsed" << std::endl; + } + + std::cout << "issue a watch cancel" << std::endl; + // cancel the watcher after receiving the good response + this->CancelWatch(); + + // start the next round to read finish messages, read into "&dummy" + // (use nullptr, as it won't be touched). + stream->Read(nullptr, (void*)etcdv3::WATCH_FINISH); + } else { + // start the next round to read reply, read into "&reply" + stream->Read(&reply, (void*)this); + } + continue; + } + if(isCancelled.load()) { + // invalid tag, and is cancelled + break; + } + } +} + +void etcdv3::AsyncWatchAction::CancelWatch() +{ + if (!isCancelled.exchange(true)) { + WatchRequest cancel_req; + cancel_req.mutable_cancel_request()->set_watch_id(this->watch_id); + stream->Write(cancel_req, (void *)etcdv3::WATCH_WRITE_CANCEL); + isCancelled.store(true); + } +} + +bool etcdv3::AsyncWatchAction::Cancelled() const { + return isCancelled.load(); +} + +void etcdv3::AsyncWatchAction::waitForResponse(std::function callback) +{ + void* got_tag; + bool ok = false; + + while(cq_.Next(&got_tag, &ok)) + { + if(ok == false) + { + break; + } + if(got_tag == (void *)etcdv3::WATCH_WRITE_CANCEL) { + stream->WritesDone((void*)etcdv3::WATCH_WRITES_DONE); + continue; + } + if(got_tag == (void*)etcdv3::WATCH_WRITES_DONE) + { + stream->Finish(&status, (void *)etcdv3::WATCH_FINISH); + continue; + } + if (got_tag == (void *)etcdv3::WATCH_FINISH) { + // shutdown + cq_.Shutdown(); + break; + } + if(got_tag == (void*)this) // read tag + { + if (reply.canceled()) { + if (reply.compact_revision() != 0) { + auto resp = ParseResponse(); + auto duration = std::chrono::duration_cast( + std::chrono::high_resolution_clock::now() - start_timepoint); + callback(etcd::Response(resp, duration)); + } + // cancel on-the-fly calls, but don't shutdown the completion queue as there + // are still a inflight call to finish + context.TryCancel(); + continue; + } + + // for the callback case, we don't invoke callback immediately if watching + // for a future revision, we wait until there are some effective events. + if(reply.events_size()) + { + auto resp = ParseResponse(); + auto duration = std::chrono::duration_cast( + std::chrono::high_resolution_clock::now() - start_timepoint); + callback(etcd::Response(resp, duration)); + start_timepoint = std::chrono::high_resolution_clock::now(); + } + stream->Read(&reply, (void*)this); + continue; + } + if(isCancelled.load()) { + // invalid tag, and is cancelled + break; + } + } +} + +etcdv3::AsyncWatchResponse etcdv3::AsyncWatchAction::ParseResponse() +{ + AsyncWatchResponse watch_resp; + watch_resp.set_action(etcdv3::WATCH_ACTION); + + if(!status.ok()) + { + watch_resp.set_error_code(status.error_code()); + watch_resp.set_error_message(status.error_message()); + } + else + { + watch_resp.ParseResponse(reply); + } + return watch_resp; +} diff --git a/src/v3/AsyncHeadAction.cpp b/src/v3/AsyncHeadAction.cpp deleted file mode 100644 index df99024..0000000 --- a/src/v3/AsyncHeadAction.cpp +++ /dev/null @@ -1,35 +0,0 @@ -#include "etcd/v3/AsyncHeadAction.hpp" - -#include - -#include "etcd/v3/action_constants.hpp" - -using etcdserverpb::RangeRequest; - -etcdv3::AsyncHeadAction::AsyncHeadAction( - etcdv3::ActionParameters && params) - : etcdv3::Action(std::move(params)) -{ - RangeRequest get_request; - get_request.set_key(etcdv3::NUL); - get_request.set_limit(1); - response_reader = parameters.kv_stub->AsyncRange(&context,get_request,&cq_); - response_reader->Finish(&reply, &status, (void*)this); -} - -etcdv3::AsyncHeadResponse etcdv3::AsyncHeadAction::ParseResponse() -{ - AsyncHeadResponse head_resp; - head_resp.set_action(etcdv3::GET_ACTION); - - if(!status.ok()) - { - head_resp.set_error_code(status.error_code()); - head_resp.set_error_message(status.error_message()); - } - else - { - head_resp.ParseResponse(reply); - } - return head_resp; -} diff --git a/src/v3/AsyncHeadResponse.cpp b/src/v3/AsyncHeadResponse.cpp deleted file mode 100644 index 5169a63..0000000 --- a/src/v3/AsyncHeadResponse.cpp +++ /dev/null @@ -1,11 +0,0 @@ -#include "etcd/v3/AsyncHeadResponse.hpp" -#include "etcd/v3/action_constants.hpp" - - -void etcdv3::AsyncHeadResponse::ParseResponse(RangeResponse& resp) -{ - cluster_id = resp.header().cluster_id(); - member_id = resp.header().member_id(); - index = resp.header().revision(); - raft_term = resp.header().raft_term(); -} diff --git a/src/v3/AsyncLeaseAction.cpp b/src/v3/AsyncLeaseAction.cpp deleted file mode 100644 index 95e7943..0000000 --- a/src/v3/AsyncLeaseAction.cpp +++ /dev/null @@ -1,264 +0,0 @@ -#include "etcd/v3/AsyncLeaseAction.hpp" - -#include "etcd/Response.hpp" -#include "etcd/v3/action_constants.hpp" -#include "etcd/v3/Transaction.hpp" - -#include - -using etcdserverpb::LeaseGrantRequest; -using etcdserverpb::LeaseRevokeRequest; -using etcdserverpb::LeaseCheckpointRequest; -using etcdserverpb::LeaseKeepAliveRequest; -using etcdserverpb::LeaseTimeToLiveRequest; -using etcdserverpb::LeaseLeasesRequest; - -etcdv3::AsyncLeaseGrantAction::AsyncLeaseGrantAction( - etcdv3::ActionParameters && params) - : etcdv3::Action(std::move(params)) -{ - LeaseGrantRequest leasegrant_request; - leasegrant_request.set_ttl(parameters.ttl); - // If ID is set to 0, etcd will choose an ID. - leasegrant_request.set_id(parameters.lease_id); - - response_reader = parameters.lease_stub->AsyncLeaseGrant(&context, leasegrant_request, &cq_); - response_reader->Finish(&reply, &status, (void*)this); -} - -etcdv3::AsyncLeaseGrantResponse etcdv3::AsyncLeaseGrantAction::ParseResponse() -{ - AsyncLeaseGrantResponse lease_resp; - lease_resp.set_action(etcdv3::LEASEGRANT); - - if (!status.ok()) { - lease_resp.set_error_code(status.error_code()); - lease_resp.set_error_message(status.error_message()); - } else { - lease_resp.ParseResponse(reply); - } - return lease_resp; -} - -etcdv3::AsyncLeaseRevokeAction::AsyncLeaseRevokeAction( - etcdv3::ActionParameters && params) - : etcdv3::Action(std::move(params)) -{ - LeaseRevokeRequest leaserevoke_request; - leaserevoke_request.set_id(parameters.lease_id); - - response_reader = parameters.lease_stub->AsyncLeaseRevoke(&context, leaserevoke_request, &cq_); - response_reader->Finish(&reply, &status, (void*)this); -} - -etcdv3::AsyncLeaseRevokeResponse etcdv3::AsyncLeaseRevokeAction::ParseResponse() -{ - AsyncLeaseRevokeResponse lease_resp; - lease_resp.set_action(etcdv3::LEASEREVOKE); - - if (!status.ok()) { - lease_resp.set_error_code(status.error_code()); - lease_resp.set_error_message(status.error_message()); - } else { - lease_resp.ParseResponse(reply); - } - return lease_resp; -} - -etcdv3::AsyncLeaseKeepAliveAction::AsyncLeaseKeepAliveAction( - etcdv3::ActionParameters && params) - : etcdv3::Action(std::move(params)) -{ - isCancelled = false; - stream = parameters.lease_stub->AsyncLeaseKeepAlive(&context, &cq_, (void*)etcdv3::KEEPALIVE_CREATE); - - void *got_tag = nullptr; - bool ok = false; - if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)etcdv3::KEEPALIVE_CREATE) { - // ok - } else { - throw std::runtime_error("Failed to create a lease keep-alive connection"); - } -} - -etcdv3::AsyncLeaseKeepAliveResponse etcdv3::AsyncLeaseKeepAliveAction::ParseResponse() -{ - AsyncLeaseKeepAliveResponse lease_resp; - lease_resp.set_action(etcdv3::LEASEKEEPALIVE); - - if (!status.ok()) { - lease_resp.set_error_code(status.error_code()); - lease_resp.set_error_message(status.error_message()); - } else { - lease_resp.ParseResponse(reply); - } - return lease_resp; -} - -etcd::Response etcdv3::AsyncLeaseKeepAliveAction::Refresh() -{ - std::lock_guard scope_lock(this->protect_is_cancelled); - - auto start_timepoint = std::chrono::high_resolution_clock::now(); - if (isCancelled) { - status = grpc::Status::CANCELLED; - return etcd::Response(ParseResponse(), etcd::detail::duration_till_now(start_timepoint)); - } - - LeaseKeepAliveRequest leasekeepalive_request; - leasekeepalive_request.set_id(parameters.lease_id); - - void *got_tag = nullptr; - bool ok = false; - - if (parameters.has_grpc_timeout()) { - stream->Write(leasekeepalive_request, (void *)etcdv3::KEEPALIVE_WRITE); - // wait write finish - switch (cq_.AsyncNext(&got_tag, &ok, parameters.grpc_deadline())) { - case CompletionQueue::NextStatus::TIMEOUT: { - status = grpc::Status(grpc::StatusCode::DEADLINE_EXCEEDED, "gRPC timeout during keep alive write"); - break; - } - case CompletionQueue::NextStatus::SHUTDOWN: { - status = grpc::Status(grpc::StatusCode::UNAVAILABLE, "gRPC already shutdown during keep alive write"); - break; - } - case CompletionQueue::NextStatus::GOT_EVENT: { - if (!ok || got_tag != (void *)etcdv3::KEEPALIVE_WRITE) { - return etcd::Response(grpc::StatusCode::ABORTED, "Failed to create a lease keep-alive connection: write not ok or invalid tag"); - } - } - } - if (!status.ok()) { - this->CancelKeepAlive(); - return etcd::Response(ParseResponse(), etcd::detail::duration_till_now(start_timepoint)); - } - - stream->Read(&reply, (void*)etcdv3::KEEPALIVE_READ); - // wait read finish - switch (cq_.AsyncNext(&got_tag, &ok, parameters.grpc_deadline())) { - case CompletionQueue::NextStatus::TIMEOUT: { - status = grpc::Status(grpc::StatusCode::DEADLINE_EXCEEDED, "gRPC timeout during keep alive read"); - break; - } - case CompletionQueue::NextStatus::SHUTDOWN: { - status = grpc::Status(grpc::StatusCode::UNAVAILABLE, "gRPC already shutdown during keep alive read"); - break; - } - case CompletionQueue::NextStatus::GOT_EVENT: { - if (ok && got_tag == (void *)etcdv3::KEEPALIVE_READ) { - return etcd::Response(ParseResponse(), etcd::detail::duration_till_now(start_timepoint)); - } - break; - } - } - this->CancelKeepAlive(); - return etcd::Response(grpc::StatusCode::ABORTED, "Failed to create a lease keep-alive connection: read not ok or invalid tag"); - } else { - stream->Write(leasekeepalive_request, (void *)etcdv3::KEEPALIVE_WRITE); - // wait write finish - if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)etcdv3::KEEPALIVE_WRITE) { - stream->Read(&reply, (void*)etcdv3::KEEPALIVE_READ); - // wait read finish - if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)etcdv3::KEEPALIVE_READ) { - return etcd::Response(ParseResponse(), etcd::detail::duration_till_now(start_timepoint)); - } - } - this->CancelKeepAlive(); - return etcd::Response(grpc::StatusCode::ABORTED, "Failed to create a lease keep-alive connection: read not ok or invalid tag"); - } -} - -void etcdv3::AsyncLeaseKeepAliveAction::CancelKeepAlive() -{ - std::lock_guard scope_lock(this->protect_is_cancelled); - if(isCancelled == false) - { - isCancelled = true; - - void *got_tag = nullptr; - bool ok = false; - - stream->WritesDone((void*)etcdv3::KEEPALIVE_DONE); - if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)etcdv3::KEEPALIVE_DONE) { - // ok - } else { - std::cerr << "Failed to mark a lease keep-alive connection as DONE: " - << context.debug_error_string() << std::endl; - } - - stream->Finish(&status, (void *)KEEPALIVE_FINISH); - if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)KEEPALIVE_FINISH) { - // ok - } else { - std::cerr << "Failed to finish a lease keep-alive connection: " - << status.error_message() - << ", " << context.debug_error_string() << std::endl; - } - - // cancel on-the-fly calls - context.TryCancel(); - - cq_.Shutdown(); - } -} - -bool etcdv3::AsyncLeaseKeepAliveAction::Cancelled() const -{ - return isCancelled; -} - -etcdv3::ActionParameters& etcdv3::AsyncLeaseKeepAliveAction::mutable_parameters() { - return this->parameters; -} - -etcdv3::AsyncLeaseTimeToLiveAction::AsyncLeaseTimeToLiveAction( - etcdv3::ActionParameters && params) - : etcdv3::Action(std::move(params)) -{ - LeaseTimeToLiveRequest leasetimetolive_request; - leasetimetolive_request.set_id(parameters.lease_id); - // FIXME: unsupported parameters: "keys" - // leasetimetolive_request.set_keys(parameters.keys); - - response_reader = parameters.lease_stub->AsyncLeaseTimeToLive(&context, leasetimetolive_request, &cq_); - response_reader->Finish(&reply, &status, (void*)this); -} - -etcdv3::AsyncLeaseTimeToLiveResponse etcdv3::AsyncLeaseTimeToLiveAction::ParseResponse() -{ - AsyncLeaseTimeToLiveResponse lease_resp; - lease_resp.set_action(etcdv3::LEASETIMETOLIVE); - - if (!status.ok()) { - lease_resp.set_error_code(status.error_code()); - lease_resp.set_error_message(status.error_message()); - } else { - lease_resp.ParseResponse(reply); - } - return lease_resp; -} - -etcdv3::AsyncLeaseLeasesAction::AsyncLeaseLeasesAction( - etcdv3::ActionParameters && params) - : etcdv3::Action(std::move(params)) -{ - LeaseLeasesRequest leaseleases_request; - - response_reader = parameters.lease_stub->AsyncLeaseLeases(&context, leaseleases_request, &cq_); - response_reader->Finish(&reply, &status, (void*)this); -} - -etcdv3::AsyncLeaseLeasesResponse etcdv3::AsyncLeaseLeasesAction::ParseResponse() -{ - AsyncLeaseLeasesResponse lease_resp; - lease_resp.set_action(etcdv3::LEASELEASES); - - if (!status.ok()) { - lease_resp.set_error_code(status.error_code()); - lease_resp.set_error_message(status.error_message()); - } else { - lease_resp.ParseResponse(reply); - } - return lease_resp; -} diff --git a/src/v3/AsyncLeaseResponse.cpp b/src/v3/AsyncLeaseResponse.cpp deleted file mode 100644 index 5c80ab3..0000000 --- a/src/v3/AsyncLeaseResponse.cpp +++ /dev/null @@ -1,34 +0,0 @@ -#include "etcd/v3/AsyncLeaseResponse.hpp" -#include "etcd/v3/action_constants.hpp" - - -void etcdv3::AsyncLeaseGrantResponse::ParseResponse(LeaseGrantResponse& resp) { - index = resp.header().revision(); - value.kvs.set_lease(resp.id()); - value.set_ttl(resp.ttl()); - error_message = resp.error(); -} - -void etcdv3::AsyncLeaseRevokeResponse::ParseResponse(LeaseRevokeResponse& resp) { - index = resp.header().revision(); -} - -void etcdv3::AsyncLeaseKeepAliveResponse::ParseResponse(LeaseKeepAliveResponse& resp) { - index = resp.header().revision(); - value.kvs.set_lease(resp.id()); - value.set_ttl(resp.ttl()); -} - -void etcdv3::AsyncLeaseTimeToLiveResponse::ParseResponse(LeaseTimeToLiveResponse& resp) { - index = resp.header().revision(); - value.kvs.set_lease(resp.id()); - value.set_ttl(resp.ttl()); - // FIXME: unsupported: fields "grantedTTL" and "keys" -} - -void etcdv3::AsyncLeaseLeasesResponse::ParseResponse(LeaseLeasesResponse& resp) { - index = resp.header().revision(); - for (auto lease : resp.leases()) { - leases.emplace_back(lease.id()); - } -} diff --git a/src/v3/AsyncLockAction.cpp b/src/v3/AsyncLockAction.cpp deleted file mode 100644 index cc3f56a..0000000 --- a/src/v3/AsyncLockAction.cpp +++ /dev/null @@ -1,64 +0,0 @@ -#include "etcd/v3/AsyncLockAction.hpp" -#include "etcd/v3/action_constants.hpp" - -using v3lockpb::LockRequest; -using v3lockpb::UnlockRequest; - -etcdv3::AsyncLockAction::AsyncLockAction( - ActionParameters && params) - : etcdv3::Action(std::move(params)) -{ - LockRequest lock_request; - lock_request.set_name(parameters.key); - lock_request.set_lease(parameters.lease_id); - - response_reader = parameters.lock_stub->AsyncLock(&context, lock_request, &cq_); - response_reader->Finish(&reply, &status, (void*)this); -} - -etcdv3::AsyncLockResponse etcdv3::AsyncLockAction::ParseResponse() -{ - AsyncLockResponse lock_resp; - lock_resp.set_action(etcdv3::LOCK_ACTION); - - if(!status.ok()) - { - lock_resp.set_error_code(status.error_code()); - lock_resp.set_error_message(status.error_message()); - } - else - { - lock_resp.ParseResponse(reply); - } - - return lock_resp; -} - -etcdv3::AsyncUnlockAction::AsyncUnlockAction( - ActionParameters && params) - : etcdv3::Action(std::move(params)) -{ - UnlockRequest unlock_request; - unlock_request.set_key(parameters.key); - - response_reader = parameters.lock_stub->AsyncUnlock(&context, unlock_request, &cq_); - response_reader->Finish(&reply, &status, (void*)this); -} - -etcdv3::AsyncUnlockResponse etcdv3::AsyncUnlockAction::ParseResponse() -{ - AsyncUnlockResponse unlock_resp; - unlock_resp.set_action(etcdv3::UNLOCK_ACTION); - - if(!status.ok()) - { - unlock_resp.set_error_code(status.error_code()); - unlock_resp.set_error_message(status.error_message()); - } - else - { - unlock_resp.ParseResponse(reply); - } - - return unlock_resp; -} diff --git a/src/v3/AsyncLockResponse.cpp b/src/v3/AsyncLockResponse.cpp deleted file mode 100644 index ac3f4be..0000000 --- a/src/v3/AsyncLockResponse.cpp +++ /dev/null @@ -1,13 +0,0 @@ -#include "etcd/v3/AsyncLockResponse.hpp" -#include "etcd/v3/action_constants.hpp" - -void etcdv3::AsyncLockResponse::ParseResponse(LockResponse& resp) -{ - index = resp.header().revision(); - lock_key = resp.key(); -} - -void etcdv3::AsyncUnlockResponse::ParseResponse(UnlockResponse& resp) -{ - index = resp.header().revision(); -} diff --git a/src/v3/AsyncPutAction.cpp b/src/v3/AsyncPutAction.cpp deleted file mode 100644 index 1e91dce..0000000 --- a/src/v3/AsyncPutAction.cpp +++ /dev/null @@ -1,36 +0,0 @@ -#include "etcd/v3/AsyncPutAction.hpp" -#include "etcd/v3/action_constants.hpp" - -using etcdserverpb::PutRequest; - -etcdv3::AsyncPutAction::AsyncPutAction( - ActionParameters && params) - : etcdv3::Action(std::move(params)) -{ - PutRequest put_request; - put_request.set_key(parameters.key); - put_request.set_value(parameters.value); - put_request.set_lease(parameters.lease_id); - put_request.set_prev_kv(true); - - response_reader = parameters.kv_stub->AsyncPut(&context, put_request, &cq_); - response_reader->Finish(&reply, &status, (void*)this); -} - -etcdv3::AsyncPutResponse etcdv3::AsyncPutAction::ParseResponse() -{ - AsyncPutResponse put_resp; - put_resp.set_action(etcdv3::PUT_ACTION); - - if(!status.ok()) - { - put_resp.set_error_code(status.error_code()); - put_resp.set_error_message(status.error_message()); - } - else - { - put_resp.ParseResponse(reply); - } - - return put_resp; -} diff --git a/src/v3/AsyncPutResponse.cpp b/src/v3/AsyncPutResponse.cpp deleted file mode 100644 index 6a116a5..0000000 --- a/src/v3/AsyncPutResponse.cpp +++ /dev/null @@ -1,13 +0,0 @@ -#include "etcd/v3/AsyncPutResponse.hpp" -#include "etcd/v3/action_constants.hpp" - - -void etcdv3::AsyncPutResponse::ParseResponse(PutResponse& resp) -{ - index = resp.header().revision(); - - //get all previous values - etcdv3::KeyValue kv; - kv.kvs.CopyFrom(resp.prev_kv()); - prev_value = kv; -} diff --git a/src/v3/AsyncRangeAction.cpp b/src/v3/AsyncRangeAction.cpp deleted file mode 100644 index 54117cc..0000000 --- a/src/v3/AsyncRangeAction.cpp +++ /dev/null @@ -1,59 +0,0 @@ -#include "etcd/v3/AsyncRangeAction.hpp" - -#include - -#include "etcd/v3/action_constants.hpp" - -using etcdserverpb::RangeRequest; - -etcdv3::AsyncRangeAction::AsyncRangeAction( - etcdv3::ActionParameters && params) - : etcdv3::Action(std::move(params)) -{ - RangeRequest get_request; - if (!parameters.withPrefix) { - get_request.set_key(parameters.key); - } else { - if (parameters.key.empty()) { - // see: WithFromKey in etcdv3/client - get_request.set_key(etcdv3::NUL); - get_request.set_range_end(etcdv3::NUL); - } else { - get_request.set_key(parameters.key); - get_request.set_range_end(detail::string_plus_one(parameters.key)); - } - } - if(!parameters.range_end.empty()) { - get_request.set_range_end(parameters.range_end); - } - if(parameters.revision > 0) { - get_request.set_revision(parameters.revision); - } - - get_request.set_limit(parameters.limit); - get_request.set_sort_order(RangeRequest::SortOrder::RangeRequest_SortOrder_NONE); - - // set keys_only and count_only - get_request.set_keys_only(params.keys_only); - get_request.set_count_only(params.count_only); - - response_reader = parameters.kv_stub->AsyncRange(&context,get_request,&cq_); - response_reader->Finish(&reply, &status, (void*)this); -} - -etcdv3::AsyncRangeResponse etcdv3::AsyncRangeAction::ParseResponse() -{ - AsyncRangeResponse range_resp; - range_resp.set_action(etcdv3::GET_ACTION); - - if(!status.ok()) - { - range_resp.set_error_code(status.error_code()); - range_resp.set_error_message(status.error_message()); - } - else - { - range_resp.ParseResponse(reply, parameters.withPrefix || !parameters.range_end.empty()); - } - return range_resp; -} diff --git a/src/v3/AsyncRangeResponse.cpp b/src/v3/AsyncRangeResponse.cpp deleted file mode 100644 index 2dfa73b..0000000 --- a/src/v3/AsyncRangeResponse.cpp +++ /dev/null @@ -1,29 +0,0 @@ -#include "etcd/v3/AsyncRangeResponse.hpp" -#include "etcd/v3/action_constants.hpp" - - -void etcdv3::AsyncRangeResponse::ParseResponse(RangeResponse& resp, bool prefix) -{ - index = resp.header().revision(); - if(resp.kvs_size() == 0 && !prefix) - { - error_code = etcdv3::ERROR_KEY_NOT_FOUND; - error_message = "etcd-cpp-apiv3: key not found"; - return; - } - else - { - for(int index=0; index < resp.kvs_size(); index++) - { - etcdv3::KeyValue kv; - kv.kvs.CopyFrom(resp.kvs(index)); - values.push_back(kv); - } - - if(!prefix) - { - value = values[0]; - values.clear(); - } - } -} diff --git a/src/v3/AsyncSetAction.cpp b/src/v3/AsyncSetAction.cpp deleted file mode 100644 index e09c023..0000000 --- a/src/v3/AsyncSetAction.cpp +++ /dev/null @@ -1,51 +0,0 @@ -#include "etcd/v3/AsyncSetAction.hpp" - -#include "etcd/v3/action_constants.hpp" -#include "etcd/v3/Transaction.hpp" - -etcdv3::AsyncSetAction::AsyncSetAction( - etcdv3::ActionParameters && params, bool create) - : etcdv3::Action(std::move(params)) -{ - etcdv3::Transaction transaction(parameters.key); - isCreate = create; - transaction.init_compare(CompareResult::EQUAL, - CompareTarget::VERSION); - - transaction.setup_basic_create_sequence(parameters.key, parameters.value, parameters.lease_id); - - if(isCreate) - { - transaction.setup_basic_failure_operation(parameters.key); - } - else - { - transaction.setup_set_failure_operation(parameters.key, parameters.value, parameters.lease_id); - } - response_reader = parameters.kv_stub->AsyncTxn(&context, *transaction.txn_request, &cq_); - response_reader->Finish(&reply, &status, (void*)this); -} - -etcdv3::AsyncTxnResponse etcdv3::AsyncSetAction::ParseResponse() -{ - - AsyncTxnResponse txn_resp; - txn_resp.set_action(isCreate? etcdv3::CREATE_ACTION : etcdv3::SET_ACTION); - - if(!status.ok()) - { - txn_resp.set_error_code(status.error_code()); - txn_resp.set_error_message(status.error_message()); - } - else - { - txn_resp.ParseResponse(parameters.key, parameters.withPrefix, reply); - - if(!reply.succeeded() && isCreate) - { - txn_resp.set_error_code(etcdv3::ERROR_KEY_ALREADY_EXISTS); - txn_resp.set_error_message("etcd-cpp-apiv3: key already exists"); - } - } - return txn_resp; -} diff --git a/src/v3/AsyncTxnAction.cpp b/src/v3/AsyncTxnAction.cpp deleted file mode 100644 index 6d2d07c..0000000 --- a/src/v3/AsyncTxnAction.cpp +++ /dev/null @@ -1,38 +0,0 @@ -#include "etcd/v3/action_constants.hpp" -#include "etcd/v3/AsyncTxnAction.hpp" -#include "etcd/v3/Transaction.hpp" - - -etcdv3::AsyncTxnAction::AsyncTxnAction( - etcdv3::ActionParameters && params, etcdv3::Transaction const &tx) - : etcdv3::Action(std::move(params)) -{ - response_reader = parameters.kv_stub->AsyncTxn(&context, *tx.txn_request, &cq_); - response_reader->Finish(&reply, &status, (void *)this); -} - -etcdv3::AsyncTxnResponse etcdv3::AsyncTxnAction::ParseResponse() -{ - AsyncTxnResponse txn_resp; - txn_resp.set_action(etcdv3::TXN_ACTION); - - if(!status.ok()) - { - txn_resp.set_error_code(status.error_code()); - txn_resp.set_error_message(status.error_message()); - } - else - { - txn_resp.ParseResponse(parameters.key, parameters.withPrefix, reply); - - //if there is an error code returned by parseResponse, we must - //not overwrite it. - if(!reply.succeeded() && !txn_resp.get_error_code()) - { - txn_resp.set_error_code(ERROR_COMPARE_FAILED); - txn_resp.set_error_message("etcd-cpp-apiv3: compare failed"); - } - } - - return txn_resp; -} diff --git a/src/v3/AsyncTxnResponse.cpp b/src/v3/AsyncTxnResponse.cpp deleted file mode 100644 index b713094..0000000 --- a/src/v3/AsyncTxnResponse.cpp +++ /dev/null @@ -1,48 +0,0 @@ -#include "etcd/v3/AsyncTxnResponse.hpp" -#include "etcd/v3/AsyncRangeResponse.hpp" -#include "etcd/v3/AsyncDeleteResponse.hpp" -#include "etcd/v3/action_constants.hpp" - -using etcdserverpb::ResponseOp; - -void etcdv3::AsyncTxnResponse::ParseResponse(TxnResponse& reply) { - index = reply.header().revision(); -} - -void etcdv3::AsyncTxnResponse::ParseResponse(std::string const& key, bool prefix, TxnResponse& reply) -{ - index = reply.header().revision(); - for(int index=0; index < reply.responses_size(); index++) - { - auto resp = reply.responses(index); - if(ResponseOp::ResponseCase::kResponseRange == resp.response_case()) - { - AsyncRangeResponse response; - response.ParseResponse(*(resp.mutable_response_range()),prefix); - - error_code = response.get_error_code(); - error_message = response.get_error_message(); - - values = response.get_values(); - value = response.get_value(); - } - else if(ResponseOp::ResponseCase::kResponsePut == resp.response_case()) - { - auto put_resp = resp.response_put(); - if(put_resp.has_prev_kv()) - { - prev_value.kvs.CopyFrom(put_resp.prev_kv()); - } - } - else if(ResponseOp::ResponseCase::kResponseDeleteRange == resp.response_case()) - { - AsyncDeleteResponse response; - response.ParseResponse(key,prefix,*(resp.mutable_response_delete_range())); - - prev_value.kvs.CopyFrom(response.get_prev_value().kvs); - - values = response.get_values(); - value = response.get_value(); - } - } -} diff --git a/src/v3/AsyncUpdateAction.cpp b/src/v3/AsyncUpdateAction.cpp deleted file mode 100644 index 00ff7bc..0000000 --- a/src/v3/AsyncUpdateAction.cpp +++ /dev/null @@ -1,50 +0,0 @@ -#include "etcd/v3/AsyncUpdateAction.hpp" - -#include "etcd/v3/action_constants.hpp" -#include "etcd/v3/AsyncRangeResponse.hpp" -#include "etcd/v3/Transaction.hpp" - -using etcdserverpb::RangeRequest; -using etcdserverpb::PutRequest; -using etcdserverpb::RequestOp; -using etcdserverpb::ResponseOp; -using etcdserverpb::TxnRequest; - -etcdv3::AsyncUpdateAction::AsyncUpdateAction( - etcdv3::ActionParameters && params) - : etcdv3::Action(std::move(params)) -{ - etcdv3::Transaction transaction(parameters.key); - transaction.init_compare(CompareResult::GREATER, - CompareTarget::VERSION); - - transaction.setup_compare_and_swap_sequence(parameters.value, parameters.lease_id); - - response_reader = parameters.kv_stub->AsyncTxn(&context, *transaction.txn_request, &cq_); - response_reader->Finish(&reply, &status, (void*)this); -} - -etcdv3::AsyncTxnResponse etcdv3::AsyncUpdateAction::ParseResponse() -{ - AsyncTxnResponse txn_resp; - - if(!status.ok()) - { - txn_resp.set_error_code(status.error_code()); - txn_resp.set_error_message(status.error_message()); - } - else - { - if(reply.succeeded()) - { - txn_resp.ParseResponse(parameters.key, parameters.withPrefix, reply); - txn_resp.set_action(etcdv3::UPDATE_ACTION); - } - else - { - txn_resp.set_error_code(etcdv3::ERROR_KEY_NOT_FOUND); - txn_resp.set_error_message("etcd-cpp-apiv3: key not found"); - } - } - return txn_resp; -} diff --git a/src/v3/AsyncWatchAction.cpp b/src/v3/AsyncWatchAction.cpp deleted file mode 100644 index 505f4f2..0000000 --- a/src/v3/AsyncWatchAction.cpp +++ /dev/null @@ -1,209 +0,0 @@ -#include "etcd/v3/AsyncWatchAction.hpp" -#include "etcd/v3/action_constants.hpp" - - -using etcdserverpb::WatchCreateRequest; - -etcdv3::AsyncWatchAction::AsyncWatchAction( - etcdv3::ActionParameters && params) - : etcdv3::Action(std::move(params)) -{ - isCancelled.store(false); - stream = parameters.watch_stub->AsyncWatch(&context,&cq_,(void*)etcdv3::WATCH_CREATE); - - WatchRequest watch_req; - WatchCreateRequest watch_create_req; - - if(!parameters.withPrefix) { - watch_create_req.set_key(parameters.key); - } else { - if (parameters.key.empty()) { - watch_create_req.set_key(etcdv3::NUL); - watch_create_req.set_range_end(etcdv3::NUL); - } else { - watch_create_req.set_key(parameters.key); - watch_create_req.set_range_end(detail::string_plus_one(parameters.key)); - } - } - if(!parameters.range_end.empty()) { - watch_create_req.set_range_end(parameters.range_end); - } - - watch_create_req.set_prev_kv(true); - watch_create_req.set_start_revision(parameters.revision); - - watch_req.mutable_create_request()->CopyFrom(watch_create_req); - - // wait "create" success (the stream becomes ready) - void *got_tag; - bool ok = false; - if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)etcdv3::WATCH_CREATE) { - stream->Write(watch_req, (void *)etcdv3::WATCH_WRITE); - } else { - throw std::runtime_error("failed to create a watch connection"); - } - - // wait "write" (WatchCreateRequest) success, and start to read the first reply - if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)etcdv3::WATCH_WRITE) { - stream->Read(&reply, (void*)this); - this->watch_id = reply.watch_id(); - } else { - throw std::runtime_error("failed to write WatchCreateRequest to server"); - } -} - -void etcdv3::AsyncWatchAction::waitForResponse() -{ - void* got_tag; - bool ok = false; - - while(cq_.Next(&got_tag, &ok)) - { - if(ok == false) - { - break; - } - if(got_tag == (void *)etcdv3::WATCH_WRITE_CANCEL) { - stream->WritesDone((void*)etcdv3::WATCH_WRITES_DONE); - continue; - } - if(got_tag == (void*)etcdv3::WATCH_WRITES_DONE) - { - stream->Finish(&status, (void *)etcdv3::WATCH_FINISH); - continue; - } - if (got_tag == (void *)etcdv3::WATCH_FINISH) { - // shutdown - cq_.Shutdown(); - break; - } - if(got_tag == (void*)this) // read tag - { - if (reply.canceled()) { - // cancel on-the-fly calls, but don't shutdown the completion queue as there - // are still a inflight call to finish - context.TryCancel(); - continue; - } - - // we stop watch under two conditions: - // - // 1. watch for a future revision, return immediately with empty events set - // 2. receive any effective events. - if ((reply.created() && reply.header().revision() < parameters.revision) || - reply.events_size() > 0) { - // leave a warning if the response is too large and been fragmented - if (reply.fragment()) { - std::cerr << "WARN: The response hasn't been fully received and parsed" << std::endl; - } - - std::cout << "issue a watch cancel" << std::endl; - // cancel the watcher after receiving the good response - this->CancelWatch(); - - // start the next round to read finish messages, read into "&dummy" - // (use nullptr, as it won't be touched). - stream->Read(nullptr, (void*)etcdv3::WATCH_FINISH); - } else { - // start the next round to read reply, read into "&reply" - stream->Read(&reply, (void*)this); - } - continue; - } - if(isCancelled.load()) { - // invalid tag, and is cancelled - break; - } - } -} - -void etcdv3::AsyncWatchAction::CancelWatch() -{ - if (!isCancelled.exchange(true)) { - WatchRequest cancel_req; - cancel_req.mutable_cancel_request()->set_watch_id(this->watch_id); - stream->Write(cancel_req, (void *)etcdv3::WATCH_WRITE_CANCEL); - isCancelled.store(true); - } -} - -bool etcdv3::AsyncWatchAction::Cancelled() const { - return isCancelled.load(); -} - -void etcdv3::AsyncWatchAction::waitForResponse(std::function callback) -{ - void* got_tag; - bool ok = false; - - while(cq_.Next(&got_tag, &ok)) - { - if(ok == false) - { - break; - } - if(got_tag == (void *)etcdv3::WATCH_WRITE_CANCEL) { - stream->WritesDone((void*)etcdv3::WATCH_WRITES_DONE); - continue; - } - if(got_tag == (void*)etcdv3::WATCH_WRITES_DONE) - { - stream->Finish(&status, (void *)etcdv3::WATCH_FINISH); - continue; - } - if (got_tag == (void *)etcdv3::WATCH_FINISH) { - // shutdown - cq_.Shutdown(); - break; - } - if(got_tag == (void*)this) // read tag - { - if (reply.canceled()) { - if (reply.compact_revision() != 0) { - auto resp = ParseResponse(); - auto duration = std::chrono::duration_cast( - std::chrono::high_resolution_clock::now() - start_timepoint); - callback(etcd::Response(resp, duration)); - } - // cancel on-the-fly calls, but don't shutdown the completion queue as there - // are still a inflight call to finish - context.TryCancel(); - continue; - } - - // for the callback case, we don't invoke callback immediately if watching - // for a future revision, we wait until there are some effective events. - if(reply.events_size()) - { - auto resp = ParseResponse(); - auto duration = std::chrono::duration_cast( - std::chrono::high_resolution_clock::now() - start_timepoint); - callback(etcd::Response(resp, duration)); - start_timepoint = std::chrono::high_resolution_clock::now(); - } - stream->Read(&reply, (void*)this); - continue; - } - if(isCancelled.load()) { - // invalid tag, and is cancelled - break; - } - } -} - -etcdv3::AsyncWatchResponse etcdv3::AsyncWatchAction::ParseResponse() -{ - AsyncWatchResponse watch_resp; - watch_resp.set_action(etcdv3::WATCH_ACTION); - - if(!status.ok()) - { - watch_resp.set_error_code(status.error_code()); - watch_resp.set_error_message(status.error_message()); - } - else - { - watch_resp.ParseResponse(reply); - } - return watch_resp; -} diff --git a/src/v3/AsyncWatchResponse.cpp b/src/v3/AsyncWatchResponse.cpp deleted file mode 100644 index 409821a..0000000 --- a/src/v3/AsyncWatchResponse.cpp +++ /dev/null @@ -1,46 +0,0 @@ -#include "etcd/v3/AsyncWatchResponse.hpp" -#include "etcd/v3/action_constants.hpp" - -void etcdv3::AsyncWatchResponse::ParseResponse(WatchResponse& reply) -{ - if (reply.canceled() && reply.compact_revision() != 0) { - error_code = grpc::StatusCode::OUT_OF_RANGE; - error_message = "required revision has been compacted"; - compact_revision = reply.compact_revision(); - return; - } - index = reply.header().revision(); - for (auto const &e: reply.events()) { - events.emplace_back(e); - } - for(int cnt =0; cnt < reply.events_size(); cnt++) - { - auto event = reply.events(cnt); - if(mvccpb::Event::EventType::Event_EventType_PUT == event.type()) - { - if(event.kv().version() == 1) - { - action = etcdv3::CREATE_ACTION; - } - else - { - action = etcdv3::SET_ACTION; - } - value.kvs = event.kv(); - - } - else if(mvccpb::Event::EventType::Event_EventType_DELETE_ == event.type()) - { - action = etcdv3::DELETE_ACTION; - value.kvs = event.kv(); - } - if(event.has_prev_kv()) - { - prev_value.kvs = event.prev_kv(); - } - // just store the first occurence of the key in values. - // this is done so tas client will not need to change their behaviour. - // break immediately - break; - } -}