Use Txn for add()

This commit is contained in:
arches 2016-06-08 11:22:16 -04:00
parent a1293c770a
commit b7500a17cb
3 changed files with 151 additions and 18 deletions

View File

@ -1,11 +1,18 @@
#include "etcd/Client.hpp"
#include "v3/include/AsyncRangeResponse.hpp"
#include "v3/include/AsyncPutResponse.hpp"
#include "v3/include/AsyncTxnResponse.hpp"
#include "v3/include/AsyncDelResponse.hpp"
#include "v3/include/AsyncModifyResponse.hpp"
#include "v3/include/Utils.hpp"
#include <iostream>
#include <memory>
using etcdserverpb::TxnRequest;
using etcdserverpb::Compare;
using etcdserverpb::RequestOp;
etcd::Client::Client(std::string const & address)
: client(address), grpcClient(address)
@ -196,6 +203,7 @@ pplx::task<etcd::Response> etcd::Client::removeEntryWithKeyAndIndex(std::string
pplx::task<etcd::Response> etcd::Client::rm_if(std::string const & key, int old_index)
{
return removeEntryWithKeyAndIndex(key, old_index);
}
pplx::task<etcd::Response> etcd::Client::mkdir(std::string const & key)
@ -242,29 +250,42 @@ pplx::task<etcd::Response> etcd::Client::watch(std::string const & key, int from
pplx::task<etcd::Response> etcd::Client::send_asyncadd(std::string const & key, std::string const & value)
{
//check if key already exist
etcdv3::AsyncRangeResponse* resp = etcdv3::Utils::getKey(key, grpcClient);
if(resp->reply.kvs_size())
{
resp->error_code=105;
resp->error_message="Key already exists";
return Response::createResponse(*resp);
}
//check if key is not present
TxnRequest txn_request;
Compare* compare = txn_request.add_compare();
compare->set_result(Compare::CompareResult::Compare_CompareResult_EQUAL);
compare->set_target(Compare::CompareTarget::Compare_CompareTarget_VERSION);
compare->set_key(key);
compare->set_version(0);
PutRequest put_request;
put_request.set_key(key);
put_request.set_value(value);
etcdv3::AsyncPutResponse* call= new etcdv3::AsyncPutResponse("create");
//below 2 lines can be removed once we are able to use Txn
call->client = &grpcClient;
call->key = key;
//get key whether success or failure
RangeRequest get_request1 = new RangeRequest();
get_request1->set_key(key);
RequestOp* req_failure = txn_request.add_failure();
req_failure->set_allocated_request_range(get_request1);
call->response_reader = grpcClient.stub_->AsyncPut(&call->context,put_request,&call->cq_);
//if success, add key and then get new value of key
PutRequest* put_request = new PutRequest();
put_request->set_key(key);
put_request->set_value(value);
RequestOp* req_success2 = txn_request.add_success();
req_success2->set_allocated_request_put(put_request);
RangeRequest* get_request2 = new RangeRequest();
get_request2->set_key(key);
RequestOp* req_success3 = txn_request.add_success();
req_success3->set_allocated_request_range(get_request2);
etcdv3::AsyncTxnResponse* call= new etcdv3::AsyncTxnResponse("create");
call->response_reader = grpcClient.stub_->AsyncTxn(&call->context,txn_request,&call->cq_);
call->response_reader->Finish(&call->reply, &call->status, (void*)call);
return Response::create(call);
}

View File

@ -0,0 +1,35 @@
#ifndef __ASYNC_TXNRESPONSE_HPP__
#define __ASYNC_TXNRESPONSE_HPP__
#include <grpc++/grpc++.h>
#include "proto/rpc.grpc.pb.h"
#include "v3/include/V3Response.hpp"
#include "v3/include/grpcClient.hpp"
using grpc::ClientAsyncResponseReader;
using grpc::ClientContext;
using grpc::CompletionQueue;
using grpc::Status;
using etcdserverpb::TxnResponse;
namespace etcdv3
{
class AsyncTxnResponse : public etcdv3::V3Response
{
public:
AsyncTxnResponse(){};
AsyncTxnResponse(const std::string act){action = act;};
AsyncTxnResponse(const AsyncTxnResponse& other);
AsyncTxnResponse& operator=(const AsyncTxnResponse& other);
TxnResponse reply;
Status status;
ClientContext context;
CompletionQueue cq_;
std::unique_ptr<ClientAsyncResponseReader<TxnResponse>> response_reader;
AsyncTxnResponse& ParseResponse();
};
}
#endif

View File

@ -0,0 +1,77 @@
#include "v3/include/AsyncTxnResponse.hpp"
using etcdserverpb::ResponseOp;
etcdv3::AsyncTxnResponse::AsyncTxnResponse(const etcdv3::AsyncTxnResponse& other)
{
error_code = other.error_code;
error_message = other.error_message;
index = other.index;
action = other.action;
values = other.values;
prev_value.set_key(other.prev_value.key());
prev_value.set_value(other.prev_value.value());
prev_value.set_create_revision(other.prev_value.create_revision());
prev_value.set_mod_revision(other.prev_value.mod_revision());
}
etcdv3::AsyncTxnResponse& etcdv3::AsyncTxnResponse::operator=(const etcdv3::AsyncTxnResponse& other)
{
error_code = other.error_code;
error_message = other.error_message;
index = other.index;
action = other.action;
values = other.values;
prev_value.set_key(other.prev_value.key());
prev_value.set_value(other.prev_value.value());
prev_value.set_create_revision(other.prev_value.create_revision());
prev_value.set_mod_revision(other.prev_value.mod_revision());
return *this;
}
etcdv3::AsyncTxnResponse& etcdv3::AsyncTxnResponse::ParseResponse()
{
if(action == "create")
{
if(reply.succeeded())
{
for(int index=0; index < reply.responses_size(); index++)
{
auto resp = reply.responses(index);
if(ResponseOp::ResponseCase::kResponseRange == resp.response_case())
{
if(resp.response_range().kvs_size())
{
if(!values.empty())
{
prev_value = values[0];
}
values.push_back(resp.response_range().kvs(0));
}
else
{
error_code=100;
error_message="Key not found";
}
}
else if(ResponseOp::ResponseCase::kResponsePut == resp.response_case())
{
//do nothing for now.
}
else
{
//do nothing for now.
}
}
}
else
{
error_code=105;
error_message="Key already exists";
}
}
return *this;
}