From d29e05545d0614a5db2c147cce1736bde1da03e1 Mon Sep 17 00:00:00 2001 From: Tao He Date: Tue, 12 Oct 2021 10:17:25 +0800 Subject: [PATCH] Implements the put action. Signed-off-by: Tao He --- etcd/Client.hpp | 17 +++++++++ etcd/SyncClient.hpp | 1 + etcd/v3/Action.hpp | 2 +- etcd/v3/AsyncDeleteAction.hpp | 4 +-- ...geResponse.hpp => AsyncDeleteResponse.hpp} | 4 +-- etcd/v3/AsyncPutAction.hpp | 26 ++++++++++++++ etcd/v3/AsyncPutResponse.hpp | 23 ++++++++++++ etcd/v3/action_constants.hpp | 1 + src/Client.cpp | 17 ++++++++- src/SyncClient.cpp | 5 +++ src/v3/AsyncDeleteAction.cpp | 14 ++++---- ...geResponse.cpp => AsyncDeleteResponse.cpp} | 6 ++-- src/v3/AsyncPutAction.cpp | 36 +++++++++++++++++++ src/v3/AsyncPutResponse.cpp | 13 +++++++ src/v3/AsyncTxnResponse.cpp | 4 +-- src/v3/action_constants.cpp | 1 + 16 files changed, 156 insertions(+), 18 deletions(-) rename etcd/v3/{AsyncDeleteRangeResponse.hpp => AsyncDeleteResponse.hpp} (81%) create mode 100644 etcd/v3/AsyncPutAction.hpp create mode 100644 etcd/v3/AsyncPutResponse.hpp rename src/v3/{AsyncDeleteRangeResponse.cpp => AsyncDeleteResponse.cpp} (73%) create mode 100644 src/v3/AsyncPutAction.cpp create mode 100644 src/v3/AsyncPutResponse.cpp diff --git a/etcd/Client.hpp b/etcd/Client.hpp index 8fe2548..eb5f143 100644 --- a/etcd/Client.hpp +++ b/etcd/Client.hpp @@ -56,6 +56,16 @@ namespace etcd Client(std::string const & etcd_url, std::string const & load_balancer = "round_robin"); + /** + * Constructs an etcd client object. + * + * @param etcd_url is the url of the etcd server to connect to, like "http://127.0.0.1:2379", + * or multiple url, seperated by ',' or ';'. + * @param load_balancer is the load balance strategy, can be one of round_robin/pick_first/grpclb/xds. + */ + static etcd::Client *WithUrl(std::string const & etcd_url, + std::string const & load_balancer = "round_robin"); + /** * Constructs an etcd client object. * @@ -165,6 +175,13 @@ namespace etcd */ pplx::task add(std::string const & key, std::string const & value, int64_t leaseId); + /** + * Put a new key-value pair. + * @param key is the key to be put + * @param value is the value to be put + */ + pplx::task put(std::string const & key, std::string const & value); + /** * Modifies an existing key. Fails if the key does not exists. * @param key is the key to be modified diff --git a/etcd/SyncClient.hpp b/etcd/SyncClient.hpp index d2ff556..e978464 100644 --- a/etcd/SyncClient.hpp +++ b/etcd/SyncClient.hpp @@ -46,6 +46,7 @@ namespace etcd Response set(std::string const & key, std::string const & value, int64_t leaseId); Response add(std::string const & key, std::string const & value, int ttl = 0); Response add(std::string const & key, std::string const & value, int64_t leaseId); + Response put(std::string const & key, std::string const & value); Response modify(std::string const & key, std::string const & value, int ttl = 0); Response modify(std::string const & key, std::string const & value, int64_t leaseId); Response modify_if(std::string const & key, std::string const & value, std::string const & old_value, int ttl = 0); diff --git a/etcd/v3/Action.hpp b/etcd/v3/Action.hpp index 498b135..46c1785 100644 --- a/etcd/v3/Action.hpp +++ b/etcd/v3/Action.hpp @@ -32,7 +32,7 @@ namespace etcdv3 bool withPrefix; int revision; int old_revision; - int64_t lease_id; + int64_t lease_id = 0; // no lease int ttl; int limit; std::string name; // for campaign (in v3election) diff --git a/etcd/v3/AsyncDeleteAction.hpp b/etcd/v3/AsyncDeleteAction.hpp index 71e18e2..8cb105d 100644 --- a/etcd/v3/AsyncDeleteAction.hpp +++ b/etcd/v3/AsyncDeleteAction.hpp @@ -4,7 +4,7 @@ #include #include "proto/rpc.grpc.pb.h" #include "etcd/v3/Action.hpp" -#include "etcd/v3/AsyncDeleteRangeResponse.hpp" +#include "etcd/v3/AsyncDeleteResponse.hpp" using grpc::ClientAsyncResponseReader; @@ -16,7 +16,7 @@ namespace etcdv3 { public: AsyncDeleteAction(etcdv3::ActionParameters const ¶m); - AsyncDeleteRangeResponse ParseResponse(); + AsyncDeleteResponse ParseResponse(); private: DeleteRangeResponse reply; std::unique_ptr> response_reader; diff --git a/etcd/v3/AsyncDeleteRangeResponse.hpp b/etcd/v3/AsyncDeleteResponse.hpp similarity index 81% rename from etcd/v3/AsyncDeleteRangeResponse.hpp rename to etcd/v3/AsyncDeleteResponse.hpp index 2b37c52..9818690 100644 --- a/etcd/v3/AsyncDeleteRangeResponse.hpp +++ b/etcd/v3/AsyncDeleteResponse.hpp @@ -12,10 +12,10 @@ using etcdserverpb::DeleteRangeResponse; namespace etcdv3 { - class AsyncDeleteRangeResponse : public etcdv3::V3Response + class AsyncDeleteResponse : public etcdv3::V3Response { public: - AsyncDeleteRangeResponse(){}; + AsyncDeleteResponse(){}; void ParseResponse(std::string const& key, bool prefix, DeleteRangeResponse& resp); }; } diff --git a/etcd/v3/AsyncPutAction.hpp b/etcd/v3/AsyncPutAction.hpp new file mode 100644 index 0000000..08c6c4d --- /dev/null +++ b/etcd/v3/AsyncPutAction.hpp @@ -0,0 +1,26 @@ +#ifndef __ASYNC_PUT_HPP__ +#define __ASYNC_PUT_HPP__ + +#include +#include "proto/rpc.grpc.pb.h" +#include "etcd/v3/Action.hpp" +#include "etcd/v3/AsyncPutResponse.hpp" + + +using grpc::ClientAsyncResponseReader; +using etcdserverpb::PutResponse; + +namespace etcdv3 +{ + class AsyncPutAction : public etcdv3::Action + { + public: + AsyncPutAction(etcdv3::ActionParameters const ¶m); + AsyncPutResponse ParseResponse(); + private: + PutResponse reply; + std::unique_ptr> response_reader; + }; +} + +#endif diff --git a/etcd/v3/AsyncPutResponse.hpp b/etcd/v3/AsyncPutResponse.hpp new file mode 100644 index 0000000..ea94e2b --- /dev/null +++ b/etcd/v3/AsyncPutResponse.hpp @@ -0,0 +1,23 @@ +#ifndef __ASYNC_PUTRESPONSE_HPP__ +#define __ASYNC_PUTRESPONSE_HPP__ + +#include +#include "proto/rpc.grpc.pb.h" +#include "etcd/v3/V3Response.hpp" +#include "etcd/v3/Action.hpp" + + +using grpc::ClientAsyncResponseReader; +using etcdserverpb::PutResponse; + +namespace etcdv3 +{ + class AsyncPutResponse : public etcdv3::V3Response + { + public: + AsyncPutResponse(){}; + void ParseResponse(PutResponse& resp); + }; +} + +#endif diff --git a/etcd/v3/action_constants.hpp b/etcd/v3/action_constants.hpp index 5ff09e8..fa8cb5f 100644 --- a/etcd/v3/action_constants.hpp +++ b/etcd/v3/action_constants.hpp @@ -7,6 +7,7 @@ namespace etcdv3 extern char const * UPDATE_ACTION; extern char const * SET_ACTION; extern char const * GET_ACTION; + extern char const * PUT_ACTION; extern char const * DELETE_ACTION; extern char const * COMPARESWAP_ACTION; extern char const * COMPAREDELETE_ACTION; diff --git a/src/Client.cpp b/src/Client.cpp index 4bb042a..7f5a700 100644 --- a/src/Client.cpp +++ b/src/Client.cpp @@ -32,7 +32,8 @@ #include "etcd/v3/Action.hpp" #include "etcd/v3/AsyncRangeResponse.hpp" #include "etcd/v3/AsyncWatchResponse.hpp" -#include "etcd/v3/AsyncDeleteRangeResponse.hpp" +#include "etcd/v3/AsyncDeleteResponse.hpp" +#include "etcd/v3/AsyncPutResponse.hpp" #include "etcd/v3/AsyncLockResponse.hpp" #include "etcd/v3/AsyncElectionResponse.hpp" #include "etcd/v3/AsyncTxnResponse.hpp" @@ -45,6 +46,7 @@ #include "etcd/v3/AsyncHeadAction.hpp" #include "etcd/v3/AsyncRangeAction.hpp" #include "etcd/v3/AsyncDeleteAction.hpp" +#include "etcd/v3/AsyncPutAction.hpp" #include "etcd/v3/AsyncWatchAction.hpp" #include "etcd/v3/AsyncLeaseAction.hpp" #include "etcd/v3/AsyncLockAction.hpp" @@ -198,6 +200,11 @@ etcd::Client::Client(std::string const & address, stubs->electionServiceStub = Election::NewStub(this->channel); } +etcd::Client *etcd::Client::WithUrl(std::string const & etcd_url, + std::string const & load_balancer) { + return new etcd::Client(etcd_url, load_balancer); +} + etcd::Client::Client(std::string const & address, std::string const & username, std::string const & password, @@ -373,6 +380,14 @@ pplx::task etcd::Client::add(std::string const & key, std::strin return Response::create(call); } +pplx::task etcd::Client::put(std::string const & key, std::string const & value) { + etcdv3::ActionParameters params; + params.key.assign(key); + params.value.assign(value); + params.kv_stub = stubs->kvServiceStub.get(); + std::shared_ptr call(new etcdv3::AsyncPutAction(params)); + return Response::create(call); +} pplx::task etcd::Client::modify(std::string const & key, std::string const & value, int ttl) { diff --git a/src/SyncClient.cpp b/src/SyncClient.cpp index e64001e..55c60e3 100644 --- a/src/SyncClient.cpp +++ b/src/SyncClient.cpp @@ -53,6 +53,11 @@ etcd::Response etcd::SyncClient::add(std::string const & key, std::string const CHECK_EXCEPTIONS(client.add(key, value, leaseId).get()); } +etcd::Response etcd::SyncClient::put(std::string const & key, std::string const & value) +{ + CHECK_EXCEPTIONS(client.put(key, value).get()); +} + etcd::Response etcd::SyncClient::modify(std::string const & key, std::string const & value, int ttl) { CHECK_EXCEPTIONS(client.modify(key, value, ttl).get()); diff --git a/src/v3/AsyncDeleteAction.cpp b/src/v3/AsyncDeleteAction.cpp index 87f6115..60e733c 100644 --- a/src/v3/AsyncDeleteAction.cpp +++ b/src/v3/AsyncDeleteAction.cpp @@ -5,7 +5,7 @@ using etcdserverpb::DeleteRangeRequest; etcdv3::AsyncDeleteAction::AsyncDeleteAction( ActionParameters const ¶m) - : etcdv3::Action(param) + : etcdv3::Action(param) { DeleteRangeRequest del_request; del_request.set_key(parameters.key); @@ -26,20 +26,20 @@ etcdv3::AsyncDeleteAction::AsyncDeleteAction( response_reader->Finish(&reply, &status, (void*)this); } -etcdv3::AsyncDeleteRangeResponse etcdv3::AsyncDeleteAction::ParseResponse() +etcdv3::AsyncDeleteResponse etcdv3::AsyncDeleteAction::ParseResponse() { - AsyncDeleteRangeResponse del_resp; + AsyncDeleteResponse del_resp; del_resp.set_action(etcdv3::DELETE_ACTION); - + if(!status.ok()) { del_resp.set_error_code(status.error_code()); del_resp.set_error_message(status.error_message()); } else - { - del_resp.ParseResponse(parameters.key, parameters.withPrefix || !parameters.range_end.empty(), reply); + { + del_resp.ParseResponse(parameters.key, parameters.withPrefix || !parameters.range_end.empty(), reply); } - + return del_resp; } diff --git a/src/v3/AsyncDeleteRangeResponse.cpp b/src/v3/AsyncDeleteResponse.cpp similarity index 73% rename from src/v3/AsyncDeleteRangeResponse.cpp rename to src/v3/AsyncDeleteResponse.cpp index 1309298..fa4b3d3 100644 --- a/src/v3/AsyncDeleteRangeResponse.cpp +++ b/src/v3/AsyncDeleteResponse.cpp @@ -1,8 +1,8 @@ -#include "etcd/v3/AsyncDeleteRangeResponse.hpp" +#include "etcd/v3/AsyncDeleteResponse.hpp" #include "etcd/v3/action_constants.hpp" -void etcdv3::AsyncDeleteRangeResponse::ParseResponse(std::string const& key, bool prefix, DeleteRangeResponse& resp) +void etcdv3::AsyncDeleteResponse::ParseResponse(std::string const& key, bool prefix, DeleteRangeResponse& resp) { index = resp.header().revision(); @@ -16,7 +16,7 @@ void etcdv3::AsyncDeleteRangeResponse::ParseResponse(std::string const& key, boo //get all previous values for(int cnt=0; cnt < resp.prev_kvs_size(); cnt++) { - etcdv3::KeyValue kv; + etcdv3::KeyValue kv; kv.kvs.CopyFrom(resp.prev_kvs(cnt)); values.push_back(kv); } diff --git a/src/v3/AsyncPutAction.cpp b/src/v3/AsyncPutAction.cpp new file mode 100644 index 0000000..4bcadce --- /dev/null +++ b/src/v3/AsyncPutAction.cpp @@ -0,0 +1,36 @@ +#include "etcd/v3/AsyncPutAction.hpp" +#include "etcd/v3/action_constants.hpp" + +using etcdserverpb::PutRequest; + +etcdv3::AsyncPutAction::AsyncPutAction( + ActionParameters const ¶m) + : etcdv3::Action(param) +{ + PutRequest put_request; + put_request.set_key(parameters.key); + put_request.set_value(parameters.value); + put_request.set_lease(parameters.lease_id); + put_request.set_prev_kv(true); + + response_reader = parameters.kv_stub->AsyncPut(&context, put_request, &cq_); + response_reader->Finish(&reply, &status, (void*)this); +} + +etcdv3::AsyncPutResponse etcdv3::AsyncPutAction::ParseResponse() +{ + AsyncPutResponse put_resp; + put_resp.set_action(etcdv3::PUT_ACTION); + + if(!status.ok()) + { + put_resp.set_error_code(status.error_code()); + put_resp.set_error_message(status.error_message()); + } + else + { + put_resp.ParseResponse(reply); + } + + return put_resp; +} diff --git a/src/v3/AsyncPutResponse.cpp b/src/v3/AsyncPutResponse.cpp new file mode 100644 index 0000000..6a116a5 --- /dev/null +++ b/src/v3/AsyncPutResponse.cpp @@ -0,0 +1,13 @@ +#include "etcd/v3/AsyncPutResponse.hpp" +#include "etcd/v3/action_constants.hpp" + + +void etcdv3::AsyncPutResponse::ParseResponse(PutResponse& resp) +{ + index = resp.header().revision(); + + //get all previous values + etcdv3::KeyValue kv; + kv.kvs.CopyFrom(resp.prev_kv()); + prev_value = kv; +} diff --git a/src/v3/AsyncTxnResponse.cpp b/src/v3/AsyncTxnResponse.cpp index 8816206..b713094 100644 --- a/src/v3/AsyncTxnResponse.cpp +++ b/src/v3/AsyncTxnResponse.cpp @@ -1,6 +1,6 @@ #include "etcd/v3/AsyncTxnResponse.hpp" #include "etcd/v3/AsyncRangeResponse.hpp" -#include "etcd/v3/AsyncDeleteRangeResponse.hpp" +#include "etcd/v3/AsyncDeleteResponse.hpp" #include "etcd/v3/action_constants.hpp" using etcdserverpb::ResponseOp; @@ -36,7 +36,7 @@ void etcdv3::AsyncTxnResponse::ParseResponse(std::string const& key, bool prefix } else if(ResponseOp::ResponseCase::kResponseDeleteRange == resp.response_case()) { - AsyncDeleteRangeResponse response; + AsyncDeleteResponse response; response.ParseResponse(key,prefix,*(resp.mutable_response_delete_range())); prev_value.kvs.CopyFrom(response.get_prev_value().kvs); diff --git a/src/v3/action_constants.cpp b/src/v3/action_constants.cpp index efa14bc..cfc56ae 100644 --- a/src/v3/action_constants.cpp +++ b/src/v3/action_constants.cpp @@ -5,6 +5,7 @@ char const * etcdv3::COMPARESWAP_ACTION = "compareAndSwap"; char const * etcdv3::UPDATE_ACTION = "update"; char const * etcdv3::SET_ACTION = "set"; char const * etcdv3::GET_ACTION = "get"; +char const * etcdv3::PUT_ACTION = "put"; char const * etcdv3::DELETE_ACTION = "delete"; char const * etcdv3::COMPAREDELETE_ACTION = "compareAndDelete"; char const * etcdv3::LOCK_ACTION = "lock";