commit for merge

This commit is contained in:
lampayan 2016-06-02 13:36:58 +02:00
parent b78da53aa3
commit 326693a95e
8 changed files with 111 additions and 60 deletions

View File

@ -11,6 +11,7 @@
using grpc::Channel;
using etcdserverpb::KV;
using etcdserverpb::Watch;
namespace etcd
{
@ -41,6 +42,8 @@ namespace etcd
*/
pplx::task<Response> set(std::string const & key, std::string const & value);
void setv3(std::string const&, std::string const&);
/**
* Creates a new key and sets it's value. Fails if the key already exists.
* @param key is the key to be created
@ -140,6 +143,7 @@ namespace etcd
web::http::client::http_client client;
std::unique_ptr<KV::Stub> stub_;
std::unique_ptr<Watch::Stub> watchServiceStub;
};
}

View File

@ -1,14 +1,6 @@
syntax = "proto3";
package authpb;
import "gogoproto/gogo.proto";
option (gogoproto.marshaler_all) = true;
option (gogoproto.sizer_all) = true;
option (gogoproto.unmarshaler_all) = true;
option (gogoproto.goproto_getters_all) = false;
option (gogoproto.goproto_enum_prefix_all) = false;
// User is a single entry in the bucket authUsers
message User {
bytes name = 1;

View File

@ -2,26 +2,26 @@ syntax = "proto2";
package etcdserverpb;
message Request {
optional uint64 ID = 1 [(gogoproto.nullable) = false];
optional string Method = 2 [(gogoproto.nullable) = false];
optional string Path = 3 [(gogoproto.nullable) = false];
optional string Val = 4 [(gogoproto.nullable) = false];
optional bool Dir = 5 [(gogoproto.nullable) = false];
optional string PrevValue = 6 [(gogoproto.nullable) = false];
optional uint64 PrevIndex = 7 [(gogoproto.nullable) = false];
optional bool PrevExist = 8 [(gogoproto.nullable) = true];
optional int64 Expiration = 9 [(gogoproto.nullable) = false];
optional bool Wait = 10 [(gogoproto.nullable) = false];
optional uint64 Since = 11 [(gogoproto.nullable) = false];
optional bool Recursive = 12 [(gogoproto.nullable) = false];
optional bool Sorted = 13 [(gogoproto.nullable) = false];
optional bool Quorum = 14 [(gogoproto.nullable) = false];
optional int64 Time = 15 [(gogoproto.nullable) = false];
optional bool Stream = 16 [(gogoproto.nullable) = false];
optional bool Refresh = 17 [(gogoproto.nullable) = true];
optional uint64 ID = 1;
optional string Method = 2;
optional string Path = 3;
optional string Val = 4;
optional bool Dir = 5;
optional string PrevValue = 6;
optional uint64 PrevIndex = 7;
optional bool PrevExist = 8;
optional int64 Expiration = 9;
optional bool Wait = 10;
optional uint64 Since = 11;
optional bool Recursive = 12;
optional bool Sorted = 13;
optional bool Quorum = 14;
optional int64 Time = 15;
optional bool Stream = 16;
optional bool Refresh = 17;
}
message Metadata {
optional uint64 NodeID = 1 [(gogoproto.nullable) = false];
optional uint64 ClusterID = 2 [(gogoproto.nullable) = false];
optional uint64 NodeID = 1;
optional uint64 ClusterID = 2;
}

View File

@ -1,14 +1,6 @@
syntax = "proto3";
package mvccpb;
import "gogoproto/gogo.proto";
option (gogoproto.marshaler_all) = true;
option (gogoproto.sizer_all) = true;
option (gogoproto.unmarshaler_all) = true;
option (gogoproto.goproto_getters_all) = false;
option (gogoproto.goproto_enum_prefix_all) = false;
message KeyValue {
// key is the key in bytes. An empty key is not allowed.
bytes key = 1;

View File

@ -23,7 +23,7 @@ service KV {
// A txn request increments the revision of the key-value store
// and generates events with the same revision for every completed request.
// It is not allowed to modify the same key several times within one txn.
rpc Txn(TxnRequest) returns (TxnResponse) {}
//rpc Txn(TxnRequest) returns (TxnResponse) {}
// Compact compacts the event history in the etcd key-value store. The key-value
// store should be periodically compacted or the event history will continue to grow
@ -84,7 +84,7 @@ service Maintenance {
// Hash returns the hash of the local KV state for consistency checking purpose.
// This is designed for testing; do not use this in production when there
// are ongoing transactions.
rpc Hash(HashRequest) returns (HashResponse) {}
//rpc Hash(HashRequest) returns (HashResponse) {}
// Snapshot sends a snapshot of the entire backend from a member over a stream to a client.
rpc Snapshot(SnapshotRequest) returns (stream SnapshotResponse) {}
@ -226,23 +226,23 @@ message DeleteRangeResponse {
int64 deleted = 2;
}
message RequestUnion {
//message RequestUnion {
// request is a union of request types accepted by a transaction.
oneof request {
RangeRequest request_range = 1;
PutRequest request_put = 2;
DeleteRangeRequest request_delete_range = 3;
}
}
// oneof request {
// RangeRequest request_range = 1;
// PutRequest request_put = 2;
// DeleteRangeRequest request_delete_range = 3;
// }
//}
message ResponseUnion {
//message ResponseUnion {
// response is a union of response types returned by a transaction.
oneof response {
RangeResponse response_range = 1;
PutResponse response_put = 2;
DeleteRangeResponse response_delete_range = 3;
}
}
// oneof response {
// RangeResponse response_range = 1;
// PutResponse response_put = 2;
// DeleteRangeResponse response_delete_range = 3;
// }
//}
message Compare {
enum CompareResult {
@ -297,9 +297,9 @@ message TxnRequest {
// and the response will contain their respective responses in order.
repeated Compare compare = 1;
// success is a list of requests which will be applied when compare evaluates to true.
repeated RequestUnion success = 2;
//repeated RequestUnion success = 2;
// failure is a list of requests which will be applied when compare evaluates to false.
repeated RequestUnion failure = 3;
//repeated RequestUnion failure = 3;
}
message TxnResponse {
@ -308,7 +308,7 @@ message TxnResponse {
bool succeeded = 2;
// responses is a list of responses corresponding to the results from applying
// success if succeeded is true or failure if succeeded is false.
repeated ResponseUnion responses = 3;
//repeated ResponseUnion responses = 3;
}
// CompactionRequest compacts the key-value store up to a given revision. All superseded keys

View File

@ -1,5 +1,7 @@
#include "etcd/Client.hpp"
#include <iostream>
etcd::Client::Client(std::string const & address)
: client(address)
{
@ -12,6 +14,7 @@ etcd::Client::Client(std::string const & address)
}
std::shared_ptr<Channel> channel = grpc::CreateChannel(stripped_address, grpc::InsecureChannelCredentials());
stub_= KV::NewStub(channel);
watchServiceStub = Watch::NewStub(channel);
}
pplx::task<etcd::Response> etcd::Client::send_get_request(web::http::uri_builder & uri)
@ -43,6 +46,28 @@ pplx::task<etcd::Response> etcd::Client::set(std::string const & key, std::strin
return send_put_request(uri, "value", value);
}
void etcd::Client::setv3(std::string const &key, std::string const &value)
{
std::cout << "FBDL setv3" << std::endl;
etcdserverpb::PutRequest putRequest;
putRequest.set_key(key);
putRequest.set_value(value);
etcdserverpb::PutResponse putResponse;
grpc::ClientContext context;
std::cout << "invoking put stub rpc" << std::endl;
grpc::Status status = stub_->Put(&context, putRequest, &putResponse);
std::cout << "checking status" << std::endl;
if(status.ok()){
std::cout << "put OK" << std::endl;
}
else {
std::cout << "put NOK" << std::endl;
}
}
pplx::task<etcd::Response> etcd::Client::add(std::string const & key, std::string const & value)
{
web::http::uri_builder uri("/v2/keys" + key);
@ -64,6 +89,7 @@ pplx::task<etcd::Response> etcd::Client::modify_if(std::string const & key, std:
return send_put_request(uri, "value", value);
}
//FBDL
pplx::task<etcd::Response> etcd::Client::modify_if(std::string const & key, std::string const & value, int old_index)
{
web::http::uri_builder uri("/v2/keys" + key);
@ -73,8 +99,13 @@ pplx::task<etcd::Response> etcd::Client::modify_if(std::string const & key, std:
pplx::task<etcd::Response> etcd::Client::rm(std::string const & key)
{
web::http::uri_builder uri("/v2/keys" + key);
uri.append_query("dir=false");
std::cout << "FBDL: rm is invoked" << std::endl;
web::http::uri_builder uri("/v2/keys" + key); // /v2/keys/test/key1
std::cout << "FBDL url: " << uri.to_string() << std::endl;
uri.append_query("dir=false"); // /v2/keys/test/key1?dir=false
std::cout << "FBDL url after append query: " << uri.to_string() << std::endl;
return Response::create(client.request("DELETE", uri.to_string()));
}

View File

@ -1,12 +1,19 @@
#include "etcd/Response.hpp"
#include "json_constants.hpp"
#include <iostream>
pplx::task<etcd::Response> etcd::Response::create(pplx::task<web::http::http_response> response_task)
{
return pplx::task<etcd::Response> ([response_task](){
std::cout << "FBDL Response create" << std::endl;
return pplx::task<etcd::Response> (
[response_task]()
{
std::cout << "FBDL inside response task" << std::endl;
auto json_task = response_task.get().extract_json();
return etcd::Response(response_task.get(), json_task.get());
});
}
);
}
etcd::Response::Response()

View File

@ -70,8 +70,11 @@ TEST_CASE("set a key")
TEST_CASE("delete a value")
{
std::cout << "FBDL do a delete via rm only" <<std::endl;
etcd::Client etcd("http://127.0.0.1:4001");
CHECK(3 == etcd.ls("/test").get().keys().size());
std::cout << "FBDL invoking rm in test" <<std::endl;
etcd::Response resp = etcd.rm("/test/key1").get();
CHECK("43" == resp.prev_value().as_string());
CHECK("delete" == resp.action());
@ -132,6 +135,28 @@ TEST_CASE("wait for a value change")
CHECK("43" == res.get().value().as_string());
}
//FBDL: first watch
TEST_CASE("FBDL wait for a value change")
{
std::cout << "FBDL wait for a value change" << std::endl;
etcd::Client etcd("http://127.0.0.1:4001");
// etcd.set("/test/key1", "42").wait();
etcd.setv3("test/key1", "42");
// pplx::task<etcd::Response> res = etcd.watch("/test/key1");
// CHECK(!res.is_done());
// sleep(1);
// CHECK(!res.is_done());
// CHECK("42" == etcd.get("/test/key1").get().value().as_string());
//
// etcd.set("/test/key1", "43").get();
// sleep(1);
// REQUIRE(res.is_done());
// REQUIRE("set" == res.get().action());
// CHECK("43" == res.get().value().as_string());
// CHECK("43" == etcd.get("/test/key1").get().value().as_string());
}
TEST_CASE("wait for a directory change")
{
etcd::Client etcd("http://127.0.0.1:4001");