Add a "head" method on the client the retrieve the latest revision.

Signed-off-by: Tao He <sighingnow@gmail.com>
This commit is contained in:
Tao He 2021-06-16 17:10:32 +08:00
parent 68c5626d2c
commit efcecb7731
11 changed files with 133 additions and 14 deletions

View File

@ -110,7 +110,12 @@ namespace etcd
std::string const & load_balancer = "round_robin"); std::string const & load_balancer = "round_robin");
/** /**
* Sends a get request to the etcd server * Get the HEAD revision of the connected etcd server.
*/
pplx::task<Response> head();
/**
* Get the value of specified key from the etcd server
* @param key is the key to be read * @param key is the key to be read
*/ */
pplx::task<Response> get(std::string const & key); pplx::task<Response> get(std::string const & key);

View File

@ -40,6 +40,7 @@ namespace etcd
std::string const & password, std::string const & password,
std::string const & load_balancer = "round_robin"); std::string const & load_balancer = "round_robin");
Response head();
Response get(std::string const & key); Response get(std::string const & key);
Response set(std::string const & key, std::string const & value, int ttl = 0); Response set(std::string const & key, std::string const & value, int ttl = 0);
Response set(std::string const & key, std::string const & value, int64_t leaseId); Response set(std::string const & key, std::string const & value, int64_t leaseId);

View File

@ -0,0 +1,26 @@
#ifndef __ASYNC_HEAD_HPP__
#define __ASYNC_HEAD_HPP__
#include <grpc++/grpc++.h>
#include "proto/rpc.grpc.pb.h"
#include "etcd/v3/Action.hpp"
#include "etcd/v3/AsyncHeadResponse.hpp"
using grpc::ClientAsyncResponseReader;
using etcdserverpb::RangeResponse;
namespace etcdv3
{
class AsyncHeadAction : public etcdv3::Action
{
public:
AsyncHeadAction(etcdv3::ActionParameters param);
AsyncHeadResponse ParseResponse();
private:
RangeResponse reply;
std::unique_ptr<ClientAsyncResponseReader<RangeResponse>> response_reader;
};
}
#endif

View File

@ -0,0 +1,22 @@
#ifndef __ASYNC_HEADRESPONSE_HPP__
#define __ASYNC_HEADRESPONSE_HPP__
#include <grpc++/grpc++.h>
#include "proto/rpc.grpc.pb.h"
#include "etcd/v3/V3Response.hpp"
using grpc::ClientAsyncResponseReader;
using etcdserverpb::RangeResponse;
namespace etcdv3
{
class AsyncHeadResponse : public etcdv3::V3Response
{
public:
AsyncHeadResponse(){};
void ParseResponse(RangeResponse& resp, bool prefix=false);
};
}
#endif

View File

