Refactor the implementation of etcd transactions.

Signed-off-by: Tao He <sighingnow@gmail.com>
This commit is contained in:
Tao He 2023-07-10 08:50:35 +08:00
parent 204038c4bc
commit 8364dd24d0
12 changed files with 728 additions and 656 deletions

View File

@ -495,6 +495,7 @@ some specific conditions.
Values can be deleted with the `rm` method passing the key to be deleted as a parameter. The key Values can be deleted with the `rm` method passing the key to be deleted as a parameter. The key
should point to an existing value. There are conditional variations for deletion too. should point to an existing value. There are conditional variations for deletion too.
* `rm(std::string const& key)` unconditionally deletes the given key
* `rm_if(key, value, old_value)` deletes an already existing value but only if the previous * `rm_if(key, value, old_value)` deletes an already existing value but only if the previous
value equals with old_value. If the values does not match returns with "Compare failed" error value equals with old_value. If the values does not match returns with "Compare failed" error
(code `ERROR_COMPARE_FAILED`) (code `ERROR_COMPARE_FAILED`)
@ -852,11 +853,32 @@ Transactions in etcd supports set a set of comparison targets to specify the con
etcdv3::Transaction txn; etcdv3::Transaction txn;
// setup the conditions // setup the conditions
txn.reset_key("/test/x1"); txn.add_compare_value("/test/x1", "1");
txn.init_compare("1", etcdv3::CompareResult::EQUAL, etcdv3::CompareTarget::VALUE); txn.add_compare_value("/test/x2", "2");
txn.reset_key("/test/x2"); // or, compare the last modified revision
txn.init_compare("2", etcdv3::CompareResult::EQUAL, etcdv3::CompareTarget::VALUE); txn.add_compare_mod("/test/x3", 0); // not exists
txn.add_compare_mod("/test/x4", etcdv3::CompareResult::GREATER, 1234); // the modified revision is greater than 1234
```
High-level APIs (e.g., `compare_and_create`, `compare_and_swap`) are also provided, e.g.,
`fetch-and-add` operation can be implemented as
```cpp
auto fetch_and_add = [](etcd::Client& client,
std::string const& key) -> void {
auto value = stoi(client.get(key).get().value().as_string());
while (true) {
auto txn = etcdv3::Transaction();
txn.setup_compare_and_swap(key, std::to_string(value),
std::to_string(value + 1));
etcd::Response resp = client.txn(txn).get();
if (resp.is_ok()) {
break;
}
value = stoi(resp.value().as_string());
}
};
``` ```
See full example of the usages of transaction APIs, please refer to [./tst/TransactionTest.cpp](./tst/TransactionTest.cpp), See full example of the usages of transaction APIs, please refer to [./tst/TransactionTest.cpp](./tst/TransactionTest.cpp),

View File

