diff --git a/src/Client.cpp b/src/Client.cpp index 7823115..e14d2cd 100644 --- a/src/Client.cpp +++ b/src/Client.cpp @@ -1,11 +1,18 @@ #include "etcd/Client.hpp" #include "v3/include/AsyncRangeResponse.hpp" #include "v3/include/AsyncPutResponse.hpp" +#include "v3/include/AsyncTxnResponse.hpp" #include "v3/include/AsyncDelResponse.hpp" #include "v3/include/AsyncModifyResponse.hpp" #include "v3/include/Utils.hpp" #include +#include + + +using etcdserverpb::TxnRequest; +using etcdserverpb::Compare; +using etcdserverpb::RequestOp; etcd::Client::Client(std::string const & address) : client(address), grpcClient(address) @@ -196,6 +203,7 @@ pplx::task etcd::Client::removeEntryWithKeyAndIndex(std::string pplx::task etcd::Client::rm_if(std::string const & key, int old_index) { return removeEntryWithKeyAndIndex(key, old_index); + } pplx::task etcd::Client::mkdir(std::string const & key) @@ -242,29 +250,42 @@ pplx::task etcd::Client::watch(std::string const & key, int from pplx::task etcd::Client::send_asyncadd(std::string const & key, std::string const & value) { - //check if key already exist - etcdv3::AsyncRangeResponse* resp = etcdv3::Utils::getKey(key, grpcClient); - if(resp->reply.kvs_size()) - { - resp->error_code=105; - resp->error_message="Key already exists"; - return Response::createResponse(*resp); - } + //check if key is not present + TxnRequest txn_request; + Compare* compare = txn_request.add_compare(); + compare->set_result(Compare::CompareResult::Compare_CompareResult_EQUAL); + compare->set_target(Compare::CompareTarget::Compare_CompareTarget_VERSION); + compare->set_key(key); + compare->set_version(0); - PutRequest put_request; - put_request.set_key(key); - put_request.set_value(value); - - etcdv3::AsyncPutResponse* call= new etcdv3::AsyncPutResponse("create"); - //below 2 lines can be removed once we are able to use Txn - call->client = &grpcClient; - call->key = key; + //get key whether success or failure + RangeRequest get_request1 = new RangeRequest(); + get_request1->set_key(key); + RequestOp* req_failure = txn_request.add_failure(); + req_failure->set_allocated_request_range(get_request1); - call->response_reader = grpcClient.stub_->AsyncPut(&call->context,put_request,&call->cq_); + + //if success, add key and then get new value of key + PutRequest* put_request = new PutRequest(); + put_request->set_key(key); + put_request->set_value(value); + RequestOp* req_success2 = txn_request.add_success(); + req_success2->set_allocated_request_put(put_request); + + RangeRequest* get_request2 = new RangeRequest(); + get_request2->set_key(key); + RequestOp* req_success3 = txn_request.add_success(); + req_success3->set_allocated_request_range(get_request2); + + + + etcdv3::AsyncTxnResponse* call= new etcdv3::AsyncTxnResponse("create"); + + call->response_reader = grpcClient.stub_->AsyncTxn(&call->context,txn_request,&call->cq_); call->response_reader->Finish(&call->reply, &call->status, (void*)call); - + return Response::create(call); } diff --git a/v3/include/AsyncTxnResponse.hpp b/v3/include/AsyncTxnResponse.hpp new file mode 100644 index 0000000..eb64775 --- /dev/null +++ b/v3/include/AsyncTxnResponse.hpp @@ -0,0 +1,35 @@ +#ifndef __ASYNC_TXNRESPONSE_HPP__ +#define __ASYNC_TXNRESPONSE_HPP__ + +#include +#include "proto/rpc.grpc.pb.h" +#include "v3/include/V3Response.hpp" +#include "v3/include/grpcClient.hpp" + + + +using grpc::ClientAsyncResponseReader; +using grpc::ClientContext; +using grpc::CompletionQueue; +using grpc::Status; +using etcdserverpb::TxnResponse; + +namespace etcdv3 +{ + class AsyncTxnResponse : public etcdv3::V3Response + { + public: + AsyncTxnResponse(){}; + AsyncTxnResponse(const std::string act){action = act;}; + AsyncTxnResponse(const AsyncTxnResponse& other); + AsyncTxnResponse& operator=(const AsyncTxnResponse& other); + TxnResponse reply; + Status status; + ClientContext context; + CompletionQueue cq_; + std::unique_ptr> response_reader; + AsyncTxnResponse& ParseResponse(); + }; +} + +#endif diff --git a/v3/src/AsyncTxnResponse.cpp b/v3/src/AsyncTxnResponse.cpp new file mode 100644 index 0000000..8b98e5d --- /dev/null +++ b/v3/src/AsyncTxnResponse.cpp @@ -0,0 +1,77 @@ +#include "v3/include/AsyncTxnResponse.hpp" + +using etcdserverpb::ResponseOp; + + +etcdv3::AsyncTxnResponse::AsyncTxnResponse(const etcdv3::AsyncTxnResponse& other) +{ + error_code = other.error_code; + error_message = other.error_message; + index = other.index; + action = other.action; + values = other.values; + prev_value.set_key(other.prev_value.key()); + prev_value.set_value(other.prev_value.value()); + prev_value.set_create_revision(other.prev_value.create_revision()); + prev_value.set_mod_revision(other.prev_value.mod_revision()); + +} + +etcdv3::AsyncTxnResponse& etcdv3::AsyncTxnResponse::operator=(const etcdv3::AsyncTxnResponse& other) +{ + error_code = other.error_code; + error_message = other.error_message; + index = other.index; + action = other.action; + values = other.values; + prev_value.set_key(other.prev_value.key()); + prev_value.set_value(other.prev_value.value()); + prev_value.set_create_revision(other.prev_value.create_revision()); + prev_value.set_mod_revision(other.prev_value.mod_revision()); + return *this; +} + +etcdv3::AsyncTxnResponse& etcdv3::AsyncTxnResponse::ParseResponse() +{ + if(action == "create") + { + if(reply.succeeded()) + { + for(int index=0; index < reply.responses_size(); index++) + { + auto resp = reply.responses(index); + if(ResponseOp::ResponseCase::kResponseRange == resp.response_case()) + { + if(resp.response_range().kvs_size()) + { + if(!values.empty()) + { + prev_value = values[0]; + } + + values.push_back(resp.response_range().kvs(0)); + } + else + { + error_code=100; + error_message="Key not found"; + } + } + else if(ResponseOp::ResponseCase::kResponsePut == resp.response_case()) + { + //do nothing for now. + } + else + { + //do nothing for now. + } + } + } + else + { + error_code=105; + error_message="Key already exists"; + } + } + return *this; +}