Refactor part1

This commit is contained in:
arches 2016-06-24 12:20:31 -04:00
parent 9d199a8579
commit a43de811f4
33 changed files with 735 additions and 523 deletions

View File

@ -133,20 +133,6 @@ namespace etcd
std::unique_ptr<KV::Stub> stub_; std::unique_ptr<KV::Stub> stub_;
std::unique_ptr<Watch::Stub> watchServiceStub; std::unique_ptr<Watch::Stub> watchServiceStub;
pplx::task<etcd::Response> send_asyncput(const std::string& key, const std::string& value);
pplx::task<etcd::Response> send_asyncadd(std::string const & key, const std::string& value);
pplx::task<etcd::Response> send_asyncmodify(std::string const & key, std::string const & value);
pplx::task<etcd::Response> send_asyncget(std::string const & key,std::string const& range_end="");
pplx::task<etcd::Response> send_put(const std::string& key, const std::string& value);
pplx::task<etcd::Response> send_get(std::string const & key);
pplx::task<etcd::Response> send_asyncmodify_if(std::string const & key, std::string const & value, std::string const & old_value);
pplx::task<etcd::Response> send_asyncmodify_if(std::string const & key, std::string const & value, int old_index);
pplx::task<etcd::Response> send_asyncdelete(std::string const & key, bool recursive);
pplx::task<etcd::Response> send_asyncrm_if(std::string const &key, std::string const &old_value);
pplx::task<etcd::Response> send_asyncrm_if(std::string const &key, int old_index);
pplx::task<etcd::Response> send_asyncwatch(std::string const & key, bool recursive);
pplx::task<etcd::Response> send_asyncwatch(std::string const & key, int fromIndex, bool recursive);
private: private:
std::shared_ptr<etcdv3::AsyncTxnResponse> initiate_transaction(const std::string &operation, std::shared_ptr<etcdv3::AsyncTxnResponse> initiate_transaction(const std::string &operation,
etcdv3::Transaction& transaction); etcdv3::Transaction& transaction);

View File

