Added DeleteRangeResponse.

Refactor how value/values are filled up by parseResponse
This commit is contained in:
arches 2016-07-08 08:48:39 -04:00
parent 6dad838545
commit 6dfbe791a0
33 changed files with 315 additions and 241 deletions

View File

@ -1,4 +1,4 @@
add_library(etcd-cpp-api SHARED ../proto/kv.pb.cc ../proto/auth.pb.cc ../proto/rpc.pb.cc ../proto/rpc.grpc.pb.cc ../v3/src/AsyncTxnResponse.cpp ../v3/src/AsyncRangeResponse.cpp ../v3/src/Transaction.cpp ../v3/src/action_constants.cpp ../v3/src/AsyncSetAction.cpp ../v3/src/AsyncCompareAndSwapAction.cpp ../v3/src/AsyncUpdateAction.cpp ../v3/src/AsyncGetAction.cpp ../v3/src/AsyncDeleteAction.cpp ../v3/src/AsyncCompareAndDeleteAction.cpp ../v3/src/Action.cpp ../v3/src/AsyncWatchAction.cpp Client.cpp Response.cpp Value.cpp SyncClient.cpp Watcher.cpp ../v3/src/AsyncWatchResponse.cpp)
add_library(etcd-cpp-api SHARED ../proto/kv.pb.cc ../proto/auth.pb.cc ../proto/rpc.pb.cc ../proto/rpc.grpc.pb.cc ../v3/src/AsyncTxnResponse.cpp ../v3/src/AsyncRangeResponse.cpp ../v3/src/Transaction.cpp ../v3/src/action_constants.cpp ../v3/src/AsyncSetAction.cpp ../v3/src/AsyncCompareAndSwapAction.cpp ../v3/src/AsyncUpdateAction.cpp ../v3/src/AsyncGetAction.cpp ../v3/src/AsyncDeleteAction.cpp ../v3/src/AsyncCompareAndDeleteAction.cpp ../v3/src/Action.cpp ../v3/src/AsyncWatchAction.cpp ../v3/src/V3Response.cpp ../v3/src/AsyncDeleteRangeResponse.cpp ../v3/src/AsyncWatchResponse.cpp Client.cpp Response.cpp Value.cpp SyncClient.cpp Watcher.cpp)
set_property(TARGET etcd-cpp-api PROPERTY CXX_STANDARD 11)
target_link_libraries(etcd-cpp-api ${CPPREST_LIB} boost_system ssl crypto protobuf grpc++)

View File

@ -4,6 +4,7 @@
#include "v3/include/AsyncTxnResponse.hpp"
#include "v3/include/AsyncRangeResponse.hpp"
#include "v3/include/AsyncWatchResponse.hpp"
#include "v3/include/AsyncDeleteRangeResponse.hpp"
#include "v3/include/Transaction.hpp"
#include <iostream>

View File

@ -6,11 +6,13 @@
etcd::Response::Response(const etcdv3::V3Response& reply)
{
_index = reply.index;
_action = reply.action;
_error_code = reply.error_code;
_error_message = reply.error_message;
_action = reply.action;
int size = reply.values.size();
if(reply.isPrefix)
//with prefix means that we expect that
//values could have at least one result(e.g. ls, rmdir)
if(size)
{
for(int index = 0; index < size; index++)
{
@ -18,15 +20,15 @@ etcd::Response::Response(const etcdv3::V3Response& reply)
_keys.push_back(reply.values[index].key());
}
}
else if(size == 1)
//values where we expect that
// at most one result.(e.g. set, add, modify, rm, watch)
else
{
_value = Value(reply.values[0]);
}
if(reply.prev_values.size() == 1)
{
_prev_value = Value(reply.prev_values[0]);
_value = Value(reply.value);
}
_prev_value = Value(reply.prev_value);
}

View File

