diff --git a/etcd/Client.hpp b/etcd/Client.hpp index b74499b..93c819c 100644 --- a/etcd/Client.hpp +++ b/etcd/Client.hpp @@ -2,6 +2,8 @@ #define __ETCD_CLIENT_HPP__ #include "etcd/Response.hpp" +#include "v3/include/Transaction.hpp" +#include "v3/include/AsyncTxnResponse.hpp" #include #include @@ -153,7 +155,11 @@ namespace etcd pplx::task send_asyncdelete(std::string const & key, bool recursive); pplx::task send_asyncrm_if(std::string const &key, std::string const &old_value); pplx::task send_asyncrm_if(std::string const &key, int old_index); - }; + +private: + std::shared_ptr initiate_transaction(const std::string &operation, + etcdv3::Transaction& transaction); +}; diff --git a/src/Client.cpp b/src/Client.cpp index 0f6eded..621d8ef 100644 --- a/src/Client.cpp +++ b/src/Client.cpp @@ -70,16 +70,19 @@ pplx::task etcd::Client::modify(std::string const & key, std::st return send_asyncmodify(key,value); } + pplx::task etcd::Client::modify_if(std::string const & key, std::string const & value, std::string const & old_value) { return send_asyncmodify_if(key, value, old_value); } + pplx::task etcd::Client::modify_if(std::string const & key, std::string const & value, int old_index) { return send_asyncmodify_if(key, value, old_index); } + pplx::task etcd::Client::rm(std::string const & key) { return send_asyncdelete(key,false); @@ -91,24 +94,28 @@ pplx::task etcd::Client::rm_if(std::string const & key, std::str return send_asyncrm_if(key, old_value); } + pplx::task etcd::Client::rm_if(std::string const & key, int old_index) { return send_asyncrm_if(key, old_index); } + pplx::task etcd::Client::mkdir(std::string const & key) { web::http::uri_builder uri("/v2/keys" + key); return send_put_request(uri, "dir", "true"); } + pplx::task etcd::Client::rmdir(std::string const & key, bool recursive) { return send_asyncdelete(key,recursive); } + pplx::task etcd::Client::ls(std::string const & key) { @@ -119,6 +126,7 @@ pplx::task etcd::Client::ls(std::string const & key) return send_asyncget(key,range_end); } + pplx::task etcd::Client::watch(std::string const & key, bool recursive) { web::http::uri_builder uri("/v2/keys" + key); @@ -128,6 +136,7 @@ pplx::task etcd::Client::watch(std::string const & key, bool rec return send_get_request(uri); } + pplx::task etcd::Client::watch(std::string const & key, int fromIndex, bool recursive) { web::http::uri_builder uri("/v2/keys" + key); @@ -138,6 +147,17 @@ pplx::task etcd::Client::watch(std::string const & key, int from return send_get_request(uri); } + +std::shared_ptr etcd::Client::initiate_transaction(const std::string &operation, + etcdv3::Transaction& transaction) +{ + std::shared_ptr call(new etcdv3::AsyncTxnResponse(operation)); + call->response_reader = stub_->AsyncTxn(&call->context, transaction.txn_request, &call->cq_); + call->response_reader->Finish(&call->reply, &call->status, (void*) (call.get())); + return call; +} + + pplx::task etcd::Client::send_asyncadd(std::string const & key, std::string const & value) { etcdv3::Transaction transaction(key); @@ -147,15 +167,11 @@ pplx::task etcd::Client::send_asyncadd(std::string const & key, transaction.setup_basic_failure_operation(key); transaction.setup_basic_create_sequence(key, value); - - //to be done in one method, where txn_request is a field of a class - std::shared_ptr call(new etcdv3::AsyncTxnResponse("create")); - call->response_reader = stub_->AsyncTxn(&call->context,transaction.txn_request,&call->cq_); - call->response_reader->Finish(&call->reply, &call->status, (void*)call.get()); - + std::shared_ptr call = initiate_transaction("create", transaction); return Response::create(call); } + pplx::task etcd::Client::send_asyncmodify_if(std::string const & key, std::string const & value, std::string const & old_value) { etcdv3::Transaction transaction(key); @@ -164,14 +180,12 @@ pplx::task etcd::Client::send_asyncmodify_if(std::string const & transaction.setup_basic_failure_operation(key); transaction.setup_compare_and_swap_sequence(value); - - std::shared_ptr call(new etcdv3::AsyncTxnResponse("compareAndSwap")); - call->response_reader = stub_->AsyncTxn(&call->context,transaction.txn_request,&call->cq_); - call->response_reader->Finish(&call->reply, &call->status, (void*)call.get()); + std::shared_ptr call = initiate_transaction("compareAndSwap", transaction); return Response::create(call); } + pplx::task etcd::Client::send_asyncmodify_if(std::string const & key, std::string const & value, int old_index) { etcdv3::Transaction transaction(key); @@ -180,11 +194,8 @@ pplx::task etcd::Client::send_asyncmodify_if(std::string const & transaction.setup_basic_failure_operation(key); transaction.setup_compare_and_swap_sequence(value); - - std::shared_ptr call(new etcdv3::AsyncTxnResponse("compareAndSwap")); - call->response_reader = stub_->AsyncTxn(&call->context,transaction.txn_request,&call->cq_); - call->response_reader->Finish(&call->reply, &call->status, (void*)call.get()); + std::shared_ptr call = initiate_transaction("compareAndSwap", transaction); return Response::create(call); } @@ -197,11 +208,8 @@ pplx::task etcd::Client::send_asyncmodify(std::string const & ke transaction.setup_basic_failure_operation(key); transaction.setup_compare_and_swap_sequence(value); - - std::shared_ptr call(new etcdv3::AsyncTxnResponse("update")); - call->response_reader = stub_->AsyncTxn(&call->context,transaction.txn_request,&call->cq_); - call->response_reader->Finish(&call->reply, &call->status, (void*)call.get()); + std::shared_ptr call = initiate_transaction("update", transaction); return Response::create(call); } @@ -233,14 +241,12 @@ pplx::task etcd::Client::send_asyncput(std::string const & key, transaction.setup_set_failure_operation(key, value); transaction.setup_basic_create_sequence(key, value); - - std::shared_ptr call(new etcdv3::AsyncTxnResponse("set")); - call->response_reader = stub_->AsyncTxn(&call->context,transaction.txn_request,&call->cq_); - call->response_reader->Finish(&call->reply, &call->status, (void*)call.get()); + std::shared_ptr call = initiate_transaction("set", transaction); return Response::create(call); } + pplx::task etcd::Client::send_asyncdelete(std::string const & key, bool recursive) { etcdv3::Transaction transaction(key); @@ -257,29 +263,25 @@ pplx::task etcd::Client::send_asyncdelete(std::string const & ke transaction.setup_delete_sequence(key, range_end, recursive); transaction.setup_delete_failure_operation(key, range_end, recursive); - std::shared_ptr call(new etcdv3::AsyncTxnResponse("delete")); - call->response_reader = stub_->AsyncTxn(&call->context,transaction.txn_request,&call->cq_); - call->response_reader->Finish(&call->reply, &call->status, (void*)call.get()); - + std::shared_ptr call = initiate_transaction("delete", transaction); return Response::create(call); } + pplx::task etcd::Client::send_asyncrm_if(std::string const &key, std::string const &old_value) { - etcdv3::Transaction transaction(key); - transaction.init_compare(old_value, Compare::CompareResult::Compare_CompareResult_EQUAL, + etcdv3::Transaction transaction(key); + transaction.init_compare(old_value, Compare::CompareResult::Compare_CompareResult_EQUAL, Compare::CompareTarget::Compare_CompareTarget_VALUE); - transaction.setup_compare_and_delete_operation(key); - transaction.setup_basic_failure_operation(key); - - std::shared_ptr call(new etcdv3::AsyncTxnResponse("compareAndDelete")); - call->response_reader = stub_->AsyncTxn(&call->context,transaction.txn_request,&call->cq_); - call->response_reader->Finish(&call->reply, &call->status, (void*)call.get()); + transaction.setup_compare_and_delete_operation(key); + transaction.setup_basic_failure_operation(key); + std::shared_ptr call = initiate_transaction("compareAndDelete", transaction); return Response::create(call); } + pplx::task etcd::Client::send_asyncrm_if(std::string const &key, int old_index) { etcdv3::Transaction transaction(key); transaction.init_compare(old_index, Compare::CompareResult::Compare_CompareResult_EQUAL, @@ -288,11 +290,6 @@ pplx::task etcd::Client::send_asyncrm_if(std::string const &key, transaction.setup_compare_and_delete_operation(key); transaction.setup_basic_failure_operation(key); - std::shared_ptr call(new etcdv3::AsyncTxnResponse("compareAndDelete")); - call->response_reader = stub_->AsyncTxn(&call->context,transaction.txn_request,&call->cq_); - call->response_reader->Finish(&call->reply, &call->status, (void*)call.get()); - + std::shared_ptr call = initiate_transaction("compareAndDelete", transaction); return Response::create(call); } - - diff --git a/v3/src/Transaction.cpp b/v3/src/Transaction.cpp index 5afc947..1bed942 100644 --- a/v3/src/Transaction.cpp +++ b/v3/src/Transaction.cpp @@ -14,12 +14,9 @@ using etcdserverpb::RequestOp; using etcdserverpb::DeleteRangeRequest; etcdv3::Transaction::Transaction() { - // TODO Auto-generated constructor stub - } etcdv3::Transaction::Transaction(const std::string& key) : key(key) { - } void etcdv3::Transaction::init_compare(Compare::CompareResult result, Compare::CompareTarget target){ @@ -49,6 +46,9 @@ void etcdv3::Transaction::init_compare(int old_index, Compare::CompareResult res compare->set_mod_revision(old_index); } +/** + * get key on failure + */ void etcdv3::Transaction::setup_basic_failure_operation(std::string const& key) { std::unique_ptr get_request(new RangeRequest()); //then this can be just a raw pointer get_request->set_key(key); @@ -56,6 +56,9 @@ void etcdv3::Transaction::setup_basic_failure_operation(std::string const& key) req_failure->set_allocated_request_range(get_request.release()); //why not get()? } +/** + * get key on failure, get key before put, modify and then get updated key + */ void etcdv3::Transaction::setup_set_failure_operation(std::string const &key, std::string const &value) { std::unique_ptr get_request(new RangeRequest()); get_request->set_key(key); @@ -74,6 +77,9 @@ void etcdv3::Transaction::setup_set_failure_operation(std::string const &key, st req_failure->set_allocated_request_range(get_request.release()); } +/** + * get key, delete + */ void etcdv3::Transaction::setup_delete_failure_operation(std::string const &key, std::string const &range_end, bool recursive) { std::unique_ptr get_request(new RangeRequest()); std::unique_ptr del_request(new DeleteRangeRequest()); @@ -136,14 +142,17 @@ void etcdv3::Transaction::setup_compare_and_swap_sequence(std::string const& val req_success->set_allocated_request_range(get_request.release()); } +/** + * get key, delete + */ void etcdv3::Transaction::setup_delete_sequence(std::string const &key, std::string const &range_end, bool recursive) { std::unique_ptr get_request(new RangeRequest()); get_request->set_key(key); if(recursive) { - get_request->set_range_end(range_end); - get_request->set_sort_target(RangeRequest::SortTarget::RangeRequest_SortTarget_KEY); - get_request->set_sort_order(RangeRequest::SortOrder::RangeRequest_SortOrder_ASCEND); + get_request->set_range_end(range_end); + get_request->set_sort_target(RangeRequest::SortTarget::RangeRequest_SortTarget_KEY); + get_request->set_sort_order(RangeRequest::SortOrder::RangeRequest_SortOrder_ASCEND); } RequestOp* req_success = txn_request.add_success(); @@ -153,7 +162,7 @@ void etcdv3::Transaction::setup_delete_sequence(std::string const &key, std::str del_request->set_key(key); if(recursive) { - del_request->set_range_end(range_end); + del_request->set_range_end(range_end); } req_success = txn_request.add_success(); @@ -174,5 +183,4 @@ void etcdv3::Transaction::setup_compare_and_delete_operation(std::string const& etcdv3::Transaction::~Transaction() { - // TODO Auto-generated destructor stub }