Refactored transaction initialization.

cleaned up and added comments for methods in transaction class
This commit is contained in:
lampayan 2016-06-21 15:52:19 +02:00
parent fe5396e437
commit b086ee9d72
3 changed files with 60 additions and 49 deletions

View File

@ -2,6 +2,8 @@
#define __ETCD_CLIENT_HPP__ #define __ETCD_CLIENT_HPP__
#include "etcd/Response.hpp" #include "etcd/Response.hpp"
#include "v3/include/Transaction.hpp"
#include "v3/include/AsyncTxnResponse.hpp"
#include <cpprest/http_client.h> #include <cpprest/http_client.h>
#include <string> #include <string>
@ -153,7 +155,11 @@ namespace etcd
pplx::task<etcd::Response> send_asyncdelete(std::string const & key, bool recursive); pplx::task<etcd::Response> send_asyncdelete(std::string const & key, bool recursive);
pplx::task<etcd::Response> send_asyncrm_if(std::string const &key, std::string const &old_value); pplx::task<etcd::Response> send_asyncrm_if(std::string const &key, std::string const &old_value);
pplx::task<etcd::Response> send_asyncrm_if(std::string const &key, int old_index); pplx::task<etcd::Response> send_asyncrm_if(std::string const &key, int old_index);
};
private:
std::shared_ptr<etcdv3::AsyncTxnResponse> initiate_transaction(const std::string &operation,
etcdv3::Transaction& transaction);
};

View File