@ -107,9 +107,24 @@ TEST_CASE("atomic compare-and-swap")
TEST_CASE("delete a value")
{
etcd::Client etcd("http://127.0.0.1:2379");
etcd::Response resp = etcd.rm("/test/key1").get();
etcd::Response resp = etcd.rm("/test/key11111").get();
CHECK(!resp.is_ok());
CHECK(100 == resp.error_code());
CHECK("Key not found" == resp.error_message());
int index = etcd.get("/test/key1").get().index();
int create_index = etcd.get("/test/key1").get().value().created_index();
int modify_index = etcd.get("/test/key1").get().value().modified_index();
resp = etcd.rm("/test/key1").get();
CHECK("43" == resp.prev_value().as_string());
CHECK( "/test/key1" == resp.prev_value().key());
CHECK( create_index == resp.prev_value().created_index());
CHECK( modify_index == resp.prev_value().modified_index());
CHECK("delete" == resp.action());
CHECK( resp.index() == resp.value().modified_index());
CHECK( create_index == resp.value().created_index());
CHECK("" == resp.value().as_string());
CHECK( "/test/key1" == resp.value().key());
}
TEST_CASE("atomic compare-and-delete based on prevValue")
@ -224,10 +239,42 @@ TEST_CASE("delete a directory")
{
etcd::Client etcd("http://127.0.0.1:2379");
//CHECK(108 == etcd.rmdir("/test/new_dir").get().error_code()); // Directory not empty
CHECK(0 == etcd.rmdir("/test/new_dir", true).get().error_code());
}
etcd::Response resp = etcd.ls("/test/new_dir").get();
//get the lowest created index
std::vector<int> myset;
for(unsigned int cnt=0; cnt < resp.values().size(); cnt++)
{
myset.push_back(resp.value(cnt).created_index());
}
std::sort(myset.begin(),myset.end());
//get the latest modified index
std::vector<int> myset1;
for(unsigned int cnt=0; cnt < resp.values().size(); cnt++)
{
myset1.push_back(resp.value(cnt).modified_index());
}
std::sort(myset1.begin(),myset1.end());
resp = etcd.rmdir("/test/new_dir", true).get();
int index = resp.index();
CHECK("" == resp.prev_value().as_string());
CHECK( myset[0] == resp.prev_value().created_index());
CHECK( myset1[myset1.size() - 1] == resp.prev_value().modified_index());
CHECK( "/test/new_dir" == resp.prev_value().key());
CHECK("delete" == resp.action());
CHECK( index == resp.value().modified_index());
CHECK( myset[0] == resp.value().created_index());
CHECK("" == resp.value().as_string());
resp = etcd.rmdir("/test/dirnotfound", true).get();
CHECK(!resp.is_ok());
CHECK(100 == resp.error_code());
CHECK("Key not found" == resp.error_message());
}
TEST_CASE("wait for a value change")
{
etcd::Client etcd("http://127.0.0.1:2379");

View File

@ -15,6 +15,7 @@ void printResponse(etcd::Response const & resp)
else
{
std::cout << resp.action() << " " << resp.value().as_string() << std::endl;
std::cout << "Previous value: " << resp.prev_value().as_string() << std::endl;
}
}
@ -29,13 +30,15 @@ TEST_CASE("create watcher with cancel")
sleep(1);
etcd.set("/test/key", "42");
etcd.set("/test/key", "43");
etcd.rm("/test/key");
etcd.set("/test/key", "44");
sleep(1);
CHECK(2 == watcher_called);
CHECK(4 == watcher_called);
watcher.Cancel();
etcd.set("/test/key", "50");
etcd.set("/test/key", "51");
sleep(1);
CHECK(2 == watcher_called);
CHECK(4 == watcher_called);
etcd.rmdir("/test", true);
@ -140,5 +143,5 @@ TEST_CASE("create watcher")
// std::cout << "std::exception: " << ex.what() << "\n";
// }
// }
etcd.rmdir("/test", true).error_code();
etcd.rmdir("/test", true).error_code();
}

View File

@ -33,14 +33,16 @@ namespace etcdv3
class Action
{
public:
public:
Action(etcdv3::ActionParameters params);
Action(){};
void waitForResponse();
protected:
Status status;
ClientContext context;
CompletionQueue cq_;
etcdv3::ActionParameters parameters;
void waitForResponse();
};
}
#endif

View File

@ -18,6 +18,7 @@ namespace etcdv3
public:
AsyncCompareAndDeleteAction(etcdv3::ActionParameters param, etcdv3::Atomicity_Type type);
AsyncTxnResponse ParseResponse();
private:
TxnResponse reply;
std::unique_ptr<ClientAsyncResponseReader<TxnResponse>> response_reader;
};

View File

@ -18,6 +18,7 @@ namespace etcdv3
public:
AsyncCompareAndSwapAction(etcdv3::ActionParameters param, etcdv3::Atomicity_Type type);
AsyncTxnResponse ParseResponse();
private:
TxnResponse reply;
std::unique_ptr<ClientAsyncResponseReader<TxnResponse>> response_reader;
};

View File