@ -23,7 +23,6 @@ namespace etcd
class Response class Response
{ {
public: public:
static pplx::task<Response> create(pplx::task<web::http::http_response> response_task);
template<typename T>static pplx::task<etcd::Response> create(std::shared_ptr<T> call) template<typename T>static pplx::task<etcd::Response> create(std::shared_ptr<T> call)
{ {
@ -99,7 +98,6 @@ namespace etcd
std::string const & key(int index) const; std::string const & key(int index) const;
protected: protected:
Response(web::http::http_response http_response, web::json::value json_value);
Response(const etcdv3::V3Response& response); Response(const etcdv3::V3Response& response);
int _error_code; int _error_code;

View File

@ -1,4 +1,4 @@
add_library(etcd-cpp-api SHARED ../proto/kv.pb.cc ../proto/auth.pb.cc ../proto/rpc.pb.cc ../proto/rpc.grpc.pb.cc ../v3/src/AsyncTxnResponse.cpp ../v3/src/AsyncRangeResponse.cpp ../v3/src/AsyncWatchResponse.cpp ../v3/src/Transaction.cpp Client.cpp Response.cpp Value.cpp json_constants.cpp) add_library(etcd-cpp-api SHARED ../proto/kv.pb.cc ../proto/auth.pb.cc ../proto/rpc.pb.cc ../proto/rpc.grpc.pb.cc ../v3/src/AsyncTxnResponse.cpp ../v3/src/AsyncRangeResponse.cpp ../v3/src/AsyncWatchResponse.cpp ../v3/src/Transaction.cpp ../v3/src/action_constants.cpp ../v3/src/AsyncSetAction.cpp ../v3/src/AsyncCompareAndSwapAction.cpp ../v3/src/AsyncUpdateAction.cpp ../v3/src/AsyncGetAction.cpp ../v3/src/AsyncDeleteAction.cpp ../v3/src/AsyncCompareAndDeleteAction.cpp ../v3/src/Action.cpp ../v3/src/AsyncWatchAction.cpp Client.cpp Response.cpp Value.cpp)
set_property(TARGET etcd-cpp-api PROPERTY CXX_STANDARD 11) set_property(TARGET etcd-cpp-api PROPERTY CXX_STANDARD 11)
target_link_libraries(etcd-cpp-api ${CPPREST_LIB} boost_system ssl crypto protobuf grpc++) target_link_libraries(etcd-cpp-api ${CPPREST_LIB} boost_system ssl crypto protobuf grpc++)
@ -8,3 +8,4 @@ install (FILES ../etcd/Client.hpp
../etcd/Response.hpp ../etcd/Response.hpp
../etcd/Value.hpp ../etcd/Value.hpp
DESTINATION include/etcd) DESTINATION include/etcd)

View File

@ -1,11 +1,20 @@
#include <memory> #include <memory>
#include "etcd/Client.hpp" #include "etcd/Client.hpp"
#include "v3/include/action_constants.hpp"
#include "v3/include/AsyncTxnResponse.hpp" #include "v3/include/AsyncTxnResponse.hpp"
#include "v3/include/AsyncRangeResponse.hpp" #include "v3/include/AsyncRangeResponse.hpp"
#include "v3/include/AsyncWatchResponse.hpp" #include "v3/include/AsyncWatchResponse.hpp"
#include "v3/include/Transaction.hpp" #include "v3/include/Transaction.hpp"
#include <iostream> #include <iostream>
#include "v3/include/AsyncSetAction.hpp"
#include "v3/include/AsyncCompareAndSwapAction.hpp"
#include "v3/include/AsyncCompareAndDeleteAction.hpp"
#include "v3/include/AsyncUpdateAction.hpp"
#include "v3/include/AsyncGetAction.hpp"
#include "v3/include/AsyncDeleteAction.hpp"
#include "v3/include/AsyncWatchAction.hpp"
using grpc::Channel; using grpc::Channel;
using etcdserverpb::PutRequest; using etcdserverpb::PutRequest;
using etcdserverpb::RangeRequest; using etcdserverpb::RangeRequest;
@ -14,10 +23,6 @@ using etcdserverpb::DeleteRangeRequest;
using etcdserverpb::Compare; using etcdserverpb::Compare;
using etcdserverpb::RequestOp; using etcdserverpb::RequestOp;
using grpc::ClientReaderWriter;
using etcdserverpb::WatchRequest;
using etcdserverpb::WatchResponse;
using etcdserverpb::WatchCreateRequest;
etcd::Client::Client(std::string const & address) etcd::Client::Client(std::string const & address)
{ {
@ -36,281 +41,86 @@ etcd::Client::Client(std::string const & address)
pplx::task<etcd::Response> etcd::Client::get(std::string const & key) pplx::task<etcd::Response> etcd::Client::get(std::string const & key)
{ {
return send_asyncget(key); std::shared_ptr<etcdv3::AsyncGetAction> call(new etcdv3::AsyncGetAction(key,stub_.get()));
return Response::create(call);
} }
pplx::task<etcd::Response> etcd::Client::set(std::string const & key, std::string const & value) pplx::task<etcd::Response> etcd::Client::set(std::string const & key, std::string const & value)
{ {
return send_asyncput(key,value); std::shared_ptr<etcdv3::AsyncSetAction> call(new etcdv3::AsyncSetAction(key, value, stub_.get()));
return Response::create(call);;
} }
pplx::task<etcd::Response> etcd::Client::add(std::string const & key, std::string const & value) pplx::task<etcd::Response> etcd::Client::add(std::string const & key, std::string const & value)
{ {
return send_asyncadd(key,value); std::shared_ptr<etcdv3::AsyncSetAction> call(new etcdv3::AsyncSetAction(key, value, stub_.get(), true));
return Response::create(call);;
} }
pplx::task<etcd::Response> etcd::Client::modify(std::string const & key, std::string const & value) pplx::task<etcd::Response> etcd::Client::modify(std::string const & key, std::string const & value)
{ {
return send_asyncmodify(key,value); std::shared_ptr<etcdv3::AsyncUpdateAction> call(new etcdv3::AsyncUpdateAction(key,value,stub_.get()));;
return Response::create(call);
} }
pplx::task<etcd::Response> etcd::Client::modify_if(std::string const & key, std::string const & value, std::string const & old_value) pplx::task<etcd::Response> etcd::Client::modify_if(std::string const & key, std::string const & value, std::string const & old_value)
{ {
return send_asyncmodify_if(key, value, old_value); std::shared_ptr<etcdv3::AsyncCompareAndSwapAction> call(new etcdv3::AsyncCompareAndSwapAction(key,value,old_value, stub_.get()));;
return Response::create(call);
} }
pplx::task<etcd::Response> etcd::Client::modify_if(std::string const & key, std::string const & value, int old_index) pplx::task<etcd::Response> etcd::Client::modify_if(std::string const & key, std::string const & value, int old_index)
{ {
return send_asyncmodify_if(key, value, old_index); std::shared_ptr<etcdv3::AsyncCompareAndSwapAction> call(new etcdv3::AsyncCompareAndSwapAction(key,value,old_index, stub_.get()));;
return Response::create(call);
} }
pplx::task<etcd::Response> etcd::Client::rm(std::string const & key) pplx::task<etcd::Response> etcd::Client::rm(std::string const & key)
{ {
return send_asyncdelete(key,false); std::shared_ptr<etcdv3::AsyncDeleteAction> call(new etcdv3::AsyncDeleteAction(key,stub_.get()));;
return Response::create(call);
} }
pplx::task<etcd::Response> etcd::Client::rm_if(std::string const & key, std::string const & old_value) pplx::task<etcd::Response> etcd::Client::rm_if(std::string const & key, std::string const & old_value)
{ {
return send_asyncrm_if(key, old_value); std::shared_ptr<etcdv3::AsyncCompareAndDeleteAction> call(new etcdv3::AsyncCompareAndDeleteAction(key,old_value,stub_.get()));;
return Response::create(call);
} }
pplx::task<etcd::Response> etcd::Client::rm_if(std::string const & key, int old_index) pplx::task<etcd::Response> etcd::Client::rm_if(std::string const & key, int old_index)
{ {
return send_asyncrm_if(key, old_index); std::shared_ptr<etcdv3::AsyncCompareAndDeleteAction> call(new etcdv3::AsyncCompareAndDeleteAction(key,old_index,stub_.get()));;
return Response::create(call);
} }
pplx::task<etcd::Response> etcd::Client::rmdir(std::string const & key, bool recursive) pplx::task<etcd::Response> etcd::Client::rmdir(std::string const & key, bool recursive)
{ {
std::shared_ptr<etcdv3::AsyncDeleteAction> call(new etcdv3::AsyncDeleteAction(key,stub_.get(),true));;
return send_asyncdelete(key,recursive); return Response::create(call);
} }
pplx::task<etcd::Response> etcd::Client::ls(std::string const & key) pplx::task<etcd::Response> etcd::Client::ls(std::string const & key)
{ {
std::string range_end(key); std::shared_ptr<etcdv3::AsyncGetAction> call(new etcdv3::AsyncGetAction(key,stub_.get(),true));
int ascii = (int)range_end[range_end.length()-1]; return Response::create(call);
range_end.back() = ascii+1;
return send_asyncget(key,range_end);
} }
pplx::task<etcd::Response> etcd::Client::watch(std::string const & key, bool recursive) pplx::task<etcd::Response> etcd::Client::watch(std::string const & key, bool recursive)
{ {
return send_asyncwatch(key,recursive); std::shared_ptr<etcdv3::AsyncWatchAction> call(new etcdv3::AsyncWatchAction(key,recursive,stub_.get(),watchServiceStub.get()));
return Response::create(call);
} }
pplx::task<etcd::Response> etcd::Client::watch(std::string const & key, int fromIndex, bool recursive) pplx::task<etcd::Response> etcd::Client::watch(std::string const & key, int fromIndex, bool recursive)
{ {
return send_asyncwatch(key, fromIndex, recursive); std::shared_ptr<etcdv3::AsyncWatchAction> call(new etcdv3::AsyncWatchAction(key,fromIndex,recursive,stub_.get(),watchServiceStub.get()));
}
std::shared_ptr<etcdv3::AsyncTxnResponse> etcd::Client::initiate_transaction(const std::string &operation,
etcdv3::Transaction& transaction)
{
std::shared_ptr<etcdv3::AsyncTxnResponse> call(new etcdv3::AsyncTxnResponse(operation));
call->response_reader = stub_->AsyncTxn(&call->context, transaction.txn_request, &call->cq_);
call->response_reader->Finish(&call->reply, &call->status, (void*) (call.get()));
return call;
}
pplx::task<etcd::Response> etcd::Client::send_asyncadd(std::string const & key, std::string const & value)
{
etcdv3::Transaction transaction(key);
transaction.init_compare(Compare::CompareResult::Compare_CompareResult_EQUAL,
Compare::CompareTarget::Compare_CompareTarget_VERSION);
transaction.setup_basic_failure_operation(key);
transaction.setup_basic_create_sequence(key, value);
std::shared_ptr<etcdv3::AsyncTxnResponse> call = initiate_transaction("create", transaction);
return Response::create(call); return Response::create(call);
} }
pplx::task<etcd::Response> etcd::Client::send_asyncmodify_if(std::string const & key, std::string const & value, std::string const & old_value)
{
etcdv3::Transaction transaction(key);
transaction.init_compare(old_value, Compare::CompareResult::Compare_CompareResult_EQUAL,
Compare::CompareTarget::Compare_CompareTarget_VALUE);
transaction.setup_basic_failure_operation(key);
transaction.setup_compare_and_swap_sequence(value);
std::shared_ptr<etcdv3::AsyncTxnResponse> call = initiate_transaction("compareAndSwap", transaction);
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::send_asyncmodify_if(std::string const & key, std::string const & value, int old_index)
{
etcdv3::Transaction transaction(key);
transaction.init_compare(old_index, Compare::CompareResult::Compare_CompareResult_EQUAL,
Compare::CompareTarget::Compare_CompareTarget_MOD);
transaction.setup_basic_failure_operation(key);
transaction.setup_compare_and_swap_sequence(value);
std::shared_ptr<etcdv3::AsyncTxnResponse> call = initiate_transaction("compareAndSwap", transaction);
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::send_asyncmodify(std::string const & key, std::string const & value)
{
etcdv3::Transaction transaction(key);
transaction.init_compare(Compare::CompareResult::Compare_CompareResult_GREATER,
Compare::CompareTarget::Compare_CompareTarget_VERSION);
transaction.setup_basic_failure_operation(key);
transaction.setup_compare_and_swap_sequence(value);
std::shared_ptr<etcdv3::AsyncTxnResponse> call = initiate_transaction("update", transaction);
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::send_asyncget(std::string const & key, std::string const& range_end)
{
RangeRequest get_request;
get_request.set_key(key);
if(!range_end.empty())
{
get_request.set_range_end(range_end);
get_request.set_sort_target(RangeRequest::SortTarget::RangeRequest_SortTarget_KEY);
get_request.set_sort_order(RangeRequest::SortOrder::RangeRequest_SortOrder_ASCEND);
}
std::shared_ptr<etcdv3::AsyncRangeResponse> call(new etcdv3::AsyncRangeResponse());
call->response_reader = stub_->AsyncRange(&call->context,get_request,&call->cq_);
call->response_reader->Finish(&call->reply, &call->status, (void*)call.get());
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::send_asyncput(std::string const & key, std::string const & value)
{
etcdv3::Transaction transaction(key);
transaction.init_compare(Compare::CompareResult::Compare_CompareResult_EQUAL,
Compare::CompareTarget::Compare_CompareTarget_VERSION);
transaction.setup_set_failure_operation(key, value);
transaction.setup_basic_create_sequence(key, value);
std::shared_ptr<etcdv3::AsyncTxnResponse> call = initiate_transaction("set", transaction);
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::send_asyncdelete(std::string const & key, bool recursive)
{
etcdv3::Transaction transaction(key);
transaction.init_compare(Compare::CompareResult::Compare_CompareResult_GREATER,
Compare::CompareTarget::Compare_CompareTarget_VERSION);
std::string range_end(key);
if(recursive)
{
int ascii = (int)range_end[range_end.length()-1];
range_end.back() = ascii+1;
}
transaction.setup_delete_sequence(key, range_end, recursive);
transaction.setup_delete_failure_operation(key, range_end, recursive);
std::shared_ptr<etcdv3::AsyncTxnResponse> call = initiate_transaction("delete", transaction);
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::send_asyncrm_if(std::string const &key, std::string const &old_value)
{
etcdv3::Transaction transaction(key);
transaction.init_compare(old_value, Compare::CompareResult::Compare_CompareResult_EQUAL,
Compare::CompareTarget::Compare_CompareTarget_VALUE);
transaction.setup_compare_and_delete_operation(key);
transaction.setup_basic_failure_operation(key);
std::shared_ptr<etcdv3::AsyncTxnResponse> call = initiate_transaction("compareAndDelete", transaction);
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::send_asyncrm_if(std::string const &key, int old_index) {
etcdv3::Transaction transaction(key);
transaction.init_compare(old_index, Compare::CompareResult::Compare_CompareResult_EQUAL,
Compare::CompareTarget::Compare_CompareTarget_MOD);
transaction.setup_compare_and_delete_operation(key);
transaction.setup_basic_failure_operation(key);
std::shared_ptr<etcdv3::AsyncTxnResponse> call = initiate_transaction("compareAndDelete", transaction);
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::send_asyncwatch(std::string const & key, bool recursive)
{
std::shared_ptr<etcdv3::AsyncWatchResponse> call(new etcdv3::AsyncWatchResponse());
call->stream = watchServiceStub->AsyncWatch(&call->context,&call->cq_,(void*)call.get());
WatchRequest watch_req;
WatchCreateRequest watch_create_req;
watch_create_req.set_key(key);
std::string range_end(key);
if(recursive)
{
int ascii = (int)range_end[range_end.length()-1];
range_end.back() = ascii+1;
watch_create_req.set_range_end(range_end);
}
watch_req.mutable_create_request()->CopyFrom(watch_create_req);
call->stream->Write(watch_req, (void*)call.get());
call->stub_ = stub_.get();
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::send_asyncwatch(std::string const & key, int fromIndex, bool recursive)
{
std::shared_ptr<etcdv3::AsyncWatchResponse> call(new etcdv3::AsyncWatchResponse());
call->stream = watchServiceStub->AsyncWatch(&call->context,&call->cq_,(void*)call.get());
WatchRequest watch_req;
WatchCreateRequest watch_create_req;
watch_create_req.set_key(key);
watch_create_req.set_start_revision(fromIndex);
std::string range_end(key);
if(recursive)
{
int ascii = (int)range_end[range_end.length()-1];
range_end.back() = ascii+1;
watch_create_req.set_range_end(range_end);
}
watch_req.mutable_create_request()->CopyFrom(watch_create_req);
call->stream->Write(watch_req, (void*)call.get());
call->stub_ = stub_.get();
call->fromIndex = fromIndex;
return Response::create(call);
}

View File

@ -1,19 +1,7 @@
#include "etcd/Response.hpp" #include "etcd/Response.hpp"
#include "json_constants.hpp"
#include <iostream> #include <iostream>
pplx::task<etcd::Response> etcd::Response::create(pplx::task<web::http::http_response> response_task)
{
return pplx::task<etcd::Response> (
[response_task]()
{
auto json_task = response_task.get().extract_json();
return etcd::Response(response_task.get(), json_task.get());
}
);
}
etcd::Response::Response(const etcdv3::V3Response& reply) etcd::Response::Response(const etcdv3::V3Response& reply)
{ {
@ -48,41 +36,6 @@ etcd::Response::Response()
{ {
} }
etcd::Response::Response(web::http::http_response http_response, web::json::value json_value)
: _error_code(0),
_index(0)
{
if (http_response.headers().has(JSON_ETCD_INDEX))
_index = atoi(http_response.headers()[JSON_ETCD_INDEX].c_str());
if (json_value.has_field(JSON_ERROR_CODE))
{
_error_code = json_value[JSON_ERROR_CODE].as_number().to_int64();
_error_message = json_value[JSON_MESSAGE].as_string();
}
if (json_value.has_field(JSON_ACTION))
_action = json_value[JSON_ACTION].as_string();
if (json_value.has_field(JSON_NODE))
{
if (json_value[JSON_NODE].has_field(JSON_NODES))
{
std::string prefix = json_value[JSON_NODE][JSON_KEY].as_string();
for (auto & node : json_value[JSON_NODE][JSON_NODES].as_array())
{
_values.push_back(Value(node));
_keys.push_back(node[JSON_KEY].as_string().substr(prefix.length() + 1));
}
}
else
_value = Value(json_value.at(JSON_NODE));
}
if (json_value.has_field(JSON_PREV_NODE))
_prev_value = Value(json_value.at(JSON_PREV_NODE));
}
int etcd::Response::error_code() const int etcd::Response::error_code() const
{ {
return _error_code; return _error_code;

View File

@ -1,5 +1,4 @@
#include "etcd/Value.hpp" #include "etcd/Value.hpp"
#include "json_constants.hpp"
#include "proto/kv.pb.h" #include "proto/kv.pb.h"
etcd::Value::Value() etcd::Value::Value()
@ -9,14 +8,6 @@ etcd::Value::Value()
{ {
} }
etcd::Value::Value(web::json::value const & json_value)
: _key(json_value.has_field(JSON_KEY) ? json_value.at(JSON_KEY).as_string() : ""),
dir(json_value.has_field(JSON_DIR)),
value(json_value.has_field(JSON_VALUE) ? json_value.at(JSON_VALUE).as_string() : ""),
created(json_value.has_field(JSON_CREATED) ? json_value.at(JSON_CREATED).as_number().to_int64() : 0),
modified(json_value.has_field(JSON_MODIFIED) ? json_value.at(JSON_MODIFIED).as_number().to_int64() : 0)
{
}
etcd::Value::Value(mvccpb::KeyValue const & kvs) etcd::Value::Value(mvccpb::KeyValue const & kvs)
{ {

View File

@ -1,14 +0,0 @@
#include "json_constants.hpp"
char const * etcd::JSON_KEY = "key";
char const * etcd::JSON_DIR = "dir";
char const * etcd::JSON_VALUE = "value";
char const * etcd::JSON_CREATED = "createdIndex";
char const * etcd::JSON_MODIFIED = "modifiedIndex";
char const * etcd::JSON_ERROR_CODE = "errorCode";
char const * etcd::JSON_MESSAGE = "message";
char const * etcd::JSON_ACTION = "action";
char const * etcd::JSON_NODE = "node";
char const * etcd::JSON_NODES = "nodes";
char const * etcd::JSON_PREV_NODE = "prevNode";
char const * etcd::JSON_ETCD_INDEX = "X-Etcd-Index";

View File

@ -1,20 +0,0 @@
#ifndef __ETCD_JSON_CONSTANTS_HPP__
#define __ETCD_JSON_CONSTANTS_HPP__
namespace etcd
{
extern char const * JSON_KEY;
extern char const * JSON_DIR;
extern char const * JSON_VALUE;
extern char const * JSON_CREATED;
extern char const * JSON_MODIFIED;
extern char const * JSON_ERROR_CODE;
extern char const * JSON_MESSAGE;
extern char const * JSON_ACTION;
extern char const * JSON_NODE;
extern char const * JSON_NODES;
extern char const * JSON_PREV_NODE;
extern char const * JSON_ETCD_INDEX;
};
#endif

21
v3/include/Action.hpp Normal file
View File

@ -0,0 +1,21 @@
#ifndef __V3_ACTION_HPP__
#define __V3_ACTION_HPP__
#include <grpc++/grpc++.h>
using grpc::ClientContext;
using grpc::CompletionQueue;
using grpc::Status;
namespace etcdv3
{
class Action
{
public:
Status status;
ClientContext context;
CompletionQueue cq_;
void waitForResponse();
};
}
#endif

View File

@ -0,0 +1,27 @@
#ifndef __ASYNC_COMPAREANDDELETE_HPP__
#define __ASYNC_COMPAREANDDELETE_HPP__
#include <grpc++/grpc++.h>
#include "proto/rpc.grpc.pb.h"
#include "v3/include/Action.hpp"
#include "v3/include/AsyncTxnResponse.hpp"
using grpc::ClientAsyncResponseReader;
using etcdserverpb::TxnResponse;
using etcdserverpb::KV;
namespace etcdv3
{
class AsyncCompareAndDeleteAction : public etcdv3::Action
{
public:
AsyncCompareAndDeleteAction(std::string const & key, std::string const & old_value, KV::Stub* stub_);
AsyncCompareAndDeleteAction(std::string const & key, int old_index, KV::Stub* stub_);
AsyncTxnResponse ParseResponse();
TxnResponse reply;
std::unique_ptr<ClientAsyncResponseReader<TxnResponse>> response_reader;
};
}
#endif

View File

@ -0,0 +1,27 @@
#ifndef __ASYNC_COMPAREANDSWAP_HPP__
#define __ASYNC_COMPAREANDSWAP_HPP__
#include <grpc++/grpc++.h>
#include "proto/rpc.grpc.pb.h"
#include "v3/include/Action.hpp"
#include "v3/include/AsyncTxnResponse.hpp"
using grpc::ClientAsyncResponseReader;
using etcdserverpb::TxnResponse;
using etcdserverpb::KV;
namespace etcdv3
{
class AsyncCompareAndSwapAction : public etcdv3::Action
{
public:
AsyncCompareAndSwapAction(std::string const & key, std::string const & value, std::string const & old_value, KV::Stub* stub_);
AsyncCompareAndSwapAction(std::string const & key, std::string const & value, int old_index, KV::Stub* stub_);
AsyncTxnResponse ParseResponse();
TxnResponse reply;
std::unique_ptr<ClientAsyncResponseReader<TxnResponse>> response_reader;
};
}
#endif

View File

@ -0,0 +1,26 @@
#ifndef __ASYNC_DELETE_HPP__
#define __ASYNC_DELETE_HPP__
#include <grpc++/grpc++.h>
#include "proto/rpc.grpc.pb.h"
#include "v3/include/Action.hpp"
#include "v3/include/AsyncTxnResponse.hpp"
using grpc::ClientAsyncResponseReader;
using etcdserverpb::TxnResponse;
using etcdserverpb::KV;
namespace etcdv3
{
class AsyncDeleteAction : public etcdv3::Action
{
public:
AsyncDeleteAction(std::string const & key, KV::Stub* stub_, bool recursive=false);
AsyncTxnResponse ParseResponse();
TxnResponse reply;
std::unique_ptr<ClientAsyncResponseReader<TxnResponse>> response_reader;
};
}
#endif

View File

@ -0,0 +1,26 @@
#ifndef __ASYNC_GET_HPP__
#define __ASYNC_GET_HPP__
#include <grpc++/grpc++.h>
#include "proto/rpc.grpc.pb.h"
#include "v3/include/Action.hpp"
#include "v3/include/AsyncRangeResponse.hpp"
using grpc::ClientAsyncResponseReader;
using etcdserverpb::RangeResponse;
using etcdserverpb::KV;
namespace etcdv3
{
class AsyncGetAction : public etcdv3::Action
{
public:
AsyncGetAction(std::string const & key, KV::Stub* stub_, bool withPrefix=false);
AsyncRangeResponse ParseResponse();
RangeResponse reply;
std::unique_ptr<ClientAsyncResponseReader<RangeResponse>> response_reader;
};
}
#endif

View File

@ -7,9 +7,6 @@
using grpc::ClientAsyncResponseReader; using grpc::ClientAsyncResponseReader;
using grpc::ClientContext;
using grpc::CompletionQueue;
using grpc::Status;
using etcdserverpb::RangeResponse; using etcdserverpb::RangeResponse;
namespace etcdv3 namespace etcdv3
@ -17,16 +14,11 @@ namespace etcdv3
class AsyncRangeResponse : public etcdv3::V3Response class AsyncRangeResponse : public etcdv3::V3Response
{ {
public: public:
AsyncRangeResponse(){action = "get";}; AsyncRangeResponse(RangeResponse& resp);
AsyncRangeResponse(const AsyncRangeResponse& other); AsyncRangeResponse(const AsyncRangeResponse& other);
AsyncRangeResponse& operator=(const AsyncRangeResponse& other); AsyncRangeResponse& operator=(const AsyncRangeResponse& other);
AsyncRangeResponse& ParseResponse(); void ParseResponse();
void waitForResponse();
RangeResponse reply; RangeResponse reply;
Status status;
ClientContext context;
CompletionQueue cq_;
std::unique_ptr<ClientAsyncResponseReader<RangeResponse>> response_reader;
}; };
} }

View File

@ -0,0 +1,27 @@
#ifndef __ASYNC_SET_HPP__
#define __ASYNC_SET_HPP__
#include <grpc++/grpc++.h>
#include "proto/rpc.grpc.pb.h"
#include "v3/include/Action.hpp"
#include "v3/include/AsyncTxnResponse.hpp"
using grpc::ClientAsyncResponseReader;
using etcdserverpb::TxnResponse;
using etcdserverpb::KV;
namespace etcdv3
{
class AsyncSetAction : public etcdv3::Action
{
public:
AsyncSetAction(std::string const & key, std::string const & value, KV::Stub* stub_, bool create=false);
AsyncTxnResponse ParseResponse();
TxnResponse reply;
std::unique_ptr<ClientAsyncResponseReader<TxnResponse>> response_reader;
bool isCreate;
};
}
#endif

View File

@ -1,14 +1,9 @@
#ifndef __ASYNC_TXNRESPONSE_HPP__ #ifndef __ASYNC_TXNRESPONSE_HPP__
#define __ASYNC_TXNRESPONSE_HPP__ #define __ASYNC_TXNRESPONSE_HPP__
#include <grpc++/grpc++.h>
#include "proto/rpc.grpc.pb.h"
#include "v3/include/V3Response.hpp" #include "v3/include/V3Response.hpp"
#include "proto/rpc.pb.h"
using grpc::ClientAsyncResponseReader;
using grpc::ClientContext;
using grpc::CompletionQueue;
using grpc::Status;
using etcdserverpb::TxnResponse; using etcdserverpb::TxnResponse;
namespace etcdv3 namespace etcdv3
@ -16,17 +11,11 @@ namespace etcdv3
class AsyncTxnResponse : public etcdv3::V3Response class AsyncTxnResponse : public etcdv3::V3Response
{ {
public: public:
AsyncTxnResponse(){}; AsyncTxnResponse(TxnResponse& resp);
AsyncTxnResponse(const std::string act){action = act;};
AsyncTxnResponse(const AsyncTxnResponse& other);
AsyncTxnResponse& operator=(const AsyncTxnResponse& other); AsyncTxnResponse& operator=(const AsyncTxnResponse& other);
AsyncTxnResponse& ParseResponse(); AsyncTxnResponse(const AsyncTxnResponse& other);
void waitForResponse(); void ParseResponse();
TxnResponse reply; TxnResponse reply;
Status status;
ClientContext context;
CompletionQueue cq_;
std::unique_ptr<ClientAsyncResponseReader<TxnResponse>> response_reader;
}; };
} }

View File

@ -0,0 +1,26 @@
#ifndef __ASYNC_UPDATE_HPP__
#define __ASYNC_UPDATE_HPP__
#include <grpc++/grpc++.h>
#include "proto/rpc.grpc.pb.h"
#include "v3/include/Action.hpp"
#include "v3/include/AsyncTxnResponse.hpp"
using grpc::ClientAsyncResponseReader;
using etcdserverpb::TxnResponse;
using etcdserverpb::KV;
namespace etcdv3
{
class AsyncUpdateAction : public etcdv3::Action
{
public:
AsyncUpdateAction(std::string const & key, std::string const & value, KV::Stub* stub_);
AsyncTxnResponse ParseResponse();
TxnResponse reply;
std::unique_ptr<ClientAsyncResponseReader<TxnResponse>> response_reader;
};
}
#endif

View File

@ -0,0 +1,31 @@
#ifndef __ASYNC_WATCHACTION_HPP__
#define __ASYNC_WATCHACTION_HPP__
#include <grpc++/grpc++.h>
#include "proto/rpc.grpc.pb.h"
#include "v3/include/Action.hpp"
#include "v3/include/AsyncWatchResponse.hpp"
using grpc::ClientAsyncReaderWriter;
using etcdserverpb::WatchRequest;
using etcdserverpb::WatchResponse;
using etcdserverpb::KV;
using etcdserverpb::Watch;
namespace etcdv3
{
class AsyncWatchAction : public etcdv3::Action
{
public:
AsyncWatchAction(std::string const & key, bool recursive, KV::Stub* stub_, Watch::Stub* watchServiceStub);
AsyncWatchAction(std::string const & key, int fromIndex, bool recursive, KV::Stub* stub_, Watch::Stub* watchServiceStub);
AsyncWatchResponse ParseResponse();
void waitForResponse();
WatchResponse reply;
KV::Stub* stub_;
std::unique_ptr<ClientAsyncReaderWriter<WatchRequest,WatchResponse>> stream;
};
}
#endif

View File

@ -3,13 +3,10 @@
#include <grpc++/grpc++.h> #include <grpc++/grpc++.h>
#include "proto/rpc.grpc.pb.h" #include "proto/rpc.grpc.pb.h"
#include "proto/rpc.pb.h"
#include "v3/include/V3Response.hpp" #include "v3/include/V3Response.hpp"
using grpc::ClientAsyncReaderWriter;
using grpc::ClientContext;
using grpc::CompletionQueue;
using grpc::Status;
using etcdserverpb::WatchRequest; using etcdserverpb::WatchRequest;
using etcdserverpb::WatchResponse; using etcdserverpb::WatchResponse;
using etcdserverpb::KV; using etcdserverpb::KV;
@ -19,20 +16,13 @@ namespace etcdv3
class AsyncWatchResponse : public etcdv3::V3Response class AsyncWatchResponse : public etcdv3::V3Response
{ {
public: public:
AsyncWatchResponse(){fromIndex = -1;}; AsyncWatchResponse(WatchResponse& resp);
AsyncWatchResponse(const std::string act){action = act;};
AsyncWatchResponse(const AsyncWatchResponse& other); AsyncWatchResponse(const AsyncWatchResponse& other);
AsyncWatchResponse& operator=(const AsyncWatchResponse& other); AsyncWatchResponse& operator=(const AsyncWatchResponse& other);
AsyncWatchResponse& ParseResponse(); void ParseResponse();
void waitForResponse();
WatchResponse reply; WatchResponse reply;
Status status;
ClientContext context;
CompletionQueue cq_;
std::unique_ptr<ClientAsyncReaderWriter<WatchRequest,WatchResponse>> stream;
KV::Stub* stub_;
int fromIndex;
}; };
} }
#endif #endif

View File

@ -1,9 +1,9 @@
#ifndef __V3_RESPONSE_HPP__ #ifndef __V3_RESPONSE_HPP__
#define __V3_RESPONSE_HPP__ #define __V3_RESPONSE_HPP__
#include <grpc++/grpc++.h>
#include "proto/kv.pb.h" #include "proto/kv.pb.h"
namespace etcdv3 namespace etcdv3
{ {
class V3Response class V3Response
@ -11,8 +11,8 @@ namespace etcdv3
public: public:
V3Response(): error_code(0), index(0) {}; V3Response(): error_code(0), index(0) {};
int error_code; int error_code;
std::string error_message;
int index; int index;
std::string error_message;
std::string action; std::string action;
std::vector<mvccpb::KeyValue> values; std::vector<mvccpb::KeyValue> values;
std::vector<mvccpb::KeyValue> prev_values; std::vector<mvccpb::KeyValue> prev_values;

View File

@ -0,0 +1,16 @@
#ifndef __ETCD_ACTION_CONSTANTS_HPP__
#define __ETCD_ACTION_CONSTANTS_HPP__
namespace etcdv3
{
extern char const * CREATE_ACTION;
extern char const * UPDATE_ACTION;
extern char const * SET_ACTION;
extern char const * GET_ACTION;
extern char const * DELETE_ACTION;
extern char const * COMPARESWAP_ACTION;
extern char const * COMPAREDELETE_ACTION;
};
#endif

10
v3/src/Action.cpp Normal file
View File

@ -0,0 +1,10 @@
#include "v3/include/Action.hpp"
void etcdv3::Action::waitForResponse()
{
void* got_tag;
bool ok = false;
cq_.Next(&got_tag, &ok);
GPR_ASSERT(got_tag == (void*)this);
}

View File

@ -0,0 +1,59 @@
#include "v3/include/AsyncCompareAndDeleteAction.hpp"
#include "v3/include/action_constants.hpp"
#include "v3/include/Transaction.hpp"
using etcdserverpb::Compare;
using etcdserverpb::RangeRequest;
using etcdserverpb::PutRequest;
using etcdserverpb::RequestOp;
using etcdserverpb::ResponseOp;
using etcdserverpb::TxnRequest;
etcdv3::AsyncCompareAndDeleteAction::AsyncCompareAndDeleteAction(std::string const & key, std::string const & old_value, KV::Stub* stub_)
{
etcdv3::Transaction transaction(key);
transaction.init_compare(old_value, Compare::CompareResult::Compare_CompareResult_EQUAL,
Compare::CompareTarget::Compare_CompareTarget_VALUE);
transaction.setup_compare_and_delete_operation(key);
transaction.setup_basic_failure_operation(key);
response_reader = stub_->AsyncTxn(&context, transaction.txn_request, &cq_);
response_reader->Finish(&reply, &status, (void*)this);
}
etcdv3::AsyncCompareAndDeleteAction::AsyncCompareAndDeleteAction(std::string const & key, int old_index, KV::Stub* stub_)
{
etcdv3::Transaction transaction(key);
transaction.init_compare(old_index, Compare::CompareResult::Compare_CompareResult_EQUAL,
Compare::CompareTarget::Compare_CompareTarget_MOD);
transaction.setup_compare_and_delete_operation(key);
transaction.setup_basic_failure_operation(key);
response_reader = stub_->AsyncTxn(&context, transaction.txn_request, &cq_);
response_reader->Finish(&reply, &status, (void*)this);
}
etcdv3::AsyncTxnResponse etcdv3::AsyncCompareAndDeleteAction::ParseResponse()
{
AsyncTxnResponse txn_resp(reply);
if(!status.ok())
{
txn_resp.error_code = status.error_code();
txn_resp.error_message = status.error_message();
}
else
{
txn_resp.ParseResponse();
txn_resp.prev_values = txn_resp.values;
txn_resp.action = etcdv3::COMPAREDELETE_ACTION;
if(!reply.succeeded())
{
txn_resp.error_code=101;
txn_resp.error_message="Compare failed";
}
}
return txn_resp;
}

View File

@ -0,0 +1,60 @@
#include "v3/include/AsyncCompareAndSwapAction.hpp"
#include "v3/include/action_constants.hpp"
#include "v3/include/Transaction.hpp"
using etcdserverpb::Compare;
using etcdserverpb::RangeRequest;
using etcdserverpb::PutRequest;
using etcdserverpb::RequestOp;
using etcdserverpb::ResponseOp;
using etcdserverpb::TxnRequest;
etcdv3::AsyncCompareAndSwapAction::AsyncCompareAndSwapAction(std::string const & key, std::string const & value, std::string const & old_value, KV::Stub* stub_)
{
etcdv3::Transaction transaction(key);
transaction.init_compare(old_value, Compare::CompareResult::Compare_CompareResult_EQUAL,
Compare::CompareTarget::Compare_CompareTarget_VALUE);
transaction.setup_basic_failure_operation(key);
transaction.setup_compare_and_swap_sequence(value);
response_reader = stub_->AsyncTxn(&context, transaction.txn_request, &cq_);
response_reader->Finish(&reply, &status, (void*)this);
}
etcdv3::AsyncCompareAndSwapAction::AsyncCompareAndSwapAction(std::string const & key, std::string const & value, int old_index, KV::Stub* stub_)
{
etcdv3::Transaction transaction(key);
transaction.init_compare(old_index, Compare::CompareResult::Compare_CompareResult_EQUAL,
Compare::CompareTarget::Compare_CompareTarget_MOD);
transaction.setup_basic_failure_operation(key);
transaction.setup_compare_and_swap_sequence(value);
response_reader = stub_->AsyncTxn(&context, transaction.txn_request, &cq_);
response_reader->Finish(&reply, &status, (void*)this);
}
etcdv3::AsyncTxnResponse etcdv3::AsyncCompareAndSwapAction::ParseResponse()
{
AsyncTxnResponse txn_resp(reply);
if(!status.ok())
{
txn_resp.error_code = status.error_code();
txn_resp.error_message = status.error_message();
}
else
{
txn_resp.ParseResponse();
txn_resp.action = etcdv3::COMPARESWAP_ACTION;
if(!reply.succeeded())
{
txn_resp.error_code=101;
txn_resp.error_message="Compare failed";
}
}
return txn_resp;
}

View File

@ -0,0 +1,43 @@
#include "v3/include/AsyncDeleteAction.hpp"
#include "v3/include/action_constants.hpp"
#include "v3/include/Transaction.hpp"
using etcdserverpb::Compare;
etcdv3::AsyncDeleteAction::AsyncDeleteAction(std::string const & key, KV::Stub* stub_, bool recursive)
{
etcdv3::Transaction transaction(key);
transaction.init_compare(Compare::CompareResult::Compare_CompareResult_GREATER,
Compare::CompareTarget::Compare_CompareTarget_VERSION);
std::string range_end(key);
if(recursive)
{
int ascii = (int)range_end[range_end.length()-1];
range_end.back() = ascii+1;
}
transaction.setup_delete_sequence(key, range_end, recursive);
transaction.setup_delete_failure_operation(key, range_end, recursive);
response_reader = stub_->AsyncTxn(&context, transaction.txn_request, &cq_);
response_reader->Finish(&reply, &status, (void*)this);
}
etcdv3::AsyncTxnResponse etcdv3::AsyncDeleteAction::ParseResponse()
{
AsyncTxnResponse txn_resp(reply);
if(!status.ok())
{
txn_resp.error_code = status.error_code();
txn_resp.error_message = status.error_message();
}
else
{
txn_resp.ParseResponse();
txn_resp.prev_values = txn_resp.values;
txn_resp.action = etcdv3::DELETE_ACTION;
}
return txn_resp;
}

41
v3/src/AsyncGetAction.cpp Normal file
View File

@ -0,0 +1,41 @@
#include "v3/include/AsyncGetAction.hpp"
#include "v3/include/action_constants.hpp"
using etcdserverpb::RangeRequest;
etcdv3::AsyncGetAction::AsyncGetAction(std::string const & key, KV::Stub* stub_, bool withPrefix)
{
RangeRequest get_request;
get_request.set_key(key);
if(withPrefix)
{
std::string range_end(key);
int ascii = (int)range_end[range_end.length()-1];
range_end.back() = ascii+1;
get_request.set_range_end(range_end);
get_request.set_sort_target(RangeRequest::SortTarget::RangeRequest_SortTarget_KEY);
get_request.set_sort_order(RangeRequest::SortOrder::RangeRequest_SortOrder_ASCEND);
}
response_reader = stub_->AsyncRange(&context,get_request,&cq_);
response_reader->Finish(&reply, &status, (void*)this);
}
etcdv3::AsyncRangeResponse etcdv3::AsyncGetAction::ParseResponse()
{
AsyncRangeResponse range_resp(reply);
if(!status.ok())
{
range_resp.error_code = status.error_code();
range_resp.error_message = status.error_message();
}
else
{
range_resp.ParseResponse();
range_resp.action = etcdv3::GET_ACTION;
}
return range_resp;
}

View File

@ -1,4 +1,10 @@
#include "v3/include/AsyncRangeResponse.hpp" #include "v3/include/AsyncRangeResponse.hpp"
#include "v3/include/action_constants.hpp"
etcdv3::AsyncRangeResponse::AsyncRangeResponse(RangeResponse& resp)
{
reply = resp;
}
etcdv3::AsyncRangeResponse::AsyncRangeResponse(const etcdv3::AsyncRangeResponse& other) etcdv3::AsyncRangeResponse::AsyncRangeResponse(const etcdv3::AsyncRangeResponse& other)
{ {
@ -22,26 +28,9 @@ etcdv3::AsyncRangeResponse& etcdv3::AsyncRangeResponse::operator=(const etcdv3::
return *this; return *this;
} }
void etcdv3::AsyncRangeResponse::waitForResponse() void etcdv3::AsyncRangeResponse::ParseResponse()
{
void* got_tag;
bool ok = false;
cq_.Next(&got_tag, &ok);
GPR_ASSERT(got_tag == (void*)this);
}
etcdv3::AsyncRangeResponse& etcdv3::AsyncRangeResponse::ParseResponse()
{ {
index = reply.header().revision(); index = reply.header().revision();
if(!status.ok())
{
error_code = status.error_code();
error_message = status.error_message();
}
else
{
if(reply.kvs_size() == 0) if(reply.kvs_size() == 0)
{ {
error_code=100; error_code=100;
@ -53,6 +42,3 @@ etcdv3::AsyncRangeResponse& etcdv3::AsyncRangeResponse::ParseResponse()
values.push_back(reply.kvs(index)); values.push_back(reply.kvs(index));
} }
} }
index = reply.header().revision();
return *this;
}

57
v3/src/AsyncSetAction.cpp Normal file
View File

@ -0,0 +1,57 @@
#include "v3/include/AsyncSetAction.hpp"
#include "v3/include/AsyncRangeResponse.hpp"
#include "v3/include/action_constants.hpp"
#include "v3/include/Transaction.hpp"
using etcdserverpb::Compare;
using etcdserverpb::RangeRequest;
using etcdserverpb::PutRequest;
using etcdserverpb::RequestOp;
using etcdserverpb::ResponseOp;
using etcdserverpb::TxnRequest;
etcdv3::AsyncSetAction::AsyncSetAction(std::string const & key, std::string const & value, KV::Stub* stub_, bool create)
{
etcdv3::Transaction transaction(key);
isCreate = create;
if(create)
{
transaction.init_compare(Compare::CompareResult::Compare_CompareResult_EQUAL,
Compare::CompareTarget::Compare_CompareTarget_VERSION);
transaction.setup_basic_failure_operation(key);
transaction.setup_basic_create_sequence(key, value);
}
else
{
transaction.init_compare(Compare::CompareResult::Compare_CompareResult_EQUAL,
Compare::CompareTarget::Compare_CompareTarget_VERSION);
transaction.setup_set_failure_operation(key, value);
transaction.setup_basic_create_sequence(key, value);
}
response_reader = stub_->AsyncTxn(&context, transaction.txn_request, &cq_);
response_reader->Finish(&reply, &status, (void*)this);
}
etcdv3::AsyncTxnResponse etcdv3::AsyncSetAction::ParseResponse()
{
AsyncTxnResponse txn_resp(reply);
if(!status.ok())
{
txn_resp.error_code = status.error_code();
txn_resp.error_message = status.error_message();
}
else
{
txn_resp.ParseResponse();
txn_resp.action = isCreate? etcdv3::CREATE_ACTION:etcdv3::SET_ACTION;
if(!reply.succeeded() && txn_resp.action == etcdv3::CREATE_ACTION)
{
txn_resp.error_code=105;
txn_resp.error_message="Key already exists";
}
}
return txn_resp;
}

View File

@ -1,8 +1,13 @@
#include "v3/include/AsyncTxnResponse.hpp" #include "v3/include/AsyncTxnResponse.hpp"
#include "v3/include/AsyncRangeResponse.hpp" #include "v3/include/AsyncRangeResponse.hpp"
#include "v3/include/action_constants.hpp"
using etcdserverpb::ResponseOp; using etcdserverpb::ResponseOp;
etcdv3::AsyncTxnResponse::AsyncTxnResponse(TxnResponse& resp)
{
reply = resp;
}
etcdv3::AsyncTxnResponse::AsyncTxnResponse(const etcdv3::AsyncTxnResponse& other) etcdv3::AsyncTxnResponse::AsyncTxnResponse(const etcdv3::AsyncTxnResponse& other)
{ {
@ -12,7 +17,6 @@ etcdv3::AsyncTxnResponse::AsyncTxnResponse(const etcdv3::AsyncTxnResponse& other
action = other.action; action = other.action;
values = other.values; values = other.values;
prev_values = other.prev_values; prev_values = other.prev_values;
} }
etcdv3::AsyncTxnResponse& etcdv3::AsyncTxnResponse::operator=(const etcdv3::AsyncTxnResponse& other) etcdv3::AsyncTxnResponse& etcdv3::AsyncTxnResponse::operator=(const etcdv3::AsyncTxnResponse& other)
@ -26,26 +30,9 @@ etcdv3::AsyncTxnResponse& etcdv3::AsyncTxnResponse::operator=(const etcdv3::Asyn
return *this; return *this;
} }
void etcdv3::AsyncTxnResponse::waitForResponse() void etcdv3::AsyncTxnResponse::ParseResponse()
{ {
void* got_tag;
bool ok = false;
cq_.Next(&got_tag, &ok);
GPR_ASSERT(got_tag == (void*)this);
}
etcdv3::AsyncTxnResponse& etcdv3::AsyncTxnResponse::ParseResponse()
{
index = reply.header().revision(); index = reply.header().revision();
if(!status.ok())
{
error_code = status.error_code();
error_message = status.error_message();
}
else
{
std::vector<mvccpb::KeyValue> range_kvs; std::vector<mvccpb::KeyValue> range_kvs;
std::vector<mvccpb::KeyValue> prev_range_kvs; std::vector<mvccpb::KeyValue> prev_range_kvs;
for(int index=0; index < reply.responses_size(); index++) for(int index=0; index < reply.responses_size(); index++)
@ -53,51 +40,20 @@ etcdv3::AsyncTxnResponse& etcdv3::AsyncTxnResponse::ParseResponse()
auto resp = reply.responses(index); auto resp = reply.responses(index);
if(ResponseOp::ResponseCase::kResponseRange == resp.response_case()) if(ResponseOp::ResponseCase::kResponseRange == resp.response_case())
{ {
AsyncRangeResponse response; AsyncRangeResponse response(*(resp.mutable_response_range()));
response.reply = resp.response_range(); response.ParseResponse();
auto v3resp = response.ParseResponse();
error_code = v3resp.error_code; error_code = response.error_code;
error_message = v3resp.error_message; error_message = response.error_message;
if(!v3resp.values.empty()) if(!response.values.empty())
{ {
prev_range_kvs=range_kvs; prev_range_kvs=range_kvs;
range_kvs = v3resp.values; range_kvs = response.values;
}
}
else if(ResponseOp::ResponseCase::kResponseDeleteRange == resp.response_case())
{
//do nothing yet
}
}
if(!reply.succeeded())
{
if(action == "create")
{
error_code=105;
error_message="Key already exists";
}
else if(action == "compareAndSwap" || action == "compareAndDelete")
{
if(!error_code)
{
error_code=101;
error_message="Compare failed";
} }
} }
} }
prev_values = prev_range_kvs; prev_values = prev_range_kvs;
values = range_kvs; values = range_kvs;
if(action == "delete" || action == "compareAndDelete")
{
prev_values = values;
}
}
return *this;
} }

View File

@ -0,0 +1,41 @@
#include "v3/include/AsyncUpdateAction.hpp"
#include "v3/include/AsyncRangeResponse.hpp"
#include "v3/include/action_constants.hpp"
#include "v3/include/Transaction.hpp"
using etcdserverpb::Compare;
using etcdserverpb::RangeRequest;
using etcdserverpb::PutRequest;
using etcdserverpb::RequestOp;
using etcdserverpb::ResponseOp;
using etcdserverpb::TxnRequest;
etcdv3::AsyncUpdateAction::AsyncUpdateAction(std::string const & key, std::string const & value, KV::Stub* stub_)
{
etcdv3::Transaction transaction(key);
transaction.init_compare(Compare::CompareResult::Compare_CompareResult_GREATER,
Compare::CompareTarget::Compare_CompareTarget_VERSION);
transaction.setup_basic_failure_operation(key);
transaction.setup_compare_and_swap_sequence(value);
response_reader = stub_->AsyncTxn(&context, transaction.txn_request, &cq_);
response_reader->Finish(&reply, &status, (void*)this);
}
etcdv3::AsyncTxnResponse etcdv3::AsyncUpdateAction::ParseResponse()
{
AsyncTxnResponse txn_resp(reply);
if(!status.ok())
{
txn_resp.error_code = status.error_code();
txn_resp.error_message = status.error_message();
}
else
{
txn_resp.ParseResponse();
txn_resp.action = etcdv3::UPDATE_ACTION;
}
return txn_resp;
}

View File

@ -0,0 +1,91 @@
#include "v3/include/AsyncWatchAction.hpp"
#include "v3/include/action_constants.hpp"
using etcdserverpb::RangeRequest;
using etcdserverpb::RangeResponse;
using etcdserverpb::WatchCreateRequest;
etcdv3::AsyncWatchAction::AsyncWatchAction(std::string const & key, bool recursive, KV::Stub* stub_, Watch::Stub* watchServiceStub)
{
stream = watchServiceStub->AsyncWatch(&context,&cq_,(void*)this);
WatchRequest watch_req;
WatchCreateRequest watch_create_req;
watch_create_req.set_key(key);
std::string range_end(key);
if(recursive)
{
int ascii = (int)range_end[range_end.length()-1];
range_end.back() = ascii+1;
watch_create_req.set_range_end(range_end);
}
watch_req.mutable_create_request()->CopyFrom(watch_create_req);
stream->Write(watch_req, (void*)this);
stub_ = stub_;
}
etcdv3::AsyncWatchAction::AsyncWatchAction(std::string const & key, int fromIndex, bool recursive, KV::Stub* stub_, Watch::Stub* watchServiceStub)
{
stream = watchServiceStub->AsyncWatch(&context,&cq_,(void*)this);
WatchRequest watch_req;
WatchCreateRequest watch_create_req;
watch_create_req.set_key(key);
watch_create_req.set_start_revision(fromIndex);
std::string range_end(key);
if(recursive)
{
int ascii = (int)range_end[range_end.length()-1];
range_end.back() = ascii+1;
watch_create_req.set_range_end(range_end);
}
watch_req.mutable_create_request()->CopyFrom(watch_create_req);
stream->Write(watch_req, (void*)this);
stub_ = stub_;
}
void etcdv3::AsyncWatchAction::waitForResponse()
{
void* got_tag;
bool ok = false;
stream->Read(&reply, (void*)3);
while(cq_.Next(&got_tag, &ok))
{
if(got_tag == (void*)3)
{
if(reply.events_size())
{
stream->WritesDone((void*)this);
cq_.Next(&got_tag, &ok);
break;
}
else
{
stream->Read(&reply, (void*)3);
}
}
}
}
etcdv3::AsyncWatchResponse etcdv3::AsyncWatchAction::ParseResponse()
{
AsyncWatchResponse watch_resp(reply);
if(!status.ok())
{
watch_resp.error_code = status.error_code();
watch_resp.error_message = status.error_message();
}
else
{
watch_resp.ParseResponse();
}
return watch_resp;
}

View File

@ -1,8 +1,13 @@
#include "v3/include/AsyncWatchResponse.hpp" #include "v3/include/AsyncWatchResponse.hpp"
#include "v3/include/action_constants.hpp"
using etcdserverpb::RangeRequest; using etcdserverpb::RangeRequest;
using etcdserverpb::RangeResponse; using etcdserverpb::RangeResponse;
etcdv3::AsyncWatchResponse::AsyncWatchResponse(WatchResponse& resp)
{
reply = resp;
}
etcdv3::AsyncWatchResponse::AsyncWatchResponse(const etcdv3::AsyncWatchResponse& other) etcdv3::AsyncWatchResponse::AsyncWatchResponse(const etcdv3::AsyncWatchResponse& other)
{ {
@ -26,38 +31,10 @@ etcdv3::AsyncWatchResponse& etcdv3::AsyncWatchResponse::operator=(const etcdv3::
return *this; return *this;
} }
void etcdv3::AsyncWatchResponse::waitForResponse() void etcdv3::AsyncWatchResponse::ParseResponse()
{
void* got_tag;
bool ok = false;
stream->Read(&reply, (void*)3);
while(cq_.Next(&got_tag, &ok))
{
if(got_tag == (void*)3)
{
if(reply.events_size())
{
stream->WritesDone((void*)this);
cq_.Next(&got_tag, &ok);
break;
}
else
{
stream->Read(&reply, (void*)3);
}
}
}
}
etcdv3::AsyncWatchResponse& etcdv3::AsyncWatchResponse::ParseResponse()
{ {
index = reply.header().revision(); index = reply.header().revision();
mvccpb::KeyValue kv;
std::map<std::string, mvccpb::KeyValue> mapValue; std::map<std::string, mvccpb::KeyValue> mapValue;
std::map<std::string, mvccpb::KeyValue> prev_mapValue;
for(int cnt =0; cnt < reply.events_size(); cnt++) for(int cnt =0; cnt < reply.events_size(); cnt++)
{ {
auto event = reply.events(cnt); auto event = reply.events(cnt);
@ -65,48 +42,26 @@ etcdv3::AsyncWatchResponse& etcdv3::AsyncWatchResponse::ParseResponse()
{ {
if(event.has_kv()) if(event.has_kv())
{ {
kv = event.kv(); auto kv = event.kv();
if(kv.version() == 1) if(kv.version() == 1)
{ {
action = "create"; action = etcdv3::CREATE_ACTION;
} }
else else
{ {
action = "set"; action = etcdv3::SET_ACTION;
} }
mapValue.emplace(kv.key(), kv); mapValue.emplace(kv.key(), kv);
} }
} }
else if(mvccpb::Event::EventType::Event_EventType_DELETE == event.type()) else if(mvccpb::Event::EventType::Event_EventType_DELETE == event.type())
{ {
action = "delete"; action = etcdv3::DELETE_ACTION;
}
//get previous value index - 1
RangeRequest get_request;
get_request.set_key(kv.key());
get_request.set_revision((fromIndex >=0)?fromIndex - 1:index-1);
RangeResponse response;
ClientContext ctx;
Status result = stub_->Range(&ctx, get_request, &response);
if (result.ok())
{
for(int cnt=0; cnt < response.kvs_size(); cnt++)
{
prev_mapValue.emplace(response.kvs(cnt).key(),response.kvs(cnt));
}
}
}
for(auto x: prev_mapValue)
{
prev_values.push_back(x.second);
} }
}
for(auto x: mapValue) for(auto x: mapValue)
{ {
values.push_back(x.second); values.push_back(x.second);
} }
return *this;
} }

View File

@ -0,0 +1,10 @@
#include "v3/include/action_constants.hpp"
char const * etcdv3::CREATE_ACTION = "create";
char const * etcdv3::COMPARESWAP_ACTION = "compareAndSwap";
char const * etcdv3::UPDATE_ACTION = "update";
char const * etcdv3::SET_ACTION = "set";
char const * etcdv3::GET_ACTION = "get";
char const * etcdv3::DELETE_ACTION = "delete";
char const * etcdv3::COMPAREDELETE_ACTION = "compareAndDelete";