@ -70,16 +70,19 @@ pplx::task<etcd::Response> etcd::Client::modify(std::string const & key, std::st
return send_asyncmodify(key,value); return send_asyncmodify(key,value);
} }
pplx::task<etcd::Response> etcd::Client::modify_if(std::string const & key, std::string const & value, std::string const & old_value) pplx::task<etcd::Response> etcd::Client::modify_if(std::string const & key, std::string const & value, std::string const & old_value)
{ {
return send_asyncmodify_if(key, value, old_value); return send_asyncmodify_if(key, value, old_value);
} }
pplx::task<etcd::Response> etcd::Client::modify_if(std::string const & key, std::string const & value, int old_index) pplx::task<etcd::Response> etcd::Client::modify_if(std::string const & key, std::string const & value, int old_index)
{ {
return send_asyncmodify_if(key, value, old_index); return send_asyncmodify_if(key, value, old_index);
} }
pplx::task<etcd::Response> etcd::Client::rm(std::string const & key) pplx::task<etcd::Response> etcd::Client::rm(std::string const & key)
{ {
return send_asyncdelete(key,false); return send_asyncdelete(key,false);
@ -91,24 +94,28 @@ pplx::task<etcd::Response> etcd::Client::rm_if(std::string const & key, std::str
return send_asyncrm_if(key, old_value); return send_asyncrm_if(key, old_value);
} }
pplx::task<etcd::Response> etcd::Client::rm_if(std::string const & key, int old_index) pplx::task<etcd::Response> etcd::Client::rm_if(std::string const & key, int old_index)
{ {
return send_asyncrm_if(key, old_index); return send_asyncrm_if(key, old_index);
} }
pplx::task<etcd::Response> etcd::Client::mkdir(std::string const & key) pplx::task<etcd::Response> etcd::Client::mkdir(std::string const & key)
{ {
web::http::uri_builder uri("/v2/keys" + key); web::http::uri_builder uri("/v2/keys" + key);
return send_put_request(uri, "dir", "true"); return send_put_request(uri, "dir", "true");
} }
pplx::task<etcd::Response> etcd::Client::rmdir(std::string const & key, bool recursive) pplx::task<etcd::Response> etcd::Client::rmdir(std::string const & key, bool recursive)
{ {
return send_asyncdelete(key,recursive); return send_asyncdelete(key,recursive);
} }
pplx::task<etcd::Response> etcd::Client::ls(std::string const & key) pplx::task<etcd::Response> etcd::Client::ls(std::string const & key)
{ {
@ -119,6 +126,7 @@ pplx::task<etcd::Response> etcd::Client::ls(std::string const & key)
return send_asyncget(key,range_end); return send_asyncget(key,range_end);
} }
pplx::task<etcd::Response> etcd::Client::watch(std::string const & key, bool recursive) pplx::task<etcd::Response> etcd::Client::watch(std::string const & key, bool recursive)
{ {
web::http::uri_builder uri("/v2/keys" + key); web::http::uri_builder uri("/v2/keys" + key);
@ -128,6 +136,7 @@ pplx::task<etcd::Response> etcd::Client::watch(std::string const & key, bool rec
return send_get_request(uri); return send_get_request(uri);
} }
pplx::task<etcd::Response> etcd::Client::watch(std::string const & key, int fromIndex, bool recursive) pplx::task<etcd::Response> etcd::Client::watch(std::string const & key, int fromIndex, bool recursive)
{ {
web::http::uri_builder uri("/v2/keys" + key); web::http::uri_builder uri("/v2/keys" + key);
@ -138,6 +147,17 @@ pplx::task<etcd::Response> etcd::Client::watch(std::string const & key, int from
return send_get_request(uri); return send_get_request(uri);
} }
std::shared_ptr<etcdv3::AsyncTxnResponse> etcd::Client::initiate_transaction(const std::string &operation,
etcdv3::Transaction& transaction)
{
std::shared_ptr<etcdv3::AsyncTxnResponse> call(new etcdv3::AsyncTxnResponse(operation));
call->response_reader = stub_->AsyncTxn(&call->context, transaction.txn_request, &call->cq_);
call->response_reader->Finish(&call->reply, &call->status, (void*) (call.get()));
return call;
}
pplx::task<etcd::Response> etcd::Client::send_asyncadd(std::string const & key, std::string const & value) pplx::task<etcd::Response> etcd::Client::send_asyncadd(std::string const & key, std::string const & value)
{ {
etcdv3::Transaction transaction(key); etcdv3::Transaction transaction(key);
@ -147,15 +167,11 @@ pplx::task<etcd::Response> etcd::Client::send_asyncadd(std::string const & key,
transaction.setup_basic_failure_operation(key); transaction.setup_basic_failure_operation(key);
transaction.setup_basic_create_sequence(key, value); transaction.setup_basic_create_sequence(key, value);
std::shared_ptr<etcdv3::AsyncTxnResponse> call = initiate_transaction("create", transaction);
//to be done in one method, where txn_request is a field of a class
std::shared_ptr<etcdv3::AsyncTxnResponse> call(new etcdv3::AsyncTxnResponse("create"));
call->response_reader = stub_->AsyncTxn(&call->context,transaction.txn_request,&call->cq_);
call->response_reader->Finish(&call->reply, &call->status, (void*)call.get());
return Response::create(call); return Response::create(call);
} }
pplx::task<etcd::Response> etcd::Client::send_asyncmodify_if(std::string const & key, std::string const & value, std::string const & old_value) pplx::task<etcd::Response> etcd::Client::send_asyncmodify_if(std::string const & key, std::string const & value, std::string const & old_value)
{ {
etcdv3::Transaction transaction(key); etcdv3::Transaction transaction(key);
@ -165,13 +181,11 @@ pplx::task<etcd::Response> etcd::Client::send_asyncmodify_if(std::string const &
transaction.setup_basic_failure_operation(key); transaction.setup_basic_failure_operation(key);
transaction.setup_compare_and_swap_sequence(value); transaction.setup_compare_and_swap_sequence(value);
std::shared_ptr<etcdv3::AsyncTxnResponse> call(new etcdv3::AsyncTxnResponse("compareAndSwap")); std::shared_ptr<etcdv3::AsyncTxnResponse> call = initiate_transaction("compareAndSwap", transaction);
call->response_reader = stub_->AsyncTxn(&call->context,transaction.txn_request,&call->cq_);
call->response_reader->Finish(&call->reply, &call->status, (void*)call.get());
return Response::create(call); return Response::create(call);
} }
pplx::task<etcd::Response> etcd::Client::send_asyncmodify_if(std::string const & key, std::string const & value, int old_index) pplx::task<etcd::Response> etcd::Client::send_asyncmodify_if(std::string const & key, std::string const & value, int old_index)
{ {
etcdv3::Transaction transaction(key); etcdv3::Transaction transaction(key);
@ -181,10 +195,7 @@ pplx::task<etcd::Response> etcd::Client::send_asyncmodify_if(std::string const &
transaction.setup_basic_failure_operation(key); transaction.setup_basic_failure_operation(key);
transaction.setup_compare_and_swap_sequence(value); transaction.setup_compare_and_swap_sequence(value);
std::shared_ptr<etcdv3::AsyncTxnResponse> call(new etcdv3::AsyncTxnResponse("compareAndSwap")); std::shared_ptr<etcdv3::AsyncTxnResponse> call = initiate_transaction("compareAndSwap", transaction);
call->response_reader = stub_->AsyncTxn(&call->context,transaction.txn_request,&call->cq_);
call->response_reader->Finish(&call->reply, &call->status, (void*)call.get());
return Response::create(call); return Response::create(call);
} }
@ -198,10 +209,7 @@ pplx::task<etcd::Response> etcd::Client::send_asyncmodify(std::string const & ke
transaction.setup_basic_failure_operation(key); transaction.setup_basic_failure_operation(key);
transaction.setup_compare_and_swap_sequence(value); transaction.setup_compare_and_swap_sequence(value);
std::shared_ptr<etcdv3::AsyncTxnResponse> call(new etcdv3::AsyncTxnResponse("update")); std::shared_ptr<etcdv3::AsyncTxnResponse> call = initiate_transaction("update", transaction);
call->response_reader = stub_->AsyncTxn(&call->context,transaction.txn_request,&call->cq_);
call->response_reader->Finish(&call->reply, &call->status, (void*)call.get());
return Response::create(call); return Response::create(call);
} }
@ -234,13 +242,11 @@ pplx::task<etcd::Response> etcd::Client::send_asyncput(std::string const & key,
transaction.setup_set_failure_operation(key, value); transaction.setup_set_failure_operation(key, value);
transaction.setup_basic_create_sequence(key, value); transaction.setup_basic_create_sequence(key, value);
std::shared_ptr<etcdv3::AsyncTxnResponse> call(new etcdv3::AsyncTxnResponse("set")); std::shared_ptr<etcdv3::AsyncTxnResponse> call = initiate_transaction("set", transaction);
call->response_reader = stub_->AsyncTxn(&call->context,transaction.txn_request,&call->cq_);
call->response_reader->Finish(&call->reply, &call->status, (void*)call.get());
return Response::create(call); return Response::create(call);
} }
pplx::task<etcd::Response> etcd::Client::send_asyncdelete(std::string const & key, bool recursive) pplx::task<etcd::Response> etcd::Client::send_asyncdelete(std::string const & key, bool recursive)
{ {
etcdv3::Transaction transaction(key); etcdv3::Transaction transaction(key);
@ -257,29 +263,25 @@ pplx::task<etcd::Response> etcd::Client::send_asyncdelete(std::string const & ke
transaction.setup_delete_sequence(key, range_end, recursive); transaction.setup_delete_sequence(key, range_end, recursive);
transaction.setup_delete_failure_operation(key, range_end, recursive); transaction.setup_delete_failure_operation(key, range_end, recursive);
std::shared_ptr<etcdv3::AsyncTxnResponse> call(new etcdv3::AsyncTxnResponse("delete")); std::shared_ptr<etcdv3::AsyncTxnResponse> call = initiate_transaction("delete", transaction);
call->response_reader = stub_->AsyncTxn(&call->context,transaction.txn_request,&call->cq_);
call->response_reader->Finish(&call->reply, &call->status, (void*)call.get());
return Response::create(call); return Response::create(call);
} }
pplx::task<etcd::Response> etcd::Client::send_asyncrm_if(std::string const &key, std::string const &old_value) pplx::task<etcd::Response> etcd::Client::send_asyncrm_if(std::string const &key, std::string const &old_value)
{ {
etcdv3::Transaction transaction(key); etcdv3::Transaction transaction(key);
transaction.init_compare(old_value, Compare::CompareResult::Compare_CompareResult_EQUAL, transaction.init_compare(old_value, Compare::CompareResult::Compare_CompareResult_EQUAL,
Compare::CompareTarget::Compare_CompareTarget_VALUE); Compare::CompareTarget::Compare_CompareTarget_VALUE);
transaction.setup_compare_and_delete_operation(key); transaction.setup_compare_and_delete_operation(key);
transaction.setup_basic_failure_operation(key); transaction.setup_basic_failure_operation(key);
std::shared_ptr<etcdv3::AsyncTxnResponse> call(new etcdv3::AsyncTxnResponse("compareAndDelete"));
call->response_reader = stub_->AsyncTxn(&call->context,transaction.txn_request,&call->cq_);
call->response_reader->Finish(&call->reply, &call->status, (void*)call.get());
std::shared_ptr<etcdv3::AsyncTxnResponse> call = initiate_transaction("compareAndDelete", transaction);
return Response::create(call); return Response::create(call);
} }
pplx::task<etcd::Response> etcd::Client::send_asyncrm_if(std::string const &key, int old_index) { pplx::task<etcd::Response> etcd::Client::send_asyncrm_if(std::string const &key, int old_index) {
etcdv3::Transaction transaction(key); etcdv3::Transaction transaction(key);
transaction.init_compare(old_index, Compare::CompareResult::Compare_CompareResult_EQUAL, transaction.init_compare(old_index, Compare::CompareResult::Compare_CompareResult_EQUAL,
@ -288,11 +290,6 @@ pplx::task<etcd::Response> etcd::Client::send_asyncrm_if(std::string const &key,
transaction.setup_compare_and_delete_operation(key); transaction.setup_compare_and_delete_operation(key);
transaction.setup_basic_failure_operation(key); transaction.setup_basic_failure_operation(key);
std::shared_ptr<etcdv3::AsyncTxnResponse> call(new etcdv3::AsyncTxnResponse("compareAndDelete")); std::shared_ptr<etcdv3::AsyncTxnResponse> call = initiate_transaction("compareAndDelete", transaction);
call->response_reader = stub_->AsyncTxn(&call->context,transaction.txn_request,&call->cq_);
call->response_reader->Finish(&call->reply, &call->status, (void*)call.get());
return Response::create(call); return Response::create(call);
} }

View File

@ -14,12 +14,9 @@ using etcdserverpb::RequestOp;
using etcdserverpb::DeleteRangeRequest; using etcdserverpb::DeleteRangeRequest;
etcdv3::Transaction::Transaction() { etcdv3::Transaction::Transaction() {
// TODO Auto-generated constructor stub
} }
etcdv3::Transaction::Transaction(const std::string& key) : key(key) { etcdv3::Transaction::Transaction(const std::string& key) : key(key) {
} }
void etcdv3::Transaction::init_compare(Compare::CompareResult result, Compare::CompareTarget target){ void etcdv3::Transaction::init_compare(Compare::CompareResult result, Compare::CompareTarget target){
@ -49,6 +46,9 @@ void etcdv3::Transaction::init_compare(int old_index, Compare::CompareResult res
compare->set_mod_revision(old_index); compare->set_mod_revision(old_index);
} }
/**
* get key on failure
*/
void etcdv3::Transaction::setup_basic_failure_operation(std::string const& key) { void etcdv3::Transaction::setup_basic_failure_operation(std::string const& key) {
std::unique_ptr<RangeRequest> get_request(new RangeRequest()); //then this can be just a raw pointer std::unique_ptr<RangeRequest> get_request(new RangeRequest()); //then this can be just a raw pointer
get_request->set_key(key); get_request->set_key(key);
@ -56,6 +56,9 @@ void etcdv3::Transaction::setup_basic_failure_operation(std::string const& key)
req_failure->set_allocated_request_range(get_request.release()); //why not get()? req_failure->set_allocated_request_range(get_request.release()); //why not get()?
} }
/**
* get key on failure, get key before put, modify and then get updated key
*/
void etcdv3::Transaction::setup_set_failure_operation(std::string const &key, std::string const &value) { void etcdv3::Transaction::setup_set_failure_operation(std::string const &key, std::string const &value) {
std::unique_ptr<RangeRequest> get_request(new RangeRequest()); std::unique_ptr<RangeRequest> get_request(new RangeRequest());
get_request->set_key(key); get_request->set_key(key);
@ -74,6 +77,9 @@ void etcdv3::Transaction::setup_set_failure_operation(std::string const &key, st
req_failure->set_allocated_request_range(get_request.release()); req_failure->set_allocated_request_range(get_request.release());
} }
/**
* get key, delete
*/
void etcdv3::Transaction::setup_delete_failure_operation(std::string const &key, std::string const &range_end, bool recursive) { void etcdv3::Transaction::setup_delete_failure_operation(std::string const &key, std::string const &range_end, bool recursive) {
std::unique_ptr<RangeRequest> get_request(new RangeRequest()); std::unique_ptr<RangeRequest> get_request(new RangeRequest());
std::unique_ptr<DeleteRangeRequest> del_request(new DeleteRangeRequest()); std::unique_ptr<DeleteRangeRequest> del_request(new DeleteRangeRequest());
@ -136,14 +142,17 @@ void etcdv3::Transaction::setup_compare_and_swap_sequence(std::string const& val
req_success->set_allocated_request_range(get_request.release()); req_success->set_allocated_request_range(get_request.release());
} }
/**
* get key, delete
*/
void etcdv3::Transaction::setup_delete_sequence(std::string const &key, std::string const &range_end, bool recursive) { void etcdv3::Transaction::setup_delete_sequence(std::string const &key, std::string const &range_end, bool recursive) {
std::unique_ptr<RangeRequest> get_request(new RangeRequest()); std::unique_ptr<RangeRequest> get_request(new RangeRequest());
get_request->set_key(key); get_request->set_key(key);
if(recursive) if(recursive)
{ {
get_request->set_range_end(range_end); get_request->set_range_end(range_end);
get_request->set_sort_target(RangeRequest::SortTarget::RangeRequest_SortTarget_KEY); get_request->set_sort_target(RangeRequest::SortTarget::RangeRequest_SortTarget_KEY);
get_request->set_sort_order(RangeRequest::SortOrder::RangeRequest_SortOrder_ASCEND); get_request->set_sort_order(RangeRequest::SortOrder::RangeRequest_SortOrder_ASCEND);
} }
RequestOp* req_success = txn_request.add_success(); RequestOp* req_success = txn_request.add_success();
@ -153,7 +162,7 @@ void etcdv3::Transaction::setup_delete_sequence(std::string const &key, std::str
del_request->set_key(key); del_request->set_key(key);
if(recursive) if(recursive)
{ {
del_request->set_range_end(range_end); del_request->set_range_end(range_end);
} }
req_success = txn_request.add_success(); req_success = txn_request.add_success();
@ -174,5 +183,4 @@ void etcdv3::Transaction::setup_compare_and_delete_operation(std::string const&
etcdv3::Transaction::~Transaction() { etcdv3::Transaction::~Transaction() {
// TODO Auto-generated destructor stub
} }