@ -4,12 +4,11 @@
#include <grpc++/grpc++.h>
#include "proto/rpc.grpc.pb.h"
#include "v3/include/Action.hpp"
#include "v3/include/AsyncTxnResponse.hpp"
#include "v3/include/AsyncDeleteRangeResponse.hpp"
using grpc::ClientAsyncResponseReader;
using etcdserverpb::TxnResponse;
using etcdserverpb::KV;
using etcdserverpb::DeleteRangeResponse;
namespace etcdv3
{
@ -17,9 +16,10 @@ namespace etcdv3
{
public:
AsyncDeleteAction(etcdv3::ActionParameters param);
AsyncTxnResponse ParseResponse();
TxnResponse reply;
std::unique_ptr<ClientAsyncResponseReader<TxnResponse>> response_reader;
AsyncDeleteRangeResponse ParseResponse();
private:
DeleteRangeResponse reply;
std::unique_ptr<ClientAsyncResponseReader<DeleteRangeResponse>> response_reader;
};
}

View File

@ -0,0 +1,23 @@
#ifndef __ASYNC_DELETERESPONSE_HPP__
#define __ASYNC_DELETERESPONSE_HPP__
#include <grpc++/grpc++.h>
#include "proto/rpc.grpc.pb.h"
#include "v3/include/V3Response.hpp"
#include "v3/include/Action.hpp"
using grpc::ClientAsyncResponseReader;
using etcdserverpb::DeleteRangeResponse;
namespace etcdv3
{
class AsyncDeleteRangeResponse : public etcdv3::V3Response
{
public:
AsyncDeleteRangeResponse(){};
void ParseResponse(std::string const& key, bool prefix, DeleteRangeResponse& resp);
};
}
#endif

View File

@ -9,7 +9,6 @@
using grpc::ClientAsyncResponseReader;
using etcdserverpb::RangeResponse;
using etcdserverpb::KV;
namespace etcdv3
{
@ -18,6 +17,7 @@ namespace etcdv3
public:
AsyncGetAction(etcdv3::ActionParameters param);
AsyncRangeResponse ParseResponse();
private:
RangeResponse reply;
std::unique_ptr<ClientAsyncResponseReader<RangeResponse>> response_reader;
};

View File

@ -14,11 +14,8 @@ namespace etcdv3
class AsyncRangeResponse : public etcdv3::V3Response
{
public:
AsyncRangeResponse(RangeResponse& resp);
AsyncRangeResponse(const AsyncRangeResponse& other);
AsyncRangeResponse& operator=(const AsyncRangeResponse& other);
void ParseResponse();
RangeResponse reply;
AsyncRangeResponse(){};
void ParseResponse(RangeResponse& resp, bool prefix=false);
};
}

View File

@ -16,8 +16,9 @@ namespace etcdv3
class AsyncSetAction : public etcdv3::Action
{
public:
AsyncSetAction(etcdv3::ActionParameters param, bool isCreate=false);
AsyncSetAction(etcdv3::ActionParameters param, bool create=false);
AsyncTxnResponse ParseResponse();
private:
TxnResponse reply;
std::unique_ptr<ClientAsyncResponseReader<TxnResponse>> response_reader;
bool isCreate;

View File

@ -11,11 +11,8 @@ namespace etcdv3
class AsyncTxnResponse : public etcdv3::V3Response
{
public:
AsyncTxnResponse(TxnResponse& resp);
AsyncTxnResponse& operator=(const AsyncTxnResponse& other);
AsyncTxnResponse(const AsyncTxnResponse& other);
void ParseResponse();
TxnResponse reply;
AsyncTxnResponse(){};
void ParseResponse(std::string const& key, bool prefix,TxnResponse& resp);
};
}

View File

@ -18,6 +18,7 @@ namespace etcdv3
public:
AsyncUpdateAction(etcdv3::ActionParameters param);
AsyncTxnResponse ParseResponse();
private:
TxnResponse reply;
std::unique_ptr<ClientAsyncResponseReader<TxnResponse>> response_reader;
};

View File

@ -24,8 +24,8 @@ namespace etcdv3
void waitForResponse(std::function<void(etcd::Response)> callback);
void CancelWatch();
void WatchReq(std::string const & key);
private:
WatchResponse reply;
KV::Stub* kv_stub;
std::unique_ptr<ClientAsyncReaderWriter<WatchRequest,WatchResponse>> stream;
bool isCancelled;
};

View File