@ -1,5 +1,5 @@
#ifndef __ASYNC_GET_HPP__ #ifndef __ASYNC_RANGE_HPP__
#define __ASYNC_GET_HPP__ #define __ASYNC_RANGE_HPP__
#include <grpc++/grpc++.h> #include <grpc++/grpc++.h>
#include "proto/rpc.grpc.pb.h" #include "proto/rpc.grpc.pb.h"
@ -12,10 +12,10 @@ using etcdserverpb::RangeResponse;
namespace etcdv3 namespace etcdv3
{ {
class AsyncGetAction : public etcdv3::Action class AsyncRangeAction : public etcdv3::Action
{ {
public: public:
AsyncGetAction(etcdv3::ActionParameters param); AsyncRangeAction(etcdv3::ActionParameters param);
AsyncRangeResponse ParseResponse(); AsyncRangeResponse ParseResponse();
private: private:
RangeResponse reply; RangeResponse reply;

View File

@ -40,7 +40,8 @@
#include "etcd/v3/AsyncCompareAndSwapAction.hpp" #include "etcd/v3/AsyncCompareAndSwapAction.hpp"
#include "etcd/v3/AsyncCompareAndDeleteAction.hpp" #include "etcd/v3/AsyncCompareAndDeleteAction.hpp"
#include "etcd/v3/AsyncUpdateAction.hpp" #include "etcd/v3/AsyncUpdateAction.hpp"
#include "etcd/v3/AsyncGetAction.hpp" #include "etcd/v3/AsyncHeadAction.hpp"
#include "etcd/v3/AsyncRangeAction.hpp"
#include "etcd/v3/AsyncDeleteAction.hpp" #include "etcd/v3/AsyncDeleteAction.hpp"
#include "etcd/v3/AsyncWatchAction.hpp" #include "etcd/v3/AsyncWatchAction.hpp"
#include "etcd/v3/AsyncLeaseAction.hpp" #include "etcd/v3/AsyncLeaseAction.hpp"
@ -262,6 +263,15 @@ etcd::Client *etcd::Client::WithSSL(std::string const & etcd_url,
return new etcd::Client(etcd_url, ca, cert, key, load_balancer); return new etcd::Client(etcd_url, ca, cert, key, load_balancer);
} }
pplx::task<etcd::Response> etcd::Client::head()
{
etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.kv_stub = stubs->kvServiceStub.get();
std::shared_ptr<etcdv3::AsyncHeadAction> call(new etcdv3::AsyncHeadAction(params));
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::get(std::string const & key) pplx::task<etcd::Response> etcd::Client::get(std::string const & key)
{ {
etcdv3::ActionParameters params; etcdv3::ActionParameters params;
@ -269,7 +279,7 @@ pplx::task<etcd::Response> etcd::Client::get(std::string const & key)
params.key.assign(key); params.key.assign(key);
params.withPrefix = false; params.withPrefix = false;
params.kv_stub = stubs->kvServiceStub.get(); params.kv_stub = stubs->kvServiceStub.get();
std::shared_ptr<etcdv3::AsyncGetAction> call(new etcdv3::AsyncGetAction(params)); std::shared_ptr<etcdv3::AsyncRangeAction> call(new etcdv3::AsyncRangeAction(params));
return Response::create(call); return Response::create(call);
} }
@ -546,7 +556,7 @@ pplx::task<etcd::Response> etcd::Client::ls(std::string const & key)
params.withPrefix = true; params.withPrefix = true;
params.limit = 0; // default no limit. params.limit = 0; // default no limit.
params.kv_stub = stubs->kvServiceStub.get(); params.kv_stub = stubs->kvServiceStub.get();
std::shared_ptr<etcdv3::AsyncGetAction> call(new etcdv3::AsyncGetAction(params)); std::shared_ptr<etcdv3::AsyncRangeAction> call(new etcdv3::AsyncRangeAction(params));
return Response::create(call); return Response::create(call);
} }
@ -558,7 +568,7 @@ pplx::task<etcd::Response> etcd::Client::ls(std::string const & key, size_t cons
params.withPrefix = true; params.withPrefix = true;
params.limit = limit; params.limit = limit;
params.kv_stub = stubs->kvServiceStub.get(); params.kv_stub = stubs->kvServiceStub.get();
std::shared_ptr<etcdv3::AsyncGetAction> call(new etcdv3::AsyncGetAction(params)); std::shared_ptr<etcdv3::AsyncRangeAction> call(new etcdv3::AsyncRangeAction(params));
return Response::create(call); return Response::create(call);
} }
@ -571,7 +581,7 @@ pplx::task<etcd::Response> etcd::Client::ls(std::string const & key, std::string
params.withPrefix = false; params.withPrefix = false;
params.limit = 0; // default no limit. params.limit = 0; // default no limit.
params.kv_stub = stubs->kvServiceStub.get(); params.kv_stub = stubs->kvServiceStub.get();
std::shared_ptr<etcdv3::AsyncGetAction> call(new etcdv3::AsyncGetAction(params)); std::shared_ptr<etcdv3::AsyncRangeAction> call(new etcdv3::AsyncRangeAction(params));
return Response::create(call); return Response::create(call);
} }
@ -584,7 +594,7 @@ pplx::task<etcd::Response> etcd::Client::ls(std::string const & key, std::string
params.withPrefix = false; params.withPrefix = false;
params.limit = limit; params.limit = limit;
params.kv_stub = stubs->kvServiceStub.get(); params.kv_stub = stubs->kvServiceStub.get();
std::shared_ptr<etcdv3::AsyncGetAction> call(new etcdv3::AsyncGetAction(params)); std::shared_ptr<etcdv3::AsyncRangeAction> call(new etcdv3::AsyncRangeAction(params));
return Response::create(call); return Response::create(call);
} }

View File

