Added implementation for ls

This commit is contained in:
arches 2016-06-10 11:19:53 -04:00
parent 1c6f5be31e
commit 0d7b702430
8 changed files with 112 additions and 78 deletions

View File

@ -148,9 +148,9 @@ namespace etcd
std::unique_ptr<KV::Stub> stub_; std::unique_ptr<KV::Stub> stub_;
pplx::task<etcd::Response> send_asyncput(const std::string& key, const std::string& value); pplx::task<etcd::Response> send_asyncput(const std::string& key, const std::string& value);
std::unique_ptr<Watch::Stub> watchServiceStub; std::unique_ptr<Watch::Stub> watchServiceStub;
pplx::task<etcd::Response> send_asyncget(std::string const & key);
pplx::task<etcd::Response> send_asyncadd(std::string const & key, const std::string& value); pplx::task<etcd::Response> send_asyncadd(std::string const & key, const std::string& value);
pplx::task<etcd::Response> send_asyncmodify(std::string const & key, std::string const & value); pplx::task<etcd::Response> send_asyncmodify(std::string const & key, std::string const & value);
pplx::task<etcd::Response> send_asyncget(std::string const & key,std::string const& range_end="");
pplx::task<etcd::Response> send_put(const std::string& key, const std::string& value); pplx::task<etcd::Response> send_put(const std::string& key, const std::string& value);
pplx::task<etcd::Response> send_get(std::string const & key); pplx::task<etcd::Response> send_get(std::string const & key);
pplx::task<etcd::Response> send_asyncmodify_if(std::string const & key, std::string const & value, std::string const & old_value); pplx::task<etcd::Response> send_asyncmodify_if(std::string const & key, std::string const & value, std::string const & old_value);

View File

@ -38,18 +38,12 @@ namespace etcd
//blocking //blocking
call->cq_.Next(&got_tag, &ok); call->cq_.Next(&got_tag, &ok);
GPR_ASSERT(got_tag == (void*)call); GPR_ASSERT(got_tag == (void*)call);
GPR_ASSERT(ok);
T call = static_cast<T>(got_tag); T call = static_cast<T>(got_tag);
if(call->status.ok())
{ auto v3resp = call->ParseResponse();
auto v3resp = call->ParseResponse();
resp = etcd::Response(v3resp); resp = etcd::Response(v3resp);
}
else
{
throw std::runtime_error(call->status.error_message());
}
delete call; //todo:make this a smart pointer delete call; //todo:make this a smart pointer
return resp; return resp;

View File