@ -16,11 +16,8 @@ namespace etcdv3
class AsyncWatchResponse : public etcdv3::V3Response
{
public:
AsyncWatchResponse(WatchResponse& resp);
AsyncWatchResponse(const AsyncWatchResponse& other);
AsyncWatchResponse& operator=(const AsyncWatchResponse& other);
void ParseResponse();
WatchResponse reply;
AsyncWatchResponse(){};
void ParseResponse(WatchResponse& resp);
};
}

View File

@ -1,9 +0,0 @@
#ifndef __I_RESPONSE_HPP__
#define __I_RESPONSE_HPP__
namespace etcdv3
{
class IResponse
{
};
}

View File

@ -10,11 +10,18 @@ namespace etcdv3
{
public:
V3Response(): error_code(0), index(0), isPrefix(false) {};
void set_error_code(int code);
void set_error_message(std::string msg);
void set_action(std::string action);
std::vector<mvccpb::KeyValue> get_kv_values();
std::vector<mvccpb::KeyValue> get_prev_kv_values();
int error_code;
int index;
bool isPrefix;
std::string error_message;
std::string action;
mvccpb::KeyValue value;
mvccpb::KeyValue prev_value;
std::vector<mvccpb::KeyValue> values;
std::vector<mvccpb::KeyValue> prev_values;
};

View File

@ -13,3 +13,4 @@ void etcdv3::Action::waitForResponse()
cq_.Next(&got_tag, &ok);
GPR_ASSERT(got_tag == (void*)this);
}

View File

