From 326693a95e5513762898ea1a8e7d6244eefc2752 Mon Sep 17 00:00:00 2001 From: lampayan Date: Thu, 2 Jun 2016 13:36:58 +0200 Subject: [PATCH] commit for merge --- etcd/Client.hpp | 4 ++++ proto/auth.proto | 8 -------- proto/etcdserver.proto | 38 +++++++++++++++++++------------------- proto/kv.proto | 8 -------- proto/rpc.proto | 38 +++++++++++++++++++------------------- src/Client.cpp | 35 +++++++++++++++++++++++++++++++++-- src/Response.cpp | 15 +++++++++++---- tst/EtcdTest.cpp | 25 +++++++++++++++++++++++++ 8 files changed, 111 insertions(+), 60 deletions(-) diff --git a/etcd/Client.hpp b/etcd/Client.hpp index 8a6929a..76dcd43 100644 --- a/etcd/Client.hpp +++ b/etcd/Client.hpp @@ -11,6 +11,7 @@ using grpc::Channel; using etcdserverpb::KV; +using etcdserverpb::Watch; namespace etcd { @@ -41,6 +42,8 @@ namespace etcd */ pplx::task set(std::string const & key, std::string const & value); + void setv3(std::string const&, std::string const&); + /** * Creates a new key and sets it's value. Fails if the key already exists. * @param key is the key to be created @@ -140,6 +143,7 @@ namespace etcd web::http::client::http_client client; std::unique_ptr stub_; + std::unique_ptr watchServiceStub; }; } diff --git a/proto/auth.proto b/proto/auth.proto index 9308a17..50f9c33 100644 --- a/proto/auth.proto +++ b/proto/auth.proto @@ -1,14 +1,6 @@ syntax = "proto3"; package authpb; -import "gogoproto/gogo.proto"; - -option (gogoproto.marshaler_all) = true; -option (gogoproto.sizer_all) = true; -option (gogoproto.unmarshaler_all) = true; -option (gogoproto.goproto_getters_all) = false; -option (gogoproto.goproto_enum_prefix_all) = false; - // User is a single entry in the bucket authUsers message User { bytes name = 1; diff --git a/proto/etcdserver.proto b/proto/etcdserver.proto index c6821be..f51ec09 100644 --- a/proto/etcdserver.proto +++ b/proto/etcdserver.proto @@ -2,26 +2,26 @@ syntax = "proto2"; package etcdserverpb; message Request { - optional uint64 ID = 1 [(gogoproto.nullable) = false]; - optional string Method = 2 [(gogoproto.nullable) = false]; - optional string Path = 3 [(gogoproto.nullable) = false]; - optional string Val = 4 [(gogoproto.nullable) = false]; - optional bool Dir = 5 [(gogoproto.nullable) = false]; - optional string PrevValue = 6 [(gogoproto.nullable) = false]; - optional uint64 PrevIndex = 7 [(gogoproto.nullable) = false]; - optional bool PrevExist = 8 [(gogoproto.nullable) = true]; - optional int64 Expiration = 9 [(gogoproto.nullable) = false]; - optional bool Wait = 10 [(gogoproto.nullable) = false]; - optional uint64 Since = 11 [(gogoproto.nullable) = false]; - optional bool Recursive = 12 [(gogoproto.nullable) = false]; - optional bool Sorted = 13 [(gogoproto.nullable) = false]; - optional bool Quorum = 14 [(gogoproto.nullable) = false]; - optional int64 Time = 15 [(gogoproto.nullable) = false]; - optional bool Stream = 16 [(gogoproto.nullable) = false]; - optional bool Refresh = 17 [(gogoproto.nullable) = true]; + optional uint64 ID = 1; + optional string Method = 2; + optional string Path = 3; + optional string Val = 4; + optional bool Dir = 5; + optional string PrevValue = 6; + optional uint64 PrevIndex = 7; + optional bool PrevExist = 8; + optional int64 Expiration = 9; + optional bool Wait = 10; + optional uint64 Since = 11; + optional bool Recursive = 12; + optional bool Sorted = 13; + optional bool Quorum = 14; + optional int64 Time = 15; + optional bool Stream = 16; + optional bool Refresh = 17; } message Metadata { - optional uint64 NodeID = 1 [(gogoproto.nullable) = false]; - optional uint64 ClusterID = 2 [(gogoproto.nullable) = false]; + optional uint64 NodeID = 1; + optional uint64 ClusterID = 2; } diff --git a/proto/kv.proto b/proto/kv.proto index f0c82b5..9a8004c 100644 --- a/proto/kv.proto +++ b/proto/kv.proto @@ -1,14 +1,6 @@ syntax = "proto3"; package mvccpb; -import "gogoproto/gogo.proto"; - -option (gogoproto.marshaler_all) = true; -option (gogoproto.sizer_all) = true; -option (gogoproto.unmarshaler_all) = true; -option (gogoproto.goproto_getters_all) = false; -option (gogoproto.goproto_enum_prefix_all) = false; - message KeyValue { // key is the key in bytes. An empty key is not allowed. bytes key = 1; diff --git a/proto/rpc.proto b/proto/rpc.proto index 331cee5..c5463e1 100644 --- a/proto/rpc.proto +++ b/proto/rpc.proto @@ -23,7 +23,7 @@ service KV { // A txn request increments the revision of the key-value store // and generates events with the same revision for every completed request. // It is not allowed to modify the same key several times within one txn. - rpc Txn(TxnRequest) returns (TxnResponse) {} + //rpc Txn(TxnRequest) returns (TxnResponse) {} // Compact compacts the event history in the etcd key-value store. The key-value // store should be periodically compacted or the event history will continue to grow @@ -84,7 +84,7 @@ service Maintenance { // Hash returns the hash of the local KV state for consistency checking purpose. // This is designed for testing; do not use this in production when there // are ongoing transactions. - rpc Hash(HashRequest) returns (HashResponse) {} + //rpc Hash(HashRequest) returns (HashResponse) {} // Snapshot sends a snapshot of the entire backend from a member over a stream to a client. rpc Snapshot(SnapshotRequest) returns (stream SnapshotResponse) {} @@ -226,23 +226,23 @@ message DeleteRangeResponse { int64 deleted = 2; } -message RequestUnion { +//message RequestUnion { // request is a union of request types accepted by a transaction. - oneof request { - RangeRequest request_range = 1; - PutRequest request_put = 2; - DeleteRangeRequest request_delete_range = 3; - } -} +// oneof request { +// RangeRequest request_range = 1; +// PutRequest request_put = 2; +// DeleteRangeRequest request_delete_range = 3; +// } +//} -message ResponseUnion { +//message ResponseUnion { // response is a union of response types returned by a transaction. - oneof response { - RangeResponse response_range = 1; - PutResponse response_put = 2; - DeleteRangeResponse response_delete_range = 3; - } -} +// oneof response { +// RangeResponse response_range = 1; +// PutResponse response_put = 2; +// DeleteRangeResponse response_delete_range = 3; +// } +//} message Compare { enum CompareResult { @@ -297,9 +297,9 @@ message TxnRequest { // and the response will contain their respective responses in order. repeated Compare compare = 1; // success is a list of requests which will be applied when compare evaluates to true. - repeated RequestUnion success = 2; + //repeated RequestUnion success = 2; // failure is a list of requests which will be applied when compare evaluates to false. - repeated RequestUnion failure = 3; + //repeated RequestUnion failure = 3; } message TxnResponse { @@ -308,7 +308,7 @@ message TxnResponse { bool succeeded = 2; // responses is a list of responses corresponding to the results from applying // success if succeeded is true or failure if succeeded is false. - repeated ResponseUnion responses = 3; + //repeated ResponseUnion responses = 3; } // CompactionRequest compacts the key-value store up to a given revision. All superseded keys diff --git a/src/Client.cpp b/src/Client.cpp index 3c571ea..758d26c 100644 --- a/src/Client.cpp +++ b/src/Client.cpp @@ -1,5 +1,7 @@ #include "etcd/Client.hpp" +#include + etcd::Client::Client(std::string const & address) : client(address) { @@ -12,6 +14,7 @@ etcd::Client::Client(std::string const & address) } std::shared_ptr channel = grpc::CreateChannel(stripped_address, grpc::InsecureChannelCredentials()); stub_= KV::NewStub(channel); + watchServiceStub = Watch::NewStub(channel); } pplx::task etcd::Client::send_get_request(web::http::uri_builder & uri) @@ -43,6 +46,28 @@ pplx::task etcd::Client::set(std::string const & key, std::strin return send_put_request(uri, "value", value); } +void etcd::Client::setv3(std::string const &key, std::string const &value) +{ + std::cout << "FBDL setv3" << std::endl; + etcdserverpb::PutRequest putRequest; + putRequest.set_key(key); + putRequest.set_value(value); + + etcdserverpb::PutResponse putResponse; + grpc::ClientContext context; + + std::cout << "invoking put stub rpc" << std::endl; + grpc::Status status = stub_->Put(&context, putRequest, &putResponse); + + std::cout << "checking status" << std::endl; + if(status.ok()){ + std::cout << "put OK" << std::endl; + } + else { + std::cout << "put NOK" << std::endl; + } +} + pplx::task etcd::Client::add(std::string const & key, std::string const & value) { web::http::uri_builder uri("/v2/keys" + key); @@ -64,6 +89,7 @@ pplx::task etcd::Client::modify_if(std::string const & key, std: return send_put_request(uri, "value", value); } +//FBDL pplx::task etcd::Client::modify_if(std::string const & key, std::string const & value, int old_index) { web::http::uri_builder uri("/v2/keys" + key); @@ -73,8 +99,13 @@ pplx::task etcd::Client::modify_if(std::string const & key, std: pplx::task etcd::Client::rm(std::string const & key) { - web::http::uri_builder uri("/v2/keys" + key); - uri.append_query("dir=false"); + std::cout << "FBDL: rm is invoked" << std::endl; + web::http::uri_builder uri("/v2/keys" + key); // /v2/keys/test/key1 + std::cout << "FBDL url: " << uri.to_string() << std::endl; + + uri.append_query("dir=false"); // /v2/keys/test/key1?dir=false + std::cout << "FBDL url after append query: " << uri.to_string() << std::endl; + return Response::create(client.request("DELETE", uri.to_string())); } diff --git a/src/Response.cpp b/src/Response.cpp index ae32089..664da73 100644 --- a/src/Response.cpp +++ b/src/Response.cpp @@ -1,12 +1,19 @@ #include "etcd/Response.hpp" #include "json_constants.hpp" +#include + pplx::task etcd::Response::create(pplx::task response_task) { - return pplx::task ([response_task](){ - auto json_task = response_task.get().extract_json(); - return etcd::Response(response_task.get(), json_task.get()); - }); + std::cout << "FBDL Response create" << std::endl; + return pplx::task ( + [response_task]() + { + std::cout << "FBDL inside response task" << std::endl; + auto json_task = response_task.get().extract_json(); + return etcd::Response(response_task.get(), json_task.get()); + } + ); } etcd::Response::Response() diff --git a/tst/EtcdTest.cpp b/tst/EtcdTest.cpp index 244e768..a3411b2 100644 --- a/tst/EtcdTest.cpp +++ b/tst/EtcdTest.cpp @@ -70,8 +70,11 @@ TEST_CASE("set a key") TEST_CASE("delete a value") { + std::cout << "FBDL do a delete via rm only" < res = etcd.watch("/test/key1"); +// CHECK(!res.is_done()); +// sleep(1); +// CHECK(!res.is_done()); +// CHECK("42" == etcd.get("/test/key1").get().value().as_string()); +// +// etcd.set("/test/key1", "43").get(); +// sleep(1); +// REQUIRE(res.is_done()); +// REQUIRE("set" == res.get().action()); +// CHECK("43" == res.get().value().as_string()); +// CHECK("43" == etcd.get("/test/key1").get().value().as_string()); +} + TEST_CASE("wait for a directory change") { etcd::Client etcd("http://127.0.0.1:4001");