Update rm and modify functions. Use Txn for these functions.

This commit is contained in:
arches 2016-06-17 11:11:53 -04:00
parent db2ce95328
commit e84a1b0667
3 changed files with 139 additions and 167 deletions

View File

@ -8,14 +8,7 @@
#include <grpc++/grpc++.h> #include <grpc++/grpc++.h>
#include "proto/rpc.grpc.pb.h" #include "proto/rpc.grpc.pb.h"
#include "v3/include/grpcClient.hpp"
using grpc::ClientAsyncResponseReader;
using grpc::ClientContext;
using grpc::CompletionQueue;
using grpc::Status;
using etcdserverpb::PutResponse;
using etcdserverpb::RangeResponse;
using etcdserverpb::KV; using etcdserverpb::KV;
using etcdserverpb::Watch; using etcdserverpb::Watch;
@ -147,25 +140,19 @@ namespace etcd
web::http::client::http_client client; web::http::client::http_client client;
std::unique_ptr<KV::Stub> stub_; std::unique_ptr<KV::Stub> stub_;
std::unique_ptr<Watch::Stub> stub1_;
pplx::task<etcd::Response> send_asyncput(const std::string& key, const std::string& value);
std::unique_ptr<Watch::Stub> watchServiceStub; std::unique_ptr<Watch::Stub> watchServiceStub;
pplx::task<etcd::Response> send_asyncput(const std::string& key, const std::string& value);
pplx::task<etcd::Response> send_asyncadd(std::string const & key, const std::string& value); pplx::task<etcd::Response> send_asyncadd(std::string const & key, const std::string& value);
pplx::task<etcd::Response> send_asyncmodify(std::string const & key, std::string const & value); pplx::task<etcd::Response> send_asyncmodify(std::string const & key, std::string const & value);
pplx::task<etcd::Response> send_asyncget(std::string const & key,std::string const& range_end=""); pplx::task<etcd::Response> send_asyncget(std::string const & key,std::string const& range_end="");
pplx::task<etcd::Response> send_put(const std::string& key, const std::string& value); pplx::task<etcd::Response> send_put(const std::string& key, const std::string& value);
pplx::task<etcd::Response> send_get(std::string const & key); pplx::task<etcd::Response> send_get(std::string const & key);
pplx::task<etcd::Response> send_asyncmodify_if(std::string const & key, std::string const & value, std::string const & old_value); pplx::task<etcd::Response> send_asyncmodify_if(std::string const & key, std::string const & value, std::string const & old_value);
pplx::task<etcd::Response> send_asyncmodify_if(std::string const & key, std::string const & value, int old_index);
pplx::task<etcd::Response> send_asyncdelete(std::string const & key, bool recursive); pplx::task<etcd::Response> send_asyncdelete(std::string const & key, bool recursive);
pplx::task<etcd::Response> send_asyncrm_if(std::string const &key, std::string const &old_value);
etcdv3::grpcClient grpcClient; pplx::task<etcd::Response> send_asyncrm_if(std::string const &key, int old_index);
private:
pplx::task<Response> removeEntryWithKey(std::string const &entryKey);
pplx::task<Response> removeEntryWithKeyAndValue(std::string const &entryKey, std::string const &oldValue);
pplx::task<Response> removeEntryWithKeyAndIndex(std::string const &entryKey, int oldIndex);
pplx::task<Response> modifyEntryWithValueAndOldIndex(std::string const & key, std::string const & value, int old_index);
}; };

View File

