Merge remote-tracking branch 'origin/maui' into other_dev

# Conflicts:
#	etcd/Client.hpp
#	etcd/Response.hpp
#	proto/rpc.proto
#	src/CMakeLists.txt
#	src/Client.cpp
#	src/Response.cpp
#	tst/EtcdTest.cpp
This commit is contained in:
lampayan 2016-06-08 14:53:34 +02:00
commit 33be7c9092
19 changed files with 764 additions and 474 deletions

View File

@ -3,14 +3,16 @@ project (etcd-cpp-api)
find_library(CPPREST_LIB NAMES cpprest)
find_package(Boost REQUIRED COMPONENTS system thread locale random)
find_package(Protobuf REQUIRED)
set (etcd-cpp-api_VERSION_MAJOR 0)
set (etcd-cpp-api_VERSION_MINOR 1)
enable_testing()
include_directories(${CMAKE_CURRENT_SOURCE_DIR})
include_directories(${CMAKE_CURRENT_SOURCE_DIR} /home/arches/casablanca/Release/include /home/arches/grpc/include)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Werror")
add_subdirectory(src)
add_subdirectory(tst)

View File

@ -9,6 +9,9 @@
#include <grpc++/grpc++.h>
#include "proto/rpc.grpc.pb.h"
#include "etcd/AsyncDeleteResponse.h"
#include "v3/include/AsyncRangeResponse.hpp"
#include "v3/include/grpcClient.hpp"
using grpc::Channel;
using grpc::ClientAsyncResponseReader;
@ -157,35 +160,25 @@ namespace etcd
web::http::client::http_client client;
std::unique_ptr<KV::Stub> stub_;
pplx::task<etcd::Response> send_asyncput(const std::string& key, const std::string& value);
std::unique_ptr<Watch::Stub> watchServiceStub;
pplx::task<etcd::Response> send_asyncget(std::string const & key);
pplx::task<etcd::Response> send_asyncadd(std::string const & key, const std::string& value);
pplx::task<etcd::Response> send_asyncmodify(std::string const & key, std::string const & value);
pplx::task<etcd::Response> send_put(const std::string& key, const std::string& value);
pplx::task<etcd::Response> send_get(std::string const & key);
pplx::task<etcd::Response> send_asyncmodify_if(std::string const & key, std::string const & value, std::string const & old_value);
etcdv3::grpcClient grpcClient;
private:
void getEntryForPreviousValue(const std::string& entryKey, etcd::AsyncDeleteResponse* drp);
};
class AsyncPutResponse
{
public:
PutResponse reply;
Status status;
ClientContext context;
CompletionQueue cq_;
std::unique_ptr<ClientAsyncResponseReader<PutResponse>> response_reader;
Response ParseResponse();
};
class AsyncRangeResponse
{
public:
RangeResponse reply;
Status status;
ClientContext context;
CompletionQueue cq_;
std::unique_ptr<ClientAsyncResponseReader<RangeResponse>> response_reader;
Response ParseResponse();
};
}
#endif

View File

@ -8,6 +8,9 @@
#include "etcd/Value.hpp"
#include <grpc++/grpc++.h>
#include "v3/include/V3Response.hpp"
#include <grpc++/grpc++.h>
namespace etcd
{
typedef std::vector<std::string> Keys;
@ -49,6 +52,37 @@ namespace etcd
});
};
static pplx::task<Response> createResponse(const etcdv3::V3Response& response);
template<typename T>static pplx::task<etcd::Response> create(T call)
{
return pplx::task<etcd::Response>([call]()
{
void* got_tag;
bool ok = false;
etcd::Response resp;
//blocking
call->cq_.Next(&got_tag, &ok);
GPR_ASSERT(got_tag == (void*)call);
GPR_ASSERT(ok);
T call = static_cast<T>(got_tag);
if(call->status.ok())
{
auto v3resp = call->ParseResponse();
resp = etcd::Response(v3resp);
}
else
{
throw std::runtime_error(call->status.error_message());
}
delete call; //todo:make this a smart pointer
return resp;
});
};
Response();
/**
@ -106,8 +140,9 @@ namespace etcd
*/
std::string const & key(int index) const;
protected:
protected:
Response(web::http::http_response http_response, web::json::value json_value);
Response(const etcdv3::V3Response& response);
int _error_code;
std::string _error_message;

View File

