diff --git a/etcd/Client.hpp b/etcd/Client.hpp index 776fc61..cdb81af 100644 --- a/etcd/Client.hpp +++ b/etcd/Client.hpp @@ -8,6 +8,7 @@ #include #include "proto/rpc.grpc.pb.h" +#include "v3/include/grpcClient.hpp" using grpc::ClientAsyncResponseReader; using grpc::ClientContext; diff --git a/src/Client.cpp b/src/Client.cpp index ce8d8b0..9e24dc8 100644 --- a/src/Client.cpp +++ b/src/Client.cpp @@ -4,6 +4,7 @@ #include "v3/include/AsyncRangeResponse.hpp" #include "v3/include/AsyncDelResponse.hpp" #include "v3/include/AsyncModifyResponse.hpp" +#include "v3/include/Utils.hpp" #include using grpc::Channel; @@ -97,16 +98,16 @@ pplx::task etcd::Client::modifyEntryWithValueAndOldIndex(std::st put_request.set_key(key); put_request.set_value(value); - etcdv3::AsyncModifyResponse* call= new etcdv3::AsyncModifyResponse("compareAndSwap"); + std::shared_ptrcall(new etcdv3::AsyncModifyResponse("compareAndSwap")); //below 2 lines can be removed once we are able to use Txn - call->prev_value = resp->reply.kvs(0); + call->prev_values.push_back(resp->reply.kvs(0)); call->client = &grpcClient; call->key = key; call->rpcInstance = grpcClient.stub_->AsyncPut(&call->context,put_request,&call->cq_); - call->rpcInstance->Finish(&call->putResponse, &call->status, (void*)call); + call->rpcInstance->Finish(&call->putResponse, &call->status, (void*)call.get()); return Response::create(call); } @@ -135,7 +136,7 @@ pplx::task etcd::Client::removeEntryWithKey(std::string const & etcdv3::AsyncDelResponse* call = new etcdv3::AsyncDelResponse("delete"); //mano-mano - call->prev_value = resp->reply.kvs(0); + call->prev_values.push_back(resp->reply.kvs(0)); call->client = &grpcClient; call->key = entryKey; @@ -170,7 +171,7 @@ pplx::task etcd::Client::removeEntryWithKeyAndValue(std::string etcdv3::AsyncDelResponse *deleteResponseCall = new etcdv3::AsyncDelResponse("compareAndDelete"); - deleteResponseCall->prev_value = searchResult->reply.kvs(0); + deleteResponseCall->prev_values.push_back(searchResult->reply.kvs(0)); deleteResponseCall->client = &grpcClient; deleteResponseCall->key = entryKey; @@ -205,7 +206,7 @@ pplx::task etcd::Client::removeEntryWithKeyAndIndex(std::string etcdv3::AsyncDelResponse *deleteResponseCall = new etcdv3::AsyncDelResponse("compareAndDelete"); - deleteResponseCall->prev_value = searchResult->reply.kvs(0); + deleteResponseCall->prev_values.push_back(searchResult->reply.kvs(0)); deleteResponseCall->client = &grpcClient; deleteResponseCall->key = entryKey; @@ -323,7 +324,6 @@ pplx::task etcd::Client::send_asyncmodify_if(std::string const & compare->set_key(key); compare->set_value(old_value); - etcdv3::AsyncPutResponse* call= new etcdv3::AsyncPutResponse("compareAndSwap"); //get key on failure std::unique_ptr get_request(new RangeRequest()); get_request->set_key(key); @@ -401,7 +401,7 @@ pplx::task etcd::Client::send_asyncmodify(std::string const & ke call->response_reader = stub_->AsyncTxn(&call->context,txn_request,&call->cq_); - call->rpcInstance->Finish(&call->putResponse, &call->status, (void*)call); + call->response_reader->Finish(&call->reply, &call->status, (void*)call.get()); return Response::create(call); diff --git a/tst/EtcdTest.cpp b/tst/EtcdTest.cpp index 684dcba..cb2ecc7 100644 --- a/tst/EtcdTest.cpp +++ b/tst/EtcdTest.cpp @@ -8,7 +8,7 @@ TEST_CASE("setup") { etcd::Client etcd("http://127.0.0.1:2379"); - //etcd.rmdir("/test", true).wait(); + etcd.rmdir("/test", true).wait(); } @@ -25,7 +25,7 @@ TEST_CASE("add a new key") CHECK(!val.is_dir()); CHECK(0 < val.created_index()); CHECK(0 < val.modified_index()); - //CHECK(0 < resp.index()); maui: skip this first// X-Etcd-Index header value + CHECK(0 < resp.index()); // X-Etcd-Index header value CHECK(105 == etcd.add("/test/key1", "43").get().error_code()); // Key already exists CHECK(105 == etcd.add("/test/key1", "42").get().error_code()); // Key already exists CHECK("Key already exists" == etcd.add("/test/key1", "42").get().error_message()); @@ -34,7 +34,7 @@ TEST_CASE("add a new key") TEST_CASE("read a value from etcd") { - etcd::Client etcd("http://192.168.99.100:2379"); + etcd::Client etcd("http://127.0.0.1:2379"); etcd::Response resp = etcd.get("/test/key1").get(); CHECK("get" == resp.action()); REQUIRE(resp.is_ok()); @@ -96,33 +96,10 @@ TEST_CASE("atomic compare-and-swap") CHECK("Compare failed" == res.error_message()); } -TEST_CASE("list a directory") -{ - etcd::Client etcd("http://127.0.0.1:2379"); - //CHECK(0 == etcd.ls("/test/new_dir").get().keys().size()); - - etcd.set("/test/new_dir/key1", "value1").wait(); - etcd.set("/test/new_dir/key2", "value2").wait(); - etcd.set("/test/new_dir/sub_dir","value3").wait(); - - etcd::Response resp = etcd.ls("/test/new_dir").get(); - CHECK("get" == resp.action()); - REQUIRE(3 == resp.keys().size()); - CHECK("key1" == resp.key(0)); - CHECK("key2" == resp.key(1)); - CHECK("sub_dir" == resp.key(2)); - CHECK("value1" == resp.value(0).as_string()); - CHECK("value2" == resp.value(1).as_string()); - CHECK(resp.values()[2].is_dir()); - - CHECK(0 == etcd.ls("/test/new_dir/key1").get().error_code()); -} - - TEST_CASE("delete a value") { - etcd::Client etcd("http://127.0.0.1:4001"); + etcd::Client etcd("http://127.0.0.1:2379"); etcd::Response resp = etcd.rm("/test/key1").get(); CHECK("43" == resp.prev_value().as_string()); CHECK("delete" == resp.action()); @@ -130,7 +107,7 @@ TEST_CASE("delete a value") TEST_CASE("atomic compare-and-delete based on prevValue") { - etcd::Client etcd("http://127.0.0.1:4001"); + etcd::Client etcd("http://127.0.0.1:2379"); etcd.set("/test/key1", "42").wait(); etcd::Response res = etcd.rm_if("/test/key1", "43").get(); @@ -146,7 +123,7 @@ TEST_CASE("atomic compare-and-delete based on prevValue") TEST_CASE("atomic compare-and-delete based on prevIndex") { - etcd::Client etcd("http://127.0.0.1:4001"); + etcd::Client etcd("http://127.0.0.1:2379"); int index = etcd.set("/test/key1", "42").get().index(); etcd::Response res = etcd.rm_if("/test/key1", index - 1).get(); @@ -160,9 +137,10 @@ TEST_CASE("atomic compare-and-delete based on prevIndex") CHECK("42" == res.prev_value().as_string()); } + TEST_CASE("deep atomic compare-and-swap") { - etcd::Client etcd("http://127.0.0.1:4001"); + etcd::Client etcd("http://127.0.0.1:2379"); etcd.set("/test/key1", "42").wait(); // modify success @@ -178,6 +156,7 @@ TEST_CASE("deep atomic compare-and-swap") CHECK(101 == res.error_code()); CHECK("Compare failed" == res.error_message()); + // succes with the correct index res = etcd.modify_if("/test/key1", "44", index).get(); REQUIRE(res.is_ok()); @@ -189,10 +168,12 @@ TEST_CASE("deep atomic compare-and-swap") CHECK(!res.is_ok()); CHECK(101 == res.error_code()); CHECK("Compare failed" == res.error_message()); + } -#if 0 +//skip this test case +/* TEST_CASE("create a directory") { etcd::Client etcd("http://127.0.0.1:4001"); @@ -200,39 +181,41 @@ TEST_CASE("create a directory") CHECK("set" == resp.action()); CHECK(resp.value().is_dir()); } +*/ TEST_CASE("list a directory") { - etcd::Client etcd("http://127.0.0.1:4001"); + etcd::Client etcd("http://127.0.0.1:2379"); CHECK(0 == etcd.ls("/test/new_dir").get().keys().size()); etcd.set("/test/new_dir/key1", "value1").wait(); etcd.set("/test/new_dir/key2", "value2").wait(); - etcd.mkdir("/test/new_dir/sub_dir").wait(); + etcd.set("/test/new_dir/sub_dir", "value3").wait(); etcd::Response resp = etcd.ls("/test/new_dir").get(); CHECK("get" == resp.action()); REQUIRE(3 == resp.keys().size()); - CHECK("key1" == resp.key(0)); - CHECK("key2" == resp.key(1)); - CHECK("sub_dir" == resp.key(2)); + CHECK("/test/new_dir/key1" == resp.key(0)); + CHECK("/test/new_dir/key2" == resp.key(1)); + CHECK("/test/new_dir/sub_dir" == resp.key(2)); CHECK("value1" == resp.value(0).as_string()); CHECK("value2" == resp.value(1).as_string()); - CHECK(resp.values()[2].is_dir()); + CHECK(resp.values()[2].is_dir() == 0); CHECK(0 == etcd.ls("/test/new_dir/key1").get().error_code()); } TEST_CASE("delete a directory") { - etcd::Client etcd("http://127.0.0.1:4001"); - CHECK(108 == etcd.rmdir("/test/new_dir").get().error_code()); // Directory not empty + etcd::Client etcd("http://127.0.0.1:2379"); + //CHECK(108 == etcd.rmdir("/test/new_dir").get().error_code()); // Directory not empty CHECK(0 == etcd.rmdir("/test/new_dir", true).get().error_code()); } + TEST_CASE("wait for a value change") { - etcd::Client etcd("http://127.0.0.1:4001"); + etcd::Client etcd("http://127.0.0.1:2379"); etcd.set("/test/key1", "42").wait(); pplx::task res = etcd.watch("/test/key1"); @@ -296,11 +279,12 @@ TEST_CASE("watch changes in the past") CHECK("45" == res.value().as_string()); } + TEST_CASE("cleanup") { - etcd::Client etcd("http://127.0.0.1:4001"); + etcd::Client etcd("http://127.0.0.1:2379"); REQUIRE(0 == etcd.rmdir("/test", true).get().error_code()); } -#endif + diff --git a/v3/include/AsyncPutResponse.hpp b/v3/include/AsyncPutResponse.hpp new file mode 100644 index 0000000..8145f5e --- /dev/null +++ b/v3/include/AsyncPutResponse.hpp @@ -0,0 +1,36 @@ +#ifndef __ASYNC_PUTRESPONSE_HPP__ +#define __ASYNC_PUTRESPONSE_HPP__ + +#include +#include "proto/rpc.grpc.pb.h" +#include "v3/include/V3Response.hpp" +#include "v3/include/grpcClient.hpp" + + +using grpc::ClientAsyncResponseReader; +using grpc::ClientContext; +using grpc::CompletionQueue; +using grpc::Status; +using etcdserverpb::PutResponse; + +namespace etcdv3 +{ + class AsyncPutResponse : public etcdv3::V3Response + { + public: + AsyncPutResponse(){}; + AsyncPutResponse(const std::string act){action = act;}; + AsyncPutResponse(const AsyncPutResponse& other); + AsyncPutResponse& operator=(const AsyncPutResponse& other); + PutResponse reply; + Status status; + ClientContext context; + CompletionQueue cq_; + std::unique_ptr> response_reader; + AsyncPutResponse& ParseResponse(); + etcdv3::grpcClient* client; + std::string key; + }; +} + +#endif diff --git a/v3/src/AsyncModifyResponse.cpp b/v3/src/AsyncModifyResponse.cpp index 39fd61f..e0ad2d7 100644 --- a/v3/src/AsyncModifyResponse.cpp +++ b/v3/src/AsyncModifyResponse.cpp @@ -16,10 +16,7 @@ etcdv3::AsyncModifyResponse::AsyncModifyResponse(const etcdv3::AsyncModifyRespon index = other.index; action = other.action; values = other.values; - prev_value.set_key(other.prev_value.key()); - prev_value.set_value(other.prev_value.value()); - prev_value.set_create_revision(other.prev_value.create_revision()); - prev_value.set_mod_revision(other.prev_value.mod_revision()); + prev_values= other.prev_values; } etcdv3::AsyncModifyResponse::AsyncModifyResponse(const std::string &input) { @@ -32,10 +29,7 @@ etcdv3::AsyncModifyResponse& etcdv3::AsyncModifyResponse::operator=(const etcdv3 index = other.index; action = other.action; values = other.values; - prev_value.set_key(other.prev_value.key()); - prev_value.set_value(other.prev_value.value()); - prev_value.set_create_revision(other.prev_value.create_revision()); - prev_value.set_mod_revision(other.prev_value.mod_revision()); + prev_values= other.prev_values; return *this; } diff --git a/v3/src/AsyncPutResponse.cpp b/v3/src/AsyncPutResponse.cpp index 13e7612..709edda 100644 --- a/v3/src/AsyncPutResponse.cpp +++ b/v3/src/AsyncPutResponse.cpp @@ -13,10 +13,7 @@ etcdv3::AsyncPutResponse::AsyncPutResponse(const etcdv3::AsyncPutResponse& other index = other.index; action = other.action; values = other.values; - prev_value.set_key(other.prev_value.key()); - prev_value.set_value(other.prev_value.value()); - prev_value.set_create_revision(other.prev_value.create_revision()); - prev_value.set_mod_revision(other.prev_value.mod_revision()); + prev_values= other.prev_values; } @@ -27,10 +24,7 @@ etcdv3::AsyncPutResponse& etcdv3::AsyncPutResponse::operator=(const etcdv3::Asyn index = other.index; action = other.action; values = other.values; - prev_value.set_key(other.prev_value.key()); - prev_value.set_value(other.prev_value.value()); - prev_value.set_create_revision(other.prev_value.create_revision()); - prev_value.set_mod_revision(other.prev_value.mod_revision()); + prev_values= other.prev_values; return *this; }