@ -23,6 +23,11 @@ etcd::SyncClient::SyncClient(std::string const & address,
{ {
} }
etcd::Response etcd::SyncClient::head()
{
CHECK_EXCEPTIONS(client.head().get());
}
etcd::Response etcd::SyncClient::get(std::string const & key) etcd::Response etcd::SyncClient::get(std::string const & key)
{ {
CHECK_EXCEPTIONS(client.get(key).get()); CHECK_EXCEPTIONS(client.get(key).get());

View File

@ -0,0 +1,32 @@
#include "etcd/v3/AsyncHeadAction.hpp"
#include <cstdlib>
#include "etcd/v3/action_constants.hpp"
using etcdserverpb::RangeRequest;
etcdv3::AsyncHeadAction::AsyncHeadAction(etcdv3::ActionParameters param)
: etcdv3::Action(param)
{
RangeRequest get_request;
get_request.set_key(etcdv3::NUL);
get_request.set_limit(1);
response_reader = parameters.kv_stub->AsyncRange(&context,get_request,&cq_);
response_reader->Finish(&reply, &status, (void*)this);
}
etcdv3::AsyncHeadResponse etcdv3::AsyncHeadAction::ParseResponse()
{
AsyncHeadResponse range_resp;
if(!status.ok())
{
range_resp.set_error_code(status.error_code());
range_resp.set_error_message(status.error_message());
}
else
{
range_resp.ParseResponse(reply, parameters.withPrefix || !parameters.range_end.empty());
}
return range_resp;
}

View File

@ -0,0 +1,9 @@
#include "etcd/v3/AsyncHeadResponse.hpp"
#include "etcd/v3/action_constants.hpp"
void etcdv3::AsyncHeadResponse::ParseResponse(RangeResponse& resp, bool prefix)
{
action = etcdv3::GET_ACTION;
index = resp.header().revision();
}

View File

@ -1,4 +1,4 @@
#include "etcd/v3/AsyncGetAction.hpp" #include "etcd/v3/AsyncRangeAction.hpp"
#include <cstdlib> #include <cstdlib>
@ -6,7 +6,7 @@
using etcdserverpb::RangeRequest; using etcdserverpb::RangeRequest;
etcdv3::AsyncGetAction::AsyncGetAction(etcdv3::ActionParameters param) etcdv3::AsyncRangeAction::AsyncRangeAction(etcdv3::ActionParameters param)
: etcdv3::Action(param) : etcdv3::Action(param)
{ {
RangeRequest get_request; RangeRequest get_request;
@ -33,7 +33,7 @@ etcdv3::AsyncGetAction::AsyncGetAction(etcdv3::ActionParameters param)
response_reader->Finish(&reply, &status, (void*)this); response_reader->Finish(&reply, &status, (void*)this);
} }
etcdv3::AsyncRangeResponse etcdv3::AsyncGetAction::ParseResponse() etcdv3::AsyncRangeResponse etcdv3::AsyncRangeAction::ParseResponse()
{ {
AsyncRangeResponse range_resp; AsyncRangeResponse range_resp;
if(!status.ok()) if(!status.ok())

View File

@ -123,6 +123,9 @@ TEST_CASE("delete a value")
<< ", modify index = " << modify_index << ", modify index = " << modify_index
<< std::endl; << std::endl;
int head_index = etcd.head().get().index();
CHECK(index == head_index);
resp = etcd.rm("/test/key1").get(); resp = etcd.rm("/test/key1").get();
CHECK("43" == resp.prev_value().as_string()); CHECK("43" == resp.prev_value().as_string());
CHECK( "/test/key1" == resp.prev_value().key()); CHECK( "/test/key1" == resp.prev_value().key());
@ -378,6 +381,9 @@ TEST_CASE("watch changes in the past")
etcd.set("/test/key1", "44").wait(); etcd.set("/test/key1", "44").wait();
etcd.set("/test/key1", "45").wait(); etcd.set("/test/key1", "45").wait();
int head_index = etcd.head().get().index();
CHECK(index + 3 == head_index);
etcd::Response res = etcd.watch("/test/key1", ++index).get(); etcd::Response res = etcd.watch("/test/key1", ++index).get();
CHECK("set" == res.action()); CHECK("set" == res.action());
CHECK("43" == res.value().as_string()); CHECK("43" == res.value().as_string());
@ -403,6 +409,9 @@ TEST_CASE("watch range changes in the past")
etcd.set("/test/key3", "45").wait(); etcd.set("/test/key3", "45").wait();
etcd.set("/test/key4", "45").wait(); etcd.set("/test/key4", "45").wait();
int head_index = etcd.head().get().index();
CHECK(index + 4 == head_index);
etcd::Response res; etcd::Response res;
res = etcd.watch("/test/key1", "/test/key4", index).get(); res = etcd.watch("/test/key1", "/test/key4", index).get();