@ -4,6 +4,7 @@
#include <cpprest/http_client.h>
#include <string>
#include <vector>
#include "proto/kv.pb.h"
namespace etcd
{
@ -46,6 +47,7 @@ namespace etcd
friend class AsyncDeleteResponse;
Value();
Value(web::json::value const & json_value);
Value(mvccpb::KeyValue const & kvs);
std::string _key;
bool dir;
std::string value;

View File

@ -18,17 +18,6 @@ service KV {
// A delete request increments the revision of the key-value store
// and generates a delete event in the event history for every deleted key.
rpc DeleteRange(DeleteRangeRequest) returns (DeleteRangeResponse) {}
// Txn processes multiple requests in a single transaction.
// A txn request increments the revision of the key-value store
// and generates events with the same revision for every completed request.
// It is not allowed to modify the same key several times within one txn.
//rpc Txn(TxnRequest) returns (TxnResponse) {}
// Compact compacts the event history in the etcd key-value store. The key-value
// store should be periodically compacted or the event history will continue to grow
// indefinitely.
rpc Compact(CompactionRequest) returns (CompactionResponse) {}
}
service Watch {
@ -84,7 +73,7 @@ service Maintenance {
// Hash returns the hash of the local KV state for consistency checking purpose.
// This is designed for testing; do not use this in production when there
// are ongoing transactions.
//rpc Hash(HashRequest) returns (HashResponse) {}
rpc Hash(HashRequest) returns (HashResponse) {}
// Snapshot sends a snapshot of the entire backend from a member over a stream to a client.
rpc Snapshot(SnapshotRequest) returns (stream SnapshotResponse) {}
@ -226,23 +215,14 @@ message DeleteRangeResponse {
int64 deleted = 2;
}
//message RequestUnion {
message RequestUnion {
// request is a union of request types accepted by a transaction.
// oneof request {
// RangeRequest request_range = 1;
// PutRequest request_put = 2;
// DeleteRangeRequest request_delete_range = 3;
// }
//}
//message ResponseUnion {
// response is a union of response types returned by a transaction.
// oneof response {
// RangeResponse response_range = 1;
// PutResponse response_put = 2;
// DeleteRangeResponse response_delete_range = 3;
// }
//}
oneof requestXXX {
RangeRequest request_range = 1;
PutRequest request_put = 2;
DeleteRangeRequest request_delete_range = 3;
}
}
message Compare {
enum CompareResult {
@ -274,58 +254,6 @@ message Compare {
}
}
// From google paxosdb paper:
// Our implementation hinges around a powerful primitive which we call MultiOp. All other database
// operations except for iteration are implemented as a single call to MultiOp. A MultiOp is applied atomically
// and consists of three components:
// 1. A list of tests called guard. Each test in guard checks a single entry in the database. It may check
// for the absence or presence of a value, or compare with a given value. Two different tests in the guard
// may apply to the same or different entries in the database. All tests in the guard are applied and
// MultiOp returns the results. If all tests are true, MultiOp executes t op (see item 2 below), otherwise
// it executes f op (see item 3 below).
// 2. A list of database operations called t op. Each operation in the list is either an insert, delete, or
// lookup operation, and applies to a single database entry. Two different operations in the list may apply
// to the same or different entries in the database. These operations are executed
// if guard evaluates to
// true.
// 3. A list of database operations called f op. Like t op, but executed if guard evaluates to false.
message TxnRequest {
// compare is a list of predicates representing a conjunction of terms.
// If the comparisons succeed, then the success requests will be processed in order,
// and the response will contain their respective responses in order.
// If the comparisons fail, then the failure requests will be processed in order,
// and the response will contain their respective responses in order.
repeated Compare compare = 1;
// success is a list of requests which will be applied when compare evaluates to true.
//repeated RequestUnion success = 2;
// failure is a list of requests which will be applied when compare evaluates to false.
//repeated RequestUnion failure = 3;
}
message TxnResponse {
ResponseHeader header = 1;
// succeeded is set to true if the compare evaluated to true or false otherwise.
bool succeeded = 2;
// responses is a list of responses corresponding to the results from applying
// success if succeeded is true or failure if succeeded is false.
//repeated ResponseUnion responses = 3;
}
// CompactionRequest compacts the key-value store up to a given revision. All superseded keys
// with a revision less than the compaction revision will be removed.
message CompactionRequest {
// revision is the key-value store revision for the compaction operation.
int64 revision = 1;
// physical is set so the RPC will wait until the compaction is physically
// applied to the local database such that compacted entries are totally
// removed from the backend database.
bool physical = 2;
}
message CompactionResponse {
ResponseHeader header = 1;
}
message HashRequest {
}

View File

@ -1,4 +1,4 @@
add_library(etcd-cpp-api SHARED ../proto/kv.pb.cc ../proto/auth.pb.cc ../proto/rpc.pb.cc ../proto/rpc.grpc.pb.cc Client.cpp Response.cpp Value.cpp json_constants.cpp BaseResponse.cpp AsyncDeleteResponse.cpp)
add_library(etcd-cpp-api SHARED ../proto/kv.pb.cc ../proto/auth.pb.cc ../proto/rpc.pb.cc ../proto/rpc.grpc.pb.cc ../v3/src/Utils.cpp ../v3/src/grpcClient.cpp ../v3/src/AsyncRangeResponse.cpp ../v3/src/AsyncPutResponse.cpp Client.cpp Response.cpp Value.cpp json_constants.cpp BaseResponse.cpp AsyncDeleteResponse.cpp)
set_property(TARGET etcd-cpp-api PROPERTY CXX_STANDARD 11)
target_link_libraries(etcd-cpp-api ${CPPREST_LIB} boost_system ssl crypto protobuf grpc++)

View File

@ -1,21 +1,14 @@
#include "etcd/Client.hpp"
#include "v3/include/AsyncRangeResponse.hpp"
#include "v3/include/AsyncPutResponse.hpp"
#include "v3/include/Utils.hpp"
#include <iostream>
#include "etcd/AsyncDeleteResponse.h"
etcd::Client::Client(std::string const & address)
: client(address)
: client(address), grpcClient(address)
{
std::string stripped_address(address);
std::string substr("http://");
std::string::size_type i = stripped_address.find(substr);
if(i != std::string::npos)
{
stripped_address.erase(i,substr.length());
}
std::shared_ptr<Channel> channel = grpc::CreateChannel(stripped_address, grpc::InsecureChannelCredentials());
stub_= KV::NewStub(channel);
watchServiceStub = Watch::NewStub(channel);
}
pplx::task<etcd::Response> etcd::Client::send_get_request(web::http::uri_builder & uri)
@ -37,12 +30,12 @@ pplx::task<etcd::Response> etcd::Client::send_put_request(web::http::uri_builder
pplx::task<etcd::Response> etcd::Client::get(std::string const & key)
{
return send_get(key);
return send_asyncget(key);
}
pplx::task<etcd::Response> etcd::Client::set(std::string const & key, std::string const & value)
{
return send_put(key,value);
return send_asyncput(key,value);
}
//TODO: a temporary set, until set version 3 is implemented
@ -66,23 +59,17 @@ void etcd::Client::setv3(std::string const &key, std::string const &value)
pplx::task<etcd::Response> etcd::Client::add(std::string const & key, std::string const & value)
{
web::http::uri_builder uri("/v2/keys" + key);
uri.append_query("prevExist=false");
return send_put_request(uri, "value", value);
return send_asyncadd(key,value);
}
pplx::task<etcd::Response> etcd::Client::modify(std::string const & key, std::string const & value)
{
web::http::uri_builder uri("/v2/keys" + key);
uri.append_query("prevExist=true");
return send_put_request(uri, "value", value);
return send_asyncmodify(key,value);
}
pplx::task<etcd::Response> etcd::Client::modify_if(std::string const & key, std::string const & value, std::string const & old_value)
{
web::http::uri_builder uri("/v2/keys" + key);
uri.append_query("prevValue", old_value);
return send_put_request(uri, "value", value);
return send_asyncmodify_if(key, value, old_value);
}
//FBDL
@ -213,97 +200,151 @@ pplx::task<etcd::Response> etcd::Client::watch(std::string const & key, int from
}
etcd::Response etcd::AsyncPutResponse::ParseResponse()
{
std::cout << reply.header().revision() << std::endl;
return etcd::Response();
}
etcd::Response etcd::AsyncRangeResponse::ParseResponse()
pplx::task<etcd::Response> etcd::Client::send_asyncadd(std::string const & key, std::string const & value)
{
mvccpb::KeyValue kvs;
if(reply.kvs_size())
//check if key already exist
etcdv3::AsyncRangeResponse* resp = etcdv3::Utils::getKey(key, grpcClient);
if(resp->reply.kvs_size())
{
int index=0;
do
{
kvs = reply.kvs(index++);
std::cout<<reply.header().revision() << std::endl;
std::cout << kvs.create_revision() << std::endl;
std::cout << kvs.mod_revision() << std::endl;
std::cout << kvs.version() << std::endl;
}while(reply.more());
resp->error_code=105;
resp->error_message="Key already exists";
return Response::createResponse(*resp);
}
return etcd::Response();
PutRequest put_request;
put_request.set_key(key);
put_request.set_value(value);
etcdv3::AsyncPutResponse* call= new etcdv3::AsyncPutResponse("create");
//below 2 lines can be removed once we are able to use Txn
call->client = &grpcClient;
call->key = key;
call->response_reader = grpcClient.stub_->AsyncPut(&call->context,put_request,&call->cq_);
call->response_reader->Finish(&call->reply, &call->status, (void*)call);
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::send_get(std::string const & key)
pplx::task<etcd::Response> etcd::Client::send_asyncmodify_if(std::string const & key, std::string const & value, std::string const & old_value)
{
//check current key is equal to old_value
etcdv3::AsyncRangeResponse* resp = etcdv3::Utils::getKey(key, grpcClient);
if(!resp->reply.kvs_size())
{
resp->error_code=100;
resp->error_message="Key not found";
return Response::createResponse(*resp);
}
else
{
if(resp->reply.kvs(0).value() != old_value)
{
resp->error_code=101;
resp->error_message="Compare failed";
return Response::createResponse(*resp);
}
}
PutRequest put_request;
put_request.set_key(key);
put_request.set_value(value);
etcdv3::AsyncPutResponse* call= new etcdv3::AsyncPutResponse("compareAndSwap");
//below 2 lines can be removed once we are able to use Txn
call->prev_value = resp->reply.kvs(0);
call->client = &grpcClient;
call->key = key;
call->response_reader = grpcClient.stub_->AsyncPut(&call->context,put_request,&call->cq_);
call->response_reader->Finish(&call->reply, &call->status, (void*)call);
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::send_asyncmodify(std::string const & key, std::string const & value)
{
//check if key already exist
etcdv3::AsyncRangeResponse* resp = etcdv3::Utils::getKey(key, grpcClient);
if(!resp->reply.kvs_size())
{
resp->error_code=100;
resp->error_message="Key not found";
return Response::createResponse(*resp);
}
PutRequest put_request;
put_request.set_key(key);
put_request.set_value(value);
etcdv3::AsyncPutResponse* call= new etcdv3::AsyncPutResponse("update");
//below 2 lines can be removed once we are able to use Txn
call->prev_value = resp->reply.kvs(0);
call->client = &grpcClient;
call->key = key;
call->response_reader = grpcClient.stub_->AsyncPut(&call->context,put_request,&call->cq_);
call->response_reader->Finish(&call->reply, &call->status, (void*)call);
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::send_asyncget(std::string const & key)
{
RangeRequest request;
request.set_key(key);
etcd::AsyncRangeResponse* call= new etcd::AsyncRangeResponse();
etcdv3::AsyncRangeResponse* call= new etcdv3::AsyncRangeResponse();
call->response_reader = stub_->AsyncRange(&call->context,request,&call->cq_);
call->response_reader = grpcClient.stub_->AsyncRange(&call->context,request,&call->cq_);
call->response_reader->Finish(&call->reply, &call->status, (void*)call);
return pplx::task<etcd::Response>([call]()
{
void* got_tag;
bool ok = false;
etcd::Response resp;
//blocking
call->cq_.Next(&got_tag, &ok);
GPR_ASSERT(got_tag == (void*)call);
GPR_ASSERT(ok);
etcd::AsyncRangeResponse* call = static_cast<etcd::AsyncRangeResponse*>(got_tag);
if(call->status.ok())
{
resp = call->ParseResponse();
}
delete call;
return resp;
});
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::send_put(std::string const & key, std::string const & value)
pplx::task<etcd::Response> etcd::Client::send_asyncput(std::string const & key, std::string const & value)
{
PutRequest request;
request.set_key(key);
request.set_value(value);
etcd::AsyncPutResponse* call= new etcd::AsyncPutResponse();
call->response_reader = stub_->AsyncPut(&call->context,request,&call->cq_);
PutRequest put_request;
put_request.set_key(key);
put_request.set_value(value);
etcdv3::AsyncPutResponse* call= new etcdv3::AsyncPutResponse("set");
//get current value
etcdv3::AsyncRangeResponse* resp = etcdv3::Utils::getKey(key, grpcClient);
if(resp->reply.kvs_size())
{
call->prev_value = resp->reply.kvs(0);
}
call->client = &grpcClient;
call->key = key;
call->response_reader = grpcClient.stub_->AsyncPut(&call->context,put_request,&call->cq_);
call->response_reader->Finish(&call->reply, &call->status, (void*)call);
return pplx::task<etcd::Response>([call]()
{
void* got_tag;
bool ok = false;
etcd::Response resp;
//blocking
call->cq_.Next(&got_tag, &ok);
GPR_ASSERT(got_tag == (void*)call);
GPR_ASSERT(ok);
etcd::AsyncPutResponse* call = static_cast<etcd::AsyncPutResponse*>(got_tag);
if(call->status.ok())
{
resp = call->ParseResponse();
}
delete call;
return resp;
});
return Response::create(call);
}

View File

@ -14,6 +14,35 @@ pplx::task<etcd::Response> etcd::Response::create(pplx::task<web::http::http_res
);
}
pplx::task<etcd::Response> etcd::Response::createResponse(const etcdv3::V3Response& response)
{
return pplx::task<etcd::Response>([response](){
return etcd::Response(response);
});
}
etcd::Response::Response(const etcdv3::V3Response& reply)
{
_index = reply.index;
_error_code = reply.error_code;
_error_message = reply.error_message;
_action = reply.action;
int size = reply.values.size();
if(size > 1)
{
for(int x = 0; x < size; x++)
_values.push_back(Value(reply.values[x]));
}
else if(size == 1)
{
_value = Value(reply.values[0]);
}
_prev_value = Value(reply.prev_value);
}
etcd::Response::Response()
: _error_code(0),
_index(0)

View File

@ -1,5 +1,6 @@
#include "etcd/Value.hpp"
#include "json_constants.hpp"
#include "proto/kv.pb.h"
etcd::Value::Value()
: dir(false),
@ -17,6 +18,15 @@ etcd::Value::Value(web::json::value const & json_value)
{
}
etcd::Value::Value(mvccpb::KeyValue const & kvs)
{
dir=false;
_key=kvs.key();
value=kvs.value();
created=kvs.create_revision();
modified=kvs.mod_revision();
}
std::string const & etcd::Value::key() const
{
return _key;

View File

@ -5,284 +5,277 @@
//TEST_CASE("setup")
//{
// etcd::Client etcd("http://127.0.0.1:4001");
// etcd.rmdir("/test", true).wait();
//}
//
//TEST_CASE("add a new key")
//{
// etcd::Client etcd("http://127.0.0.1:4001");
// etcd::Response resp = etcd.add("/test/key1", "42").get();
// REQUIRE(0 == resp.error_code());
// CHECK("create" == resp.action());
// etcd::Value const & val = resp.value();
// CHECK("42" == val.as_string());
// CHECK("/test/key1" == val.key());
// CHECK(!val.is_dir());
// CHECK(0 < val.created_index());
// CHECK(0 < val.modified_index());
// CHECK(0 < resp.index()); // X-Etcd-Index header value
// CHECK(105 == etcd.add("/test/key1", "43").get().error_code()); // Key already exists
// CHECK(105 == etcd.add("/test/key1", "42").get().error_code()); // Key already exists
// CHECK("Key already exists" == etcd.add("/test/key1", "42").get().error_message());
//}
//
//TEST_CASE("read a value from etcd")
//{
// etcd::Client etcd("http://127.0.0.1:4001");
// etcd::Response resp = etcd.get("/test/key1").get();
// CHECK("get" == resp.action());
// REQUIRE(resp.is_ok());
// REQUIRE(0 == resp.error_code());
// CHECK("42" == resp.value().as_string());
//
// CHECK("" == etcd.get("/test").get().value().as_string()); // key points to a directory
//}
//
//TEST_CASE("simplified read")
//{
// etcd::Client etcd("http://127.0.0.1:4001");
// CHECK("42" == etcd.get("/test/key1").get().value().as_string());
// std::cout << "get error code kahit success: " << etcd.get("/test/key1").get().error_code() << std::endl;
// CHECK(100 == etcd.get("/test/key2").get().error_code()); // Key not found
//}
//
//TEST_CASE("modify a key")
//{
// etcd::Client etcd("http://127.0.0.1:4001");
// etcd::Response resp = etcd.modify("/test/key1", "43").get();
// REQUIRE(0 == resp.error_code()); // overwrite
// CHECK("update" == resp.action());
// CHECK(100 == etcd.modify("/test/key2", "43").get().error_code()); // Key not found
// CHECK("43" == etcd.modify("/test/key1", "42").get().prev_value().as_string());
//}
//
////TEST_CASE("set a key")
////{
//// etcd::Client etcd("http://127.0.0.1:4001");
//// etcd::Response resp = etcd.set("/test/key1", "43").get();
//// REQUIRE(0 == resp.error_code()); // overwrite
//// CHECK("set" == resp.action());
//// CHECK(0 == etcd.set("/test/key2", "43").get().error_code()); // create new
//// CHECK("43" == etcd.set("/test/key2", "44").get().prev_value().as_string());
//// CHECK("" == etcd.set("/test/key3", "44").get().prev_value().as_string());
//// CHECK(102 == etcd.set("/test", "42").get().error_code()); // Not a file
////}
////
////FBDL rm naman
////TEST_CASE("delete a value")
////{
//// std::cout << "FBDL do a delete via rm only" <<std::endl;
//// etcd::Client etcd("http://127.0.0.1:4001");
//// CHECK(3 == etcd.ls("/test").get().keys().size());
////
//// std::cout << "FBDL invoking rm in test" <<std::endl;
//// etcd::Response resp = etcd.rm("/test/key1").get();
//// CHECK("43" == resp.prev_value().as_string());
//// CHECK("delete" == resp.action());
//// CHECK(2 == etcd.ls("/test").get().keys().size());
////}
TEST_CASE("delete a value V3 representation")
{
etcd::Client etcd("http://127.0.0.1:4001");
etcd.setv3("test/key1", "42");
etcd::Response resp = etcd.rm("test/key1").get();
CHECK("42" == resp.prev_value().as_string());
CHECK("delete" == resp.action());
etcd::Client etcd("http://192.168.99.100:2379");
//etcd.rmdir("/test", true).wait();
}
//TEST_CASE("create a directory")
//{
// etcd::Client etcd("http://127.0.0.1:4001");
// etcd::Response resp = etcd.mkdir("/test/new_dir").get();
// CHECK("set" == resp.action());
// CHECK(resp.value().is_dir());
//}
//
//TEST_CASE("list a directory")
//{
// etcd::Client etcd("http://127.0.0.1:4001");
// CHECK(0 == etcd.ls("/test/new_dir").get().keys().size());
//
// etcd.set("/test/new_dir/key1", "value1").wait();
// etcd.set("/test/new_dir/key2", "value2").wait();
// etcd.mkdir("/test/new_dir/sub_dir").wait();
//
// etcd::Response resp = etcd.ls("/test/new_dir").get();
// CHECK("get" == resp.action());
// REQUIRE(3 == resp.keys().size());
// CHECK("key1" == resp.key(0));
// CHECK("key2" == resp.key(1));
// CHECK("sub_dir" == resp.key(2));
// CHECK("value1" == resp.value(0).as_string());
// CHECK("value2" == resp.value(1).as_string());
// CHECK(resp.values()[2].is_dir());
//
// CHECK(0 == etcd.ls("/test/new_dir/key1").get().error_code());
//}
//
//TEST_CASE("delete a directory")
//{
// etcd::Client etcd("http://127.0.0.1:4001");
// CHECK(108 == etcd.rmdir("/test/new_dir").get().error_code()); // Directory not empty
// CHECK(0 == etcd.rmdir("/test/new_dir", true).get().error_code());
//}
//
//TEST_CASE("wait for a value change")
//{
// etcd::Client etcd("http://127.0.0.1:4001");
// etcd.set("/test/key1", "42").wait();
//
// pplx::task<etcd::Response> res = etcd.watch("/test/key1");
// CHECK(!res.is_done());
// sleep(1);
// CHECK(!res.is_done());
//
// etcd.set("/test/key1", "43").get();
// sleep(1);
// REQUIRE(res.is_done());
// REQUIRE("set" == res.get().action());
// CHECK("43" == res.get().value().as_string());
//}
//
////FBDL: first watch
////TEST_CASE("FBDL wait for a value change")
////{
//// std::cout << "FBDL wait for a value change" << std::endl;
//// etcd::Client etcd("http://127.0.0.1:4001");
////// etcd.set("/test/key1", "42").wait();
//// etcd.setv3("test/key1", "42");
////
////// pplx::task<etcd::Response> res = etcd.watch("/test/key1");
////// CHECK(!res.is_done());
////// sleep(1);
////// CHECK(!res.is_done());
////// CHECK("42" == etcd.get("/test/key1").get().value().as_string());
//////
////// etcd.set("/test/key1", "43").get();
////// sleep(1);
////// REQUIRE(res.is_done());
////// REQUIRE("set" == res.get().action());
////// CHECK("43" == res.get().value().as_string());
////// CHECK("43" == etcd.get("/test/key1").get().value().as_string());
////}
//
//TEST_CASE("wait for a directory change")
//{
// etcd::Client etcd("http://127.0.0.1:4001");
//
// pplx::task<etcd::Response> res = etcd.watch("/test", true);
// CHECK(!res.is_done());
// sleep(1);
// CHECK(!res.is_done());
//
// etcd.add("/test/key4", "44").wait();
// sleep(1);
// REQUIRE(res.is_done());
// CHECK("create" == res.get().action());
// CHECK("44" == res.get().value().as_string());
//
// pplx::task<etcd::Response> res2 = etcd.watch("/test", true);
// CHECK(!res2.is_done());
// sleep(1);
// CHECK(!res2.is_done());
//
// etcd.set("/test/key4", "45").wait();
// REQUIRE(res2.is_done());
// CHECK("set" == res2.get().action());
// CHECK("45" == res2.get().value().as_string());
//}
//
//TEST_CASE("watch changes in the past")
//{
// etcd::Client etcd("http://127.0.0.1:4001");
//
// int index = etcd.set("/test/key1", "42").get().index();
//
// etcd.set("/test/key1", "43").wait();
// etcd.set("/test/key1", "44").wait();
// etcd.set("/test/key1", "45").wait();
//
// etcd::Response res = etcd.watch("/test/key1", ++index).get();
// CHECK("set" == res.action());
// CHECK("43" == res.value().as_string());
//
// res = etcd.watch("/test/key1", ++index).get();
// CHECK("set" == res.action());
// CHECK("44" == res.value().as_string());
//
// res = etcd.watch("/test", ++index, true).get();
// CHECK("set" == res.action());
// CHECK("45" == res.value().as_string());
//}
//
//TEST_CASE("atomic compare-and-swap")
//{
// etcd::Client etcd("http://127.0.0.1:4001");
// etcd.set("/test/key1", "42").wait();
//
// // modify success
// etcd::Response res = etcd.modify_if("/test/key1", "43", "42").get();
// int index = res.index();
// REQUIRE(res.is_ok());
// CHECK("compareAndSwap" == res.action());
// CHECK("43" == res.value().as_string());
//
// // modify fails the second time
// res = etcd.modify_if("/test/key1", "44", "42").get();
// CHECK(!res.is_ok());
// CHECK(101 == res.error_code());
// CHECK("Compare failed" == res.error_message());
//
// // succes with the correct index
// res = etcd.modify_if("/test/key1", "44", index).get();
// REQUIRE(res.is_ok());
// CHECK("compareAndSwap" == res.action());
// CHECK("44" == res.value().as_string());
//
// // index changes so second modify fails
// res = etcd.modify_if("/test/key1", "45", index).get();
// CHECK(!res.is_ok());
// CHECK(101 == res.error_code());
// CHECK("Compare failed" == res.error_message());
//}
//
//TEST_CASE("atomic compare-and-delete based on prevValue")
//{
// etcd::Client etcd("http://127.0.0.1:4001");
// etcd.set("/test/key1", "42").wait();
//
// etcd::Response res = etcd.rm_if("/test/key1", "43").get();
// CHECK(!res.is_ok());
// CHECK(101 == res.error_code());
// CHECK("Compare failed" == res.error_message());
//
// res = etcd.rm_if("/test/key1", "42").get();
// REQUIRE(res.is_ok());
// CHECK("compareAndDelete" == res.action());
// CHECK("42" == res.prev_value().as_string());
//}
//
//TEST_CASE("atomic compare-and-delete based on prevIndex")
//{
// etcd::Client etcd("http://127.0.0.1:4001");
// int index = etcd.set("/test/key1", "42").get().index();
//
// etcd::Response res = etcd.rm_if("/test/key1", index - 1).get();
// CHECK(!res.is_ok());
// CHECK(101 == res.error_code());
// CHECK("Compare failed" == res.error_message());
//
// res = etcd.rm_if("/test/key1", index).get();
// REQUIRE(res.is_ok());
// CHECK("compareAndDelete" == res.action());
// CHECK("42" == res.prev_value().as_string());
//}
//
//TEST_CASE("cleanup")
//{
// etcd::Client etcd("http://127.0.0.1:4001");
// REQUIRE(0 == etcd.rmdir("/test", true).get().error_code());
//}
TEST_CASE("add a new key")
{
etcd::Client etcd("http://192.168.99.100:2379");
etcd::Response resp = etcd.add("/test/key1", "42").get();
REQUIRE(0 == resp.error_code());
CHECK("create" == resp.action());
etcd::Value const & val = resp.value();
CHECK("42" == val.as_string());
CHECK("/test/key1" == val.key());
CHECK(!val.is_dir());
CHECK(0 < val.created_index());
CHECK(0 < val.modified_index());
//CHECK(0 < resp.index()); maui: skip this first// X-Etcd-Index header value
CHECK(105 == etcd.add("/test/key1", "43").get().error_code()); // Key already exists
CHECK(105 == etcd.add("/test/key1", "42").get().error_code()); // Key already exists
CHECK("Key already exists" == etcd.add("/test/key1", "42").get().error_message());
}
TEST_CASE("read a value from etcd")
{
etcd::Client etcd("http://192.168.99.100:2379");
etcd::Response resp = etcd.get("/test/key1").get();
CHECK("get" == resp.action());
REQUIRE(resp.is_ok());
REQUIRE(0 == resp.error_code());
CHECK("42" == resp.value().as_string());
CHECK("" == etcd.get("/test").get().value().as_string()); // key points to a directory
}
TEST_CASE("simplified read")
{
etcd::Client etcd("http://192.168.99.100:2379");
CHECK("42" == etcd.get("/test/key1").get().value().as_string());
CHECK(100 == etcd.get("/test/key2").get().error_code()); // Key not found
}
TEST_CASE("modify a key")
{
etcd::Client etcd("http://192.168.99.100:2379");
etcd::Response resp = etcd.modify("/test/key1", "43").get();
REQUIRE(0 == resp.error_code()); // overwrite
CHECK("update" == resp.action());
CHECK(100 == etcd.modify("/test/key2", "43").get().error_code()); // Key not found
CHECK("43" == etcd.modify("/test/key1", "42").get().prev_value().as_string());
}
TEST_CASE("set a key")
{
etcd::Client etcd("http://192.168.99.100:2379");
etcd::Response resp = etcd.set("/test/key1", "43").get();
REQUIRE(0 == resp.error_code()); // overwrite
CHECK("set" == resp.action());
CHECK(0 == etcd.set("/test/key2", "43").get().error_code()); // create new
CHECK("43" == etcd.set("/test/key2", "44").get().prev_value().as_string());
CHECK("" == etcd.set("/test/key3", "44").get().prev_value().as_string());
CHECK(0 == etcd.set("/test", "42").get().error_code()); // Not a file
}
TEST_CASE("atomic compare-and-swap")
{
etcd::Client etcd("http://192.168.99.100:2379");
etcd.set("/test/key1", "42").wait();
// modify success
etcd::Response res = etcd.modify_if("/test/key1", "43", "42").get();
int index = res.index();
REQUIRE(res.is_ok());
CHECK("compareAndSwap" == res.action());
CHECK("43" == res.value().as_string());
// modify fails the second time
res = etcd.modify_if("/test/key1", "44", "42").get();
CHECK(!res.is_ok());
CHECK(101 == res.error_code());
CHECK("Compare failed" == res.error_message());
}
#if 0
TEST_CASE("delete a value")
{
etcd::Client etcd("http://127.0.0.1:4001");
CHECK(3 == etcd.ls("/test").get().keys().size());
etcd::Response resp = etcd.rm("/test/key1").get();
CHECK("43" == resp.prev_value().as_string());
CHECK("delete" == resp.action());
CHECK(2 == etcd.ls("/test").get().keys().size());
}
TEST_CASE("create a directory")
{
etcd::Client etcd("http://127.0.0.1:4001");
etcd::Response resp = etcd.mkdir("/test/new_dir").get();
CHECK("set" == resp.action());
CHECK(resp.value().is_dir());
}
TEST_CASE("list a directory")
{
etcd::Client etcd("http://127.0.0.1:4001");
CHECK(0 == etcd.ls("/test/new_dir").get().keys().size());
etcd.set("/test/new_dir/key1", "value1").wait();
etcd.set("/test/new_dir/key2", "value2").wait();
etcd.mkdir("/test/new_dir/sub_dir").wait();
etcd::Response resp = etcd.ls("/test/new_dir").get();
CHECK("get" == resp.action());
REQUIRE(3 == resp.keys().size());
CHECK("key1" == resp.key(0));
CHECK("key2" == resp.key(1));
CHECK("sub_dir" == resp.key(2));
CHECK("value1" == resp.value(0).as_string());
CHECK("value2" == resp.value(1).as_string());
CHECK(resp.values()[2].is_dir());
CHECK(0 == etcd.ls("/test/new_dir/key1").get().error_code());
}
TEST_CASE("delete a directory")
{
etcd::Client etcd("http://127.0.0.1:4001");
CHECK(108 == etcd.rmdir("/test/new_dir").get().error_code()); // Directory not empty
CHECK(0 == etcd.rmdir("/test/new_dir", true).get().error_code());
}
TEST_CASE("wait for a value change")
{
etcd::Client etcd("http://127.0.0.1:4001");
etcd.set("/test/key1", "42").wait();
pplx::task<etcd::Response> res = etcd.watch("/test/key1");
CHECK(!res.is_done());
sleep(1);
CHECK(!res.is_done());
etcd.set("/test/key1", "43").get();
sleep(1);
REQUIRE(res.is_done());
REQUIRE("set" == res.get().action());
CHECK("43" == res.get().value().as_string());
}
TEST_CASE("wait for a directory change")
{
etcd::Client etcd("http://127.0.0.1:4001");
pplx::task<etcd::Response> res = etcd.watch("/test", true);
CHECK(!res.is_done());
sleep(1);
CHECK(!res.is_done());
etcd.add("/test/key4", "44").wait();
sleep(1);
REQUIRE(res.is_done());
CHECK("create" == res.get().action());
CHECK("44" == res.get().value().as_string());
pplx::task<etcd::Response> res2 = etcd.watch("/test", true);
CHECK(!res2.is_done());
sleep(1);
CHECK(!res2.is_done());
etcd.set("/test/key4", "45").wait();
REQUIRE(res2.is_done());
CHECK("set" == res2.get().action());
CHECK("45" == res2.get().value().as_string());
}
TEST_CASE("watch changes in the past")
{
etcd::Client etcd("http://127.0.0.1:4001");
int index = etcd.set("/test/key1", "42").get().index();
etcd.set("/test/key1", "43").wait();
etcd.set("/test/key1", "44").wait();
etcd.set("/test/key1", "45").wait();
etcd::Response res = etcd.watch("/test/key1", ++index).get();
CHECK("set" == res.action());
CHECK("43" == res.value().as_string());
res = etcd.watch("/test/key1", ++index).get();
CHECK("set" == res.action());
CHECK("44" == res.value().as_string());
res = etcd.watch("/test", ++index, true).get();
CHECK("set" == res.action());
CHECK("45" == res.value().as_string());
}
TEST_CASE("atomic compare-and-swap")
{
etcd::Client etcd("http://127.0.0.1:4001");
etcd.set("/test/key1", "42").wait();
// modify success
etcd::Response res = etcd.modify_if("/test/key1", "43", "42").get();
int index = res.index();
REQUIRE(res.is_ok());
CHECK("compareAndSwap" == res.action());
CHECK("43" == res.value().as_string());
// modify fails the second time
res = etcd.modify_if("/test/key1", "44", "42").get();
CHECK(!res.is_ok());
CHECK(101 == res.error_code());
CHECK("Compare failed" == res.error_message());
// succes with the correct index
res = etcd.modify_if("/test/key1", "44", index).get();
REQUIRE(res.is_ok());
CHECK("compareAndSwap" == res.action());
CHECK("44" == res.value().as_string());
// index changes so second modify fails
res = etcd.modify_if("/test/key1", "45", index).get();
CHECK(!res.is_ok());
CHECK(101 == res.error_code());
CHECK("Compare failed" == res.error_message());
}
TEST_CASE("atomic compare-and-delete based on prevValue")
{
etcd::Client etcd("http://127.0.0.1:4001");
etcd.set("/test/key1", "42").wait();
etcd::Response res = etcd.rm_if("/test/key1", "43").get();
CHECK(!res.is_ok());
CHECK(101 == res.error_code());
CHECK("Compare failed" == res.error_message());
res = etcd.rm_if("/test/key1", "42").get();
REQUIRE(res.is_ok());
CHECK("compareAndDelete" == res.action());
CHECK("42" == res.prev_value().as_string());
}
TEST_CASE("atomic compare-and-delete based on prevIndex")
{
etcd::Client etcd("http://127.0.0.1:4001");
int index = etcd.set("/test/key1", "42").get().index();
etcd::Response res = etcd.rm_if("/test/key1", index - 1).get();
CHECK(!res.is_ok());
CHECK(101 == res.error_code());
CHECK("Compare failed" == res.error_message());
res = etcd.rm_if("/test/key1", index).get();
REQUIRE(res.is_ok());
CHECK("compareAndDelete" == res.action());
CHECK("42" == res.prev_value().as_string());
}
TEST_CASE("cleanup")
{
etcd::Client etcd("http://127.0.0.1:4001");
REQUIRE(0 == etcd.rmdir("/test", true).get().error_code());
}
#endif

View File

@ -0,0 +1,36 @@
#ifndef __ASYNC_PUTRESPONSE_HPP__
#define __ASYNC_PUTRESPONSE_HPP__
#include <grpc++/grpc++.h>
#include "proto/rpc.grpc.pb.h"
#include "v3/include/V3Response.hpp"
#include "v3/include/grpcClient.hpp"
using grpc::ClientAsyncResponseReader;
using grpc::ClientContext;
using grpc::CompletionQueue;
using grpc::Status;
using etcdserverpb::PutResponse;
namespace etcdv3
{
class AsyncPutResponse : public etcdv3::V3Response
{
public:
AsyncPutResponse(){};
AsyncPutResponse(const std::string act){action = act;};
AsyncPutResponse(const AsyncPutResponse& other);
AsyncPutResponse& operator=(const AsyncPutResponse& other);
PutResponse reply;
Status status;
ClientContext context;
CompletionQueue cq_;
std::unique_ptr<ClientAsyncResponseReader<PutResponse>> response_reader;
AsyncPutResponse& ParseResponse();
etcdv3::grpcClient* client;
std::string key;
};
}
#endif

View File

@ -0,0 +1,32 @@
#ifndef __ASYNC_RANGERESPONSE_HPP__
#define __ASYNC_RANGERESPONSE_HPP__
#include <grpc++/grpc++.h>
#include "proto/rpc.grpc.pb.h"
#include "v3/include/V3Response.hpp"
using grpc::ClientAsyncResponseReader;
using grpc::ClientContext;
using grpc::CompletionQueue;
using grpc::Status;
using etcdserverpb::RangeResponse;
namespace etcdv3
{
class AsyncRangeResponse : public etcdv3::V3Response
{
public:
AsyncRangeResponse(){};
AsyncRangeResponse(const AsyncRangeResponse& other);
AsyncRangeResponse& operator=(const AsyncRangeResponse& other);
RangeResponse reply;
Status status;
ClientContext context;
CompletionQueue cq_;
std::unique_ptr<ClientAsyncResponseReader<RangeResponse>> response_reader;
AsyncRangeResponse& ParseResponse();
};
}
#endif

15
v3/include/Utils.hpp Normal file
View File

@ -0,0 +1,15 @@
#ifndef __UTILS_HPP__
#define __UTILS_HPP__
#include "v3/include/AsyncRangeResponse.hpp"
#include "v3/include/grpcClient.hpp"
namespace etcdv3
{
namespace Utils
{
etcdv3::AsyncRangeResponse* getKey(std::string const & key, etcdv3::grpcClient& client);
}
}
#endif

View File

@ -1,9 +1,27 @@
#ifndef __V3_RESPONSE_HPP__
#define __V3_RESPONSE_HPP__
#include "proto/kv.pb.h"
namespace etcdv3
{
class V3Response
{
public:
V3Response(): error_code(0), index(00)
{
prev_value.set_key("");
prev_value.set_create_revision(0);
prev_value.set_mod_revision(0);
prev_value.set_value("");
};
int error_code;
std::string error_message;
int index;
std::string action;
std::vector<mvccpb::KeyValue> values;
mvccpb::KeyValue prev_value;
};
}
#endif

25
v3/include/grpcClient.hpp Normal file
View File

@ -0,0 +1,25 @@
#ifndef __GRPC_CLIENT_HPP__
#define __GRPC_CLIENT_HPP__
#include <grpc++/grpc++.h>
#include "proto/rpc.grpc.pb.h"
#include "v3/include/AsyncRangeResponse.hpp"
using grpc::Channel;
using etcdserverpb::PutRequest;
using etcdserverpb::RangeRequest;
using etcdserverpb::KV;
namespace etcdv3
{
class grpcClient
{
public:
grpcClient(std::string const & address);
std::unique_ptr<KV::Stub> stub_;
};
}
#endif

View File

@ -0,0 +1,44 @@
#include "v3/include/AsyncPutResponse.hpp"
#include "v3/include/Utils.hpp"
using etcdserverpb::PutRequest;
using etcdserverpb::PutRequest;
etcdv3::AsyncPutResponse::AsyncPutResponse(const etcdv3::AsyncPutResponse& other)
{
error_code = other.error_code;
error_message = other.error_message;
index = other.index;
action = other.action;
values = other.values;
prev_value.set_key(other.prev_value.key());
prev_value.set_value(other.prev_value.value());
prev_value.set_create_revision(other.prev_value.create_revision());
prev_value.set_mod_revision(other.prev_value.mod_revision());
}
etcdv3::AsyncPutResponse& etcdv3::AsyncPutResponse::operator=(const etcdv3::AsyncPutResponse& other)
{
error_code = other.error_code;
error_message = other.error_message;
index = other.index;
action = other.action;
values = other.values;
prev_value.set_key(other.prev_value.key());
prev_value.set_value(other.prev_value.value());
prev_value.set_create_revision(other.prev_value.create_revision());
prev_value.set_mod_revision(other.prev_value.mod_revision());
return *this;
}
etcdv3::AsyncPutResponse& etcdv3::AsyncPutResponse::ParseResponse()
{
etcdv3::AsyncRangeResponse* resp = etcdv3::Utils::getKey(key, *client);
if(resp->reply.kvs_size())
{
values.push_back(resp->reply.kvs(0));
}
return *this;
}

View File

@ -0,0 +1,49 @@
#include "v3/include/AsyncRangeResponse.hpp"
etcdv3::AsyncRangeResponse::AsyncRangeResponse(const etcdv3::AsyncRangeResponse& other)
{
error_code = other.error_code;
error_message = other.error_message;
index = other.index;
action = other.action;
values = other.values;
}
etcdv3::AsyncRangeResponse& etcdv3::AsyncRangeResponse::operator=(const etcdv3::AsyncRangeResponse& other)
{
error_code = other.error_code;
error_message = other.error_message;
index = other.index;
action = other.action;
values = other.values;
return *this;
}
etcdv3::AsyncRangeResponse& etcdv3::AsyncRangeResponse::ParseResponse()
{
action = "get";
if(reply.kvs_size())
{
if(reply.more())
{
for(int index=0; reply.more(); index++)
{
values.push_back(reply.kvs(index));
}
}
else
{
values.push_back(reply.kvs(0));
}
}
else
{
error_code=100;
error_message="Key not found";
}
return *this;
}

24
v3/src/Utils.cpp Normal file
View File

@ -0,0 +1,24 @@
#include "v3/include/AsyncRangeResponse.hpp"
#include "proto/rpc.grpc.pb.h"
using etcdserverpb::RangeRequest;
#include "v3/include/Utils.hpp"
etcdv3::AsyncRangeResponse* etcdv3::Utils::getKey(std::string const & key, etcdv3::grpcClient& client)
{
RangeRequest get_request;
get_request.set_key(key);
etcdv3::AsyncRangeResponse* resp= new etcdv3::AsyncRangeResponse();
resp->status = client.stub_->Range(&resp->context, get_request, &resp->reply);
if(resp->status.ok())
{
return resp;
}
else
{
throw std::runtime_error(resp->status.error_message());
}
}

14
v3/src/grpcClient.cpp Normal file
View File

@ -0,0 +1,14 @@
#include "v3/include/grpcClient.hpp"
etcdv3::grpcClient::grpcClient(std::string const & address)
{
std::string stripped_address(address);
std::string substr("http://");
std::string::size_type i = stripped_address.find(substr);
if(i != std::string::npos)
{
stripped_address.erase(i,substr.length());
}
std::shared_ptr<Channel> channel = grpc::CreateChannel(stripped_address, grpc::InsecureChannelCredentials());
stub_= KV::NewStub(channel);
}