Implements the put action.

Signed-off-by: Tao He <sighingnow@gmail.com>
This commit is contained in:
Tao He 2021-10-12 10:17:25 +08:00
parent e9db91b335
commit d29e05545d
16 changed files with 156 additions and 18 deletions

View File

@ -56,6 +56,16 @@ namespace etcd
Client(std::string const & etcd_url, Client(std::string const & etcd_url,
std::string const & load_balancer = "round_robin"); std::string const & load_balancer = "round_robin");
/**
* Constructs an etcd client object.
*
* @param etcd_url is the url of the etcd server to connect to, like "http://127.0.0.1:2379",
* or multiple url, seperated by ',' or ';'.
* @param load_balancer is the load balance strategy, can be one of round_robin/pick_first/grpclb/xds.
*/
static etcd::Client *WithUrl(std::string const & etcd_url,
std::string const & load_balancer = "round_robin");
/** /**
* Constructs an etcd client object. * Constructs an etcd client object.
* *
@ -165,6 +175,13 @@ namespace etcd
*/ */
pplx::task<Response> add(std::string const & key, std::string const & value, int64_t leaseId); pplx::task<Response> add(std::string const & key, std::string const & value, int64_t leaseId);
/**
* Put a new key-value pair.
* @param key is the key to be put
* @param value is the value to be put
*/
pplx::task<Response> put(std::string const & key, std::string const & value);
/** /**
* Modifies an existing key. Fails if the key does not exists. * Modifies an existing key. Fails if the key does not exists.
* @param key is the key to be modified * @param key is the key to be modified

View File

@ -46,6 +46,7 @@ namespace etcd
Response set(std::string const & key, std::string const & value, int64_t leaseId); Response set(std::string const & key, std::string const & value, int64_t leaseId);
Response add(std::string const & key, std::string const & value, int ttl = 0); Response add(std::string const & key, std::string const & value, int ttl = 0);
Response add(std::string const & key, std::string const & value, int64_t leaseId); Response add(std::string const & key, std::string const & value, int64_t leaseId);
Response put(std::string const & key, std::string const & value);
Response modify(std::string const & key, std::string const & value, int ttl = 0); Response modify(std::string const & key, std::string const & value, int ttl = 0);
Response modify(std::string const & key, std::string const & value, int64_t leaseId); Response modify(std::string const & key, std::string const & value, int64_t leaseId);
Response modify_if(std::string const & key, std::string const & value, std::string const & old_value, int ttl = 0); Response modify_if(std::string const & key, std::string const & value, std::string const & old_value, int ttl = 0);

View File

@ -32,7 +32,7 @@ namespace etcdv3
bool withPrefix; bool withPrefix;
int revision; int revision;
int old_revision; int old_revision;
int64_t lease_id; int64_t lease_id = 0; // no lease
int ttl; int ttl;
int limit; int limit;
std::string name; // for campaign (in v3election) std::string name; // for campaign (in v3election)

View File

@ -4,7 +4,7 @@
#include <grpc++/grpc++.h> #include <grpc++/grpc++.h>
#include "proto/rpc.grpc.pb.h" #include "proto/rpc.grpc.pb.h"
#include "etcd/v3/Action.hpp" #include "etcd/v3/Action.hpp"
#include "etcd/v3/AsyncDeleteRangeResponse.hpp" #include "etcd/v3/AsyncDeleteResponse.hpp"
using grpc::ClientAsyncResponseReader; using grpc::ClientAsyncResponseReader;
@ -16,7 +16,7 @@ namespace etcdv3
{ {
public: public:
AsyncDeleteAction(etcdv3::ActionParameters const &param); AsyncDeleteAction(etcdv3::ActionParameters const &param);
AsyncDeleteRangeResponse ParseResponse(); AsyncDeleteResponse ParseResponse();
private: private:
DeleteRangeResponse reply; DeleteRangeResponse reply;
std::unique_ptr<ClientAsyncResponseReader<DeleteRangeResponse>> response_reader; std::unique_ptr<ClientAsyncResponseReader<DeleteRangeResponse>> response_reader;

View File

@ -12,10 +12,10 @@ using etcdserverpb::DeleteRangeResponse;
namespace etcdv3 namespace etcdv3
{ {
class AsyncDeleteRangeResponse : public etcdv3::V3Response class AsyncDeleteResponse : public etcdv3::V3Response
{ {
public: public:
AsyncDeleteRangeResponse(){}; AsyncDeleteResponse(){};
void ParseResponse(std::string const& key, bool prefix, DeleteRangeResponse& resp); void ParseResponse(std::string const& key, bool prefix, DeleteRangeResponse& resp);
}; };
} }

View File

@ -0,0 +1,26 @@
#ifndef __ASYNC_PUT_HPP__
#define __ASYNC_PUT_HPP__
#include <grpc++/grpc++.h>
#include "proto/rpc.grpc.pb.h"
#include "etcd/v3/Action.hpp"
#include "etcd/v3/AsyncPutResponse.hpp"
using grpc::ClientAsyncResponseReader;
using etcdserverpb::PutResponse;
namespace etcdv3
{
class AsyncPutAction : public etcdv3::Action
{
public:
AsyncPutAction(etcdv3::ActionParameters const &param);
AsyncPutResponse ParseResponse();
private:
PutResponse reply;
std::unique_ptr<ClientAsyncResponseReader<PutResponse>> response_reader;
};
}
#endif

View File

@ -0,0 +1,23 @@
#ifndef __ASYNC_PUTRESPONSE_HPP__
#define __ASYNC_PUTRESPONSE_HPP__
#include <grpc++/grpc++.h>
#include "proto/rpc.grpc.pb.h"
#include "etcd/v3/V3Response.hpp"
#include "etcd/v3/Action.hpp"
using grpc::ClientAsyncResponseReader;
using etcdserverpb::PutResponse;
namespace etcdv3
{
class AsyncPutResponse : public etcdv3::V3Response
{
public:
AsyncPutResponse(){};
void ParseResponse(PutResponse& resp);
};
}
#endif

View File

@ -7,6 +7,7 @@ namespace etcdv3
extern char const * UPDATE_ACTION; extern char const * UPDATE_ACTION;
extern char const * SET_ACTION; extern char const * SET_ACTION;
extern char const * GET_ACTION; extern char const * GET_ACTION;
extern char const * PUT_ACTION;
extern char const * DELETE_ACTION; extern char const * DELETE_ACTION;
extern char const * COMPARESWAP_ACTION; extern char const * COMPARESWAP_ACTION;
extern char const * COMPAREDELETE_ACTION; extern char const * COMPAREDELETE_ACTION;

View File

@ -32,7 +32,8 @@
#include "etcd/v3/Action.hpp" #include "etcd/v3/Action.hpp"
#include "etcd/v3/AsyncRangeResponse.hpp" #include "etcd/v3/AsyncRangeResponse.hpp"
#include "etcd/v3/AsyncWatchResponse.hpp" #include "etcd/v3/AsyncWatchResponse.hpp"
#include "etcd/v3/AsyncDeleteRangeResponse.hpp" #include "etcd/v3/AsyncDeleteResponse.hpp"
#include "etcd/v3/AsyncPutResponse.hpp"
#include "etcd/v3/AsyncLockResponse.hpp" #include "etcd/v3/AsyncLockResponse.hpp"
#include "etcd/v3/AsyncElectionResponse.hpp" #include "etcd/v3/AsyncElectionResponse.hpp"
#include "etcd/v3/AsyncTxnResponse.hpp" #include "etcd/v3/AsyncTxnResponse.hpp"
@ -45,6 +46,7 @@
#include "etcd/v3/AsyncHeadAction.hpp" #include "etcd/v3/AsyncHeadAction.hpp"
#include "etcd/v3/AsyncRangeAction.hpp" #include "etcd/v3/AsyncRangeAction.hpp"
#include "etcd/v3/AsyncDeleteAction.hpp" #include "etcd/v3/AsyncDeleteAction.hpp"
#include "etcd/v3/AsyncPutAction.hpp"
#include "etcd/v3/AsyncWatchAction.hpp" #include "etcd/v3/AsyncWatchAction.hpp"
#include "etcd/v3/AsyncLeaseAction.hpp" #include "etcd/v3/AsyncLeaseAction.hpp"
#include "etcd/v3/AsyncLockAction.hpp" #include "etcd/v3/AsyncLockAction.hpp"
@ -198,6 +200,11 @@ etcd::Client::Client(std::string const & address,
stubs->electionServiceStub = Election::NewStub(this->channel); stubs->electionServiceStub = Election::NewStub(this->channel);
} }
etcd::Client *etcd::Client::WithUrl(std::string const & etcd_url,
std::string const & load_balancer) {
return new etcd::Client(etcd_url, load_balancer);
}
etcd::Client::Client(std::string const & address, etcd::Client::Client(std::string const & address,
std::string const & username, std::string const & username,
std::string const & password, std::string const & password,
@ -373,6 +380,14 @@ pplx::task<etcd::Response> etcd::Client::add(std::string const & key, std::strin
return Response::create(call); return Response::create(call);
} }
pplx::task<etcd::Response> etcd::Client::put(std::string const & key, std::string const & value) {
etcdv3::ActionParameters params;
params.key.assign(key);
params.value.assign(value);
params.kv_stub = stubs->kvServiceStub.get();
std::shared_ptr<etcdv3::AsyncPutAction> call(new etcdv3::AsyncPutAction(params));
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::modify(std::string const & key, std::string const & value, int ttl) pplx::task<etcd::Response> etcd::Client::modify(std::string const & key, std::string const & value, int ttl)
{ {

View File

@ -53,6 +53,11 @@ etcd::Response etcd::SyncClient::add(std::string const & key, std::string const
CHECK_EXCEPTIONS(client.add(key, value, leaseId).get()); CHECK_EXCEPTIONS(client.add(key, value, leaseId).get());
} }
etcd::Response etcd::SyncClient::put(std::string const & key, std::string const & value)
{
CHECK_EXCEPTIONS(client.put(key, value).get());
}
etcd::Response etcd::SyncClient::modify(std::string const & key, std::string const & value, int ttl) etcd::Response etcd::SyncClient::modify(std::string const & key, std::string const & value, int ttl)
{ {
CHECK_EXCEPTIONS(client.modify(key, value, ttl).get()); CHECK_EXCEPTIONS(client.modify(key, value, ttl).get());

View File

@ -26,9 +26,9 @@ etcdv3::AsyncDeleteAction::AsyncDeleteAction(
response_reader->Finish(&reply, &status, (void*)this); response_reader->Finish(&reply, &status, (void*)this);
} }
etcdv3::AsyncDeleteRangeResponse etcdv3::AsyncDeleteAction::ParseResponse() etcdv3::AsyncDeleteResponse etcdv3::AsyncDeleteAction::ParseResponse()
{ {
AsyncDeleteRangeResponse del_resp; AsyncDeleteResponse del_resp;
del_resp.set_action(etcdv3::DELETE_ACTION); del_resp.set_action(etcdv3::DELETE_ACTION);
if(!status.ok()) if(!status.ok())

View File

@ -1,8 +1,8 @@
#include "etcd/v3/AsyncDeleteRangeResponse.hpp" #include "etcd/v3/AsyncDeleteResponse.hpp"
#include "etcd/v3/action_constants.hpp" #include "etcd/v3/action_constants.hpp"
void etcdv3::AsyncDeleteRangeResponse::ParseResponse(std::string const& key, bool prefix, DeleteRangeResponse& resp) void etcdv3::AsyncDeleteResponse::ParseResponse(std::string const& key, bool prefix, DeleteRangeResponse& resp)
{ {
index = resp.header().revision(); index = resp.header().revision();

36
src/v3/AsyncPutAction.cpp Normal file
View File

@ -0,0 +1,36 @@
#include "etcd/v3/AsyncPutAction.hpp"
#include "etcd/v3/action_constants.hpp"
using etcdserverpb::PutRequest;
etcdv3::AsyncPutAction::AsyncPutAction(
ActionParameters const &param)
: etcdv3::Action(param)
{
PutRequest put_request;
put_request.set_key(parameters.key);
put_request.set_value(parameters.value);
put_request.set_lease(parameters.lease_id);
put_request.set_prev_kv(true);
response_reader = parameters.kv_stub->AsyncPut(&context, put_request, &cq_);
response_reader->Finish(&reply, &status, (void*)this);
}
etcdv3::AsyncPutResponse etcdv3::AsyncPutAction::ParseResponse()
{
AsyncPutResponse put_resp;
put_resp.set_action(etcdv3::PUT_ACTION);
if(!status.ok())
{
put_resp.set_error_code(status.error_code());
put_resp.set_error_message(status.error_message());
}
else
{
put_resp.ParseResponse(reply);
}
return put_resp;
}

View File

@ -0,0 +1,13 @@
#include "etcd/v3/AsyncPutResponse.hpp"
#include "etcd/v3/action_constants.hpp"
void etcdv3::AsyncPutResponse::ParseResponse(PutResponse& resp)
{
index = resp.header().revision();
//get all previous values
etcdv3::KeyValue kv;
kv.kvs.CopyFrom(resp.prev_kv());
prev_value = kv;
}

View File

@ -1,6 +1,6 @@
#include "etcd/v3/AsyncTxnResponse.hpp" #include "etcd/v3/AsyncTxnResponse.hpp"
#include "etcd/v3/AsyncRangeResponse.hpp" #include "etcd/v3/AsyncRangeResponse.hpp"
#include "etcd/v3/AsyncDeleteRangeResponse.hpp" #include "etcd/v3/AsyncDeleteResponse.hpp"
#include "etcd/v3/action_constants.hpp" #include "etcd/v3/action_constants.hpp"
using etcdserverpb::ResponseOp; using etcdserverpb::ResponseOp;
@ -36,7 +36,7 @@ void etcdv3::AsyncTxnResponse::ParseResponse(std::string const& key, bool prefix
} }
else if(ResponseOp::ResponseCase::kResponseDeleteRange == resp.response_case()) else if(ResponseOp::ResponseCase::kResponseDeleteRange == resp.response_case())
{ {
AsyncDeleteRangeResponse response; AsyncDeleteResponse response;
response.ParseResponse(key,prefix,*(resp.mutable_response_delete_range())); response.ParseResponse(key,prefix,*(resp.mutable_response_delete_range()));
prev_value.kvs.CopyFrom(response.get_prev_value().kvs); prev_value.kvs.CopyFrom(response.get_prev_value().kvs);

View File

@ -5,6 +5,7 @@ char const * etcdv3::COMPARESWAP_ACTION = "compareAndSwap";
char const * etcdv3::UPDATE_ACTION = "update"; char const * etcdv3::UPDATE_ACTION = "update";
char const * etcdv3::SET_ACTION = "set"; char const * etcdv3::SET_ACTION = "set";
char const * etcdv3::GET_ACTION = "get"; char const * etcdv3::GET_ACTION = "get";
char const * etcdv3::PUT_ACTION = "put";
char const * etcdv3::DELETE_ACTION = "delete"; char const * etcdv3::DELETE_ACTION = "delete";
char const * etcdv3::COMPAREDELETE_ACTION = "compareAndDelete"; char const * etcdv3::COMPAREDELETE_ACTION = "compareAndDelete";
char const * etcdv3::LOCK_ACTION = "lock"; char const * etcdv3::LOCK_ACTION = "lock";