@ -271,15 +271,6 @@ class Client {
*/ */
pplx::task<Response> get(std::string const& key, int64_t revision); pplx::task<Response> get(std::string const& key, int64_t revision);
/**
* Sets the value of a key. The key will be modified if already exists or
* created if it does not exists.
* @param key is the key to be created or modified
* @param value is the new value to be set
*/
pplx::task<Response> set(std::string const& key, std::string const& value,
int ttl = 0);
/** /**
* Sets the value of a key. The key will be modified if already exists or * Sets the value of a key. The key will be modified if already exists or
* created if it does not exists. * created if it does not exists.
@ -288,15 +279,7 @@ class Client {
* @param leaseId is the lease attached to the key * @param leaseId is the lease attached to the key
*/ */
pplx::task<Response> set(std::string const& key, std::string const& value, pplx::task<Response> set(std::string const& key, std::string const& value,
int64_t leaseId); const int64_t leaseId = 0);
/**
* Creates a new key and sets it's value. Fails if the key already exists.
* @param key is the key to be created
* @param value is the value to be set
*/
pplx::task<Response> add(std::string const& key, std::string const& value,
int ttl = 0);
/** /**
* Creates a new key and sets it's value. Fails if the key already exists. * Creates a new key and sets it's value. Fails if the key already exists.
@ -305,7 +288,7 @@ class Client {
* @param leaseId is the lease attached to the key * @param leaseId is the lease attached to the key
*/ */
pplx::task<Response> add(std::string const& key, std::string const& value, pplx::task<Response> add(std::string const& key, std::string const& value,
int64_t leaseId); const int64_t leaseId = 0);
/** /**
* Put a new key-value pair. * Put a new key-value pair.
@ -315,12 +298,13 @@ class Client {
pplx::task<Response> put(std::string const& key, std::string const& value); pplx::task<Response> put(std::string const& key, std::string const& value);
/** /**
* Modifies an existing key. Fails if the key does not exists. * Put a new key-value pair.
* @param key is the key to be modified * @param key is the key to be put
* @param value is the new value to be set * @param value is the value to be put
* @param leaseId is the lease id to be associated with the key
*/ */
pplx::task<Response> modify(std::string const& key, std::string const& value, pplx::task<Response> put(std::string const& key, std::string const& value,
int ttl = 0); const int64_t leaseId);
/** /**
* Modifies an existing key. Fails if the key does not exists. * Modifies an existing key. Fails if the key does not exists.
@ -329,18 +313,7 @@ class Client {
* @param leaseId is the lease attached to the key * @param leaseId is the lease attached to the key
*/ */
pplx::task<Response> modify(std::string const& key, std::string const& value, pplx::task<Response> modify(std::string const& key, std::string const& value,
int64_t leaseId); const int64_t leaseId = 0);
/**
* Modifies an existing key only if it has a specific value. Fails if the key
* does not exists or the original value differs from the expected one.
* @param key is the key to be modified
* @param value is the new value to be set
* @param old_value is the value to be replaced
*/
pplx::task<Response> modify_if(std::string const& key,
std::string const& value,
std::string const& old_value, int ttl = 0);
/** /**
* Modifies an existing key only if it has a specific value. Fails if the key * Modifies an existing key only if it has a specific value. Fails if the key
@ -352,19 +325,8 @@ class Client {
*/ */
pplx::task<Response> modify_if(std::string const& key, pplx::task<Response> modify_if(std::string const& key,
std::string const& value, std::string const& value,
std::string const& old_value, int64_t leaseId); std::string const& old_value,
const int64_t leaseId = 0);
/**
* Modifies an existing key only if it has a specific modification index
* value. Fails if the key does not exists or the modification index of the
* previous value differs from the expected one.
* @param key is the key to be modified
* @param value is the new value to be set
* @param old_index is the expected index of the original value
*/
pplx::task<Response> modify_if(std::string const& key,
std::string const& value, int64_t old_index,
int ttl = 0);
/** /**
* Modifies an existing key only if it has a specific modification index * Modifies an existing key only if it has a specific modification index
@ -377,7 +339,7 @@ class Client {
*/ */
pplx::task<Response> modify_if(std::string const& key, pplx::task<Response> modify_if(std::string const& key,
std::string const& value, int64_t old_index, std::string const& value, int64_t old_index,
int64_t leaseId); const int64_t leaseId = 0);
/** /**
* Removes a single key. The key has to point to a plain, non directory entry. * Removes a single key. The key has to point to a plain, non directory entry.

View File

@ -327,14 +327,6 @@ class SyncClient {
*/ */
Response get(std::string const& key, int64_t revision); Response get(std::string const& key, int64_t revision);
/**
* Sets the value of a key. The key will be modified if already exists or
* created if it does not exists.
* @param key is the key to be created or modified
* @param value is the new value to be set
*/
Response set(std::string const& key, std::string const& value, int ttl = 0);
/** /**
* Sets the value of a key. The key will be modified if already exists or * Sets the value of a key. The key will be modified if already exists or
* created if it does not exists. * created if it does not exists.
@ -343,14 +335,7 @@ class SyncClient {
* @param leaseId is the lease attached to the key * @param leaseId is the lease attached to the key
*/ */
Response set(std::string const& key, std::string const& value, Response set(std::string const& key, std::string const& value,
int64_t leaseId); const int64_t leaseId = 0);
/**
* Creates a new key and sets it's value. Fails if the key already exists.
* @param key is the key to be created
* @param value is the value to be set
*/
Response add(std::string const& key, std::string const& value, int ttl = 0);
/** /**
* Creates a new key and sets it's value. Fails if the key already exists. * Creates a new key and sets it's value. Fails if the key already exists.
@ -359,7 +344,7 @@ class SyncClient {
* @param leaseId is the lease attached to the key * @param leaseId is the lease attached to the key
*/ */
Response add(std::string const& key, std::string const& value, Response add(std::string const& key, std::string const& value,
int64_t leaseId); const int64_t leaseId = 0);
/** /**
* Put a new key-value pair. * Put a new key-value pair.
@ -369,12 +354,13 @@ class SyncClient {
Response put(std::string const& key, std::string const& value); Response put(std::string const& key, std::string const& value);
/** /**
* Modifies an existing key. Fails if the key does not exists. * Put a new key-value pair.
* @param key is the key to be modified * @param key is the key to be put
* @param value is the new value to be set * @param value is the value to be put
* @param leaseId is the lease id to be associated with the key
*/ */
Response modify(std::string const& key, std::string const& value, Response put(std::string const& key, std::string const& value,
int ttl = 0); const int64_t leaseId);
/** /**
* Modifies an existing key. Fails if the key does not exists. * Modifies an existing key. Fails if the key does not exists.
@ -383,17 +369,7 @@ class SyncClient {
* @param leaseId is the lease attached to the key * @param leaseId is the lease attached to the key
*/ */
Response modify(std::string const& key, std::string const& value, Response modify(std::string const& key, std::string const& value,
int64_t leaseId); const int64_t leaseId = 0);
/**
* Modifies an existing key only if it has a specific value. Fails if the key
* does not exists or the original value differs from the expected one.
* @param key is the key to be modified
* @param value is the new value to be set
* @param old_value is the value to be replaced
*/
Response modify_if(std::string const& key, std::string const& value,
std::string const& old_value, int ttl = 0);
/** /**
* Modifies an existing key only if it has a specific value. Fails if the key * Modifies an existing key only if it has a specific value. Fails if the key
@ -404,18 +380,7 @@ class SyncClient {
* @param leaseId is the lease attached to the key * @param leaseId is the lease attached to the key
*/ */
Response modify_if(std::string const& key, std::string const& value, Response modify_if(std::string const& key, std::string const& value,
std::string const& old_value, int64_t leaseId); std::string const& old_value, const int64_t leaseId = 0);
/**
* Modifies an existing key only if it has a specific modification index
* value. Fails if the key does not exists or the modification index of the
* previous value differs from the expected one.
* @param key is the key to be modified
* @param value is the new value to be set
* @param old_index is the expected index of the original value
*/
Response modify_if(std::string const& key, std::string const& value,
int64_t old_index, int ttl = 0);
/** /**
* Modifies an existing key only if it has a specific modification index * Modifies an existing key only if it has a specific modification index
@ -427,7 +392,7 @@ class SyncClient {
* @param leaseId is the lease attached to the key * @param leaseId is the lease attached to the key
*/ */
Response modify_if(std::string const& key, std::string const& value, Response modify_if(std::string const& key, std::string const& value,
int64_t old_index, int64_t leaseId); int64_t old_index, const int64_t leaseId = 0);
/** /**
* Removes a single key. The key has to point to a plain, non directory entry. * Removes a single key. The key has to point to a plain, non directory entry.
@ -819,22 +784,21 @@ class SyncClient {
private: private:
// TODO: use std::unique_ptr<> // TODO: use std::unique_ptr<>
std::shared_ptr<etcdv3::AsyncHeadAction> head_internal(); std::shared_ptr<etcdv3::AsyncHeadAction> head_internal();
std::shared_ptr<etcdv3::AsyncRangeAction> get_internal(std::string const& key, std::shared_ptr<etcdv3::AsyncRangeAction> get_internal(
int64_t revision = 0); std::string const& key, const int64_t revision = 0);
std::shared_ptr<etcdv3::AsyncSetAction> set_internal(std::string const& key, std::shared_ptr<etcdv3::AsyncSetAction> add_internal(
std::string const& value, std::string const& key, std::string const& value,
int64_t leaseId); const int64_t leaseId = 0);
std::shared_ptr<etcdv3::AsyncSetAction> add_internal(std::string const& key,
std::string const& value,
int64_t leaseId);
std::shared_ptr<etcdv3::AsyncPutAction> put_internal( std::shared_ptr<etcdv3::AsyncPutAction> put_internal(
std::string const& key, std::string const& value); std::string const& key, std::string const& value,
const int64_t leaseId = 0);
std::shared_ptr<etcdv3::AsyncUpdateAction> modify_internal( std::shared_ptr<etcdv3::AsyncUpdateAction> modify_internal(
std::string const& key, std::string const& value, int64_t leaseId); std::string const& key, std::string const& value,
const int64_t leaseId = 0);
std::shared_ptr<etcdv3::AsyncCompareAndSwapAction> modify_if_internal( std::shared_ptr<etcdv3::AsyncCompareAndSwapAction> modify_if_internal(
std::string const& key, std::string const& value, int64_t old_index, std::string const& key, std::string const& value, int64_t old_index,
std::string const& old_value, int64_t leaseId, std::string const& old_value, etcdv3::AtomicityType const& atomicity_type,
etcdv3::AtomicityType const& atomicity_type); const int64_t leaseId = 0);
std::shared_ptr<etcdv3::AsyncDeleteAction> rm_internal( std::shared_ptr<etcdv3::AsyncDeleteAction> rm_internal(
std::string const& key); std::string const& key);
std::shared_ptr<etcdv3::AsyncCompareAndDeleteAction> rm_if_internal( std::shared_ptr<etcdv3::AsyncCompareAndDeleteAction> rm_if_internal(

View File

@ -9,6 +9,8 @@
#include "proto/v3election.grpc.pb.h" #include "proto/v3election.grpc.pb.h"
#include "proto/v3lock.grpc.pb.h" #include "proto/v3lock.grpc.pb.h"
#include "etcd/v3/action_constants.hpp"
using grpc::ClientContext; using grpc::ClientContext;
using grpc::CompletionQueue; using grpc::CompletionQueue;
using grpc::Status; using grpc::Status;
@ -81,6 +83,27 @@ class Action {
namespace detail { namespace detail {
std::string string_plus_one(std::string const& value); std::string string_plus_one(std::string const& value);
std::string resolve_etcd_endpoints(std::string const& default_endpoints); std::string resolve_etcd_endpoints(std::string const& default_endpoints);
template <typename Req>
void make_request_with_ranges(Req& req, std::string const& key,
std::string const& range_end,
bool const recursive) {
if (!recursive) {
req.set_key(key);
} else {
if (key.empty()) {
req.set_key(etcdv3::NUL);
req.set_range_end(etcdv3::NUL);
} else {
req.set_key(key);
req.set_range_end(detail::string_plus_one(key));
}
}
if (!range_end.empty()) {
req.set_range_end(range_end);
}
}
} // namespace detail } // namespace detail
} // namespace etcdv3 } // namespace etcdv3
#endif #endif

View File

@ -76,8 +76,7 @@ class AsyncCampaignResponse : public etcdv3::V3Response {
class AsyncDeleteResponse : public etcdv3::V3Response { class AsyncDeleteResponse : public etcdv3::V3Response {
public: public:
AsyncDeleteResponse(){}; AsyncDeleteResponse(){};
void ParseResponse(std::string const& key, bool prefix, void ParseResponse(bool prefix, DeleteRangeResponse& resp);
DeleteRangeResponse& resp);
}; };
class AsyncHeadResponse : public etcdv3::V3Response { class AsyncHeadResponse : public etcdv3::V3Response {
@ -162,7 +161,6 @@ class AsyncTxnResponse : public etcdv3::V3Response {
public: public:
AsyncTxnResponse(){}; AsyncTxnResponse(){};
void ParseResponse(TxnResponse& resp); void ParseResponse(TxnResponse& resp);
void ParseResponse(std::string const& key, bool prefix, TxnResponse& resp);
}; };
class AsyncUnlockResponse : public etcdv3::V3Response { class AsyncUnlockResponse : public etcdv3::V3Response {

View File

@ -28,38 +28,194 @@ enum class CompareTarget {
class Transaction { class Transaction {
public: public:
Transaction(); Transaction();
Transaction(std::string const&); ~Transaction();
virtual ~Transaction();
// Set a new key for different comparisons and /put/get/delete requests. union Value {
void reset_key(std::string const& newkey); int64_t version;
int64_t create_revision;
int64_t mod_revision;
std::string value;
int64_t lease;
};
void init_compare(CompareResult, CompareTarget); void add_compare(std::string const& key, CompareTarget const& target,
void init_compare(std::string const& old_value, CompareResult, CompareTarget); CompareResult const& result, Value const& target_value,
void init_compare(int64_t old_value, CompareResult, CompareTarget); std::string const& range_end = "");
void setup_basic_failure_operation(std::string const& key); void add_compare_version(std::string const& key, int64_t const& version,
void setup_set_failure_operation(std::string const& key, std::string const& range_end = "");
std::string const& value, int64_t leaseid); void add_compare_version(std::string const& key, CompareResult const& result,
void setup_basic_create_sequence(std::string const& key, int64_t const& version,
std::string const& value, int64_t leaseid); std::string const& range_end = "");
void setup_compare_and_swap_sequence(std::string const& valueToSwap, void add_compare_create(std::string const& key,
int64_t leaseid); int64_t const& create_revision,
void setup_delete_sequence(std::string const& key, std::string const& range_end = "");
std::string const& range_end, bool recursive); void add_compare_create(std::string const& key, CompareResult const& result,
void setup_delete_failure_operation(std::string const& key, int64_t const& create_revision,
std::string const& range_end, std::string const& range_end = "");
bool recursive); void add_compare_mod(std::string const& key, int64_t const& mod_revision,
void setup_compare_and_delete_operation(std::string const& key); std::string const& range_end = "");
void add_compare_mod(std::string const& key, CompareResult const& result,
int64_t const& mod_revision,
std::string const& range_end = "");
void add_compare_value(std::string const& key, std::string const& value,
std::string const& range_end = "");
void add_compare_value(std::string const& key, CompareResult const& result,
std::string const& value,
std::string const& range_end = "");
void add_compare_lease(std::string const& key, int64_t const& lease,
std::string const& range_end = "");
void add_compare_lease(std::string const& key, CompareResult const& result,
int64_t const& lease,
std::string const& range_end = "");
void add_success_range(std::string const& key,
std::string const& range_end = "",
bool const recursive = false, const int64_t limit = 0);
void add_success_put(std::string const& key, std::string const& value,
int64_t const leaseid = 0, const bool prev_kv = false);
void add_success_delete(std::string const& key,
std::string const& range_end = "",
bool const recursive = false,
const bool prev_kv = false);
void add_success_txn(const std::shared_ptr<Transaction> txn);
void add_failure_range(std::string const& key,
std::string const& range_end = "",
bool const recursive = false, const int64_t limit = 0);
void add_failure_put(std::string const& key, std::string const& value,
int64_t const leaseid = 0, const bool prev_kv = false);
void add_failure_delete(std::string const& key,
std::string const& range_end = "",
bool const recursive = false,
const bool prev_kv = false);
void add_failure_txn(const std::shared_ptr<Transaction> txn);
/**
* @brief Compare, and create if succeed. If failed, the response will
* contains the previous value in "values()" field.
*/
void setup_compare_and_create(std::string const& key,
std::string const& prev_value,
std::string const& create_key,
std::string const& value,
int64_t const leaseid = 0);
/**
* @brief Compare, or create if failed. If failed, the response will contains
* the previous value in "values()" field.
*/
void setup_compare_or_create(std::string const& key,
std::string const& prev_value,
std::string const& create_key,
std::string const& value,
int64_t const leaseid = 0);
/**
* @brief Compare, and swap if succeed. If failed, the response will contains
* the previous value in "values()" field.
*/
void setup_compare_and_swap(std::string const& key,
std::string const& prev_value,
std::string const& value,
int64_t const leaseid = 0);
/**
* @brief Compare, and swap if failed. If failed, the response will contains
* the previous value in "values()" field.
*/
void setup_compare_or_swap(std::string const& key,
std::string const& prev_value,
std::string const& value,
int64_t const leaseid = 0);
/**
* @brief Compare, and delete if succeed. If failed, the response will
* contains the previous value in "values()" field.
*/
void setup_compare_and_delete(std::string const& key,
std::string const& prev_value,
std::string const& delete_key,
std::string const& range_end = "",
const bool recursive = false);
/**
* @brief Compare, or delete if failed. If failed, the response will contains
* the previous value in "values()" field.
*/
void setup_compare_or_delete(std::string const& key,
std::string const& prev_value,
std::string const& delete_key,
std::string const& range_end = "",
const bool recursive = false);
/**
* @brief Compare, and create if succeed. If failed, the response will
* contains the previous value in "values()" field.
*/
void setup_compare_and_create(std::string const& key,
const int64_t prev_revision,
std::string const& create_key,
std::string const& value,
int64_t const leaseid = 0);
/**
* @brief Compare, or create if failed. If failed, the response will contains
* the previous value in "values()" field.
*/
void setup_compare_or_create(std::string const& key,
const int64_t prev_revision,
std::string const& create_key,
std::string const& value,
int64_t const leaseid = 0);
/**
* @brief Compare, and swap if succeed. If failed, the response will contains
* the previous value in "values()" field.
*/
void setup_compare_and_swap(std::string const& key,
const int64_t prev_revision,
std::string const& value,
int64_t const leaseid = 0);
/**
* @brief Compare, and swap if failed. If failed, the response will contains
* the previous value in "values()" field.
*/
void setup_compare_or_swap(std::string const& key,
const int64_t prev_revision,
std::string const& value,
int64_t const leaseid = 0);
/**
* @brief Compare, and delete if succeed. If failed, the response will
* contains the previous value in "values()" field.
*/
void setup_compare_and_delete(std::string const& key,
const int64_t prev_revision,
std::string const& delete_key,
std::string const& range_end = "",
const bool recursive = false);
/**
* @brief Compare, or delete if failed. If failed, the response will contains
* the previous value in "values()" field.
*/
void setup_compare_or_delete(std::string const& key,
const int64_t prev_revision,
std::string const& delete_key,
std::string const& range_end = "",
const bool recursive = false);
// Keep for backwards compatibility.
// update without `get` and no `prev_kv` returned // update without `get` and no `prev_kv` returned
void setup_put(std::string const& key, std::string const& value); void setup_put(std::string const& key, std::string const& value);
void setup_delete(std::string const& key); void setup_delete(std::string const& key);
void setup_delete(std::string const& key, std::string const& range_end,
const bool recursive = false);
std::shared_ptr<etcdserverpb::TxnRequest> txn_request; std::shared_ptr<etcdserverpb::TxnRequest> txn_request;
private:
std::string key;
}; };
} // namespace etcdv3 } // namespace etcdv3