@ -33,7 +33,7 @@ etcdv3::AsyncCompareAndDeleteAction::AsyncCompareAndDeleteAction(etcdv3::ActionP
etcdv3::AsyncTxnResponse etcdv3::AsyncCompareAndDeleteAction::ParseResponse()
{
AsyncTxnResponse txn_resp(reply);
AsyncTxnResponse txn_resp;
if(!status.ok())
{
txn_resp.error_code = status.error_code();
@ -41,7 +41,7 @@ etcdv3::AsyncTxnResponse etcdv3::AsyncCompareAndDeleteAction::ParseResponse()
}
else
{
txn_resp.ParseResponse();
txn_resp.ParseResponse(parameters.key, parameters.withPrefix, reply);
txn_resp.prev_values = txn_resp.values;
txn_resp.action = etcdv3::COMPAREDELETE_ACTION;

View File

@ -33,7 +33,7 @@ etcdv3::AsyncCompareAndSwapAction::AsyncCompareAndSwapAction(etcdv3::ActionParam
etcdv3::AsyncTxnResponse etcdv3::AsyncCompareAndSwapAction::ParseResponse()
{
AsyncTxnResponse txn_resp(reply);
AsyncTxnResponse txn_resp;
if(!status.ok())
{
@ -42,7 +42,7 @@ etcdv3::AsyncTxnResponse etcdv3::AsyncCompareAndSwapAction::ParseResponse()
}
else
{
txn_resp.ParseResponse();
txn_resp.ParseResponse(parameters.key, parameters.withPrefix, reply);
txn_resp.action = etcdv3::COMPARESWAP_ACTION;
//if there is an error code returned by parseResponse, we must

View File

@ -1,44 +1,39 @@
#include "v3/include/AsyncDeleteAction.hpp"
#include "v3/include/action_constants.hpp"
#include "v3/include/Transaction.hpp"
using etcdserverpb::Compare;
using etcdserverpb::DeleteRangeRequest;
etcdv3::AsyncDeleteAction::AsyncDeleteAction(ActionParameters param)
: etcdv3::Action(param)
{
etcdv3::Transaction transaction(parameters.key);
transaction.init_compare(Compare::CompareResult::Compare_CompareResult_GREATER,
Compare::CompareTarget::Compare_CompareTarget_VERSION);
DeleteRangeRequest del_request;
del_request.set_key(parameters.key);
del_request.set_prev_kv(true);
std::string range_end(parameters.key);
if(parameters.withPrefix)
{
int ascii = (int)range_end[range_end.length()-1];
range_end.back() = ascii+1;
del_request.set_range_end(range_end);
}
transaction.setup_delete_sequence(parameters.key, range_end, parameters.withPrefix);
transaction.setup_delete_failure_operation(parameters.key, range_end, parameters.withPrefix);
response_reader = parameters.kv_stub->AsyncTxn(&context, transaction.txn_request, &cq_);
response_reader = parameters.kv_stub->AsyncDeleteRange(&context, del_request, &cq_);
response_reader->Finish(&reply, &status, (void*)this);
}
etcdv3::AsyncTxnResponse etcdv3::AsyncDeleteAction::ParseResponse()
etcdv3::AsyncDeleteRangeResponse etcdv3::AsyncDeleteAction::ParseResponse()
{
AsyncTxnResponse txn_resp(reply);
AsyncDeleteRangeResponse del_resp;
if(!status.ok())
{
txn_resp.error_code = status.error_code();
txn_resp.error_message = status.error_message();
del_resp.set_error_code(status.error_code());
del_resp.set_error_message(status.error_message());
}
else
{
txn_resp.ParseResponse();
txn_resp.prev_values = txn_resp.values;
txn_resp.action = etcdv3::DELETE_ACTION;
del_resp.ParseResponse(parameters.key, parameters.withPrefix, reply);
}
return txn_resp;
return del_resp;
}

View File

@ -0,0 +1,81 @@
#include "v3/include/AsyncDeleteRangeResponse.hpp"
#include "v3/include/action_constants.hpp"
void etcdv3::AsyncDeleteRangeResponse::ParseResponse(std::string const& key, bool prefix, DeleteRangeResponse& resp)
{
index = resp.header().revision();
if(resp.prev_kvs_size() == 0)
{
error_code=100;
error_message="Key not found";
}
else
{
action = etcdv3::DELETE_ACTION;
//get all previous values
for(int cnt=0; cnt < resp.prev_kvs_size(); cnt++)
{
prev_values.push_back(resp.prev_kvs(cnt));
}
//copy previous vales to values. We will format this later
for(unsigned int cnt=0; cnt < prev_values.size(); cnt++)
{
auto temp_value = prev_values[cnt];
temp_value.set_mod_revision(index);
temp_value.clear_value();
values.push_back(temp_value);
}
prev_value = prev_values[0];
value = values[0];
//get all mod revisions of previous values
std::vector<int> mod_index;
for(unsigned int cnt = 0; cnt < prev_values.size(); ++cnt)
{
mod_index.push_back(prev_values[cnt].mod_revision());
}
//sort it ascending
std::sort(mod_index.begin(),mod_index.end());
// use the latest mod index
prev_value.set_mod_revision(mod_index.back());
//get all created revision of previous values
std::vector<int> create_index;
for(unsigned int cnt = 0; cnt < prev_values.size(); ++cnt)
{
create_index.push_back(prev_values[cnt].create_revision());
}
//sort it ascending
std::sort(create_index.begin(),create_index.end());
//use earliest create index
prev_value.set_create_revision(create_index.front());
//value modified index should be the same as index
value.set_mod_revision(index);
//value created index should be the same as prev value created index
value.set_create_revision(prev_value.create_revision());
//set key.When prefix delete is done, we should use the prefix provided by
//client as the key
value.set_key(key);
prev_value.set_key(key);
//clear the value
value.clear_value();
//if withPrefix, clear previous value also
if(prefix)
{
prev_value.clear_value();
}
prev_values.clear();
values.clear();
}
}

View File

@ -17,26 +17,24 @@ etcdv3::AsyncGetAction::AsyncGetAction(etcdv3::ActionParameters param)
get_request.set_range_end(range_end);
get_request.set_sort_target(RangeRequest::SortTarget::RangeRequest_SortTarget_KEY);
get_request.set_sort_order(RangeRequest::SortOrder::RangeRequest_SortOrder_ASCEND);
}
}
response_reader = parameters.kv_stub->AsyncRange(&context,get_request,&cq_);
response_reader->Finish(&reply, &status, (void*)this);
}
etcdv3::AsyncRangeResponse etcdv3::AsyncGetAction::ParseResponse()
{
AsyncRangeResponse range_resp(reply);
AsyncRangeResponse range_resp;
if(!status.ok())
{
range_resp.error_code = status.error_code();
range_resp.error_message = status.error_message();
range_resp.set_error_code(status.error_code());
range_resp.set_error_message(status.error_message());
}
else
{
range_resp.ParseResponse();
range_resp.action = etcdv3::GET_ACTION;
range_resp.isPrefix = parameters.withPrefix;
range_resp.ParseResponse(reply, parameters.withPrefix);
//range_resp.set_action(etcdv3::GET_ACTION);
//range_resp.isPrefix = parameters.withPrefix;
}
return range_resp;
}

View File

@ -1,44 +1,28 @@
#include "v3/include/AsyncRangeResponse.hpp"
#include "v3/include/action_constants.hpp"
etcdv3::AsyncRangeResponse::AsyncRangeResponse(RangeResponse& resp)
{
reply = resp;
}
etcdv3::AsyncRangeResponse::AsyncRangeResponse(const etcdv3::AsyncRangeResponse& other)
void etcdv3::AsyncRangeResponse::ParseResponse(RangeResponse& resp, bool prefix)
{
error_code = other.error_code;
error_message = other.error_message;
index = other.index;
action = other.action;
values = other.values;
prev_values = other.prev_values;
}
etcdv3::AsyncRangeResponse& etcdv3::AsyncRangeResponse::operator=(const etcdv3::AsyncRangeResponse& other)
{
error_code = other.error_code;
error_message = other.error_message;
index = other.index;
action = other.action;
values = other.values;
prev_values = other.prev_values;
return *this;
}
void etcdv3::AsyncRangeResponse::ParseResponse()
{
index = reply.header().revision();
if(reply.kvs_size() == 0)
action = etcdv3::GET_ACTION;
index = resp.header().revision();
if(resp.kvs_size() == 0)
{
error_code=100;
error_message="Key not found";
return;
}
for(int index=0; index < reply.kvs_size(); index++)
else
{
values.push_back(reply.kvs(index));
for(int index=0; index < resp.kvs_size(); index++)
{
values.push_back(resp.kvs(index));
}
if(!prefix)
{
value = values[0];
values.clear();
}
}
}

View File

@ -19,16 +19,14 @@ etcdv3::AsyncSetAction::AsyncSetAction(etcdv3::ActionParameters param, bool crea
Compare::CompareTarget::Compare_CompareTarget_VERSION);
transaction.setup_basic_create_sequence(parameters.key, parameters.value);
if(isCreate)
{
transaction.setup_basic_failure_operation(parameters.key);
//transaction.setup_basic_create_sequence(parameters.key, parameters.value);
}
else
{
transaction.setup_set_failure_operation(parameters.key, parameters.value);
//transaction.setup_basic_create_sequence(parameters.key, parameters.value);
}
response_reader = parameters.kv_stub->AsyncTxn(&context, transaction.txn_request, &cq_);
response_reader->Finish(&reply, &status, (void*)this);
@ -37,17 +35,17 @@ etcdv3::AsyncSetAction::AsyncSetAction(etcdv3::ActionParameters param, bool crea
etcdv3::AsyncTxnResponse etcdv3::AsyncSetAction::ParseResponse()
{
AsyncTxnResponse txn_resp(reply);
AsyncTxnResponse txn_resp;
if(!status.ok())
{
txn_resp.error_code = status.error_code();
txn_resp.error_message = status.error_message();
txn_resp.set_error_code(status.error_code());
txn_resp.set_error_message(status.error_message());
}
else
{
txn_resp.ParseResponse();
txn_resp.action = isCreate? etcdv3::CREATE_ACTION:etcdv3::SET_ACTION;
txn_resp.ParseResponse(parameters.key, parameters.withPrefix, reply);
txn_resp.set_action(isCreate? etcdv3::CREATE_ACTION:etcdv3::SET_ACTION);
if(!reply.succeeded() && txn_resp.action == etcdv3::CREATE_ACTION)
{

View File

@ -1,65 +1,44 @@
#include "v3/include/AsyncTxnResponse.hpp"
#include "v3/include/AsyncRangeResponse.hpp"
#include "v3/include/AsyncDeleteRangeResponse.hpp"
#include "v3/include/action_constants.hpp"
using etcdserverpb::ResponseOp;
etcdv3::AsyncTxnResponse::AsyncTxnResponse(TxnResponse& resp)
{
reply = resp;
}
etcdv3::AsyncTxnResponse::AsyncTxnResponse(const etcdv3::AsyncTxnResponse& other)
{
error_code = other.error_code;
error_message = other.error_message;
index = other.index;
action = other.action;
values = other.values;
prev_values = other.prev_values;
}
etcdv3::AsyncTxnResponse& etcdv3::AsyncTxnResponse::operator=(const etcdv3::AsyncTxnResponse& other)
{
error_code = other.error_code;
error_message = other.error_message;
index = other.index;
action = other.action;
values = other.values;
prev_values = other.prev_values;
return *this;
}
void etcdv3::AsyncTxnResponse::ParseResponse()
void etcdv3::AsyncTxnResponse::ParseResponse(std::string const& key, bool prefix, TxnResponse& reply)
{
index = reply.header().revision();
std::vector<mvccpb::KeyValue> range_kvs;
std::vector<mvccpb::KeyValue> prev_range_kvs;
for(int index=0; index < reply.responses_size(); index++)
{
auto resp = reply.responses(index);
if(ResponseOp::ResponseCase::kResponseRange == resp.response_case())
{
AsyncRangeResponse response(*(resp.mutable_response_range()));
response.ParseResponse();
AsyncRangeResponse response;
response.ParseResponse(*(resp.mutable_response_range()),prefix);
error_code = response.error_code;
error_message = response.error_message;
if(!response.values.empty())
{
values.insert(values.end(), response.values.begin(),response.values.end());
}
values = response.values;
value = response.value;
}
else if(ResponseOp::ResponseCase::kResponsePut == resp.response_case())
{
auto put_resp = resp.response_put();
if(put_resp.has_prev_kv())
{
prev_values.push_back(put_resp.prev_kv());
prev_value = put_resp.prev_kv();
}
}
}
else if(ResponseOp::ResponseCase::kResponseDeleteRange == resp.response_case())
{
AsyncDeleteRangeResponse response;
response.ParseResponse(key,prefix,*(resp.mutable_response_delete_range()));
prev_value = response.prev_value;
values = response.values;
value = response.value;
}
}
}

View File

@ -25,7 +25,7 @@ etcdv3::AsyncUpdateAction::AsyncUpdateAction(etcdv3::ActionParameters param)
etcdv3::AsyncTxnResponse etcdv3::AsyncUpdateAction::ParseResponse()
{
AsyncTxnResponse txn_resp(reply);
AsyncTxnResponse txn_resp;
if(!status.ok())
{
@ -36,7 +36,7 @@ etcdv3::AsyncTxnResponse etcdv3::AsyncUpdateAction::ParseResponse()
{
if(reply.succeeded())
{
txn_resp.ParseResponse();
txn_resp.ParseResponse(parameters.key, parameters.withPrefix, reply);
txn_resp.action = etcdv3::UPDATE_ACTION;
}
else

View File

@ -9,6 +9,7 @@ using etcdserverpb::WatchCreateRequest;
etcdv3::AsyncWatchAction::AsyncWatchAction(etcdv3::ActionParameters param)
: etcdv3::Action(param)
{
isCancelled = false;
stream = parameters.watch_stub->AsyncWatch(&context,&cq_,(void*)"create");
WatchRequest watch_req;
@ -92,7 +93,7 @@ void etcdv3::AsyncWatchAction::waitForResponse(std::function<void(etcd::Response
etcdv3::AsyncWatchResponse etcdv3::AsyncWatchAction::ParseResponse()
{
AsyncWatchResponse watch_resp(reply);
AsyncWatchResponse watch_resp;
if(!status.ok())
{
watch_resp.error_code = status.error_code();
@ -100,7 +101,7 @@ etcdv3::AsyncWatchResponse etcdv3::AsyncWatchAction::ParseResponse()
}
else
{
watch_resp.ParseResponse();
watch_resp.ParseResponse(reply);
}
return watch_resp;
}

View File

@ -1,48 +1,15 @@
#include "v3/include/AsyncWatchResponse.hpp"
#include "v3/include/action_constants.hpp"
using etcdserverpb::RangeRequest;
using etcdserverpb::RangeResponse;
etcdv3::AsyncWatchResponse::AsyncWatchResponse(WatchResponse& resp)
{
reply = resp;
}
etcdv3::AsyncWatchResponse::AsyncWatchResponse(const etcdv3::AsyncWatchResponse& other)
{
error_code = other.error_code;
error_message = other.error_message;
index = other.index;
action = other.action;
values = other.values;
prev_values = other.prev_values;
}
etcdv3::AsyncWatchResponse& etcdv3::AsyncWatchResponse::operator=(const etcdv3::AsyncWatchResponse& other)
{
error_code = other.error_code;
error_message = other.error_message;
index = other.index;
action = other.action;
values = other.values;
prev_values = other.prev_values;
return *this;
}
void etcdv3::AsyncWatchResponse::ParseResponse()
void etcdv3::AsyncWatchResponse::ParseResponse(WatchResponse& reply)
{
index = reply.header().revision();
std::map<std::string, mvccpb::KeyValue> mapValue;
std::map<std::string, mvccpb::KeyValue> mapPrevValue;
for(int cnt =0; cnt < reply.events_size(); cnt++)
{
auto event = reply.events(cnt);
const mvccpb::KeyValue& kv = event.kv();
if(mvccpb::Event::EventType::Event_EventType_PUT == event.type())
{
if(kv.version() == 1)
if(event.kv().version() == 1)
{
action = etcdv3::CREATE_ACTION;
}
@ -50,36 +17,21 @@ void etcdv3::AsyncWatchResponse::ParseResponse()
{
action = etcdv3::SET_ACTION;
}
// just store the first occurence of the key in values.
// this is done so tas client will not need to change their behaviour.
// and then break immediately
mapValue.emplace(kv.key(), kv);
value = event.kv();
}
else if(mvccpb::Event::EventType::Event_EventType_DELETE == event.type())
{
action = etcdv3::DELETE_ACTION;
// just store the first occurence of the key in values.
// this is done so tas client will not need to change their behaviour.
// break immediately
mapValue.emplace(kv.key(), kv);
value = event.kv();
}
if(event.has_prev_kv())
{
auto kv = event.prev_kv();
mapPrevValue.emplace(kv.key(),kv);
prev_value = event.prev_kv();
}
// just store the first occurence of the key in values.
// this is done so tas client will not need to change their behaviour.
// break immediately
break;
}
for(auto x: mapPrevValue)
{
prev_values.push_back(x.second);
}
for(auto x: mapValue)
{
values.push_back(x.second);
}
}

View File

@ -98,12 +98,14 @@ void etcdv3::Transaction::setup_delete_failure_operation(std::string const &key,
* add key and then get new value of key
*/
void etcdv3::Transaction::setup_basic_create_sequence(std::string const& key, std::string const& value) {
std::unique_ptr<RangeRequest> get_request(new RangeRequest());
std::unique_ptr<PutRequest> put_request(new PutRequest());
put_request->set_key(key);
put_request->set_value(value);
put_request->set_prev_kv(true);
RequestOp* req_success = txn_request.add_success();
req_success->set_allocated_request_put(put_request.release());
std::unique_ptr<RangeRequest> get_request(new RangeRequest());
get_request->set_key(key);
req_success = txn_request.add_success();
req_success->set_allocated_request_range(get_request.release());
@ -130,38 +132,23 @@ void etcdv3::Transaction::setup_compare_and_swap_sequence(std::string const& val
* get key, delete
*/
void etcdv3::Transaction::setup_delete_sequence(std::string const &key, std::string const &range_end, bool recursive) {
std::unique_ptr<RangeRequest> get_request(new RangeRequest());
get_request->set_key(key);
std::unique_ptr<DeleteRangeRequest> del_request(new DeleteRangeRequest());
del_request->set_key(key);
del_request->set_prev_kv(true);
if(recursive)
{
get_request->set_range_end(range_end);
get_request->set_sort_target(RangeRequest::SortTarget::RangeRequest_SortTarget_KEY);
get_request->set_sort_order(RangeRequest::SortOrder::RangeRequest_SortOrder_ASCEND);
del_request->set_range_end(range_end);
}
RequestOp* req_success = txn_request.add_success();
req_success->set_allocated_request_range(get_request.release());
std::unique_ptr<DeleteRangeRequest> del_request(new DeleteRangeRequest());
del_request->set_key(key);
if(recursive)
{
del_request->set_range_end(range_end);
}
req_success = txn_request.add_success();
req_success->set_allocated_request_delete_range(del_request.release());
}
void etcdv3::Transaction::setup_compare_and_delete_operation(std::string const& key) {
std::unique_ptr<RangeRequest> get_request(new RangeRequest());
get_request->set_key(key);
RequestOp* req_success = txn_request.add_success();
req_success->set_allocated_request_range(get_request.release());
std::unique_ptr<DeleteRangeRequest> del_request(new DeleteRangeRequest());
del_request->set_key(key);
req_success = txn_request.add_success();
del_request->set_prev_kv(true);
RequestOp* req_success = txn_request.add_success();
req_success->set_allocated_request_delete_range(del_request.release());
}

27
v3/src/V3Response.cpp Normal file
View File

@ -0,0 +1,27 @@
#include "v3/include/V3Response.hpp"
#include "v3/include/action_constants.hpp"
void etcdv3::V3Response::set_error_code(int code)
{
error_code = code;
}
void etcdv3::V3Response::set_error_message(std::string msg)
{
error_message = msg;
}
void etcdv3::V3Response::set_action(std::string action)
{
this->action = action;
}
std::vector<mvccpb::KeyValue> etcdv3::V3Response::get_kv_values()
{
return values;
}
std::vector<mvccpb::KeyValue> etcdv3::V3Response::get_prev_kv_values()
{
return prev_values;
}