@ -12,6 +12,7 @@ using etcdserverpb::TxnRequest;
using etcdserverpb::Compare; using etcdserverpb::Compare;
using etcdserverpb::RequestOp; using etcdserverpb::RequestOp;
etcd::Client::Client(std::string const & address) etcd::Client::Client(std::string const & address)
: client(address) : client(address)
{ {
@ -230,9 +231,12 @@ pplx::task<etcd::Response> etcd::Client::rmdir(std::string const & key, bool rec
pplx::task<etcd::Response> etcd::Client::ls(std::string const & key) pplx::task<etcd::Response> etcd::Client::ls(std::string const & key)
{ {
web::http::uri_builder uri("/v2/keys" + key);
uri.append_query("sorted=true"); std::string range_end(key);
return send_get_request(uri); int ascii = (int)range_end[range_end.length()-1];
range_end.back() = ascii+1;
return send_asyncget(key,range_end);
} }
pplx::task<etcd::Response> etcd::Client::watch(std::string const & key, bool recursive) pplx::task<etcd::Response> etcd::Client::watch(std::string const & key, bool recursive)
@ -400,7 +404,7 @@ pplx::task<etcd::Response> etcd::Client::send_asyncmodify(std::string const & ke
} }
pplx::task<etcd::Response> etcd::Client::send_asyncget(std::string const & key) pplx::task<etcd::Response> etcd::Client::send_asyncget(std::string const & key, std::string const& range_end)
{ {
//check key exist //check key exist
TxnRequest txn_request; TxnRequest txn_request;
@ -410,16 +414,29 @@ pplx::task<etcd::Response> etcd::Client::send_asyncget(std::string const & key)
compare->set_key(key); compare->set_key(key);
compare->set_version(0); compare->set_version(0);
//get key on failure or success //get key on success
std::unique_ptr<RangeRequest> get_request(new RangeRequest()); std::unique_ptr<RangeRequest> get_request(new RangeRequest());
get_request->set_key(key); get_request->set_key(key);
if(!range_end.empty())
{
get_request->set_range_end(range_end);
get_request->set_sort_target(RangeRequest::SortTarget::RangeRequest_SortTarget_KEY);
get_request->set_sort_order(RangeRequest::SortOrder::RangeRequest_SortOrder_ASCEND);
}
RequestOp* req_success = txn_request.add_success(); RequestOp* req_success = txn_request.add_success();
req_success->set_allocated_request_range(get_request.release()); req_success->set_allocated_request_range(get_request.release());
//get key on failure
get_request.reset(new RangeRequest()); get_request.reset(new RangeRequest());
get_request->set_key(key); get_request->set_key(key);
if(!range_end.empty())
{
get_request->set_range_end(range_end);
get_request->set_sort_target(RangeRequest::SortTarget::RangeRequest_SortTarget_KEY);
get_request->set_sort_order(RangeRequest::SortOrder::RangeRequest_SortOrder_ASCEND);
}
RequestOp* req_failure = txn_request.add_failure(); RequestOp* req_failure = txn_request.add_failure();
req_failure->set_allocated_request_range(get_request.release()); req_failure->set_allocated_request_range(get_request.release());

View File

@ -23,7 +23,6 @@ pplx::task<etcd::Response> etcd::Response::createResponse(const etcdv3::V3Respon
etcd::Response::Response(const etcdv3::V3Response& reply) etcd::Response::Response(const etcdv3::V3Response& reply)
{ {
_index = reply.index; _index = reply.index;
_error_code = reply.error_code; _error_code = reply.error_code;
_error_message = reply.error_message; _error_message = reply.error_message;
@ -31,8 +30,11 @@ etcd::Response::Response(const etcdv3::V3Response& reply)
int size = reply.values.size(); int size = reply.values.size();
if(size > 1) if(size > 1)
{ {
for(int x = 0; x < size; x++) for(int index = 0; index < size; index++)
_values.push_back(Value(reply.values[x])); {
_values.push_back(Value(reply.values[index]));
_keys.push_back(reply.values[index].key());
}
} }
else if(size == 1) else if(size == 1)
{ {

View File

@ -96,6 +96,29 @@ TEST_CASE("atomic compare-and-swap")
CHECK("Compare failed" == res.error_message()); CHECK("Compare failed" == res.error_message());
} }
TEST_CASE("list a directory")
{
etcd::Client etcd("http://127.0.0.1:2379");
//CHECK(0 == etcd.ls("/test/new_dir").get().keys().size());
etcd.set("/test/new_dir/key1", "value1").wait();
etcd.set("/test/new_dir/key2", "value2").wait();
etcd.set("/test/new_dir/sub_dir","value3").wait();
etcd::Response resp = etcd.ls("/test/new_dir").get();
CHECK("get" == resp.action());
REQUIRE(3 == resp.keys().size());
CHECK("key1" == resp.key(0));
CHECK("key2" == resp.key(1));
CHECK("sub_dir" == resp.key(2));
CHECK("value1" == resp.value(0).as_string());
CHECK("value2" == resp.value(1).as_string());
CHECK(resp.values()[2].is_dir());
CHECK(0 == etcd.ls("/test/new_dir/key1").get().error_code());
}
TEST_CASE("delete a value") TEST_CASE("delete a value")
{ {

View File

@ -18,6 +18,7 @@ namespace etcdv3
{ {
public: public:
AsyncRangeResponse(){}; AsyncRangeResponse(){};
AsyncRangeResponse(const std::string act){action = act;};
AsyncRangeResponse(const AsyncRangeResponse& other); AsyncRangeResponse(const AsyncRangeResponse& other);
AsyncRangeResponse& operator=(const AsyncRangeResponse& other); AsyncRangeResponse& operator=(const AsyncRangeResponse& other);
RangeResponse reply; RangeResponse reply;

View File

@ -7,6 +7,11 @@ etcdv3::AsyncRangeResponse::AsyncRangeResponse(const etcdv3::AsyncRangeResponse&
index = other.index; index = other.index;
action = other.action; action = other.action;
values = other.values; values = other.values;
prev_value.set_key(other.prev_value.key());
prev_value.set_value(other.prev_value.value());
prev_value.set_create_revision(other.prev_value.create_revision());
prev_value.set_mod_revision(other.prev_value.mod_revision());
} }
etcdv3::AsyncRangeResponse& etcdv3::AsyncRangeResponse::operator=(const etcdv3::AsyncRangeResponse& other) etcdv3::AsyncRangeResponse& etcdv3::AsyncRangeResponse::operator=(const etcdv3::AsyncRangeResponse& other)
@ -16,34 +21,25 @@ etcdv3::AsyncRangeResponse& etcdv3::AsyncRangeResponse::operator=(const etcdv3::
index = other.index; index = other.index;
action = other.action; action = other.action;
values = other.values; values = other.values;
prev_value.set_key(other.prev_value.key());
prev_value.set_value(other.prev_value.value());
prev_value.set_create_revision(other.prev_value.create_revision());
prev_value.set_mod_revision(other.prev_value.mod_revision());
return *this; return *this;
} }
etcdv3::AsyncRangeResponse& etcdv3::AsyncRangeResponse::ParseResponse() etcdv3::AsyncRangeResponse& etcdv3::AsyncRangeResponse::ParseResponse()
{ {
action = "get"; if(reply.kvs_size() == 0)
if(reply.kvs_size())
{
if(reply.more())
{
for(int index=0; reply.more(); index++)
{
values.push_back(reply.kvs(index));
}
}
else
{
values.push_back(reply.kvs(0));
}
}
else
{ {
error_code=100; error_code=100;
error_message="Key not found"; error_message="Key not found";
} }
for(int index=0; index < reply.kvs_size(); index++)
{
values.push_back(reply.kvs(index));
}
index = reply.header().revision(); index = reply.header().revision();
return *this; return *this;
} }

View File

@ -1,4 +1,5 @@
#include "v3/include/AsyncTxnResponse.hpp" #include "v3/include/AsyncTxnResponse.hpp"
#include "v3/include/AsyncRangeResponse.hpp"
using etcdserverpb::ResponseOp; using etcdserverpb::ResponseOp;
@ -33,57 +34,57 @@ etcdv3::AsyncTxnResponse& etcdv3::AsyncTxnResponse::operator=(const etcdv3::Asyn
etcdv3::AsyncTxnResponse& etcdv3::AsyncTxnResponse::ParseResponse() etcdv3::AsyncTxnResponse& etcdv3::AsyncTxnResponse::ParseResponse()
{ {
for(int index=0; index < reply.responses_size(); index++) if(!status.ok())
{ {
auto resp = reply.responses(index); error_code = status.error_code();
if(ResponseOp::ResponseCase::kResponseRange == resp.response_case()) error_message = status.error_message();
}
else
{
std::vector<mvccpb::KeyValue> range_kvs;
for(int index=0; index < reply.responses_size(); index++)
{ {
if(resp.response_range().kvs_size()) auto resp = reply.responses(index);
if(ResponseOp::ResponseCase::kResponseRange == resp.response_case())
{ {
if(!resp.response_range().more()) AsyncRangeResponse response;
response.reply = resp.response_range();
auto v3resp = response.ParseResponse();
error_code = v3resp.error_code;
error_message = v3resp.error_message;
if(!v3resp.values.empty())
{ {
if(!values.empty()) range_kvs.insert(range_kvs.end(), v3resp.values.begin(), v3resp.values.end());
{
prev_value = values[0];
}
values.clear();
values.push_back(resp.response_range().kvs(0));
} }
} }
else }
if(!reply.succeeded())
{
if(action == "create")
{ {
error_code=100; error_code=105;
error_message="Key not found"; error_message="Key already exists";
}
else if(action == "compareAndSwap")
{
error_code=101;
error_message="Compare failed";
} }
} }
else if(ResponseOp::ResponseCase::kResponsePut == resp.response_case())
//find previous value of key do this for all actions except get
//retain only the last value gotten as the final value.
if(action != "get" && range_kvs.size() > 1)
{ {
//do nothing for now. prev_value = range_kvs.front();
values.push_back(range_kvs.back());
} }
else else
{ {
//do nothing for now. values = range_kvs;
}
}
if(action == "create")
{
if(!reply.succeeded())
{
error_code=105;
error_message="Key already exists";
}
}
else if(action == "compareAndSwap")
{
if(!reply.succeeded())
{
error_code=101;
error_message="Compare failed";
} }
} }
return *this; return *this;