diff --git a/etcd/Client.hpp b/etcd/Client.hpp index 498f512..fc72939 100644 --- a/etcd/Client.hpp +++ b/etcd/Client.hpp @@ -8,18 +8,12 @@ #include #include "proto/rpc.grpc.pb.h" -#include "v3/include/AsyncRangeResponse.hpp" -#include "v3/include/grpcClient.hpp" - -using grpc::Channel; using grpc::ClientAsyncResponseReader; using grpc::ClientContext; using grpc::CompletionQueue; using grpc::Status; -using etcdserverpb::PutRequest; using etcdserverpb::PutResponse; -using etcdserverpb::RangeRequest; using etcdserverpb::RangeResponse; using etcdserverpb::KV; using etcdserverpb::Watch; @@ -159,17 +153,14 @@ namespace etcd pplx::task send_asyncmodify(std::string const & key, std::string const & value); pplx::task send_put(const std::string& key, const std::string& value); pplx::task send_get(std::string const & key); - pplx::task send_asyncmodify_if(std::string const & key, std::string const & value, std::string const & old_value); - - etcdv3::grpcClient grpcClient; + pplx::task send_asyncmodify_if(std::string const & key, std::string const & value, std::string const & old_value); + private: pplx::task removeEntryWithKey(std::string const &entryKey); pplx::task removeEntryWithKeyAndValue(std::string const &entryKey, std::string const &oldValue); pplx::task removeEntryWithKeyAndIndex(std::string const &entryKey, int oldIndex); pplx::task modifyEntryWithValueAndOldIndex(std::string const & key, std::string const & value, int old_index); - - }; diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 3572fef..ce3bcf7 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1,4 +1,4 @@ -add_library(etcd-cpp-api SHARED ../proto/kv.pb.cc ../proto/auth.pb.cc ../proto/rpc.pb.cc ../proto/rpc.grpc.pb.cc ../v3/src/Utils.cpp ../v3/src/grpcClient.cpp ../v3/src/AsyncRangeResponse.cpp ../v3/src/AsyncPutResponse.cpp ../v3/src/V3BaseResponse.cpp ../v3/src/AsyncDelResponse.cpp ../v3/src/AsyncModifyResponse.cpp Client.cpp Response.cpp Value.cpp json_constants.cpp) +add_library(etcd-cpp-api SHARED ../proto/kv.pb.cc ../proto/auth.pb.cc ../proto/rpc.pb.cc ../proto/rpc.grpc.pb.cc ../v3/src/Utils.cpp ../v3/src/grpcClient.cpp ../v3/src/AsyncTxnResponse.cpp ../v3/src/AsyncRangeResponse.cpp ../v3/src/AsyncPutResponse.cpp ../v3/src/V3BaseResponse.cpp ../v3/src/AsyncDelResponse.cpp ../v3/src/AsyncModifyResponse.cpp Client.cpp Response.cpp Value.cpp json_constants.cpp) set_property(TARGET etcd-cpp-api PROPERTY CXX_STANDARD 11) target_link_libraries(etcd-cpp-api ${CPPREST_LIB} boost_system ssl crypto protobuf grpc++) diff --git a/src/Client.cpp b/src/Client.cpp index e14d2cd..b826b42 100644 --- a/src/Client.cpp +++ b/src/Client.cpp @@ -1,22 +1,29 @@ +#include #include "etcd/Client.hpp" -#include "v3/include/AsyncRangeResponse.hpp" -#include "v3/include/AsyncPutResponse.hpp" #include "v3/include/AsyncTxnResponse.hpp" #include "v3/include/AsyncDelResponse.hpp" #include "v3/include/AsyncModifyResponse.hpp" -#include "v3/include/Utils.hpp" - #include -#include - +using grpc::Channel; +using etcdserverpb::PutRequest; +using etcdserverpb::RangeRequest; using etcdserverpb::TxnRequest; using etcdserverpb::Compare; using etcdserverpb::RequestOp; etcd::Client::Client(std::string const & address) - : client(address), grpcClient(address) + : client(address) { + std::string stripped_address(address); + std::string substr("http://"); + std::string::size_type i = stripped_address.find(substr); + if(i != std::string::npos) + { + stripped_address.erase(i,substr.length()); + } + std::shared_ptr channel = grpc::CreateChannel(stripped_address, grpc::InsecureChannelCredentials()); + stub_= KV::NewStub(channel); } pplx::task etcd::Client::send_get_request(web::http::uri_builder & uri) @@ -259,30 +266,30 @@ pplx::task etcd::Client::send_asyncadd(std::string const & key, compare->set_version(0); - //get key whether success or failure - RangeRequest get_request1 = new RangeRequest(); - get_request1->set_key(key); + //get key on failure + std::unique_ptr get_request(new RangeRequest()); + get_request->set_key(key); RequestOp* req_failure = txn_request.add_failure(); - req_failure->set_allocated_request_range(get_request1); + req_failure->set_allocated_request_range(get_request.release()); //if success, add key and then get new value of key - PutRequest* put_request = new PutRequest(); + std::unique_ptr put_request(new PutRequest()); put_request->set_key(key); put_request->set_value(value); - RequestOp* req_success2 = txn_request.add_success(); - req_success2->set_allocated_request_put(put_request); - RangeRequest* get_request2 = new RangeRequest(); - get_request2->set_key(key); - RequestOp* req_success3 = txn_request.add_success(); - req_success3->set_allocated_request_range(get_request2); + RequestOp* req_success = txn_request.add_success(); + req_success->set_allocated_request_put(put_request.release()); + + get_request.reset(new RangeRequest()); + get_request->set_key(key); + req_success = txn_request.add_success(); + req_success->set_allocated_request_range(get_request.release()); - - etcdv3::AsyncTxnResponse* call= new etcdv3::AsyncTxnResponse("create"); + etcdv3::AsyncTxnResponse* call(new etcdv3::AsyncTxnResponse("create")); - call->response_reader = grpcClient.stub_->AsyncTxn(&call->context,txn_request,&call->cq_); + call->response_reader = stub_->AsyncTxn(&call->context,txn_request,&call->cq_); call->response_reader->Finish(&call->reply, &call->status, (void*)call); @@ -293,36 +300,50 @@ pplx::task etcd::Client::send_asyncadd(std::string const & key, pplx::task etcd::Client::send_asyncmodify_if(std::string const & key, std::string const & value, std::string const & old_value) { - //check current key is equal to old_value - etcdv3::AsyncRangeResponse* resp = etcdv3::Utils::getKey(key, grpcClient); - if(!resp->reply.kvs_size()) - { - resp->error_code=100; - resp->error_message="Key not found"; - return Response::createResponse(*resp); - } - else - { - if(resp->reply.kvs(0).value() != old_value) - { - resp->error_code=101; - resp->error_message="Compare failed"; - return Response::createResponse(*resp); - } - } + //check key exist + TxnRequest txn_request; + Compare* compare = txn_request.add_compare(); + compare->set_result(Compare::CompareResult::Compare_CompareResult_GREATER); + compare->set_target(Compare::CompareTarget::Compare_CompareTarget_VERSION); + compare->set_key(key); + compare->set_version(0); - PutRequest put_request; - put_request.set_key(key); - put_request.set_value(value); + //check key value is equal to old_value + compare = txn_request.add_compare(); + compare->set_result(Compare::CompareResult::Compare_CompareResult_EQUAL); + compare->set_target(Compare::CompareTarget::Compare_CompareTarget_VALUE); + compare->set_key(key); + compare->set_value(old_value); + + etcdv3::AsyncPutResponse* call= new etcdv3::AsyncPutResponse("compareAndSwap"); + //get key on failure + std::unique_ptr get_request(new RangeRequest()); + get_request->set_key(key); + RequestOp* req_failure = txn_request.add_failure(); + req_failure->set_allocated_request_range(get_request.release()); + + //on success get key value then modify and get new value + get_request.reset(new RangeRequest()); + get_request->set_key(key); + RequestOp* req_success = txn_request.add_success(); + req_success->set_allocated_request_range(get_request.release()); + + std::unique_ptr put_request(new PutRequest()); + put_request->set_key(key); + put_request->set_value(value); + req_success = txn_request.add_success(); + req_success->set_allocated_request_put(put_request.release()); + + get_request.reset(new RangeRequest()); + get_request->set_key(key); + req_success = txn_request.add_success(); + req_success->set_allocated_request_range(get_request.release()); + + - etcdv3::AsyncModifyResponse* call= new etcdv3::AsyncModifyResponse("compareAndSwap"); + etcdv3::AsyncTxnResponse* call= new etcdv3::AsyncTxnResponse("compareAndSwap"); - //below 2 lines can be removed once we are able to use Txn - call->prev_value = resp->reply.kvs(0); - call->client = &grpcClient; - call->key = key; - - call->rpcInstance = grpcClient.stub_->AsyncPut(&call->context,put_request,&call->cq_); + call->response_reader = stub_->AsyncTxn(&call->context,txn_request,&call->cq_); call->rpcInstance->Finish(&call->putResponse, &call->status, (void*)call); @@ -333,27 +354,44 @@ pplx::task etcd::Client::send_asyncmodify_if(std::string const & pplx::task etcd::Client::send_asyncmodify(std::string const & key, std::string const & value) { - //check if key already exist - etcdv3::AsyncRangeResponse* resp = etcdv3::Utils::getKey(key, grpcClient); - if(!resp->reply.kvs_size()) - { - resp->error_code=100; - resp->error_message="Key not found"; - return Response::createResponse(*resp); - } + //check if key is present + TxnRequest txn_request; + Compare* compare = txn_request.add_compare(); + compare->set_result(Compare::CompareResult::Compare_CompareResult_GREATER); + compare->set_target(Compare::CompareTarget::Compare_CompareTarget_VERSION); + compare->set_key(key); + compare->set_version(0); - PutRequest put_request; - put_request.set_key(key); - put_request.set_value(value); - - etcdv3::AsyncModifyResponse* call= new etcdv3::AsyncModifyResponse("update"); + //success or failure + //get key value before modification + std::unique_ptr get_request(new RangeRequest()); + get_request->set_key(key); + RequestOp* req_failure = txn_request.add_failure(); + req_failure->set_allocated_request_range(get_request.release()); - //below 2 lines can be removed once we are able to use Txn - call->prev_value = resp->reply.kvs(0); - call->client = &grpcClient; - call->key = key; + get_request.reset(new RangeRequest()); + get_request->set_key(key); + RequestOp* req_success = txn_request.add_success(); + req_success->set_allocated_request_range(get_request.release()); - call->rpcInstance = grpcClient.stub_->AsyncPut(&call->context,put_request,&call->cq_); + + //if success, modify key and then get new value of key + std::unique_ptr put_request(new PutRequest()); + put_request->set_key(key); + put_request->set_value(value); + + req_success = txn_request.add_success(); + req_success->set_allocated_request_put(put_request.release()); + + get_request.reset(new RangeRequest()); + get_request->set_key(key); + req_success = txn_request.add_success(); + req_success->set_allocated_request_range(get_request.release()); + + + etcdv3::AsyncTxnResponse* call= new etcdv3::AsyncTxnResponse("update"); + + call->response_reader = stub_->AsyncTxn(&call->context,txn_request,&call->cq_); call->rpcInstance->Finish(&call->putResponse, &call->status, (void*)call); @@ -364,12 +402,32 @@ pplx::task etcd::Client::send_asyncmodify(std::string const & ke pplx::task etcd::Client::send_asyncget(std::string const & key) { - RangeRequest request; - request.set_key(key); - - etcdv3::AsyncRangeResponse* call= new etcdv3::AsyncRangeResponse(); + //check key exist + TxnRequest txn_request; + Compare* compare = txn_request.add_compare(); + compare->set_result(Compare::CompareResult::Compare_CompareResult_GREATER); + compare->set_target(Compare::CompareTarget::Compare_CompareTarget_VERSION); + compare->set_key(key); + compare->set_version(0); - call->response_reader = grpcClient.stub_->AsyncRange(&call->context,request,&call->cq_); + //get key on failure or success + std::unique_ptr get_request(new RangeRequest()); + get_request->set_key(key); + + RequestOp* req_success = txn_request.add_success(); + req_success->set_allocated_request_range(get_request.release()); + + + get_request.reset(new RangeRequest()); + get_request->set_key(key); + + RequestOp* req_failure = txn_request.add_failure(); + req_failure->set_allocated_request_range(get_request.release()); + + + etcdv3::AsyncTxnResponse* call= new etcdv3::AsyncTxnResponse("get"); + + call->response_reader = stub_->AsyncTxn(&call->context,txn_request,&call->cq_); call->response_reader->Finish(&call->reply, &call->status, (void*)call); @@ -379,23 +437,47 @@ pplx::task etcd::Client::send_asyncget(std::string const & key) pplx::task etcd::Client::send_asyncput(std::string const & key, std::string const & value) { - PutRequest put_request; - put_request.set_key(key); - put_request.set_value(value); - - etcdv3::AsyncPutResponse* call= new etcdv3::AsyncPutResponse("set"); + //check if key is not present + TxnRequest txn_request; + Compare* compare = txn_request.add_compare(); + compare->set_result(Compare::CompareResult::Compare_CompareResult_EQUAL); + compare->set_target(Compare::CompareTarget::Compare_CompareTarget_VERSION); + compare->set_key(key); + compare->set_version(0); - //get current value - etcdv3::AsyncRangeResponse* resp = etcdv3::Utils::getKey(key, grpcClient); - if(resp->reply.kvs_size()) - { - call->prev_value = resp->reply.kvs(0); - } - call->client = &grpcClient; - call->key = key; + //get key on failure, get key before put, modify and then get updated key + std::unique_ptr get_request(new RangeRequest()); + get_request->set_key(key); + RequestOp* req_failure = txn_request.add_failure(); + req_failure->set_allocated_request_range(get_request.release()); - call->response_reader = grpcClient.stub_->AsyncPut(&call->context,put_request,&call->cq_); + std::unique_ptr put_request(new PutRequest()); + put_request->set_key(key); + put_request->set_value(value); + req_failure = txn_request.add_failure(); + req_failure->set_allocated_request_put(put_request.release()); + + get_request.reset(new RangeRequest()); + get_request->set_key(key); + req_failure = txn_request.add_failure(); + req_failure->set_allocated_request_range(get_request.release()); + + //if success, put key and then get new value of key + put_request.reset(new PutRequest()); + put_request->set_key(key); + put_request->set_value(value); + RequestOp* req_success = txn_request.add_success(); + req_success->set_allocated_request_put(put_request.release()); + + get_request.reset(new RangeRequest()); + get_request->set_key(key); + req_success = txn_request.add_success(); + req_success->set_allocated_request_range(get_request.release()); + + etcdv3::AsyncTxnResponse* call= new etcdv3::AsyncTxnResponse("set"); + + call->response_reader = stub_->AsyncTxn(&call->context,txn_request,&call->cq_); call->response_reader->Finish(&call->reply, &call->status, (void*)call); diff --git a/tst/EtcdTest.cpp b/tst/EtcdTest.cpp index a6d1f7a..77d36a1 100644 --- a/tst/EtcdTest.cpp +++ b/tst/EtcdTest.cpp @@ -7,14 +7,15 @@ TEST_CASE("setup") { - etcd::Client etcd("http://127.0.0.1:4001"); + etcd::Client etcd("http://127.0.0.1:2379"); //etcd.rmdir("/test", true).wait(); } TEST_CASE("add a new key") { - etcd::Client etcd("http://127.0.0.1:4001"); + + etcd::Client etcd("http://127.0.0.1:2379"); etcd::Response resp = etcd.add("/test/key1", "42").get(); REQUIRE(0 == resp.error_code()); CHECK("create" == resp.action()); @@ -33,7 +34,7 @@ TEST_CASE("add a new key") TEST_CASE("read a value from etcd") { - etcd::Client etcd("http://127.0.0.1:4001"); + etcd::Client etcd("http://192.168.99.100:2379"); etcd::Response resp = etcd.get("/test/key1").get(); CHECK("get" == resp.action()); REQUIRE(resp.is_ok()); @@ -46,7 +47,7 @@ TEST_CASE("read a value from etcd") TEST_CASE("simplified read") { - etcd::Client etcd("http://127.0.0.1:4001"); + etcd::Client etcd("http://127.0.0.1:2379"); CHECK("42" == etcd.get("/test/key1").get().value().as_string()); CHECK(100 == etcd.get("/test/key2").get().error_code()); // Key not found } @@ -55,7 +56,7 @@ TEST_CASE("simplified read") TEST_CASE("modify a key") { - etcd::Client etcd("http://127.0.0.1:4001"); + etcd::Client etcd("http://127.0.0.1:2379"); etcd::Response resp = etcd.modify("/test/key1", "43").get(); REQUIRE(0 == resp.error_code()); // overwrite CHECK("update" == resp.action()); @@ -66,7 +67,7 @@ TEST_CASE("modify a key") TEST_CASE("set a key") { - etcd::Client etcd("http://127.0.0.1:4001"); + etcd::Client etcd("http://127.0.0.1:2379"); etcd::Response resp = etcd.set("/test/key1", "43").get(); REQUIRE(0 == resp.error_code()); // overwrite CHECK("set" == resp.action()); @@ -78,7 +79,7 @@ TEST_CASE("set a key") TEST_CASE("atomic compare-and-swap") { - etcd::Client etcd("http://127.0.0.1:4001"); + etcd::Client etcd("http://127.0.0.1:2379"); etcd.set("/test/key1", "42").wait(); // modify success @@ -95,6 +96,7 @@ TEST_CASE("atomic compare-and-swap") CHECK("Compare failed" == res.error_message()); } + TEST_CASE("delete a value") { etcd::Client etcd("http://127.0.0.1:4001"); @@ -278,3 +280,4 @@ TEST_CASE("cleanup") } #endif + diff --git a/v3/include/AsyncPutResponse.hpp b/v3/include/AsyncPutResponse.hpp deleted file mode 100644 index 8145f5e..0000000 --- a/v3/include/AsyncPutResponse.hpp +++ /dev/null @@ -1,36 +0,0 @@ -#ifndef __ASYNC_PUTRESPONSE_HPP__ -#define __ASYNC_PUTRESPONSE_HPP__ - -#include -#include "proto/rpc.grpc.pb.h" -#include "v3/include/V3Response.hpp" -#include "v3/include/grpcClient.hpp" - - -using grpc::ClientAsyncResponseReader; -using grpc::ClientContext; -using grpc::CompletionQueue; -using grpc::Status; -using etcdserverpb::PutResponse; - -namespace etcdv3 -{ - class AsyncPutResponse : public etcdv3::V3Response - { - public: - AsyncPutResponse(){}; - AsyncPutResponse(const std::string act){action = act;}; - AsyncPutResponse(const AsyncPutResponse& other); - AsyncPutResponse& operator=(const AsyncPutResponse& other); - PutResponse reply; - Status status; - ClientContext context; - CompletionQueue cq_; - std::unique_ptr> response_reader; - AsyncPutResponse& ParseResponse(); - etcdv3::grpcClient* client; - std::string key; - }; -} - -#endif diff --git a/v3/include/AsyncTxnResponse.hpp b/v3/include/AsyncTxnResponse.hpp index eb64775..2fda271 100644 --- a/v3/include/AsyncTxnResponse.hpp +++ b/v3/include/AsyncTxnResponse.hpp @@ -4,9 +4,6 @@ #include #include "proto/rpc.grpc.pb.h" #include "v3/include/V3Response.hpp" -#include "v3/include/grpcClient.hpp" - - using grpc::ClientAsyncResponseReader; using grpc::ClientContext; diff --git a/v3/include/Utils.hpp b/v3/include/Utils.hpp deleted file mode 100644 index 587c6ee..0000000 --- a/v3/include/Utils.hpp +++ /dev/null @@ -1,15 +0,0 @@ -#ifndef __UTILS_HPP__ -#define __UTILS_HPP__ - -#include "v3/include/AsyncRangeResponse.hpp" -#include "v3/include/grpcClient.hpp" - -namespace etcdv3 -{ - namespace Utils - { - etcdv3::AsyncRangeResponse* getKey(std::string const & key, etcdv3::grpcClient& client); - } -} -#endif - diff --git a/v3/include/grpcClient.hpp b/v3/include/grpcClient.hpp deleted file mode 100644 index 9795b11..0000000 --- a/v3/include/grpcClient.hpp +++ /dev/null @@ -1,25 +0,0 @@ -#ifndef __GRPC_CLIENT_HPP__ -#define __GRPC_CLIENT_HPP__ - -#include -#include "proto/rpc.grpc.pb.h" -#include "v3/include/AsyncRangeResponse.hpp" - - -using grpc::Channel; -using etcdserverpb::PutRequest; -using etcdserverpb::RangeRequest; -using etcdserverpb::KV; - -namespace etcdv3 -{ - - class grpcClient - { - public: - grpcClient(std::string const & address); - std::unique_ptr stub_; - }; -} - -#endif diff --git a/v3/src/AsyncTxnResponse.cpp b/v3/src/AsyncTxnResponse.cpp index 8b98e5d..685214a 100644 --- a/v3/src/AsyncTxnResponse.cpp +++ b/v3/src/AsyncTxnResponse.cpp @@ -33,45 +33,58 @@ etcdv3::AsyncTxnResponse& etcdv3::AsyncTxnResponse::operator=(const etcdv3::Asyn etcdv3::AsyncTxnResponse& etcdv3::AsyncTxnResponse::ParseResponse() { - if(action == "create") + for(int index=0; index < reply.responses_size(); index++) { - if(reply.succeeded()) + auto resp = reply.responses(index); + if(ResponseOp::ResponseCase::kResponseRange == resp.response_case()) { - for(int index=0; index < reply.responses_size(); index++) + if(resp.response_range().kvs_size()) { - auto resp = reply.responses(index); - if(ResponseOp::ResponseCase::kResponseRange == resp.response_case()) + if(!resp.response_range().more()) { - if(resp.response_range().kvs_size()) + if(!values.empty()) { - if(!values.empty()) - { - prev_value = values[0]; - } - - values.push_back(resp.response_range().kvs(0)); + prev_value = values[0]; } - else - { - error_code=100; - error_message="Key not found"; - } - } - else if(ResponseOp::ResponseCase::kResponsePut == resp.response_case()) - { - //do nothing for now. - } - else - { - //do nothing for now. + values.clear(); + values.push_back(resp.response_range().kvs(0)); } } + else + { + error_code=100; + error_message="Key not found"; + } + } + else if(ResponseOp::ResponseCase::kResponsePut == resp.response_case()) + { + //do nothing for now. } else + { + //do nothing for now. + } + } + + + + + if(action == "create") + { + + if(!reply.succeeded()) { error_code=105; error_message="Key already exists"; } } + else if(action == "compareAndSwap") + { + if(!reply.succeeded()) + { + error_code=101; + error_message="Compare failed"; + } + } return *this; } diff --git a/v3/src/Utils.cpp b/v3/src/Utils.cpp deleted file mode 100644 index a769d0b..0000000 --- a/v3/src/Utils.cpp +++ /dev/null @@ -1,24 +0,0 @@ -#include "v3/include/AsyncRangeResponse.hpp" -#include "proto/rpc.grpc.pb.h" -using etcdserverpb::RangeRequest; - -#include "v3/include/Utils.hpp" - - -etcdv3::AsyncRangeResponse* etcdv3::Utils::getKey(std::string const & key, etcdv3::grpcClient& client) -{ - RangeRequest get_request; - get_request.set_key(key); - etcdv3::AsyncRangeResponse* resp= new etcdv3::AsyncRangeResponse(); - - resp->status = client.stub_->Range(&resp->context, get_request, &resp->reply); - - if(resp->status.ok()) - { - return resp; - } - else - { - throw std::runtime_error(resp->status.error_message()); - } -} diff --git a/v3/src/grpcClient.cpp b/v3/src/grpcClient.cpp deleted file mode 100644 index 7fd37d9..0000000 --- a/v3/src/grpcClient.cpp +++ /dev/null @@ -1,14 +0,0 @@ -#include "v3/include/grpcClient.hpp" - -etcdv3::grpcClient::grpcClient(std::string const & address) -{ - std::string stripped_address(address); - std::string substr("http://"); - std::string::size_type i = stripped_address.find(substr); - if(i != std::string::npos) - { - stripped_address.erase(i,substr.length()); - } - std::shared_ptr channel = grpc::CreateChannel(stripped_address, grpc::InsecureChannelCredentials()); - stub_= KV::NewStub(channel); -}