From e84a1b06677365c39ba4b93a1bec009cee2206c8 Mon Sep 17 00:00:00 2001 From: arches Date: Fri, 17 Jun 2016 11:11:53 -0400 Subject: [PATCH] Update rm and modify functions. Use Txn for these functions. --- etcd/Client.hpp | 23 +-- src/Client.cpp | 277 +++++++++++++++++------------------- v3/src/AsyncTxnResponse.cpp | 6 +- 3 files changed, 139 insertions(+), 167 deletions(-) diff --git a/etcd/Client.hpp b/etcd/Client.hpp index cdb81af..b74499b 100644 --- a/etcd/Client.hpp +++ b/etcd/Client.hpp @@ -8,14 +8,7 @@ #include #include "proto/rpc.grpc.pb.h" -#include "v3/include/grpcClient.hpp" -using grpc::ClientAsyncResponseReader; -using grpc::ClientContext; -using grpc::CompletionQueue; -using grpc::Status; -using etcdserverpb::PutResponse; -using etcdserverpb::RangeResponse; using etcdserverpb::KV; using etcdserverpb::Watch; @@ -147,25 +140,19 @@ namespace etcd web::http::client::http_client client; std::unique_ptr stub_; - std::unique_ptr stub1_; - pplx::task send_asyncput(const std::string& key, const std::string& value); std::unique_ptr watchServiceStub; + + pplx::task send_asyncput(const std::string& key, const std::string& value); pplx::task send_asyncadd(std::string const & key, const std::string& value); pplx::task send_asyncmodify(std::string const & key, std::string const & value); pplx::task send_asyncget(std::string const & key,std::string const& range_end=""); pplx::task send_put(const std::string& key, const std::string& value); pplx::task send_get(std::string const & key); pplx::task send_asyncmodify_if(std::string const & key, std::string const & value, std::string const & old_value); + pplx::task send_asyncmodify_if(std::string const & key, std::string const & value, int old_index); pplx::task send_asyncdelete(std::string const & key, bool recursive); - - etcdv3::grpcClient grpcClient; - - -private: - pplx::task removeEntryWithKey(std::string const &entryKey); - pplx::task removeEntryWithKeyAndValue(std::string const &entryKey, std::string const &oldValue); - pplx::task removeEntryWithKeyAndIndex(std::string const &entryKey, int oldIndex); - pplx::task modifyEntryWithValueAndOldIndex(std::string const & key, std::string const & value, int old_index); + pplx::task send_asyncrm_if(std::string const &key, std::string const &old_value); + pplx::task send_asyncrm_if(std::string const &key, int old_index); }; diff --git a/src/Client.cpp b/src/Client.cpp index 9e24dc8..0bf935f 100644 --- a/src/Client.cpp +++ b/src/Client.cpp @@ -21,7 +21,7 @@ using etcdserverpb::WatchResponse; using etcdserverpb::WatchCreateRequest; etcd::Client::Client(std::string const & address) - : client(address), grpcClient(address) + : client(address) { std::string stripped_address(address); std::string substr("http://"); @@ -32,7 +32,6 @@ etcd::Client::Client(std::string const & address) } std::shared_ptr channel = grpc::CreateChannel(stripped_address, grpc::InsecureChannelCredentials()); stub_= KV::NewStub(channel); - stub1_= Watch::NewStub(channel); } pplx::task etcd::Client::send_get_request(web::http::uri_builder & uri) @@ -77,148 +76,25 @@ pplx::task etcd::Client::modify_if(std::string const & key, std: return send_asyncmodify_if(key, value, old_value); } -pplx::task etcd::Client::modifyEntryWithValueAndOldIndex(std::string const & key, std::string const & value, int old_index) { - - - etcdv3::AsyncRangeResponse* resp = etcdv3::Utils::getKey(key, grpcClient); - if(!resp->reply.kvs_size()) - { - resp->error_code=100; - resp->error_message="Key not found"; - return Response::createResponse(*resp); - } - else if(resp->reply.kvs(0).mod_revision() != old_index) - { - resp->error_code = 101; - resp->error_message = "Compare failed"; - return Response::createResponse(*resp); - } - - PutRequest put_request; - put_request.set_key(key); - put_request.set_value(value); - - std::shared_ptrcall(new etcdv3::AsyncModifyResponse("compareAndSwap")); - - //below 2 lines can be removed once we are able to use Txn - call->prev_values.push_back(resp->reply.kvs(0)); - call->client = &grpcClient; - call->key = key; - - call->rpcInstance = grpcClient.stub_->AsyncPut(&call->context,put_request,&call->cq_); - - call->rpcInstance->Finish(&call->putResponse, &call->status, (void*)call.get()); - - return Response::create(call); -} - pplx::task etcd::Client::modify_if(std::string const & key, std::string const & value, int old_index) { - return modifyEntryWithValueAndOldIndex(key, value, old_index); -} - -//note: this one seems to not need the parseResponse() method -pplx::task etcd::Client::removeEntryWithKey(std::string const & entryKey) { - - etcdv3::AsyncRangeResponse* resp = etcdv3::Utils::getKey(entryKey, grpcClient); - - if(!resp->reply.kvs_size()) - { - std::cout << "nothing to delete" << std::endl; - resp->error_code = 100; - resp->error_message = "Nothing to delete"; - return Response::createResponse(*resp); - } - - etcdserverpb::DeleteRangeRequest deleteRangeRequest; - deleteRangeRequest.set_key(entryKey); - - etcdv3::AsyncDelResponse* call = new etcdv3::AsyncDelResponse("delete"); - - //mano-mano - call->prev_values.push_back(resp->reply.kvs(0)); - call->client = &grpcClient; - call->key = entryKey; - - call->rpcInstance = grpcClient.stub_->AsyncDeleteRange(&call->context, deleteRangeRequest, &call->cq_); - call->rpcInstance->Finish(&call->deleteResponse, &call->status, (void*)call); - - return Response::createResponse(*call); + return send_asyncmodify_if(key, value, old_index); } pplx::task etcd::Client::rm(std::string const & key) { - return removeEntryWithKey(key); + return send_asyncdelete(key,false); } -pplx::task etcd::Client::removeEntryWithKeyAndValue(std::string const &entryKey, std::string const &oldValue) { - - etcdv3::AsyncRangeResponse *searchResult = etcdv3::Utils::getKey(entryKey, grpcClient); - - if(!searchResult->reply.kvs_size()) { - searchResult->error_code = 100; - searchResult->error_message = "Key not Found"; - return Response::createResponse(*searchResult); - } - else if(searchResult->reply.kvs(0).value() != oldValue) { - searchResult->error_code = 101; - searchResult->error_message = "Compare failed"; - return Response::createResponse(*searchResult); - } - - etcdserverpb::DeleteRangeRequest deleteRangeRequest; - deleteRangeRequest.set_key(entryKey); - - etcdv3::AsyncDelResponse *deleteResponseCall = new etcdv3::AsyncDelResponse("compareAndDelete"); - - deleteResponseCall->prev_values.push_back(searchResult->reply.kvs(0)); - deleteResponseCall->client = &grpcClient; - deleteResponseCall->key = entryKey; - - deleteResponseCall->rpcInstance = grpcClient.stub_->AsyncDeleteRange(&deleteResponseCall->context, deleteRangeRequest, &deleteResponseCall->cq_); - deleteResponseCall->rpcInstance->Finish(&deleteResponseCall->deleteResponse, &deleteResponseCall->status, (void*)deleteResponseCall); - - return Response::createResponse(*deleteResponseCall); -} pplx::task etcd::Client::rm_if(std::string const & key, std::string const & old_value) { - return removeEntryWithKeyAndValue(key, old_value); -} - -pplx::task etcd::Client::removeEntryWithKeyAndIndex(std::string const &entryKey, int oldIndex) { - - etcdv3::AsyncRangeResponse *searchResult = etcdv3::Utils::getKey(entryKey, grpcClient); - - if(!searchResult->reply.kvs_size()) { - searchResult->error_code = 100; - searchResult->error_message = "Key not Found"; - return Response::createResponse(*searchResult); - } - else if(searchResult->reply.kvs(0).create_revision() != oldIndex) { - searchResult->error_code = 101; - searchResult->error_message = "Compare failed"; - return Response::createResponse(*searchResult); - } - - etcdserverpb::DeleteRangeRequest deleteRangeRequest; - deleteRangeRequest.set_key(entryKey); - - etcdv3::AsyncDelResponse *deleteResponseCall = new etcdv3::AsyncDelResponse("compareAndDelete"); - - deleteResponseCall->prev_values.push_back(searchResult->reply.kvs(0)); - deleteResponseCall->client = &grpcClient; - deleteResponseCall->key = entryKey; - - deleteResponseCall->rpcInstance = grpcClient.stub_->AsyncDeleteRange(&deleteResponseCall->context, deleteRangeRequest, &deleteResponseCall->cq_); - deleteResponseCall->rpcInstance->Finish(&deleteResponseCall->deleteResponse, &deleteResponseCall->status, (void*)deleteResponseCall); - - return Response::createResponse(*deleteResponseCall); + return send_asyncrm_if(key, old_value); } pplx::task etcd::Client::rm_if(std::string const & key, int old_index) { - return removeEntryWithKeyAndIndex(key, old_index); + return send_asyncrm_if(key, old_index); } @@ -308,17 +184,9 @@ pplx::task etcd::Client::send_asyncadd(std::string const & key, pplx::task etcd::Client::send_asyncmodify_if(std::string const & key, std::string const & value, std::string const & old_value) { - - //check key exist + //check key value is equal to old_value TxnRequest txn_request; Compare* compare = txn_request.add_compare(); - compare->set_result(Compare::CompareResult::Compare_CompareResult_GREATER); - compare->set_target(Compare::CompareTarget::Compare_CompareTarget_VERSION); - compare->set_key(key); - compare->set_version(0); - - //check key value is equal to old_value - compare = txn_request.add_compare(); compare->set_result(Compare::CompareResult::Compare_CompareResult_EQUAL); compare->set_target(Compare::CompareTarget::Compare_CompareTarget_VALUE); compare->set_key(key); @@ -345,9 +213,7 @@ pplx::task etcd::Client::send_asyncmodify_if(std::string const & get_request.reset(new RangeRequest()); get_request->set_key(key); req_success = txn_request.add_success(); - req_success->set_allocated_request_range(get_request.release()); - - + req_success->set_allocated_request_range(get_request.release()); std::shared_ptr call(new etcdv3::AsyncTxnResponse("compareAndSwap")); @@ -356,12 +222,53 @@ pplx::task etcd::Client::send_asyncmodify_if(std::string const & call->response_reader->Finish(&call->reply, &call->status, (void*)call.get()); return Response::create(call); - } +pplx::task etcd::Client::send_asyncmodify_if(std::string const & key, std::string const & value, int old_index) +{ + //check key value is equal to old_value + TxnRequest txn_request; + Compare* compare = txn_request.add_compare(); + compare->set_result(Compare::CompareResult::Compare_CompareResult_EQUAL); + compare->set_target(Compare::CompareTarget::Compare_CompareTarget_MOD); + compare->set_key(key); + compare->set_mod_revision(old_index); + + //get key on failure + std::unique_ptr get_request(new RangeRequest()); + get_request->set_key(key); + RequestOp* req_failure = txn_request.add_failure(); + req_failure->set_allocated_request_range(get_request.release()); + + //on success get key value then modify and get new value + get_request.reset(new RangeRequest()); + get_request->set_key(key); + RequestOp* req_success = txn_request.add_success(); + req_success->set_allocated_request_range(get_request.release()); + + std::unique_ptr put_request(new PutRequest()); + put_request->set_key(key); + put_request->set_value(value); + req_success = txn_request.add_success(); + req_success->set_allocated_request_put(put_request.release()); + + get_request.reset(new RangeRequest()); + get_request->set_key(key); + req_success = txn_request.add_success(); + req_success->set_allocated_request_range(get_request.release()); + + std::shared_ptr call(new etcdv3::AsyncTxnResponse("compareAndSwap")); + + call->response_reader = stub_->AsyncTxn(&call->context,txn_request,&call->cq_); + + call->response_reader->Finish(&call->reply, &call->status, (void*)call.get()); + + return Response::create(call); +} + + pplx::task etcd::Client::send_asyncmodify(std::string const & key, std::string const & value) { - //check if key is present TxnRequest txn_request; Compare* compare = txn_request.add_compare(); @@ -382,7 +289,6 @@ pplx::task etcd::Client::send_asyncmodify(std::string const & ke RequestOp* req_success = txn_request.add_success(); req_success->set_allocated_request_range(get_request.release()); - //if success, modify key and then get new value of key std::unique_ptr put_request(new PutRequest()); put_request->set_key(key); @@ -404,7 +310,6 @@ pplx::task etcd::Client::send_asyncmodify(std::string const & ke call->response_reader->Finish(&call->reply, &call->status, (void*)call.get()); return Response::create(call); - } @@ -520,7 +425,7 @@ pplx::task etcd::Client::send_asyncdelete(std::string const & ke req_success->set_allocated_request_delete_range(del_request.release()); - //if success, get key, delete + //if fail, get key, delete get_request.reset(new RangeRequest()); get_request->set_key(key); if(recursive) @@ -552,4 +457,84 @@ pplx::task etcd::Client::send_asyncdelete(std::string const & ke return Response::create(call); } +pplx::task etcd::Client::send_asyncrm_if(std::string const &key, std::string const &old_value) +{ + + //check if key's value is equl to oldvalue + TxnRequest txn_request; + Compare* compare = txn_request.add_compare(); + compare->set_result(Compare::CompareResult::Compare_CompareResult_EQUAL); + compare->set_target(Compare::CompareTarget::Compare_CompareTarget_VALUE); + compare->set_key(key); + compare->set_value(old_value); + + + //if success, get key, delete + std::unique_ptr get_request(new RangeRequest()); + get_request->set_key(key); + RequestOp* req_success = txn_request.add_success(); + req_success->set_allocated_request_range(get_request.release()); + + std::unique_ptr del_request(new DeleteRangeRequest()); + del_request->set_key(key); + req_success = txn_request.add_success(); + req_success->set_allocated_request_delete_range(del_request.release()); + + + //if fail, get key + get_request.reset(new RangeRequest()); + get_request->set_key(key); + RequestOp* req_failure = txn_request.add_failure(); + req_failure->set_allocated_request_range(get_request.release()); + + + std::shared_ptr call(new etcdv3::AsyncTxnResponse("compareAndDelete")); + + call->response_reader = stub_->AsyncTxn(&call->context,txn_request,&call->cq_); + + call->response_reader->Finish(&call->reply, &call->status, (void*)call.get()); + + return Response::create(call); +} + +pplx::task etcd::Client::send_asyncrm_if(std::string const &key, int old_index) +{ + + //check if key's mod revision is equal to old index + TxnRequest txn_request; + Compare* compare = txn_request.add_compare(); + compare->set_result(Compare::CompareResult::Compare_CompareResult_EQUAL); + compare->set_target(Compare::CompareTarget::Compare_CompareTarget_MOD); + compare->set_key(key); + compare->set_mod_revision(old_index); + + + //if success, get key, delete + std::unique_ptr get_request(new RangeRequest()); + get_request->set_key(key); + RequestOp* req_success = txn_request.add_success(); + req_success->set_allocated_request_range(get_request.release()); + + std::unique_ptr del_request(new DeleteRangeRequest()); + del_request->set_key(key); + req_success = txn_request.add_success(); + req_success->set_allocated_request_delete_range(del_request.release()); + + + //if fail, get key + get_request.reset(new RangeRequest()); + get_request->set_key(key); + RequestOp* req_failure = txn_request.add_failure(); + req_failure->set_allocated_request_range(get_request.release()); + + + std::shared_ptr call(new etcdv3::AsyncTxnResponse("compareAndDelete")); + + call->response_reader = stub_->AsyncTxn(&call->context,txn_request,&call->cq_); + + call->response_reader->Finish(&call->reply, &call->status, (void*)call.get()); + + return Response::create(call); +} + diff --git a/v3/src/AsyncTxnResponse.cpp b/v3/src/AsyncTxnResponse.cpp index f0a5c11..9570534 100644 --- a/v3/src/AsyncTxnResponse.cpp +++ b/v3/src/AsyncTxnResponse.cpp @@ -59,7 +59,7 @@ etcdv3::AsyncTxnResponse& etcdv3::AsyncTxnResponse::ParseResponse() } else if(ResponseOp::ResponseCase::kResponseDeleteRange == resp.response_case()) { - std::cout << "deleted keys: " << resp.response_delete_range().deleted() << std:: endl; + //do nothing yet } } @@ -70,7 +70,7 @@ etcdv3::AsyncTxnResponse& etcdv3::AsyncTxnResponse::ParseResponse() error_code=105; error_message="Key already exists"; } - else if(action == "compareAndSwap") + else if(action == "compareAndSwap" || action == "compareAndDelete") { error_code=101; error_message="Compare failed"; @@ -81,7 +81,7 @@ etcdv3::AsyncTxnResponse& etcdv3::AsyncTxnResponse::ParseResponse() values = range_kvs; - if(action == "delete") + if(action == "delete" || action == "compareAndDelete") { prev_values = values; }