View File

@ -231,61 +231,15 @@ pplx::task<etcd::Response> etcd::Client::get(std::string const& key,
pplx::task<etcd::Response> etcd::Client::set(std::string const& key, pplx::task<etcd::Response> etcd::Client::set(std::string const& key,
std::string const& value, std::string const& value,
int ttl) { const int64_t leaseid) {
if (ttl > 0) {
return this->leasegrant(ttl).then(
[this, key, value](pplx::task<etcd::Response> const& task) {
auto resp = task.get();
if (resp.error_code() == 0) {
return etcd::detail::asyncify( return etcd::detail::asyncify(
static_cast<responser_t<etcdv3::AsyncSetAction>>( static_cast<responser_t<etcdv3::AsyncPutAction>>(Response::create),
Response::create), this->client->put_internal(key, value, leaseid));
this->client->set_internal(key, value, resp.value().lease()));
} else {
return pplx::task_from_result(resp);
}
});
} else {
return etcd::detail::asyncify(
static_cast<responser_t<etcdv3::AsyncSetAction>>(Response::create),
this->client->set_internal(key, value, 0));
}
}
pplx::task<etcd::Response> etcd::Client::set(std::string const& key,
std::string const& value,
int64_t leaseid) {
return etcd::detail::asyncify(
static_cast<responser_t<etcdv3::AsyncSetAction>>(Response::create),
this->client->set_internal(key, value, leaseid));
} }
pplx::task<etcd::Response> etcd::Client::add(std::string const& key, pplx::task<etcd::Response> etcd::Client::add(std::string const& key,
std::string const& value, std::string const& value,
int ttl) { const int64_t leaseid) {
if (ttl > 0) {
return this->leasegrant(ttl).then(
[this, key, value](pplx::task<etcd::Response> const& task) {
auto resp = task.get();
if (resp.error_code() == 0) {
return etcd::detail::asyncify(
static_cast<responser_t<etcdv3::AsyncSetAction>>(
Response::create),
this->client->add_internal(key, value, resp.value().lease()));
} else {
return pplx::task_from_result(resp);
}
});
} else {
return etcd::detail::asyncify(
static_cast<responser_t<etcdv3::AsyncSetAction>>(Response::create),
this->client->add_internal(key, value, 0));
}
}
pplx::task<etcd::Response> etcd::Client::add(std::string const& key,
std::string const& value,
int64_t leaseid) {
return etcd::detail::asyncify( return etcd::detail::asyncify(
static_cast<responser_t<etcdv3::AsyncSetAction>>(Response::create), static_cast<responser_t<etcdv3::AsyncSetAction>>(Response::create),
this->client->add_internal(key, value, leaseid)); this->client->add_internal(key, value, leaseid));
@ -298,33 +252,17 @@ pplx::task<etcd::Response> etcd::Client::put(std::string const& key,
this->client->put_internal(key, value)); this->client->put_internal(key, value));
} }
pplx::task<etcd::Response> etcd::Client::modify(std::string const& key, pplx::task<etcd::Response> etcd::Client::put(std::string const& key,
std::string const& value, std::string const& value,
int ttl) { const int64_t leaseId) {
if (ttl > 0) {
return this->leasegrant(ttl).then(
[this, key, value](pplx::task<etcd::Response> const& task) {
auto resp = task.get();
if (resp.error_code() == 0) {
return etcd::detail::asyncify( return etcd::detail::asyncify(
static_cast<responser_t<etcdv3::AsyncUpdateAction>>( static_cast<responser_t<etcdv3::AsyncPutAction>>(Response::create),
Response::create), this->client->put_internal(key, value, leaseId));
this->client->modify_internal(key, value,
resp.value().lease()));
} else {
return pplx::task_from_result(resp);
}
});
} else {
return etcd::detail::asyncify(
static_cast<responser_t<etcdv3::AsyncUpdateAction>>(Response::create),
this->client->modify_internal(key, value, 0));
}
} }
pplx::task<etcd::Response> etcd::Client::modify(std::string const& key, pplx::task<etcd::Response> etcd::Client::modify(std::string const& key,
std::string const& value, std::string const& value,
int64_t leaseid) { const int64_t leaseid) {
return etcd::detail::asyncify( return etcd::detail::asyncify(
static_cast<responser_t<etcdv3::AsyncUpdateAction>>(Response::create), static_cast<responser_t<etcdv3::AsyncUpdateAction>>(Response::create),
this->client->modify_internal(key, value, leaseid)); this->client->modify_internal(key, value, leaseid));
@ -333,78 +271,25 @@ pplx::task<etcd::Response> etcd::Client::modify(std::string const& key,
pplx::task<etcd::Response> etcd::Client::modify_if(std::string const& key, pplx::task<etcd::Response> etcd::Client::modify_if(std::string const& key,
std::string const& value, std::string const& value,
std::string const& old_value, std::string const& old_value,
int ttl) { const int64_t leaseid) {
if (ttl > 0) {
return this->leasegrant(ttl).then(
[this, key, value, old_value](pplx::task<etcd::Response> const& task) {
auto resp = task.get();
if (resp.error_code() == 0) {
return etcd::detail::asyncify( return etcd::detail::asyncify(
static_cast<responser_t<etcdv3::AsyncCompareAndSwapAction>>( static_cast<responser_t<etcdv3::AsyncCompareAndSwapAction>>(
Response::create), Response::create),
this->client->modify_if_internal( this->client->modify_if_internal(key, value, 0, old_value,
key, value, 0, old_value, resp.value().lease(), etcdv3::AtomicityType::PREV_VALUE,
etcdv3::AtomicityType::PREV_VALUE)); leaseid));
} else {
return pplx::task_from_result(resp);
}
});
} else {
return etcd::detail::asyncify(
static_cast<responser_t<etcdv3::AsyncCompareAndSwapAction>>(
Response::create),
this->client->modify_if_internal(key, value, 0, old_value, 0,
etcdv3::AtomicityType::PREV_VALUE));
}
}
pplx::task<etcd::Response> etcd::Client::modify_if(std::string const& key,
std::string const& value,
std::string const& old_value,
int64_t leaseid) {
return etcd::detail::asyncify(
static_cast<responser_t<etcdv3::AsyncCompareAndSwapAction>>(
Response::create),
this->client->modify_if_internal(key, value, 0, old_value, leaseid,
etcdv3::AtomicityType::PREV_VALUE));
}
pplx::task<etcd::Response> etcd::Client::modify_if(std::string const& key,
std::string const& value,
int64_t old_index, int ttl) {
if (ttl > 0) {
return this->leasegrant(ttl).then(
[this, key, value, old_index](pplx::task<etcd::Response> const& task) {
auto resp = task.get();
if (resp.error_code() == 0) {
return etcd::detail::asyncify(
static_cast<responser_t<etcdv3::AsyncCompareAndSwapAction>>(
Response::create),
this->client->modify_if_internal(
key, value, old_index, "", resp.value().lease(),
etcdv3::AtomicityType::PREV_INDEX));
} else {
return pplx::task_from_result(resp);
}
});
} else {
return etcd::detail::asyncify(
static_cast<responser_t<etcdv3::AsyncCompareAndSwapAction>>(
Response::create),
this->client->modify_if_internal(key, value, old_index, "", 0,
etcdv3::AtomicityType::PREV_INDEX));
}
} }
pplx::task<etcd::Response> etcd::Client::modify_if(std::string const& key, pplx::task<etcd::Response> etcd::Client::modify_if(std::string const& key,
std::string const& value, std::string const& value,
int64_t old_index, int64_t old_index,
int64_t leaseid) { const int64_t leaseid) {
return etcd::detail::asyncify( return etcd::detail::asyncify(
static_cast<responser_t<etcdv3::AsyncCompareAndSwapAction>>( static_cast<responser_t<etcdv3::AsyncCompareAndSwapAction>>(
Response::create), Response::create),
this->client->modify_if_internal(key, value, old_index, "", leaseid, this->client->modify_if_internal(key, value, old_index, "",
etcdv3::AtomicityType::PREV_INDEX)); etcdv3::AtomicityType::PREV_INDEX,
leaseid));
} }
pplx::task<etcd::Response> etcd::Client::rm(std::string const& key) { pplx::task<etcd::Response> etcd::Client::rm(std::string const& key) {

View File

@ -39,6 +39,7 @@ etcd::Response::Response(const etcdv3::V3Response& reply,
_values.push_back(Value(val[index])); _values.push_back(Value(val[index]));
_keys.push_back(val[index].kvs.key()); _keys.push_back(val[index].kvs.key());
} }
_value = Value(reply.get_values()[0]);
} else { } else {
_value = Value(reply.get_value()); _value = Value(reply.get_value());
} }

View File

@ -474,18 +474,6 @@ etcd::SyncClient::~SyncClient() {
channel.reset(); channel.reset();
} }
/**
* Note [lease with TTL and issue the actual request]
*
* We sometime use the request like `set(key, value, TTL)`, we explain the TTL
* as the time between the user call the `set()` method between the request is
* actually executed in etcd server. Thus, we issue a lease request with that
* TTL value immediately, and pass it to the `set_internal()` method, the later
* may be issues asynchronously.
*
* Thus the TTL could keep the expected semantic even in the async runtime.
*/
etcd::Response etcd::SyncClient::head() { etcd::Response etcd::SyncClient::head() {
return Response::create(this->head_internal()); return Response::create(this->head_internal());
} }
@ -502,7 +490,8 @@ etcd::Response etcd::SyncClient::get(std::string const& key) {
return Response::create(this->get_internal(key)); return Response::create(this->get_internal(key));
} }
etcd::Response etcd::SyncClient::get(std::string const& key, int64_t revision) { etcd::Response etcd::SyncClient::get(std::string const& key,
const int64_t revision) {
return Response::create(this->get_internal(key, revision)); return Response::create(this->get_internal(key, revision));
} }
@ -518,62 +507,20 @@ std::shared_ptr<etcdv3::AsyncRangeAction> etcd::SyncClient::get_internal(
return std::make_shared<etcdv3::AsyncRangeAction>(std::move(params)); return std::make_shared<etcdv3::AsyncRangeAction>(std::move(params));
} }
etcd::Response etcd::SyncClient::set(std::string const& key,
std::string const& value, int ttl) {
// See Note [lease with TTL and issue the actual request]
int64_t leaseId = 0;
if (ttl > 0) {
auto res = this->leasegrant(ttl);
if (!res.is_ok()) {
return etcd::Response(res.error_code(), res.error_message());
} else {
leaseId = res.value().lease();
}
}
return Response::create(this->set_internal(key, value, leaseId));
}
etcd::Response etcd::SyncClient::set(std::string const& key, etcd::Response etcd::SyncClient::set(std::string const& key,
std::string const& value, std::string const& value,
int64_t leaseid) { const int64_t leaseid) {
return Response::create(this->set_internal(key, value, leaseid)); return Response::create(this->put_internal(key, value, leaseid));
}
std::shared_ptr<etcdv3::AsyncSetAction> etcd::SyncClient::set_internal(
std::string const& key, std::string const& value, int64_t leaseid) {
etcdv3::ActionParameters params;
params.key.assign(key);
params.value.assign(value);
params.lease_id = leaseid;
params.auth_token.assign(this->token_authenticator->renew_if_expired());
params.grpc_timeout = this->grpc_timeout;
params.kv_stub = stubs->kvServiceStub.get();
return std::make_shared<etcdv3::AsyncSetAction>(std::move(params), false);
}
etcd::Response etcd::SyncClient::add(std::string const& key,
std::string const& value, int ttl) {
// See Note [lease with TTL and issue the actual request]
int64_t leaseId = 0;
if (ttl > 0) {
auto res = this->leasegrant(ttl);
if (!res.is_ok()) {
return etcd::Response(res.error_code(), res.error_message());
} else {
leaseId = res.value().lease();
}
}
return Response::create(this->add_internal(key, value, leaseId));
} }
etcd::Response etcd::SyncClient::add(std::string const& key, etcd::Response etcd::SyncClient::add(std::string const& key,
std::string const& value, std::string const& value,
int64_t leaseid) { const int64_t leaseid) {
return Response::create(this->add_internal(key, value, leaseid)); return Response::create(this->add_internal(key, value, leaseid));
} }
std::shared_ptr<etcdv3::AsyncSetAction> etcd::SyncClient::add_internal( std::shared_ptr<etcdv3::AsyncSetAction> etcd::SyncClient::add_internal(
std::string const& key, std::string const& value, int64_t leaseid) { std::string const& key, std::string const& value, const int64_t leaseid) {
etcdv3::ActionParameters params; etcdv3::ActionParameters params;
params.key.assign(key); params.key.assign(key);
params.value.assign(value); params.value.assign(value);
@ -589,40 +536,32 @@ etcd::Response etcd::SyncClient::put(std::string const& key,
return Response::create(this->put_internal(key, value)); return Response::create(this->put_internal(key, value));
} }
etcd::Response etcd::SyncClient::put(std::string const& key,
std::string const& value,
const int64_t leaseId) {
return Response::create(this->put_internal(key, value, leaseId));
}
std::shared_ptr<etcdv3::AsyncPutAction> etcd::SyncClient::put_internal( std::shared_ptr<etcdv3::AsyncPutAction> etcd::SyncClient::put_internal(
std::string const& key, std::string const& value) { std::string const& key, std::string const& value, const int64_t leaseId) {
etcdv3::ActionParameters params; etcdv3::ActionParameters params;
params.key.assign(key); params.key.assign(key);
params.value.assign(value); params.value.assign(value);
params.lease_id = leaseId;
params.auth_token.assign(this->token_authenticator->renew_if_expired()); params.auth_token.assign(this->token_authenticator->renew_if_expired());
params.grpc_timeout = this->grpc_timeout; params.grpc_timeout = this->grpc_timeout;
params.kv_stub = stubs->kvServiceStub.get(); params.kv_stub = stubs->kvServiceStub.get();
return std::make_shared<etcdv3::AsyncPutAction>(std::move(params)); return std::make_shared<etcdv3::AsyncPutAction>(std::move(params));
} }
etcd::Response etcd::SyncClient::modify(std::string const& key,
std::string const& value, int ttl) {
// See Note [lease with TTL and issue the actual request]
int64_t leaseId = 0;
if (ttl > 0) {
auto res = leasegrant(ttl);
if (!res.is_ok()) {
return etcd::Response(res.error_code(), res.error_message());
} else {
leaseId = res.value().lease();
}
}
return Response::create(this->modify_internal(key, value, leaseId));
}
etcd::Response etcd::SyncClient::modify(std::string const& key, etcd::Response etcd::SyncClient::modify(std::string const& key,
std::string const& value, std::string const& value,
int64_t leaseid) { const int64_t leaseid) {
return Response::create(this->modify_internal(key, value, leaseid)); return Response::create(this->modify_internal(key, value, leaseid));
} }
std::shared_ptr<etcdv3::AsyncUpdateAction> etcd::SyncClient::modify_internal( std::shared_ptr<etcdv3::AsyncUpdateAction> etcd::SyncClient::modify_internal(
std::string const& key, std::string const& value, int64_t leaseid) { std::string const& key, std::string const& value, const int64_t leaseid) {
etcdv3::ActionParameters params; etcdv3::ActionParameters params;
params.key.assign(key); params.key.assign(key);
params.value.assign(value); params.value.assign(value);
@ -636,58 +575,23 @@ std::shared_ptr<etcdv3::AsyncUpdateAction> etcd::SyncClient::modify_internal(
etcd::Response etcd::SyncClient::modify_if(std::string const& key, etcd::Response etcd::SyncClient::modify_if(std::string const& key,
std::string const& value, std::string const& value,
std::string const& old_value, std::string const& old_value,
int ttl) { const int64_t leaseid) {
// See Note [lease with TTL and issue the actual request]
int64_t leaseId = 0;
if (ttl > 0) {
auto res = leasegrant(ttl);
if (!res.is_ok()) {
return etcd::Response(res.error_code(), res.error_message());
} else {
leaseId = res.value().lease();
}
}
return Response::create(this->modify_if_internal( return Response::create(this->modify_if_internal(
key, value, 0, old_value, leaseId, etcdv3::AtomicityType::PREV_VALUE)); key, value, 0, old_value, etcdv3::AtomicityType::PREV_VALUE, leaseid));
}
etcd::Response etcd::SyncClient::modify_if(std::string const& key,
std::string const& value,
std::string const& old_value,
int64_t leaseid) {
return Response::create(this->modify_if_internal(
key, value, 0, old_value, leaseid, etcdv3::AtomicityType::PREV_VALUE));
}
etcd::Response etcd::SyncClient::modify_if(std::string const& key,
std::string const& value,
int64_t old_index, int ttl) {
// See Note [lease with TTL and issue the actual request]
int64_t leaseId = 0;
if (ttl > 0) {
auto res = leasegrant(ttl);
if (!res.is_ok()) {
return etcd::Response(res.error_code(), res.error_message());
} else {
leaseId = res.value().lease();
}
}
return Response::create(this->modify_if_internal(
key, value, old_index, "", leaseId, etcdv3::AtomicityType::PREV_INDEX));
} }
etcd::Response etcd::SyncClient::modify_if(std::string const& key, etcd::Response etcd::SyncClient::modify_if(std::string const& key,
std::string const& value, std::string const& value,
int64_t old_index, int64_t leaseid) { int64_t old_index, int64_t leaseid) {
return Response::create(this->modify_if_internal( return Response::create(this->modify_if_internal(
key, value, old_index, "", leaseid, etcdv3::AtomicityType::PREV_INDEX)); key, value, old_index, "", etcdv3::AtomicityType::PREV_INDEX, leaseid));
} }
std::shared_ptr<etcdv3::AsyncCompareAndSwapAction> std::shared_ptr<etcdv3::AsyncCompareAndSwapAction>
etcd::SyncClient::modify_if_internal( etcd::SyncClient::modify_if_internal(
std::string const& key, std::string const& value, int64_t old_index, std::string const& key, std::string const& value, int64_t old_index,
std::string const& old_value, int64_t leaseId, std::string const& old_value, etcdv3::AtomicityType const& atomicity_type,
etcdv3::AtomicityType const& atomicity_type) { const int64_t leaseId) {
etcdv3::ActionParameters params; etcdv3::ActionParameters params;
params.key.assign(key); params.key.assign(key);
params.value.assign(value); params.value.assign(value);

View File

@ -43,8 +43,7 @@ void etcdv3::AsyncCampaignResponse::ParseResponse(CampaignResponse& reply) {
value.kvs.set_lease(leader.lease()); value.kvs.set_lease(leader.lease());
} }
void etcdv3::AsyncDeleteResponse::ParseResponse(std::string const& key, void etcdv3::AsyncDeleteResponse::ParseResponse(bool prefix,
bool prefix,
DeleteRangeResponse& resp) { DeleteRangeResponse& resp) {
index = resp.header().revision(); index = resp.header().revision();
@ -167,38 +166,72 @@ void etcdv3::AsyncResignResponse::ParseResponse(ResignResponse& reply) {
void etcdv3::AsyncTxnResponse::ParseResponse(TxnResponse& reply) { void etcdv3::AsyncTxnResponse::ParseResponse(TxnResponse& reply) {
index = reply.header().revision(); index = reply.header().revision();
}
void etcdv3::AsyncTxnResponse::ParseResponse(std::string const& key,
bool prefix, TxnResponse& reply) {
index = reply.header().revision();
for (int index = 0; index < reply.responses_size(); index++) { for (int index = 0; index < reply.responses_size(); index++) {
auto resp = reply.responses(index); auto resp = reply.responses(index);
if (ResponseOp::ResponseCase::kResponseRange == resp.response_case()) { if (ResponseOp::ResponseCase::kResponseRange == resp.response_case()) {
AsyncRangeResponse response; AsyncRangeResponse response;
response.ParseResponse(*(resp.mutable_response_range()), prefix); response.ParseResponse(*(resp.mutable_response_range()), true);
if (error_code == 0) {
error_code = response.get_error_code(); error_code = response.get_error_code();
error_message = response.get_error_message(); }
if (!response.get_error_message().empty()) {
values = response.get_values(); error_message += "\n" + response.get_error_message();
value = response.get_value(); }
for (auto const& value : response.get_values()) {
values.emplace_back(value);
}
} else if (ResponseOp::ResponseCase::kResponsePut == resp.response_case()) { } else if (ResponseOp::ResponseCase::kResponsePut == resp.response_case()) {
auto put_resp = resp.response_put(); AsyncPutResponse response;
if (put_resp.has_prev_kv()) { response.ParseResponse(*(resp.mutable_response_put()));
prev_value.kvs.CopyFrom(put_resp.prev_kv()); prev_value.kvs.CopyFrom(response.get_prev_value().kvs);
if (error_code == 0) {
error_code = response.get_error_code();
}
if (!response.get_error_message().empty()) {
error_message += "\n" + response.get_error_message();
}
for (auto const& value : response.get_values()) {
values.emplace_back(value);
} }
} else if (ResponseOp::ResponseCase::kResponseDeleteRange == } else if (ResponseOp::ResponseCase::kResponseDeleteRange ==
resp.response_case()) { resp.response_case()) {
AsyncDeleteResponse response; AsyncDeleteResponse response;
response.ParseResponse(key, prefix, response.ParseResponse(true, *(resp.mutable_response_delete_range()));
*(resp.mutable_response_delete_range()));
prev_value.kvs.CopyFrom(response.get_prev_value().kvs); prev_value.kvs.CopyFrom(response.get_prev_value().kvs);
values = response.get_values(); if (error_code == 0) {
value = response.get_value(); error_code = response.get_error_code();
} }
if (!response.get_error_message().empty()) {
error_message += "\n" + response.get_error_message();
}
for (auto const& value : response.get_values()) {
values.emplace_back(value);
}
} else if (ResponseOp::ResponseCase::kResponseTxn == resp.response_case()) {
AsyncTxnResponse response;
response.ParseResponse(*(resp.mutable_response_txn()));
if (error_code == 0) {
error_code = response.get_error_code();
}
if (!response.get_error_message().empty()) {
error_message += "\n" + response.get_error_message();
}
// skip
std::cerr << "Not implemented error: unable to parse nested transaction "
"response"
<< std::endl;
}
}
if (!values.empty()) {
value = values[0];
}
if (!prev_values.empty()) {
prev_value = prev_values[0];
} }
} }
@ -270,20 +303,17 @@ etcdv3::AsyncCampaignResponse etcdv3::AsyncCampaignAction::ParseResponse() {
etcdv3::AsyncCompareAndDeleteAction::AsyncCompareAndDeleteAction( etcdv3::AsyncCompareAndDeleteAction::AsyncCompareAndDeleteAction(
etcdv3::ActionParameters&& params, etcdv3::AtomicityType type) etcdv3::ActionParameters&& params, etcdv3::AtomicityType type)
: etcdv3::Action(std::move(params)) { : etcdv3::Action(std::move(params)) {
etcdv3::Transaction transaction(parameters.key); etcdv3::Transaction txn;
if (type == etcdv3::AtomicityType::PREV_VALUE) { if (type == etcdv3::AtomicityType::PREV_VALUE) {
transaction.init_compare(parameters.old_value, CompareResult::EQUAL, txn.setup_compare_and_delete(parameters.key, parameters.old_value,
CompareTarget::VALUE); parameters.key);
} else if (type == etcdv3::AtomicityType::PREV_INDEX) { } else if (type == etcdv3::AtomicityType::PREV_INDEX) {
transaction.init_compare(parameters.old_revision, CompareResult::EQUAL, txn.setup_compare_and_delete(parameters.key, parameters.old_revision,
CompareTarget::MOD); parameters.key);
} }
transaction.setup_compare_and_delete_operation(parameters.key);
transaction.setup_basic_failure_operation(parameters.key);
response_reader = response_reader =
parameters.kv_stub->AsyncTxn(&context, *transaction.txn_request, &cq_); parameters.kv_stub->AsyncTxn(&context, *txn.txn_request, &cq_);
response_reader->Finish(&reply, &status, (void*) this); response_reader->Finish(&reply, &status, (void*) this);
} }
@ -295,35 +325,30 @@ etcdv3::AsyncTxnResponse etcdv3::AsyncCompareAndDeleteAction::ParseResponse() {
txn_resp.set_error_code(status.error_code()); txn_resp.set_error_code(status.error_code());
txn_resp.set_error_message(status.error_message()); txn_resp.set_error_message(status.error_message());
} else { } else {
txn_resp.ParseResponse(parameters.key, parameters.withPrefix, reply); txn_resp.ParseResponse(reply);
if (!reply.succeeded()) { if (!reply.succeeded()) {
txn_resp.set_error_code(ERROR_COMPARE_FAILED); txn_resp.set_error_code(ERROR_COMPARE_FAILED);
txn_resp.set_error_message("etcd-cpp-apiv3: compare failed"); txn_resp.set_error_message("etcd-cpp-apiv3: compare failed");
} }
} }
return txn_resp; return txn_resp;
} }
etcdv3::AsyncCompareAndSwapAction::AsyncCompareAndSwapAction( etcdv3::AsyncCompareAndSwapAction::AsyncCompareAndSwapAction(
etcdv3::ActionParameters&& params, etcdv3::AtomicityType type) etcdv3::ActionParameters&& params, etcdv3::AtomicityType type)
: etcdv3::Action(std::move(params)) { : etcdv3::Action(std::move(params)) {
etcdv3::Transaction transaction(parameters.key); etcdv3::Transaction txn;
if (type == etcdv3::AtomicityType::PREV_VALUE) { if (type == etcdv3::AtomicityType::PREV_VALUE) {
transaction.init_compare(parameters.old_value, CompareResult::EQUAL, txn.setup_compare_and_swap(parameters.key, parameters.old_value,
CompareTarget::VALUE); parameters.value);
} else if (type == etcdv3::AtomicityType::PREV_INDEX) { } else if (type == etcdv3::AtomicityType::PREV_INDEX) {
transaction.init_compare(parameters.old_revision, CompareResult::EQUAL, txn.setup_compare_and_swap(parameters.key, parameters.old_revision,
CompareTarget::MOD); parameters.value);
} }
transaction.setup_basic_failure_operation(parameters.key);
transaction.setup_compare_and_swap_sequence(parameters.value,
parameters.lease_id);
response_reader = response_reader =
parameters.kv_stub->AsyncTxn(&context, *transaction.txn_request, &cq_); parameters.kv_stub->AsyncTxn(&context, *txn.txn_request, &cq_);
response_reader->Finish(&reply, &status, (void*) this); response_reader->Finish(&reply, &status, (void*) this);
} }
@ -335,7 +360,7 @@ etcdv3::AsyncTxnResponse etcdv3::AsyncCompareAndSwapAction::ParseResponse() {
txn_resp.set_error_code(status.error_code()); txn_resp.set_error_code(status.error_code());
txn_resp.set_error_message(status.error_message()); txn_resp.set_error_message(status.error_message());
} else { } else {
txn_resp.ParseResponse(parameters.key, parameters.withPrefix, reply); txn_resp.ParseResponse(reply);
// if there is an error code returned by parseResponse, we must // if there is an error code returned by parseResponse, we must
// not overwrite it. // not overwrite it.
@ -344,29 +369,14 @@ etcdv3::AsyncTxnResponse etcdv3::AsyncCompareAndSwapAction::ParseResponse() {
txn_resp.set_error_message("etcd-cpp-apiv3: compare failed"); txn_resp.set_error_message("etcd-cpp-apiv3: compare failed");
} }
} }
return txn_resp; return txn_resp;
} }
etcdv3::AsyncDeleteAction::AsyncDeleteAction(ActionParameters&& params) etcdv3::AsyncDeleteAction::AsyncDeleteAction(ActionParameters&& params)
: etcdv3::Action(std::move(params)) { : etcdv3::Action(std::move(params)) {
DeleteRangeRequest del_request; DeleteRangeRequest del_request;
if (!parameters.withPrefix) { detail::make_request_with_ranges(del_request, parameters.key,
del_request.set_key(parameters.key); parameters.range_end, parameters.withPrefix);
} else {
if (parameters.key.empty()) {
// see: WithFromKey in etcdv3/client
del_request.set_key(etcdv3::NUL);
del_request.set_range_end(etcdv3::NUL);
} else {
del_request.set_key(parameters.key);
del_request.set_range_end(detail::string_plus_one(parameters.key));
}
}
if (!parameters.range_end.empty()) {
del_request.set_range_end(parameters.range_end);
}
del_request.set_prev_kv(true); del_request.set_prev_kv(true);
response_reader = response_reader =
@ -383,10 +393,8 @@ etcdv3::AsyncDeleteResponse etcdv3::AsyncDeleteAction::ParseResponse() {
del_resp.set_error_message(status.error_message()); del_resp.set_error_message(status.error_message());
} else { } else {
del_resp.ParseResponse( del_resp.ParseResponse(
parameters.key, parameters.withPrefix || !parameters.range_end.empty(), parameters.withPrefix || !parameters.range_end.empty(), reply);
reply);
} }
return del_resp; return del_resp;
} }
@ -874,21 +882,8 @@ etcdv3::AsyncPutResponse etcdv3::AsyncPutAction::ParseResponse() {
etcdv3::AsyncRangeAction::AsyncRangeAction(etcdv3::ActionParameters&& params) etcdv3::AsyncRangeAction::AsyncRangeAction(etcdv3::ActionParameters&& params)
: etcdv3::Action(std::move(params)) { : etcdv3::Action(std::move(params)) {
RangeRequest get_request; RangeRequest get_request;
if (!parameters.withPrefix) { detail::make_request_with_ranges(get_request, parameters.key,
get_request.set_key(parameters.key); parameters.range_end, parameters.withPrefix);
} else {
if (parameters.key.empty()) {
// see: WithFromKey in etcdv3/client
get_request.set_key(etcdv3::NUL);
get_request.set_range_end(etcdv3::NUL);
} else {
get_request.set_key(parameters.key);
get_request.set_range_end(detail::string_plus_one(parameters.key));
}
}
if (!parameters.range_end.empty()) {
get_request.set_range_end(parameters.range_end);
}
if (parameters.revision > 0) { if (parameters.revision > 0) {
get_request.set_revision(parameters.revision); get_request.set_revision(parameters.revision);
} }
@ -951,21 +946,17 @@ etcdv3::AsyncResignResponse etcdv3::AsyncResignAction::ParseResponse() {
etcdv3::AsyncSetAction::AsyncSetAction(etcdv3::ActionParameters&& params, etcdv3::AsyncSetAction::AsyncSetAction(etcdv3::ActionParameters&& params,
bool create) bool create)
: etcdv3::Action(std::move(params)) { : etcdv3::Action(std::move(params)) {
etcdv3::Transaction transaction(parameters.key); etcdv3::Transaction txn;
isCreate = create; isCreate = create;
transaction.init_compare(CompareResult::EQUAL, CompareTarget::VERSION); txn.add_compare_mod(parameters.key, 0 /* not exists */);
txn.add_success_put(parameters.key, parameters.key, parameters.lease_id);
transaction.setup_basic_create_sequence(parameters.key, parameters.value, if (create) {
parameters.lease_id); txn.add_failure_put(parameters.key, parameters.value, parameters.lease_id);
if (isCreate) {
transaction.setup_basic_failure_operation(parameters.key);
} else { } else {
transaction.setup_set_failure_operation(parameters.key, parameters.value, txn.add_failure_range(parameters.key);
parameters.lease_id);
} }
response_reader = response_reader =
parameters.kv_stub->AsyncTxn(&context, *transaction.txn_request, &cq_); parameters.kv_stub->AsyncTxn(&context, *txn.txn_request, &cq_);
response_reader->Finish(&reply, &status, (void*) this); response_reader->Finish(&reply, &status, (void*) this);
} }
@ -977,7 +968,7 @@ etcdv3::AsyncTxnResponse etcdv3::AsyncSetAction::ParseResponse() {
txn_resp.set_error_code(status.error_code()); txn_resp.set_error_code(status.error_code());
txn_resp.set_error_message(status.error_message()); txn_resp.set_error_message(status.error_message());
} else { } else {
txn_resp.ParseResponse(parameters.key, parameters.withPrefix, reply); txn_resp.ParseResponse(reply);
if (!reply.succeeded() && isCreate) { if (!reply.succeeded() && isCreate) {
txn_resp.set_error_code(etcdv3::ERROR_KEY_ALREADY_EXISTS); txn_resp.set_error_code(etcdv3::ERROR_KEY_ALREADY_EXISTS);
@ -1003,7 +994,7 @@ etcdv3::AsyncTxnResponse etcdv3::AsyncTxnAction::ParseResponse() {
txn_resp.set_error_code(status.error_code()); txn_resp.set_error_code(status.error_code());
txn_resp.set_error_message(status.error_message()); txn_resp.set_error_message(status.error_message());
} else { } else {
txn_resp.ParseResponse(parameters.key, parameters.withPrefix, reply); txn_resp.ParseResponse(reply);
// if there is an error code returned by parseResponse, we must // if there is an error code returned by parseResponse, we must
// not overwrite it. // not overwrite it.
@ -1042,14 +1033,12 @@ etcdv3::AsyncUnlockResponse etcdv3::AsyncUnlockAction::ParseResponse() {
etcdv3::AsyncUpdateAction::AsyncUpdateAction(etcdv3::ActionParameters&& params) etcdv3::AsyncUpdateAction::AsyncUpdateAction(etcdv3::ActionParameters&& params)
: etcdv3::Action(std::move(params)) { : etcdv3::Action(std::move(params)) {
etcdv3::Transaction transaction(parameters.key); etcdv3::Transaction txn;
transaction.init_compare(CompareResult::GREATER, CompareTarget::VERSION); txn.add_compare_version(parameters.key, CompareResult::GREATER, 0); // exists
txn.add_success_put(parameters.key, parameters.value, parameters.lease_id);
transaction.setup_compare_and_swap_sequence(parameters.value, txn.add_failure_range(parameters.key);
parameters.lease_id);
response_reader = response_reader =
parameters.kv_stub->AsyncTxn(&context, *transaction.txn_request, &cq_); parameters.kv_stub->AsyncTxn(&context, *txn.txn_request, &cq_);
response_reader->Finish(&reply, &status, (void*) this); response_reader->Finish(&reply, &status, (void*) this);
} }
@ -1061,7 +1050,7 @@ etcdv3::AsyncTxnResponse etcdv3::AsyncUpdateAction::ParseResponse() {
txn_resp.set_error_message(status.error_message()); txn_resp.set_error_message(status.error_message());
} else { } else {
if (reply.succeeded()) { if (reply.succeeded()) {
txn_resp.ParseResponse(parameters.key, parameters.withPrefix, reply); txn_resp.ParseResponse(reply);
txn_resp.set_action(etcdv3::UPDATE_ACTION); txn_resp.set_action(etcdv3::UPDATE_ACTION);
} else { } else {
txn_resp.set_error_code(etcdv3::ERROR_KEY_NOT_FOUND); txn_resp.set_error_code(etcdv3::ERROR_KEY_NOT_FOUND);
@ -1084,22 +1073,8 @@ etcdv3::AsyncWatchAction::AsyncWatchAction(etcdv3::ActionParameters&& params)
WatchRequest watch_req; WatchRequest watch_req;
WatchCreateRequest watch_create_req; WatchCreateRequest watch_create_req;
detail::make_request_with_ranges(watch_create_req, parameters.key,
if (!parameters.withPrefix) { parameters.range_end, parameters.withPrefix);
watch_create_req.set_key(parameters.key);
} else {
if (parameters.key.empty()) {
watch_create_req.set_key(etcdv3::NUL);
watch_create_req.set_range_end(etcdv3::NUL);
} else {
watch_create_req.set_key(parameters.key);
watch_create_req.set_range_end(detail::string_plus_one(parameters.key));
}
}
if (!parameters.range_end.empty()) {
watch_create_req.set_range_end(parameters.range_end);
}
watch_create_req.set_prev_kv(true); watch_create_req.set_prev_kv(true);
watch_create_req.set_start_revision(parameters.revision); watch_create_req.set_start_revision(parameters.revision);
watch_create_req.set_watch_id(this->watch_id); watch_create_req.set_watch_id(this->watch_id);

View File

@ -2,11 +2,7 @@
#include "proto/rpc.grpc.pb.h" #include "proto/rpc.grpc.pb.h"
using etcdserverpb::Compare; #include "etcd/v3/Action.hpp"
using etcdserverpb::DeleteRangeRequest;
using etcdserverpb::PutRequest;
using etcdserverpb::RangeRequest;
using etcdserverpb::RequestOp;
namespace etcdv3 { namespace etcdv3 {
@ -28,185 +24,331 @@ etcdv3::Transaction::Transaction() {
txn_request.reset(new etcdserverpb::TxnRequest{}); txn_request.reset(new etcdserverpb::TxnRequest{});
} }
etcdv3::Transaction::Transaction(const std::string& key) : key(key) { etcdv3::Transaction::~Transaction() {}
txn_request.reset(new etcdserverpb::TxnRequest{});
void etcdv3::Transaction::add_compare(std::string const& key,
CompareTarget const& target,
CompareResult const& result,
Value const& target_value,
std::string const& range_end) {
switch (target) {
case CompareTarget::VERSION:
add_compare_version(key, result, target_value.version, range_end);
break;
case CompareTarget::CREATE:
add_compare_create(key, result, target_value.create_revision, range_end);
break;
case CompareTarget::MOD:
add_compare_mod(key, result, target_value.mod_revision, range_end);
break;
case CompareTarget::VALUE:
add_compare_value(key, result, target_value.value, range_end);
break;
case CompareTarget::LEASE:
add_compare_lease(key, result, target_value.lease, range_end);
break;
default:
// ignore invalid compare target
break;
}
} }
void etcdv3::Transaction::reset_key(std::string const& newkey) { key = newkey; } void etcdv3::Transaction::add_compare_version(std::string const& key,
int64_t const& version,
std::string const& range_end) {
this->add_compare_version(key, CompareResult::EQUAL, version, range_end);
}
void etcdv3::Transaction::init_compare(CompareResult result, void etcdv3::Transaction::add_compare_version(std::string const& key,
CompareTarget target) { CompareResult const& result,
Compare* compare = txn_request->add_compare(); int64_t const& version,
std::string const& range_end) {
auto compare = txn_request->add_compare();
compare->set_result(detail::to_compare_result(result)); compare->set_result(detail::to_compare_result(result));
compare->set_target(detail::to_compare_target(target)); compare->set_target(detail::to_compare_target(CompareTarget::VERSION));
compare->set_key(key); compare->set_key(key);
compare->set_version(version);
compare->set_version(0); compare->set_range_end(range_end);
} }
void etcdv3::Transaction::init_compare(std::string const& old_value, void etcdv3::Transaction::add_compare_create(std::string const& key,
CompareResult result, int64_t const& create_revision,
CompareTarget target) { std::string const& range_end) {
Compare* compare = txn_request->add_compare(); this->add_compare_create(key, CompareResult::EQUAL, create_revision,
range_end);
}
void etcdv3::Transaction::add_compare_create(std::string const& key,
CompareResult const& result,
int64_t const& create_revision,
std::string const& range_end) {
auto compare = txn_request->add_compare();
compare->set_result(detail::to_compare_result(result)); compare->set_result(detail::to_compare_result(result));
compare->set_target(detail::to_compare_target(target)); compare->set_target(detail::to_compare_target(CompareTarget::CREATE));
compare->set_key(key); compare->set_key(key);
compare->set_create_revision(create_revision);
compare->set_value(old_value); compare->set_range_end(range_end);
} }
void etcdv3::Transaction::init_compare(int64_t old_index, CompareResult result, void etcdv3::Transaction::add_compare_mod(std::string const& key,
CompareTarget target) { int64_t const& mod_revision,
Compare* compare = txn_request->add_compare(); std::string const& range_end) {
this->add_compare_mod(key, CompareResult::EQUAL, mod_revision, range_end);
}
void etcdv3::Transaction::add_compare_mod(std::string const& key,
CompareResult const& result,
int64_t const& mod_revision,
std::string const& range_end) {
auto compare = txn_request->add_compare();
compare->set_result(detail::to_compare_result(result)); compare->set_result(detail::to_compare_result(result));
compare->set_target(detail::to_compare_target(target)); compare->set_target(detail::to_compare_target(CompareTarget::MOD));
compare->set_key(key); compare->set_key(key);
compare->set_mod_revision(mod_revision);
compare->set_mod_revision(old_index); compare->set_range_end(range_end);
} }
/** void etcdv3::Transaction::add_compare_value(std::string const& key,
* get key on failure
*/
void etcdv3::Transaction::setup_basic_failure_operation(
std::string const& key) {
std::unique_ptr<RangeRequest> get_request(new RangeRequest());
get_request->set_key(key);
RequestOp* req_failure = txn_request->add_failure();
req_failure->set_allocated_request_range(get_request.release());
}
/**
* get key on failure, get key before put, modify and then get updated key
*/
void etcdv3::Transaction::setup_set_failure_operation(std::string const& key,
std::string const& value, std::string const& value,
int64_t leaseid) { std::string const& range_end) {
std::unique_ptr<PutRequest> put_request(new PutRequest()); this->add_compare_value(key, CompareResult::EQUAL, value, range_end);
put_request->set_key(key);
put_request->set_value(value);
put_request->set_prev_kv(true);
put_request->set_lease(leaseid);
RequestOp* req_failure = txn_request->add_failure();
req_failure->set_allocated_request_put(put_request.release());
std::unique_ptr<RangeRequest> get_request(new RangeRequest());
get_request->set_key(key);
req_failure = txn_request->add_failure();
req_failure->set_allocated_request_range(get_request.release());
} }
/** void etcdv3::Transaction::add_compare_value(std::string const& key,
* add key and then get new value of key CompareResult const& result,
*/
void etcdv3::Transaction::setup_basic_create_sequence(std::string const& key,
std::string const& value, std::string const& value,
int64_t leaseid) { std::string const& range_end) {
std::unique_ptr<PutRequest> put_request(new PutRequest()); auto compare = txn_request->add_compare();
put_request->set_key(key); compare->set_result(detail::to_compare_result(result));
put_request->set_value(value); compare->set_target(detail::to_compare_target(CompareTarget::VALUE));
put_request->set_prev_kv(true); compare->set_key(key);
put_request->set_lease(leaseid); compare->set_value(value);
RequestOp* req_success = txn_request->add_success(); compare->set_range_end(range_end);
req_success->set_allocated_request_put(put_request.release());
std::unique_ptr<RangeRequest> get_request(new RangeRequest());
get_request->set_key(key);
req_success = txn_request->add_success();
req_success->set_allocated_request_range(get_request.release());
} }
/** void etcdv3::Transaction::add_compare_lease(std::string const& key,
* get key value then modify and get new value int64_t const& lease,
*/ std::string const& range_end) {
void etcdv3::Transaction::setup_compare_and_swap_sequence( this->add_compare_lease(key, CompareResult::EQUAL, lease, range_end);
std::string const& value, int64_t leaseid) {
std::unique_ptr<PutRequest> put_request(new PutRequest());
put_request->set_key(key);
put_request->set_value(value);
put_request->set_prev_kv(true);
put_request->set_lease(leaseid);
RequestOp* req_success = txn_request->add_success();
req_success->set_allocated_request_put(put_request.release());
std::unique_ptr<RangeRequest> get_request(new RangeRequest());
get_request->set_key(key);
req_success = txn_request->add_success();
req_success->set_allocated_request_range(get_request.release());
} }
/** void etcdv3::Transaction::add_compare_lease(std::string const& key,
* get key, delete CompareResult const& result,
*/ int64_t const& lease,
void etcdv3::Transaction::setup_delete_sequence(std::string const& key, std::string const& range_end) {
auto compare = txn_request->add_compare();
compare->set_result(detail::to_compare_result(result));
compare->set_target(detail::to_compare_target(CompareTarget::LEASE));
compare->set_key(key);
compare->set_lease(lease);
compare->set_range_end(range_end);
}
void etcdv3::Transaction::add_success_range(std::string const& key,
std::string const& range_end, std::string const& range_end,
bool recursive) { bool const recursive,
std::unique_ptr<DeleteRangeRequest> del_request(new DeleteRangeRequest()); const int64_t limit) {
del_request->set_key(key); auto succ = txn_request->add_success();
del_request->set_prev_kv(true); auto get_request = succ->mutable_request_range();
if (recursive) { etcdv3::detail::make_request_with_ranges(*get_request, key, range_end,
del_request->set_range_end(range_end); recursive);
} get_request->set_limit(limit);
RequestOp* req_success = txn_request->add_success();
req_success->set_allocated_request_delete_range(del_request.release());
} }
/** void etcdv3::Transaction::add_success_put(std::string const& key,
* get key, delete std::string const& value,
*/ int64_t const leaseid,
void etcdv3::Transaction::setup_delete_failure_operation( const bool prev_kv) {
std::string const& key, std::string const& range_end, bool recursive) { auto succ = txn_request->add_success();
std::unique_ptr<RangeRequest> get_request(new RangeRequest()); auto put_request = succ->mutable_request_put();
std::unique_ptr<DeleteRangeRequest> del_request(new DeleteRangeRequest()); put_request->set_key(key);
get_request.reset(new RangeRequest()); put_request->set_value(value);
get_request->set_key(key); put_request->set_prev_kv(prev_kv);
if (recursive) { put_request->set_lease(leaseid);
get_request->set_range_end(range_end);
get_request->set_sort_target(
RangeRequest::SortTarget::RangeRequest_SortTarget_KEY);
get_request->set_sort_order(
RangeRequest::SortOrder::RangeRequest_SortOrder_ASCEND);
}
RequestOp* req_failure = txn_request->add_failure();
req_failure->set_allocated_request_range(get_request.release());
del_request.reset(new DeleteRangeRequest());
del_request->set_key(key);
if (recursive) {
del_request->set_range_end(range_end);
}
req_failure = txn_request->add_failure();
req_failure->set_allocated_request_delete_range(del_request.release());
} }
void etcdv3::Transaction::setup_compare_and_delete_operation( void etcdv3::Transaction::add_success_delete(std::string const& key,
std::string const& key) { std::string const& range_end,
std::unique_ptr<DeleteRangeRequest> del_request(new DeleteRangeRequest()); bool const recursive,
del_request->set_key(key); const bool prev_kv) {
del_request->set_prev_kv(true); auto succ = txn_request->add_success();
RequestOp* req_success = txn_request->add_success(); auto del_request = succ->mutable_request_delete_range();
req_success->set_allocated_request_delete_range(del_request.release()); etcdv3::detail::make_request_with_ranges(*del_request, key, range_end,
recursive);
del_request->set_prev_kv(prev_kv);
}
void etcdv3::Transaction::add_success_txn(
const std::shared_ptr<Transaction> txn) {
auto succ = txn_request->add_success();
auto txn_request = succ->mutable_request_txn();
txn_request->CopyFrom(*txn->txn_request);
}
void etcdv3::Transaction::add_failure_range(std::string const& key,
std::string const& range_end,
bool const recursive,
const int64_t limit) {
auto fail = txn_request->add_failure();
auto get_request = fail->mutable_request_range();
etcdv3::detail::make_request_with_ranges(*get_request, key, range_end,
recursive);
get_request->set_limit(limit);
}
void etcdv3::Transaction::add_failure_put(std::string const& key,
std::string const& value,
int64_t const leaseid,
const bool prev_kv) {
auto fail = txn_request->add_failure();
auto put_request = fail->mutable_request_put();
put_request->set_key(key);
put_request->set_value(value);
put_request->set_prev_kv(prev_kv);
put_request->set_lease(leaseid);
}
void etcdv3::Transaction::add_failure_delete(std::string const& key,
std::string const& range_end,
bool const recursive,
const bool prev_kv) {
auto fail = txn_request->add_failure();
auto del_request = fail->mutable_request_delete_range();
etcdv3::detail::make_request_with_ranges(*del_request, key, range_end,
recursive);
del_request->set_prev_kv(prev_kv);
}
void etcdv3::Transaction::add_failure_txn(
const std::shared_ptr<Transaction> txn) {
auto fail = txn_request->add_failure();
auto txn_request = fail->mutable_request_txn();
txn_request->CopyFrom(*txn->txn_request);
}
void etcdv3::Transaction::setup_compare_and_create(
std::string const& key, std::string const& prev_value,
std::string const& create_key, std::string const& value,
int64_t const leaseid) {
this->add_compare_value(key, CompareResult::EQUAL, prev_value);
this->add_success_put(create_key, value, leaseid);
this->add_failure_range(key);
}
void etcdv3::Transaction::setup_compare_or_create(std::string const& key,
std::string const& prev_value,
std::string const& create_key,
std::string const& value,
int64_t const leaseid) {
this->add_compare_value(key, CompareResult::NOT_EQUAL, prev_value);
this->add_success_put(create_key, value, leaseid);
this->add_failure_range(key);
}
void etcdv3::Transaction::setup_compare_and_swap(std::string const& key,
std::string const& prev_value,
std::string const& value,
int64_t const leaseid) {
this->add_compare_value(key, CompareResult::EQUAL, prev_value);
this->add_success_put(key, value, leaseid);
this->add_failure_range(key);
}
void etcdv3::Transaction::setup_compare_or_swap(std::string const& key,
std::string const& prev_value,
std::string const& value,
int64_t const leaseid) {
this->add_compare_value(key, CompareResult::NOT_EQUAL, prev_value);
this->add_success_put(key, value, leaseid);
this->add_failure_range(key);
}
void etcdv3::Transaction::setup_compare_and_delete(
std::string const& key, std::string const& prev_value,
std::string const& delete_key, std::string const& range_end,
const bool recursive) {
this->add_compare_value(key, CompareResult::EQUAL, prev_value);
this->add_success_delete(delete_key, range_end, recursive);
this->add_failure_range(key);
}
void etcdv3::Transaction::setup_compare_or_delete(std::string const& key,
std::string const& prev_value,
std::string const& delete_key,
std::string const& range_end,
const bool recursive) {
this->add_compare_value(key, CompareResult::NOT_EQUAL, prev_value);
this->add_success_delete(delete_key, range_end, recursive);
this->add_failure_range(key);
}
void etcdv3::Transaction::setup_compare_and_create(
std::string const& key, const int64_t prev_revision,
std::string const& create_key, std::string const& value,
int64_t const leaseid) {
this->add_compare_mod(key, CompareResult::EQUAL, prev_revision);
this->add_success_put(create_key, value, leaseid);
this->add_failure_range(key);
}
void etcdv3::Transaction::setup_compare_or_create(std::string const& key,
const int64_t prev_revision,
std::string const& create_key,
std::string const& value,
int64_t const leaseid) {
this->add_compare_mod(key, CompareResult::NOT_EQUAL, prev_revision);
this->add_success_put(create_key, value, leaseid);
this->add_failure_range(key);
}
void etcdv3::Transaction::setup_compare_and_swap(std::string const& key,
const int64_t prev_revision,
std::string const& value,
int64_t const leaseid) {
this->add_compare_mod(key, CompareResult::EQUAL, prev_revision);
this->add_success_put(key, value, leaseid);
this->add_failure_range(key);
}
void etcdv3::Transaction::setup_compare_or_swap(std::string const& key,
const int64_t prev_revision,
std::string const& value,
int64_t const leaseid) {
this->add_compare_mod(key, CompareResult::NOT_EQUAL, prev_revision);
this->add_success_put(key, value, leaseid);
this->add_failure_range(key);
}
void etcdv3::Transaction::setup_compare_and_delete(
std::string const& key, const int64_t prev_revision,
std::string const& delete_key, std::string const& range_end,
const bool recursive) {
this->add_compare_mod(key, CompareResult::EQUAL, prev_revision);
this->add_success_delete(delete_key, range_end, recursive);
this->add_failure_range(key);
}
void etcdv3::Transaction::setup_compare_or_delete(std::string const& key,
const int64_t prev_revision,
std::string const& delete_key,
std::string const& range_end,
const bool recursive) {
this->add_compare_mod(key, CompareResult::NOT_EQUAL, prev_revision);
this->add_success_delete(delete_key, range_end, recursive);
this->add_failure_range(key);
} }
void etcdv3::Transaction::setup_put(std::string const& key, void etcdv3::Transaction::setup_put(std::string const& key,
std::string const& value) { std::string const& value) {
std::unique_ptr<PutRequest> put_request(new PutRequest()); this->add_success_put(key, value);
put_request->set_key(key);
put_request->set_value(value);
put_request->set_prev_kv(false);
RequestOp* req_success = txn_request->add_success();
req_success->set_allocated_request_put(put_request.release());
} }
void etcdv3::Transaction::setup_delete(std::string const& key) { void etcdv3::Transaction::setup_delete(std::string const& key) {
std::unique_ptr<DeleteRangeRequest> del_request(new DeleteRangeRequest()); this->add_success_delete(key);
del_request->set_key(key);
del_request->set_prev_kv(false);
RequestOp* req_success = txn_request->add_success();
req_success->set_allocated_request_delete_range(del_request.release());
} }
etcdv3::Transaction::~Transaction() {} void etcdv3::Transaction::setup_delete(std::string const& key,
std::string const& range_end,
const bool recursive) {
this->add_success_delete(key, range_end, recursive);
}

View File

@ -50,13 +50,8 @@ TEST_CASE("add a new key") {
etcdv3::Transaction txn; etcdv3::Transaction txn;
// setup the conditions // setup the conditions
txn.reset_key("/test/x1"); txn.add_compare_value("/test/x1", "1");
txn.init_compare("1", etcdv3::CompareResult::EQUAL, txn.add_compare_value("/test/x2", "2");
etcdv3::CompareTarget::VALUE);
txn.reset_key("/test/x2");
txn.init_compare("2", etcdv3::CompareResult::EQUAL,
etcdv3::CompareTarget::VALUE);
txn.setup_put("/test/x1", "111"); txn.setup_put("/test/x1", "111");
txn.setup_delete("/test/x2"); txn.setup_delete("/test/x2");
@ -84,6 +79,51 @@ TEST_CASE("add a new key") {
} }
} }
TEST_CASE("fetch & add") {
etcd::Client etcd(etcd_url);
etcd.rmdir("/test", true).wait();
etcd.set("/test/key", "0").wait();
auto fetch_and_add = [](etcd::Client& client,
std::string const& key) -> void {
auto value = stoi(client.get(key).get().value().as_string());
while (true) {
auto txn = etcdv3::Transaction();
txn.setup_compare_and_swap(key, std::to_string(value),
std::to_string(value + 1));
etcd::Response resp = client.txn(txn).get();
if (resp.is_ok()) {
break;
}
value = stoi(resp.value().as_string());
}
};
// run 1000 times
const size_t rounds = 100;
std::atomic_size_t counter(0);
std::vector<std::thread> threads;
for (size_t i = 0; i < 10; ++i) {
threads.emplace_back([&]() {
while (counter.fetch_add(1) < rounds) {
fetch_and_add(etcd, "/test/key");
}
});
}
for (auto& thr : threads) {
thr.join();
}
// check the value
{
etcd::Response resp = etcd.get("/test/key").get();
REQUIRE(0 == resp.error_code());
CHECK(resp.value().as_string() == std::to_string(rounds));
}
}
TEST_CASE("cleanup") { TEST_CASE("cleanup") {
etcd::Client etcd(etcd_url); etcd::Client etcd(etcd_url);
REQUIRE(0 == etcd.rmdir("/test", true).get().error_code()); REQUIRE(0 == etcd.rmdir("/test", true).get().error_code());