From eedbcd4b709717d2988d040d38a37ffe97a8869e Mon Sep 17 00:00:00 2001 From: arches Date: Tue, 14 Jun 2016 04:54:23 -0400 Subject: [PATCH] Added delete --- etcd/Client.hpp | 1 + src/Client.cpp | 125 +++++++++++++++++++++--------- v3/include/AsyncRangeResponse.hpp | 3 +- v3/src/AsyncRangeResponse.cpp | 24 ++++-- v3/src/AsyncTxnResponse.cpp | 35 ++++++--- 5 files changed, 134 insertions(+), 54 deletions(-) diff --git a/etcd/Client.hpp b/etcd/Client.hpp index 64b7371..a6005cb 100644 --- a/etcd/Client.hpp +++ b/etcd/Client.hpp @@ -154,6 +154,7 @@ namespace etcd pplx::task send_put(const std::string& key, const std::string& value); pplx::task send_get(std::string const & key); pplx::task send_asyncmodify_if(std::string const & key, std::string const & value, std::string const & old_value); + pplx::task send_asyncdelete(std::string const & key, bool recursive); private: diff --git a/src/Client.cpp b/src/Client.cpp index 715ed58..7f983f6 100644 --- a/src/Client.cpp +++ b/src/Client.cpp @@ -1,6 +1,7 @@ #include #include "etcd/Client.hpp" #include "v3/include/AsyncTxnResponse.hpp" +#include "v3/include/AsyncRangeResponse.hpp" #include "v3/include/AsyncDelResponse.hpp" #include "v3/include/AsyncModifyResponse.hpp" #include @@ -9,6 +10,7 @@ using grpc::Channel; using etcdserverpb::PutRequest; using etcdserverpb::RangeRequest; using etcdserverpb::TxnRequest; +using etcdserverpb::DeleteRangeRequest; using etcdserverpb::Compare; using etcdserverpb::RequestOp; @@ -222,11 +224,13 @@ pplx::task etcd::Client::mkdir(std::string const & key) pplx::task etcd::Client::rmdir(std::string const & key, bool recursive) { - web::http::uri_builder uri("/v2/keys" + key); + /*web::http::uri_builder uri("/v2/keys" + key); uri.append_query("dir=true"); if (recursive) uri.append_query("recursive=true"); - return send_del_request(uri); + return send_del_request(uri);*/ + + return send_asyncdelete(key,recursive); } pplx::task etcd::Client::ls(std::string const & key) @@ -406,45 +410,18 @@ pplx::task etcd::Client::send_asyncmodify(std::string const & ke pplx::task etcd::Client::send_asyncget(std::string const & key, std::string const& range_end) { - //check key exist - TxnRequest txn_request; - Compare* compare = txn_request.add_compare(); - compare->set_result(Compare::CompareResult::Compare_CompareResult_GREATER); - compare->set_target(Compare::CompareTarget::Compare_CompareTarget_VERSION); - compare->set_key(key); - compare->set_version(0); - - //get key on success - std::unique_ptr get_request(new RangeRequest()); - get_request->set_key(key); - + RangeRequest get_request; + get_request.set_key(key); if(!range_end.empty()) { - get_request->set_range_end(range_end); - get_request->set_sort_target(RangeRequest::SortTarget::RangeRequest_SortTarget_KEY); - get_request->set_sort_order(RangeRequest::SortOrder::RangeRequest_SortOrder_ASCEND); - } - RequestOp* req_success = txn_request.add_success(); - req_success->set_allocated_request_range(get_request.release()); - - - //get key on failure - get_request.reset(new RangeRequest()); - get_request->set_key(key); - if(!range_end.empty()) - { - get_request->set_range_end(range_end); - get_request->set_sort_target(RangeRequest::SortTarget::RangeRequest_SortTarget_KEY); - get_request->set_sort_order(RangeRequest::SortOrder::RangeRequest_SortOrder_ASCEND); + get_request.set_range_end(range_end); + get_request.set_sort_target(RangeRequest::SortTarget::RangeRequest_SortTarget_KEY); + get_request.set_sort_order(RangeRequest::SortOrder::RangeRequest_SortOrder_ASCEND); } - - RequestOp* req_failure = txn_request.add_failure(); - req_failure->set_allocated_request_range(get_request.release()); - - etcdv3::AsyncTxnResponse* call= new etcdv3::AsyncTxnResponse("get"); + etcdv3::AsyncRangeResponse* call= new etcdv3::AsyncRangeResponse(); - call->response_reader = stub_->AsyncTxn(&call->context,txn_request,&call->cq_); + call->response_reader = stub_->AsyncRange(&call->context,get_request,&call->cq_); call->response_reader->Finish(&call->reply, &call->status, (void*)call); @@ -496,6 +473,82 @@ pplx::task etcd::Client::send_asyncput(std::string const & key, call->response_reader = stub_->AsyncTxn(&call->context,txn_request,&call->cq_); + call->response_reader->Finish(&call->reply, &call->status, (void*)call); + + return Response::create(call); +} + +pplx::task etcd::Client::send_asyncdelete(std::string const & key, bool recursive) +{ + + //check if key is present + TxnRequest txn_request; + Compare* compare = txn_request.add_compare(); + compare->set_result(Compare::CompareResult::Compare_CompareResult_GREATER); + compare->set_target(Compare::CompareTarget::Compare_CompareTarget_VERSION); + compare->set_key(key); + compare->set_version(0); + + std::string range_end(key); + if(recursive) + { + int ascii = (int)range_end[range_end.length()-1]; + range_end.back() = ascii+1; + } + + //if success, get key, delete + std::unique_ptr get_request(new RangeRequest()); + get_request->set_key(key); + if(recursive) + { + get_request->set_range_end(range_end); + get_request->set_sort_target(RangeRequest::SortTarget::RangeRequest_SortTarget_KEY); + get_request->set_sort_order(RangeRequest::SortOrder::RangeRequest_SortOrder_ASCEND); + } + + RequestOp* req_success = txn_request.add_success(); + req_success->set_allocated_request_range(get_request.release()); + + std::unique_ptr del_request(new DeleteRangeRequest()); + del_request->set_key(key); + if(recursive) + { + del_request->set_range_end(range_end); + } + + req_success = txn_request.add_success(); + req_success->set_allocated_request_delete_range(del_request.release()); + + + //if success, get key, delete + get_request.reset(new RangeRequest()); + get_request->set_key(key); + if(recursive) + { + get_request->set_range_end(range_end); + get_request->set_sort_target(RangeRequest::SortTarget::RangeRequest_SortTarget_KEY); + get_request->set_sort_order(RangeRequest::SortOrder::RangeRequest_SortOrder_ASCEND); + } + RequestOp* req_failure = txn_request.add_failure(); + req_failure->set_allocated_request_range(get_request.release()); + + del_request.reset(new DeleteRangeRequest()); + del_request->set_key(key); + if(recursive) + { + del_request->set_range_end(range_end); + } + + req_failure = txn_request.add_failure(); + req_failure->set_allocated_request_delete_range(del_request.release()); + + + + + etcdv3::AsyncTxnResponse* call= new etcdv3::AsyncTxnResponse("delete"); + + call->response_reader = stub_->AsyncTxn(&call->context,txn_request,&call->cq_); + call->response_reader->Finish(&call->reply, &call->status, (void*)call); return Response::create(call); diff --git a/v3/include/AsyncRangeResponse.hpp b/v3/include/AsyncRangeResponse.hpp index d7fdd5e..9b83ded 100644 --- a/v3/include/AsyncRangeResponse.hpp +++ b/v3/include/AsyncRangeResponse.hpp @@ -17,8 +17,7 @@ namespace etcdv3 class AsyncRangeResponse : public etcdv3::V3Response { public: - AsyncRangeResponse(){}; - AsyncRangeResponse(const std::string act){action = act;}; + AsyncRangeResponse(){action = "get";}; AsyncRangeResponse(const AsyncRangeResponse& other); AsyncRangeResponse& operator=(const AsyncRangeResponse& other); RangeResponse reply; diff --git a/v3/src/AsyncRangeResponse.cpp b/v3/src/AsyncRangeResponse.cpp index 895eda5..f306956 100644 --- a/v3/src/AsyncRangeResponse.cpp +++ b/v3/src/AsyncRangeResponse.cpp @@ -30,15 +30,27 @@ etcdv3::AsyncRangeResponse& etcdv3::AsyncRangeResponse::operator=(const etcdv3:: etcdv3::AsyncRangeResponse& etcdv3::AsyncRangeResponse::ParseResponse() { - if(reply.kvs_size() == 0) + index = reply.header().revision(); + if(!status.ok()) { - error_code=100; - error_message="Key not found"; + error_code = status.error_code(); + error_message = status.error_message(); } - - for(int index=0; index < reply.kvs_size(); index++) + else { - values.push_back(reply.kvs(index)); + + if(reply.kvs_size() == 0) + { + error_code=100; + error_message="Key not found"; + } + + for(int index=0; index < reply.kvs_size(); index++) + { + std::cout << "key: " << reply.kvs(index).key() << std::endl; + std::cout << "value: " << reply.kvs(index).value()<< std::endl; + values.push_back(reply.kvs(index)); + } } index = reply.header().revision(); return *this; diff --git a/v3/src/AsyncTxnResponse.cpp b/v3/src/AsyncTxnResponse.cpp index 2e28d30..9ca4626 100644 --- a/v3/src/AsyncTxnResponse.cpp +++ b/v3/src/AsyncTxnResponse.cpp @@ -34,6 +34,8 @@ etcdv3::AsyncTxnResponse& etcdv3::AsyncTxnResponse::operator=(const etcdv3::Asyn etcdv3::AsyncTxnResponse& etcdv3::AsyncTxnResponse::ParseResponse() { + + index = reply.header().revision(); if(!status.ok()) { error_code = status.error_code(); @@ -59,6 +61,10 @@ etcdv3::AsyncTxnResponse& etcdv3::AsyncTxnResponse::ParseResponse() range_kvs.insert(range_kvs.end(), v3resp.values.begin(), v3resp.values.end()); } } + else if(ResponseOp::ResponseCase::kResponseDeleteRange == resp.response_case()) + { + std::cout << "deleted keys: " << resp.response_delete_range().deleted() << std:: endl; + } } if(!reply.succeeded()) @@ -75,17 +81,26 @@ etcdv3::AsyncTxnResponse& etcdv3::AsyncTxnResponse::ParseResponse() } } - //find previous value of key do this for all actions except get + //find previous value of key //retain only the last value gotten as the final value. - if(action != "get" && range_kvs.size() > 1) - { - prev_value = range_kvs.front(); - values.push_back(range_kvs.back()); - } - else - { - values = range_kvs; - } + if(action == "set" || action == "create" || action == "compareAndSwap" || action == "update") + { + if(range_kvs.size() > 1) + { + prev_value = range_kvs.front(); + values.push_back(range_kvs.back()); + } + else + { + values = range_kvs; + } + } + else + { + values = range_kvs; + } + } + return *this; }