Get and list keys with specified revision.

Signed-off-by: Tao He <sighingnow@gmail.com>
This commit is contained in:
Tao He 2023-04-26 20:47:46 +08:00
parent a288eb5db4
commit 4ad9bbae0a
7 changed files with 200 additions and 12 deletions

View File

@ -259,6 +259,13 @@ namespace etcd
*/
pplx::task<Response> get(std::string const & key);
/**
* Get the value of specified key of specified revision from the etcd server
* @param key is the key to be read
* @param revision is the revision of the key to be read
*/
pplx::task<Response> get(std::string const & key, int64_t revision);
/**
* Sets the value of a key. The key will be modified if already exists or created
* if it does not exists.
@ -410,10 +417,20 @@ namespace etcd
*
* @param key is the key to be listed
* @param limit is the size limit of results to be listed, we don't use default parameters
* to ensure backwards binary compatibility.
* to ensure backwards binary compatibility. 0 means no limit.
*/
pplx::task<Response> ls(std::string const & key, size_t const limit);
/**
* Gets a directory listing of the directory prefixed by the key with given revision.
*
* @param key is the key to be listed
* @param limit is the size limit of results to be listed, we don't use default parameters
* to ensure backwards binary compatibility. 0 means no limit.
* @param revision is the revision to be listed
*/
pplx::task<Response> ls(std::string const & key, size_t const limit, int64_t revision);
/**
* Gets a directory listing of the directory identified by the key and range_end, i.e., get
* all keys in the range [key, range_end).
@ -434,6 +451,18 @@ namespace etcd
*/
pplx::task<Response> ls(std::string const & key, std::string const &range_end, size_t const limit);
/**
* Gets a directory listing of the directory identified by the key and range_end, i.e., get
* all keys in the range [key, range_end), and respects the given limit and revision.
*
* @param key is the key to be listed
* @param range_end is the end of key range to be listed
* @param limit is the size limit of results to be listed, we don't use default parameters
* to ensure backwards binary compatibility.
* @param revision is the revision to be listed
*/
pplx::task<Response> ls(std::string const & key, std::string const &range_end, size_t const limit, int64_t revision);
/**
* Gets a directory listing of the directory prefixed by the key.
*
@ -454,6 +483,18 @@ namespace etcd
*/
pplx::task<Response> keys(std::string const & key, size_t const limit);
/**
* Gets a directory listing of the directory prefixed by the key.
*
* Note that only keys are included in the response.
*
* @param key is the key to be listed
* @param limit is the size limit of results to be listed, we don't use default parameters
* to ensure backwards binary compatibility.
* @param revision is the revision to be listed
*/
pplx::task<Response> keys(std::string const & key, size_t const limit, int64_t revision);
/**
* List keys identified by the key and range_end, i.e., get all keys in the range [key,
* range_end), and respects the given limit.
@ -478,6 +519,20 @@ namespace etcd
*/
pplx::task<Response> keys(std::string const & key, std::string const &range_end, size_t const limit);
/**
* List keys identified by the key and range_end, i.e., get all keys in the range [key,
* range_end).
*
* Note that only keys are included in the response.
*
* @param key is the key to be listed
* @param range_end is the end of key range to be listed
* @param limit is the size limit of results to be listed, we don't use default parameters
* to ensure backwards binary compatibility.
* @param revision is the revision to be listed
*/
pplx::task<Response> keys(std::string const & key, std::string const &range_end, size_t const limit, int64_t revision);
/**
* Watches for changes of a key or a subtree. Please note that if you watch e.g. "/testdir" and
* a new key is created, like "/testdir/newkey" then no change happened in the value of

View File

@ -316,6 +316,13 @@ namespace etcd
*/
Response get(std::string const & key);
/**
* Get the value of specified key of specified revision from the etcd server
* @param key is the key to be read
* @param revision is the revision of the key to be read
*/
Response get(std::string const & key, int64_t revision);
/**
* Sets the value of a key. The key will be modified if already exists or created
* if it does not exists.
@ -466,10 +473,20 @@ namespace etcd
*
* @param key is the key to be listed
* @param limit is the size limit of results to be listed, we don't use default parameters
* to ensure backwards binary compatibility.
* to ensure backwards binary compatibility. 0 means no limit.
*/
Response ls(std::string const & key, size_t const limit);
/**
* Gets a directory listing of the directory prefixed by the key with given revision.
*
* @param key is the key to be listed
* @param limit is the size limit of results to be listed, we don't use default parameters
* to ensure backwards binary compatibility. 0 means no limit.
* @param revision is the revision to be listed
*/
Response ls(std::string const & key, size_t const limit, int64_t revision);
/**
* Gets a directory listing of the directory identified by the key and range_end, i.e., get
* all keys in the range [key, range_end).
@ -486,10 +503,22 @@ namespace etcd
* @param key is the key to be listed
* @param range_end is the end of key range to be listed
* @param limit is the size limit of results to be listed, we don't use default parameters
* to ensure backwards binary compatibility.
* to ensure backwards binary compatibility. 0 means no limit.
*/
Response ls(std::string const & key, std::string const &range_end, size_t const limit);
/**
* Gets a directory listing of the directory identified by the key and range_end, i.e., get
* all keys in the range [key, range_end), and respects the given limit and revision.
*
* @param key is the key to be listed
* @param range_end is the end of key range to be listed
* @param limit is the size limit of results to be listed, we don't use default parameters
* to ensure backwards binary compatibility. 0 means no limit.
* @param revision is the revision to be listed
*/
Response ls(std::string const & key, std::string const &range_end, size_t const limit, int64_t revision);
/**
* Gets a directory listing of the directory prefixed by the key.
*
@ -510,6 +539,18 @@ namespace etcd
*/
Response keys(std::string const & key, size_t const limit);
/**
* Gets a directory listing of the directory prefixed by the key with specified revision.
*
* Note that only keys are included in the response.
*
* @param key is the key to be listed
* @param limit is the size limit of results to be listed, we don't use default parameters
* to ensure backwards binary compatibility.
* @param revision is the revision to be listed
*/
Response keys(std::string const & key, size_t const limit, int64_t revision);
/**
* List keys identified by the key and range_end, i.e., get all keys in the range [key,
* range_end).
@ -534,6 +575,20 @@ namespace etcd
*/
Response keys(std::string const & key, std::string const &range_end, size_t const limit);
/**
* List keys identified by the key and range_end, i.e., get all keys in the range [key,
* range_end), and respects the given limit and revision.
*
* Note that only keys are included in the response.
*
* @param key is the key to be listed
* @param range_end is the end of key range to be listed
* @param limit is the size limit of results to be listed, we don't use default parameters
* to ensure backwards binary compatibility.
* @param revision is the revision to be listed
*/
Response keys(std::string const & key, std::string const &range_end, size_t const limit, int64_t revision);
/**
* Watches for changes of a key or a subtree. Please note that if you watch e.g. "/testdir" and
* a new key is created, like "/testdir/newkey" then no change happened in the value of
@ -721,7 +776,7 @@ namespace etcd
private:
// TODO: use std::unique_ptr<>
std::shared_ptr<etcdv3::AsyncHeadAction> head_internal();
std::shared_ptr<etcdv3::AsyncRangeAction> get_internal(std::string const & key);
std::shared_ptr<etcdv3::AsyncRangeAction> get_internal(std::string const & key, int64_t revision=0);
std::shared_ptr<etcdv3::AsyncSetAction> set_internal(std::string const & key, std::string const & value, int64_t leaseId);
std::shared_ptr<etcdv3::AsyncSetAction> add_internal(std::string const & key, std::string const & value, int64_t leaseId);
std::shared_ptr<etcdv3::AsyncPutAction> put_internal(std::string const & key, std::string const & value);
@ -731,8 +786,8 @@ namespace etcd
std::shared_ptr<etcdv3::AsyncCompareAndDeleteAction> rm_if_internal(std::string const & key, int64_t old_index, const std::string & old_value, etcdv3::AtomicityType const & atomicity_type);
std::shared_ptr<etcdv3::AsyncDeleteAction> rmdir_internal(std::string const & key, bool recursive = false);
std::shared_ptr<etcdv3::AsyncDeleteAction> rmdir_internal(std::string const & key, std::string const &range_end);
std::shared_ptr<etcdv3::AsyncRangeAction> ls_internal(std::string const & key, size_t const limit, bool const keys_only = false);
std::shared_ptr<etcdv3::AsyncRangeAction> ls_internal(std::string const & key, std::string const &range_end, size_t const limit, bool const keys_only = false);
std::shared_ptr<etcdv3::AsyncRangeAction> ls_internal(std::string const & key, size_t const limit, bool const keys_only = false, int64_t revision=0);
std::shared_ptr<etcdv3::AsyncRangeAction> ls_internal(std::string const & key, std::string const &range_end, size_t const limit, bool const keys_only = false, int64_t revision=0);
std::shared_ptr<etcdv3::AsyncWatchAction> watch_internal(std::string const & key, int64_t fromIndex, bool recursive = false);
std::shared_ptr<etcdv3::AsyncWatchAction> watch_internal(std::string const & key, std::string const &range_end, int64_t fromIndex);
std::shared_ptr<etcdv3::AsyncLeaseRevokeAction> leaserevoke_internal(int64_t lease_id);

View File

@ -35,8 +35,8 @@ namespace etcdv3
{
ActionParameters();
bool withPrefix;
int64_t revision;
int64_t old_revision;
int64_t revision = 0;
int64_t old_revision = 0;
int64_t lease_id = 0; // no lease
int ttl;
int limit;

View File

@ -244,6 +244,13 @@ pplx::task<etcd::Response> etcd::Client::get(std::string const & key)
this->client->get_internal(key));
}
pplx::task<etcd::Response> etcd::Client::get(std::string const & key, int64_t revision)
{
return etcd::detail::asyncify(
static_cast<etcd::responser_t<etcdv3::AsyncRangeAction>>(Response::create),
this->client->get_internal(key, revision));
}
pplx::task<etcd::Response> etcd::Client::set(std::string const & key, std::string const & value, int ttl)
{
if (ttl > 0) {
@ -439,6 +446,13 @@ pplx::task<etcd::Response> etcd::Client::ls(std::string const & key, size_t cons
this->client->ls_internal(key, limit));
}
pplx::task<etcd::Response> etcd::Client::ls(std::string const & key, size_t const limit, int64_t revision)
{
return etcd::detail::asyncify(
static_cast<responser_t<etcdv3::AsyncRangeAction>>(Response::create),
this->client->ls_internal(key, limit, false, revision));
}
pplx::task<etcd::Response> etcd::Client::ls(std::string const & key, std::string const &range_end)
{
return etcd::detail::asyncify(
@ -453,6 +467,13 @@ pplx::task<etcd::Response> etcd::Client::ls(std::string const & key, std::string
this->client->ls_internal(key, range_end, limit));
}
pplx::task<etcd::Response> etcd::Client::ls(std::string const & key, std::string const &range_end, size_t const limit, int64_t revision)
{
return etcd::detail::asyncify(
static_cast<responser_t<etcdv3::AsyncRangeAction>>(Response::create),
this->client->ls_internal(key, range_end, limit, false, revision));
}
pplx::task<etcd::Response> etcd::Client::keys(std::string const & key)
{
return etcd::detail::asyncify(
@ -467,6 +488,13 @@ pplx::task<etcd::Response> etcd::Client::keys(std::string const & key, size_t co
this->client->ls_internal(key, limit, true));
}
pplx::task<etcd::Response> etcd::Client::keys(std::string const & key, size_t const limit, int64_t revision)
{
return etcd::detail::asyncify(
static_cast<responser_t<etcdv3::AsyncRangeAction>>(Response::create),
this->client->ls_internal(key, limit, true, revision));
}
pplx::task<etcd::Response> etcd::Client::keys(std::string const & key, std::string const &range_end)
{
return etcd::detail::asyncify(
@ -481,6 +509,13 @@ pplx::task<etcd::Response> etcd::Client::keys(std::string const & key, std::stri
this->client->ls_internal(key, range_end, limit, true));
}
pplx::task<etcd::Response> etcd::Client::keys(std::string const & key, std::string const &range_end, size_t const limit, int64_t revision)
{
return etcd::detail::asyncify(
static_cast<responser_t<etcdv3::AsyncRangeAction>>(Response::create),
this->client->ls_internal(key, range_end, limit, true, revision));
}
pplx::task<etcd::Response> etcd::Client::watch(std::string const & key, bool recursive)
{
return etcd::detail::asyncify(

View File

@ -482,10 +482,15 @@ etcd::Response etcd::SyncClient::get(std::string const &key) {
return Response::create(this->get_internal(key));
}
std::shared_ptr<etcdv3::AsyncRangeAction> etcd::SyncClient::get_internal(std::string const & key)
etcd::Response etcd::SyncClient::get(std::string const &key, int64_t revision) {
return Response::create(this->get_internal(key, revision));
}
std::shared_ptr<etcdv3::AsyncRangeAction> etcd::SyncClient::get_internal(std::string const & key, int64_t revision)
{
etcdv3::ActionParameters params;
params.key.assign(key);
params.revision = revision;
params.withPrefix = false;
params.auth_token.assign(this->token_authenticator->renew_if_expired());
params.grpc_timeout = this->grpc_timeout;
@ -756,6 +761,11 @@ etcd::Response etcd::SyncClient::ls(std::string const & key, size_t const limit)
return Response::create(this->ls_internal(key, limit));
}
etcd::Response etcd::SyncClient::ls(std::string const & key, size_t const limit, int64_t revision)
{
return Response::create(this->ls_internal(key, limit, false, revision));
}
etcd::Response etcd::SyncClient::ls(std::string const & key, std::string const &range_end)
{
return Response::create(this->ls_internal(key, range_end, 0 /* default: no limit */));
@ -766,6 +776,11 @@ etcd::Response etcd::SyncClient::ls(std::string const & key, std::string const &
return Response::create(this->ls_internal(key, range_end, limit));
}
etcd::Response etcd::SyncClient::ls(std::string const & key, std::string const &range_end, size_t const limit, int64_t revision)
{
return Response::create(this->ls_internal(key, range_end, limit, false, revision));
}
etcd::Response etcd::SyncClient::keys(std::string const & key)
{
return Response::create(this->ls_internal(key, 0 /* default: no limit */, true));
@ -776,6 +791,11 @@ etcd::Response etcd::SyncClient::keys(std::string const & key, size_t const limi
return Response::create(this->ls_internal(key, limit, true));
}
etcd::Response etcd::SyncClient::keys(std::string const & key, size_t const limit, int64_t revision)
{
return Response::create(this->ls_internal(key, limit, true, revision));
}
etcd::Response etcd::SyncClient::keys(std::string const & key, std::string const &range_end)
{
return Response::create(this->ls_internal(key, range_end, 0 /* default: no limit */, true));
@ -786,25 +806,32 @@ etcd::Response etcd::SyncClient::keys(std::string const & key, std::string const
return Response::create(this->ls_internal(key, range_end, limit, true));
}
std::shared_ptr<etcdv3::AsyncRangeAction> etcd::SyncClient::ls_internal(std::string const & key, size_t const limit, bool const keys_only) {
etcd::Response etcd::SyncClient::keys(std::string const & key, std::string const &range_end, size_t const limit, int64_t revision)
{
return Response::create(this->ls_internal(key, range_end, limit, true, revision));
}
std::shared_ptr<etcdv3::AsyncRangeAction> etcd::SyncClient::ls_internal(std::string const & key, size_t const limit, bool const keys_only, int64_t revision) {
etcdv3::ActionParameters params;
params.key.assign(key);
params.keys_only = keys_only;
params.withPrefix = true;
params.limit = limit;
params.revision = revision;
params.auth_token.assign(this->token_authenticator->renew_if_expired());
params.grpc_timeout = this->grpc_timeout;
params.kv_stub = stubs->kvServiceStub.get();
return std::make_shared<etcdv3::AsyncRangeAction>(std::move(params));
}
std::shared_ptr<etcdv3::AsyncRangeAction> etcd::SyncClient::ls_internal(std::string const & key, std::string const &range_end, size_t const limit, bool const keys_only) {
std::shared_ptr<etcdv3::AsyncRangeAction> etcd::SyncClient::ls_internal(std::string const & key, std::string const &range_end, size_t const limit, bool const keys_only, int64_t revision) {
etcdv3::ActionParameters params;
params.key.assign(key);
params.range_end.assign(range_end);
params.keys_only = keys_only;
params.withPrefix = false;
params.limit = limit;
params.revision = revision;
params.auth_token.assign(this->token_authenticator->renew_if_expired());
params.grpc_timeout = this->grpc_timeout;
params.kv_stub = stubs->kvServiceStub.get();

View File

@ -26,6 +26,9 @@ etcdv3::AsyncRangeAction::AsyncRangeAction(
if(!parameters.range_end.empty()) {
get_request.set_range_end(parameters.range_end);
}
if(parameters.revision > 0) {
get_request.set_revision(parameters.revision);
}
get_request.set_limit(parameters.limit);
get_request.set_sort_order(RangeRequest::SortOrder::RangeRequest_SortOrder_NONE);

View File

@ -57,11 +57,24 @@ TEST_CASE("simplified read")
TEST_CASE("modify a key")
{
etcd::Client etcd(etcd_url);
etcd::Response resp = etcd.modify("/test/key1", "43").get();
// get
etcd::Response resp = etcd.get("/test/key1").get();
REQUIRE(resp.is_ok());
int64_t revision = resp.value().modified_index();
CHECK("42" == resp.value().as_string());
// modify
resp = etcd.modify("/test/key1", "43").get();
REQUIRE(0 == resp.error_code()); // overwrite
CHECK("update" == resp.action());
CHECK(etcd::ERROR_KEY_NOT_FOUND == etcd.modify("/test/key2", "43").get().error_code()); // Key not found
CHECK("43" == etcd.modify("/test/key1", "42").get().prev_value().as_string());
// check previous
resp = etcd.get("/test/key1", revision).get();
REQUIRE(resp.is_ok());
CHECK("42" == resp.value().as_string());
}
TEST_CASE("set a key")