@ -21,7 +21,7 @@ using etcdserverpb::WatchResponse;
using etcdserverpb::WatchCreateRequest; using etcdserverpb::WatchCreateRequest;
etcd::Client::Client(std::string const & address) etcd::Client::Client(std::string const & address)
: client(address), grpcClient(address) : client(address)
{ {
std::string stripped_address(address); std::string stripped_address(address);
std::string substr("http://"); std::string substr("http://");
@ -32,7 +32,6 @@ etcd::Client::Client(std::string const & address)
} }
std::shared_ptr<Channel> channel = grpc::CreateChannel(stripped_address, grpc::InsecureChannelCredentials()); std::shared_ptr<Channel> channel = grpc::CreateChannel(stripped_address, grpc::InsecureChannelCredentials());
stub_= KV::NewStub(channel); stub_= KV::NewStub(channel);
stub1_= Watch::NewStub(channel);
} }
pplx::task<etcd::Response> etcd::Client::send_get_request(web::http::uri_builder & uri) pplx::task<etcd::Response> etcd::Client::send_get_request(web::http::uri_builder & uri)
@ -77,148 +76,25 @@ pplx::task<etcd::Response> etcd::Client::modify_if(std::string const & key, std:
return send_asyncmodify_if(key, value, old_value); return send_asyncmodify_if(key, value, old_value);
} }
pplx::task<etcd::Response> etcd::Client::modifyEntryWithValueAndOldIndex(std::string const & key, std::string const & value, int old_index) {
etcdv3::AsyncRangeResponse* resp = etcdv3::Utils::getKey(key, grpcClient);
if(!resp->reply.kvs_size())
{
resp->error_code=100;
resp->error_message="Key not found";
return Response::createResponse(*resp);
}
else if(resp->reply.kvs(0).mod_revision() != old_index)
{
resp->error_code = 101;
resp->error_message = "Compare failed";
return Response::createResponse(*resp);
}
PutRequest put_request;
put_request.set_key(key);
put_request.set_value(value);
std::shared_ptr<etcdv3::AsyncModifyResponse>call(new etcdv3::AsyncModifyResponse("compareAndSwap"));
//below 2 lines can be removed once we are able to use Txn
call->prev_values.push_back(resp->reply.kvs(0));
call->client = &grpcClient;
call->key = key;
call->rpcInstance = grpcClient.stub_->AsyncPut(&call->context,put_request,&call->cq_);
call->rpcInstance->Finish(&call->putResponse, &call->status, (void*)call.get());
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::modify_if(std::string const & key, std::string const & value, int old_index) pplx::task<etcd::Response> etcd::Client::modify_if(std::string const & key, std::string const & value, int old_index)
{ {
return modifyEntryWithValueAndOldIndex(key, value, old_index); return send_asyncmodify_if(key, value, old_index);
}
//note: this one seems to not need the parseResponse() method
pplx::task<etcd::Response> etcd::Client::removeEntryWithKey(std::string const & entryKey) {
etcdv3::AsyncRangeResponse* resp = etcdv3::Utils::getKey(entryKey, grpcClient);
if(!resp->reply.kvs_size())
{
std::cout << "nothing to delete" << std::endl;
resp->error_code = 100;
resp->error_message = "Nothing to delete";
return Response::createResponse(*resp);
}
etcdserverpb::DeleteRangeRequest deleteRangeRequest;
deleteRangeRequest.set_key(entryKey);
etcdv3::AsyncDelResponse* call = new etcdv3::AsyncDelResponse("delete");
//mano-mano
call->prev_values.push_back(resp->reply.kvs(0));
call->client = &grpcClient;
call->key = entryKey;
call->rpcInstance = grpcClient.stub_->AsyncDeleteRange(&call->context, deleteRangeRequest, &call->cq_);
call->rpcInstance->Finish(&call->deleteResponse, &call->status, (void*)call);
return Response::createResponse(*call);
} }
pplx::task<etcd::Response> etcd::Client::rm(std::string const & key) pplx::task<etcd::Response> etcd::Client::rm(std::string const & key)
{ {
return removeEntryWithKey(key); return send_asyncdelete(key,false);
} }
pplx::task<etcd::Response> etcd::Client::removeEntryWithKeyAndValue(std::string const &entryKey, std::string const &oldValue) {
etcdv3::AsyncRangeResponse *searchResult = etcdv3::Utils::getKey(entryKey, grpcClient);
if(!searchResult->reply.kvs_size()) {
searchResult->error_code = 100;
searchResult->error_message = "Key not Found";
return Response::createResponse(*searchResult);
}
else if(searchResult->reply.kvs(0).value() != oldValue) {
searchResult->error_code = 101;
searchResult->error_message = "Compare failed";
return Response::createResponse(*searchResult);
}
etcdserverpb::DeleteRangeRequest deleteRangeRequest;
deleteRangeRequest.set_key(entryKey);
etcdv3::AsyncDelResponse *deleteResponseCall = new etcdv3::AsyncDelResponse("compareAndDelete");
deleteResponseCall->prev_values.push_back(searchResult->reply.kvs(0));
deleteResponseCall->client = &grpcClient;
deleteResponseCall->key = entryKey;
deleteResponseCall->rpcInstance = grpcClient.stub_->AsyncDeleteRange(&deleteResponseCall->context, deleteRangeRequest, &deleteResponseCall->cq_);
deleteResponseCall->rpcInstance->Finish(&deleteResponseCall->deleteResponse, &deleteResponseCall->status, (void*)deleteResponseCall);
return Response::createResponse(*deleteResponseCall);
}
pplx::task<etcd::Response> etcd::Client::rm_if(std::string const & key, std::string const & old_value) pplx::task<etcd::Response> etcd::Client::rm_if(std::string const & key, std::string const & old_value)
{ {
return removeEntryWithKeyAndValue(key, old_value); return send_asyncrm_if(key, old_value);
}
pplx::task<etcd::Response> etcd::Client::removeEntryWithKeyAndIndex(std::string const &entryKey, int oldIndex) {
etcdv3::AsyncRangeResponse *searchResult = etcdv3::Utils::getKey(entryKey, grpcClient);
if(!searchResult->reply.kvs_size()) {
searchResult->error_code = 100;
searchResult->error_message = "Key not Found";
return Response::createResponse(*searchResult);
}
else if(searchResult->reply.kvs(0).create_revision() != oldIndex) {
searchResult->error_code = 101;
searchResult->error_message = "Compare failed";
return Response::createResponse(*searchResult);
}
etcdserverpb::DeleteRangeRequest deleteRangeRequest;
deleteRangeRequest.set_key(entryKey);
etcdv3::AsyncDelResponse *deleteResponseCall = new etcdv3::AsyncDelResponse("compareAndDelete");
deleteResponseCall->prev_values.push_back(searchResult->reply.kvs(0));
deleteResponseCall->client = &grpcClient;
deleteResponseCall->key = entryKey;
deleteResponseCall->rpcInstance = grpcClient.stub_->AsyncDeleteRange(&deleteResponseCall->context, deleteRangeRequest, &deleteResponseCall->cq_);
deleteResponseCall->rpcInstance->Finish(&deleteResponseCall->deleteResponse, &deleteResponseCall->status, (void*)deleteResponseCall);
return Response::createResponse(*deleteResponseCall);
} }
pplx::task<etcd::Response> etcd::Client::rm_if(std::string const & key, int old_index) pplx::task<etcd::Response> etcd::Client::rm_if(std::string const & key, int old_index)
{ {
return removeEntryWithKeyAndIndex(key, old_index); return send_asyncrm_if(key, old_index);
} }
@ -308,17 +184,9 @@ pplx::task<etcd::Response> etcd::Client::send_asyncadd(std::string const & key,
pplx::task<etcd::Response> etcd::Client::send_asyncmodify_if(std::string const & key, std::string const & value, std::string const & old_value) pplx::task<etcd::Response> etcd::Client::send_asyncmodify_if(std::string const & key, std::string const & value, std::string const & old_value)
{ {
//check key value is equal to old_value
//check key exist
TxnRequest txn_request; TxnRequest txn_request;
Compare* compare = txn_request.add_compare(); Compare* compare = txn_request.add_compare();
compare->set_result(Compare::CompareResult::Compare_CompareResult_GREATER);
compare->set_target(Compare::CompareTarget::Compare_CompareTarget_VERSION);
compare->set_key(key);
compare->set_version(0);
//check key value is equal to old_value
compare = txn_request.add_compare();
compare->set_result(Compare::CompareResult::Compare_CompareResult_EQUAL); compare->set_result(Compare::CompareResult::Compare_CompareResult_EQUAL);
compare->set_target(Compare::CompareTarget::Compare_CompareTarget_VALUE); compare->set_target(Compare::CompareTarget::Compare_CompareTarget_VALUE);
compare->set_key(key); compare->set_key(key);
@ -345,9 +213,7 @@ pplx::task<etcd::Response> etcd::Client::send_asyncmodify_if(std::string const &
get_request.reset(new RangeRequest()); get_request.reset(new RangeRequest());
get_request->set_key(key); get_request->set_key(key);
req_success = txn_request.add_success(); req_success = txn_request.add_success();
req_success->set_allocated_request_range(get_request.release()); req_success->set_allocated_request_range(get_request.release());
std::shared_ptr<etcdv3::AsyncTxnResponse> call(new etcdv3::AsyncTxnResponse("compareAndSwap")); std::shared_ptr<etcdv3::AsyncTxnResponse> call(new etcdv3::AsyncTxnResponse("compareAndSwap"));
@ -356,12 +222,53 @@ pplx::task<etcd::Response> etcd::Client::send_asyncmodify_if(std::string const &
call->response_reader->Finish(&call->reply, &call->status, (void*)call.get()); call->response_reader->Finish(&call->reply, &call->status, (void*)call.get());
return Response::create(call); return Response::create(call);
} }
pplx::task<etcd::Response> etcd::Client::send_asyncmodify_if(std::string const & key, std::string const & value, int old_index)
{
//check key value is equal to old_value
TxnRequest txn_request;
Compare* compare = txn_request.add_compare();
compare->set_result(Compare::CompareResult::Compare_CompareResult_EQUAL);
compare->set_target(Compare::CompareTarget::Compare_CompareTarget_MOD);
compare->set_key(key);
compare->set_mod_revision(old_index);
//get key on failure
std::unique_ptr<RangeRequest> get_request(new RangeRequest());
get_request->set_key(key);
RequestOp* req_failure = txn_request.add_failure();
req_failure->set_allocated_request_range(get_request.release());
//on success get key value then modify and get new value
get_request.reset(new RangeRequest());
get_request->set_key(key);
RequestOp* req_success = txn_request.add_success();
req_success->set_allocated_request_range(get_request.release());
std::unique_ptr<PutRequest> put_request(new PutRequest());
put_request->set_key(key);
put_request->set_value(value);
req_success = txn_request.add_success();
req_success->set_allocated_request_put(put_request.release());
get_request.reset(new RangeRequest());
get_request->set_key(key);
req_success = txn_request.add_success();
req_success->set_allocated_request_range(get_request.release());
std::shared_ptr<etcdv3::AsyncTxnResponse> call(new etcdv3::AsyncTxnResponse("compareAndSwap"));
call->response_reader = stub_->AsyncTxn(&call->context,txn_request,&call->cq_);
call->response_reader->Finish(&call->reply, &call->status, (void*)call.get());
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::send_asyncmodify(std::string const & key, std::string const & value) pplx::task<etcd::Response> etcd::Client::send_asyncmodify(std::string const & key, std::string const & value)
{ {
//check if key is present //check if key is present
TxnRequest txn_request; TxnRequest txn_request;
Compare* compare = txn_request.add_compare(); Compare* compare = txn_request.add_compare();
@ -382,7 +289,6 @@ pplx::task<etcd::Response> etcd::Client::send_asyncmodify(std::string const & ke
RequestOp* req_success = txn_request.add_success(); RequestOp* req_success = txn_request.add_success();
req_success->set_allocated_request_range(get_request.release()); req_success->set_allocated_request_range(get_request.release());
//if success, modify key and then get new value of key //if success, modify key and then get new value of key
std::unique_ptr<PutRequest> put_request(new PutRequest()); std::unique_ptr<PutRequest> put_request(new PutRequest());
put_request->set_key(key); put_request->set_key(key);
@ -404,7 +310,6 @@ pplx::task<etcd::Response> etcd::Client::send_asyncmodify(std::string const & ke
call->response_reader->Finish(&call->reply, &call->status, (void*)call.get()); call->response_reader->Finish(&call->reply, &call->status, (void*)call.get());
return Response::create(call); return Response::create(call);
} }
@ -520,7 +425,7 @@ pplx::task<etcd::Response> etcd::Client::send_asyncdelete(std::string const & ke
req_success->set_allocated_request_delete_range(del_request.release()); req_success->set_allocated_request_delete_range(del_request.release());
//if success, get key, delete //if fail, get key, delete
get_request.reset(new RangeRequest()); get_request.reset(new RangeRequest());
get_request->set_key(key); get_request->set_key(key);
if(recursive) if(recursive)
@ -552,4 +457,84 @@ pplx::task<etcd::Response> etcd::Client::send_asyncdelete(std::string const & ke
return Response::create(call); return Response::create(call);
} }
pplx::task<etcd::Response> etcd::Client::send_asyncrm_if(std::string const &key, std::string const &old_value)
{
//check if key's value is equl to oldvalue
TxnRequest txn_request;
Compare* compare = txn_request.add_compare();
compare->set_result(Compare::CompareResult::Compare_CompareResult_EQUAL);
compare->set_target(Compare::CompareTarget::Compare_CompareTarget_VALUE);
compare->set_key(key);
compare->set_value(old_value);
//if success, get key, delete
std::unique_ptr<RangeRequest> get_request(new RangeRequest());
get_request->set_key(key);
RequestOp* req_success = txn_request.add_success();
req_success->set_allocated_request_range(get_request.release());
std::unique_ptr<DeleteRangeRequest> del_request(new DeleteRangeRequest());
del_request->set_key(key);
req_success = txn_request.add_success();
req_success->set_allocated_request_delete_range(del_request.release());
//if fail, get key
get_request.reset(new RangeRequest());
get_request->set_key(key);
RequestOp* req_failure = txn_request.add_failure();
req_failure->set_allocated_request_range(get_request.release());
std::shared_ptr<etcdv3::AsyncTxnResponse> call(new etcdv3::AsyncTxnResponse("compareAndDelete"));
call->response_reader = stub_->AsyncTxn(&call->context,txn_request,&call->cq_);
call->response_reader->Finish(&call->reply, &call->status, (void*)call.get());
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::send_asyncrm_if(std::string const &key, int old_index)
{
//check if key's mod revision is equal to old index
TxnRequest txn_request;
Compare* compare = txn_request.add_compare();
compare->set_result(Compare::CompareResult::Compare_CompareResult_EQUAL);
compare->set_target(Compare::CompareTarget::Compare_CompareTarget_MOD);
compare->set_key(key);
compare->set_mod_revision(old_index);
//if success, get key, delete
std::unique_ptr<RangeRequest> get_request(new RangeRequest());
get_request->set_key(key);
RequestOp* req_success = txn_request.add_success();
req_success->set_allocated_request_range(get_request.release());
std::unique_ptr<DeleteRangeRequest> del_request(new DeleteRangeRequest());
del_request->set_key(key);
req_success = txn_request.add_success();
req_success->set_allocated_request_delete_range(del_request.release());
//if fail, get key
get_request.reset(new RangeRequest());
get_request->set_key(key);
RequestOp* req_failure = txn_request.add_failure();
req_failure->set_allocated_request_range(get_request.release());
std::shared_ptr<etcdv3::AsyncTxnResponse> call(new etcdv3::AsyncTxnResponse("compareAndDelete"));
call->response_reader = stub_->AsyncTxn(&call->context,txn_request,&call->cq_);
call->response_reader->Finish(&call->reply, &call->status, (void*)call.get());
return Response::create(call);
}

View File

@ -59,7 +59,7 @@ etcdv3::AsyncTxnResponse& etcdv3::AsyncTxnResponse::ParseResponse()
} }
else if(ResponseOp::ResponseCase::kResponseDeleteRange == resp.response_case()) else if(ResponseOp::ResponseCase::kResponseDeleteRange == resp.response_case())
{ {
std::cout << "deleted keys: " << resp.response_delete_range().deleted() << std:: endl; //do nothing yet
} }
} }
@ -70,7 +70,7 @@ etcdv3::AsyncTxnResponse& etcdv3::AsyncTxnResponse::ParseResponse()
error_code=105; error_code=105;
error_message="Key already exists"; error_message="Key already exists";
} }
else if(action == "compareAndSwap") else if(action == "compareAndSwap" || action == "compareAndDelete")
{ {
error_code=101; error_code=101;
error_message="Compare failed"; error_message="Compare failed";
@ -81,7 +81,7 @@ etcdv3::AsyncTxnResponse& etcdv3::AsyncTxnResponse::ParseResponse()
values = range_kvs; values = range_kvs;
if(action == "delete") if(action == "delete" || action == "compareAndDelete")
{ {
prev_values = values; prev_values = values;
} }