diff --git a/etcd/Client.hpp b/etcd/Client.hpp index e4b846f..54eca77 100644 --- a/etcd/Client.hpp +++ b/etcd/Client.hpp @@ -133,20 +133,6 @@ namespace etcd std::unique_ptr stub_; std::unique_ptr watchServiceStub; - pplx::task send_asyncput(const std::string& key, const std::string& value); - pplx::task send_asyncadd(std::string const & key, const std::string& value); - pplx::task send_asyncmodify(std::string const & key, std::string const & value); - pplx::task send_asyncget(std::string const & key,std::string const& range_end=""); - pplx::task send_put(const std::string& key, const std::string& value); - pplx::task send_get(std::string const & key); - pplx::task send_asyncmodify_if(std::string const & key, std::string const & value, std::string const & old_value); - pplx::task send_asyncmodify_if(std::string const & key, std::string const & value, int old_index); - pplx::task send_asyncdelete(std::string const & key, bool recursive); - pplx::task send_asyncrm_if(std::string const &key, std::string const &old_value); - pplx::task send_asyncrm_if(std::string const &key, int old_index); - pplx::task send_asyncwatch(std::string const & key, bool recursive); - pplx::task send_asyncwatch(std::string const & key, int fromIndex, bool recursive); - private: std::shared_ptr initiate_transaction(const std::string &operation, etcdv3::Transaction& transaction); diff --git a/etcd/Response.hpp b/etcd/Response.hpp index 15c04c2..c90ce3c 100644 --- a/etcd/Response.hpp +++ b/etcd/Response.hpp @@ -23,7 +23,6 @@ namespace etcd class Response { public: - static pplx::task create(pplx::task response_task); templatestatic pplx::task create(std::shared_ptr call) { @@ -99,7 +98,6 @@ namespace etcd std::string const & key(int index) const; protected: - Response(web::http::http_response http_response, web::json::value json_value); Response(const etcdv3::V3Response& response); int _error_code; diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 98eab76..da1a009 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1,4 +1,4 @@ -add_library(etcd-cpp-api SHARED ../proto/kv.pb.cc ../proto/auth.pb.cc ../proto/rpc.pb.cc ../proto/rpc.grpc.pb.cc ../v3/src/AsyncTxnResponse.cpp ../v3/src/AsyncRangeResponse.cpp ../v3/src/AsyncWatchResponse.cpp ../v3/src/Transaction.cpp Client.cpp Response.cpp Value.cpp json_constants.cpp) +add_library(etcd-cpp-api SHARED ../proto/kv.pb.cc ../proto/auth.pb.cc ../proto/rpc.pb.cc ../proto/rpc.grpc.pb.cc ../v3/src/AsyncTxnResponse.cpp ../v3/src/AsyncRangeResponse.cpp ../v3/src/AsyncWatchResponse.cpp ../v3/src/Transaction.cpp ../v3/src/action_constants.cpp ../v3/src/AsyncSetAction.cpp ../v3/src/AsyncCompareAndSwapAction.cpp ../v3/src/AsyncUpdateAction.cpp ../v3/src/AsyncGetAction.cpp ../v3/src/AsyncDeleteAction.cpp ../v3/src/AsyncCompareAndDeleteAction.cpp ../v3/src/Action.cpp ../v3/src/AsyncWatchAction.cpp Client.cpp Response.cpp Value.cpp) set_property(TARGET etcd-cpp-api PROPERTY CXX_STANDARD 11) target_link_libraries(etcd-cpp-api ${CPPREST_LIB} boost_system ssl crypto protobuf grpc++) @@ -8,3 +8,4 @@ install (FILES ../etcd/Client.hpp ../etcd/Response.hpp ../etcd/Value.hpp DESTINATION include/etcd) + diff --git a/src/Client.cpp b/src/Client.cpp index 66dbbfa..836a8f3 100644 --- a/src/Client.cpp +++ b/src/Client.cpp @@ -1,11 +1,20 @@ #include #include "etcd/Client.hpp" +#include "v3/include/action_constants.hpp" #include "v3/include/AsyncTxnResponse.hpp" #include "v3/include/AsyncRangeResponse.hpp" #include "v3/include/AsyncWatchResponse.hpp" #include "v3/include/Transaction.hpp" #include +#include "v3/include/AsyncSetAction.hpp" +#include "v3/include/AsyncCompareAndSwapAction.hpp" +#include "v3/include/AsyncCompareAndDeleteAction.hpp" +#include "v3/include/AsyncUpdateAction.hpp" +#include "v3/include/AsyncGetAction.hpp" +#include "v3/include/AsyncDeleteAction.hpp" +#include "v3/include/AsyncWatchAction.hpp" + using grpc::Channel; using etcdserverpb::PutRequest; using etcdserverpb::RangeRequest; @@ -14,10 +23,6 @@ using etcdserverpb::DeleteRangeRequest; using etcdserverpb::Compare; using etcdserverpb::RequestOp; -using grpc::ClientReaderWriter; -using etcdserverpb::WatchRequest; -using etcdserverpb::WatchResponse; -using etcdserverpb::WatchCreateRequest; etcd::Client::Client(std::string const & address) { @@ -36,281 +41,86 @@ etcd::Client::Client(std::string const & address) pplx::task etcd::Client::get(std::string const & key) { - return send_asyncget(key); + std::shared_ptr call(new etcdv3::AsyncGetAction(key,stub_.get())); + return Response::create(call); } pplx::task etcd::Client::set(std::string const & key, std::string const & value) { - return send_asyncput(key,value); + std::shared_ptr call(new etcdv3::AsyncSetAction(key, value, stub_.get())); + return Response::create(call);; } pplx::task etcd::Client::add(std::string const & key, std::string const & value) { - return send_asyncadd(key,value); + std::shared_ptr call(new etcdv3::AsyncSetAction(key, value, stub_.get(), true)); + return Response::create(call);; } pplx::task etcd::Client::modify(std::string const & key, std::string const & value) { - return send_asyncmodify(key,value); + std::shared_ptr call(new etcdv3::AsyncUpdateAction(key,value,stub_.get()));; + return Response::create(call); } pplx::task etcd::Client::modify_if(std::string const & key, std::string const & value, std::string const & old_value) { - return send_asyncmodify_if(key, value, old_value); + std::shared_ptr call(new etcdv3::AsyncCompareAndSwapAction(key,value,old_value, stub_.get()));; + return Response::create(call); } pplx::task etcd::Client::modify_if(std::string const & key, std::string const & value, int old_index) { - return send_asyncmodify_if(key, value, old_index); + std::shared_ptr call(new etcdv3::AsyncCompareAndSwapAction(key,value,old_index, stub_.get()));; + return Response::create(call); } pplx::task etcd::Client::rm(std::string const & key) { - return send_asyncdelete(key,false); + std::shared_ptr call(new etcdv3::AsyncDeleteAction(key,stub_.get()));; + return Response::create(call); } pplx::task etcd::Client::rm_if(std::string const & key, std::string const & old_value) { - return send_asyncrm_if(key, old_value); + std::shared_ptr call(new etcdv3::AsyncCompareAndDeleteAction(key,old_value,stub_.get()));; + return Response::create(call); } - pplx::task etcd::Client::rm_if(std::string const & key, int old_index) { - return send_asyncrm_if(key, old_index); + std::shared_ptr call(new etcdv3::AsyncCompareAndDeleteAction(key,old_index,stub_.get()));; + return Response::create(call); } - - pplx::task etcd::Client::rmdir(std::string const & key, bool recursive) { - - return send_asyncdelete(key,recursive); + std::shared_ptr call(new etcdv3::AsyncDeleteAction(key,stub_.get(),true));; + return Response::create(call); } - pplx::task etcd::Client::ls(std::string const & key) { - std::string range_end(key); - int ascii = (int)range_end[range_end.length()-1]; - range_end.back() = ascii+1; - - return send_asyncget(key,range_end); + std::shared_ptr call(new etcdv3::AsyncGetAction(key,stub_.get(),true)); + return Response::create(call); } - pplx::task etcd::Client::watch(std::string const & key, bool recursive) { - return send_asyncwatch(key,recursive); + std::shared_ptr call(new etcdv3::AsyncWatchAction(key,recursive,stub_.get(),watchServiceStub.get())); + return Response::create(call); } - pplx::task etcd::Client::watch(std::string const & key, int fromIndex, bool recursive) { - return send_asyncwatch(key, fromIndex, recursive); -} - - -std::shared_ptr etcd::Client::initiate_transaction(const std::string &operation, - etcdv3::Transaction& transaction) -{ - std::shared_ptr call(new etcdv3::AsyncTxnResponse(operation)); - call->response_reader = stub_->AsyncTxn(&call->context, transaction.txn_request, &call->cq_); - call->response_reader->Finish(&call->reply, &call->status, (void*) (call.get())); - return call; -} - - -pplx::task etcd::Client::send_asyncadd(std::string const & key, std::string const & value) -{ - etcdv3::Transaction transaction(key); - transaction.init_compare(Compare::CompareResult::Compare_CompareResult_EQUAL, - Compare::CompareTarget::Compare_CompareTarget_VERSION); - - transaction.setup_basic_failure_operation(key); - transaction.setup_basic_create_sequence(key, value); - - std::shared_ptr call = initiate_transaction("create", transaction); + std::shared_ptr call(new etcdv3::AsyncWatchAction(key,fromIndex,recursive,stub_.get(),watchServiceStub.get())); return Response::create(call); } -pplx::task etcd::Client::send_asyncmodify_if(std::string const & key, std::string const & value, std::string const & old_value) -{ - etcdv3::Transaction transaction(key); - transaction.init_compare(old_value, Compare::CompareResult::Compare_CompareResult_EQUAL, - Compare::CompareTarget::Compare_CompareTarget_VALUE); - - transaction.setup_basic_failure_operation(key); - transaction.setup_compare_and_swap_sequence(value); - - std::shared_ptr call = initiate_transaction("compareAndSwap", transaction); - return Response::create(call); -} - - -pplx::task etcd::Client::send_asyncmodify_if(std::string const & key, std::string const & value, int old_index) -{ - etcdv3::Transaction transaction(key); - transaction.init_compare(old_index, Compare::CompareResult::Compare_CompareResult_EQUAL, - Compare::CompareTarget::Compare_CompareTarget_MOD); - - transaction.setup_basic_failure_operation(key); - transaction.setup_compare_and_swap_sequence(value); - - std::shared_ptr call = initiate_transaction("compareAndSwap", transaction); - return Response::create(call); -} - - -pplx::task etcd::Client::send_asyncmodify(std::string const & key, std::string const & value) -{ - etcdv3::Transaction transaction(key); - transaction.init_compare(Compare::CompareResult::Compare_CompareResult_GREATER, - Compare::CompareTarget::Compare_CompareTarget_VERSION); - - transaction.setup_basic_failure_operation(key); - transaction.setup_compare_and_swap_sequence(value); - - std::shared_ptr call = initiate_transaction("update", transaction); - return Response::create(call); -} - - -pplx::task etcd::Client::send_asyncget(std::string const & key, std::string const& range_end) -{ - RangeRequest get_request; - get_request.set_key(key); - if(!range_end.empty()) - { - get_request.set_range_end(range_end); - get_request.set_sort_target(RangeRequest::SortTarget::RangeRequest_SortTarget_KEY); - get_request.set_sort_order(RangeRequest::SortOrder::RangeRequest_SortOrder_ASCEND); - } - - std::shared_ptr call(new etcdv3::AsyncRangeResponse()); - call->response_reader = stub_->AsyncRange(&call->context,get_request,&call->cq_); - call->response_reader->Finish(&call->reply, &call->status, (void*)call.get()); - - return Response::create(call); -} - - -pplx::task etcd::Client::send_asyncput(std::string const & key, std::string const & value) -{ - etcdv3::Transaction transaction(key); - transaction.init_compare(Compare::CompareResult::Compare_CompareResult_EQUAL, - Compare::CompareTarget::Compare_CompareTarget_VERSION); - - transaction.setup_set_failure_operation(key, value); - transaction.setup_basic_create_sequence(key, value); - - std::shared_ptr call = initiate_transaction("set", transaction); - return Response::create(call); -} - - -pplx::task etcd::Client::send_asyncdelete(std::string const & key, bool recursive) -{ - etcdv3::Transaction transaction(key); - transaction.init_compare(Compare::CompareResult::Compare_CompareResult_GREATER, - Compare::CompareTarget::Compare_CompareTarget_VERSION); - - std::string range_end(key); - if(recursive) - { - int ascii = (int)range_end[range_end.length()-1]; - range_end.back() = ascii+1; - } - - transaction.setup_delete_sequence(key, range_end, recursive); - transaction.setup_delete_failure_operation(key, range_end, recursive); - - std::shared_ptr call = initiate_transaction("delete", transaction); - return Response::create(call); -} - - -pplx::task etcd::Client::send_asyncrm_if(std::string const &key, std::string const &old_value) -{ - etcdv3::Transaction transaction(key); - transaction.init_compare(old_value, Compare::CompareResult::Compare_CompareResult_EQUAL, - Compare::CompareTarget::Compare_CompareTarget_VALUE); - - transaction.setup_compare_and_delete_operation(key); - transaction.setup_basic_failure_operation(key); - - std::shared_ptr call = initiate_transaction("compareAndDelete", transaction); - return Response::create(call); -} - - -pplx::task etcd::Client::send_asyncrm_if(std::string const &key, int old_index) { - etcdv3::Transaction transaction(key); - transaction.init_compare(old_index, Compare::CompareResult::Compare_CompareResult_EQUAL, - Compare::CompareTarget::Compare_CompareTarget_MOD); - - transaction.setup_compare_and_delete_operation(key); - transaction.setup_basic_failure_operation(key); - - std::shared_ptr call = initiate_transaction("compareAndDelete", transaction); - return Response::create(call); -} - -pplx::task etcd::Client::send_asyncwatch(std::string const & key, bool recursive) -{ - std::shared_ptr call(new etcdv3::AsyncWatchResponse()); - call->stream = watchServiceStub->AsyncWatch(&call->context,&call->cq_,(void*)call.get()); - - WatchRequest watch_req; - WatchCreateRequest watch_create_req; - watch_create_req.set_key(key); - - std::string range_end(key); - if(recursive) - { - int ascii = (int)range_end[range_end.length()-1]; - range_end.back() = ascii+1; - watch_create_req.set_range_end(range_end); - } - - - watch_req.mutable_create_request()->CopyFrom(watch_create_req); - call->stream->Write(watch_req, (void*)call.get()); - call->stub_ = stub_.get(); - - return Response::create(call); -} - -pplx::task etcd::Client::send_asyncwatch(std::string const & key, int fromIndex, bool recursive) -{ - std::shared_ptr call(new etcdv3::AsyncWatchResponse()); - call->stream = watchServiceStub->AsyncWatch(&call->context,&call->cq_,(void*)call.get()); - - WatchRequest watch_req; - WatchCreateRequest watch_create_req; - watch_create_req.set_key(key); - watch_create_req.set_start_revision(fromIndex); - - std::string range_end(key); - if(recursive) - { - int ascii = (int)range_end[range_end.length()-1]; - range_end.back() = ascii+1; - watch_create_req.set_range_end(range_end); - } - - - watch_req.mutable_create_request()->CopyFrom(watch_create_req); - call->stream->Write(watch_req, (void*)call.get()); - call->stub_ = stub_.get(); - call->fromIndex = fromIndex; - - return Response::create(call); -} diff --git a/src/Response.cpp b/src/Response.cpp index 39ad78d..ca01f32 100644 --- a/src/Response.cpp +++ b/src/Response.cpp @@ -1,19 +1,7 @@ #include "etcd/Response.hpp" -#include "json_constants.hpp" #include -pplx::task etcd::Response::create(pplx::task response_task) -{ - return pplx::task ( - [response_task]() - { - auto json_task = response_task.get().extract_json(); - return etcd::Response(response_task.get(), json_task.get()); - } - ); -} - etcd::Response::Response(const etcdv3::V3Response& reply) { @@ -48,41 +36,6 @@ etcd::Response::Response() { } -etcd::Response::Response(web::http::http_response http_response, web::json::value json_value) - : _error_code(0), - _index(0) -{ - if (http_response.headers().has(JSON_ETCD_INDEX)) - _index = atoi(http_response.headers()[JSON_ETCD_INDEX].c_str()); - - if (json_value.has_field(JSON_ERROR_CODE)) - { - _error_code = json_value[JSON_ERROR_CODE].as_number().to_int64(); - _error_message = json_value[JSON_MESSAGE].as_string(); - } - - if (json_value.has_field(JSON_ACTION)) - _action = json_value[JSON_ACTION].as_string(); - - if (json_value.has_field(JSON_NODE)) - { - if (json_value[JSON_NODE].has_field(JSON_NODES)) - { - std::string prefix = json_value[JSON_NODE][JSON_KEY].as_string(); - for (auto & node : json_value[JSON_NODE][JSON_NODES].as_array()) - { - _values.push_back(Value(node)); - _keys.push_back(node[JSON_KEY].as_string().substr(prefix.length() + 1)); - } - } - else - _value = Value(json_value.at(JSON_NODE)); - } - - if (json_value.has_field(JSON_PREV_NODE)) - _prev_value = Value(json_value.at(JSON_PREV_NODE)); -} - int etcd::Response::error_code() const { return _error_code; diff --git a/src/Value.cpp b/src/Value.cpp index 618e5e1..dc16a93 100644 --- a/src/Value.cpp +++ b/src/Value.cpp @@ -1,5 +1,4 @@ #include "etcd/Value.hpp" -#include "json_constants.hpp" #include "proto/kv.pb.h" etcd::Value::Value() @@ -9,14 +8,6 @@ etcd::Value::Value() { } -etcd::Value::Value(web::json::value const & json_value) - : _key(json_value.has_field(JSON_KEY) ? json_value.at(JSON_KEY).as_string() : ""), - dir(json_value.has_field(JSON_DIR)), - value(json_value.has_field(JSON_VALUE) ? json_value.at(JSON_VALUE).as_string() : ""), - created(json_value.has_field(JSON_CREATED) ? json_value.at(JSON_CREATED).as_number().to_int64() : 0), - modified(json_value.has_field(JSON_MODIFIED) ? json_value.at(JSON_MODIFIED).as_number().to_int64() : 0) -{ -} etcd::Value::Value(mvccpb::KeyValue const & kvs) { diff --git a/src/json_constants.cpp b/src/json_constants.cpp deleted file mode 100644 index 015b3a8..0000000 --- a/src/json_constants.cpp +++ /dev/null @@ -1,14 +0,0 @@ -#include "json_constants.hpp" - -char const * etcd::JSON_KEY = "key"; -char const * etcd::JSON_DIR = "dir"; -char const * etcd::JSON_VALUE = "value"; -char const * etcd::JSON_CREATED = "createdIndex"; -char const * etcd::JSON_MODIFIED = "modifiedIndex"; -char const * etcd::JSON_ERROR_CODE = "errorCode"; -char const * etcd::JSON_MESSAGE = "message"; -char const * etcd::JSON_ACTION = "action"; -char const * etcd::JSON_NODE = "node"; -char const * etcd::JSON_NODES = "nodes"; -char const * etcd::JSON_PREV_NODE = "prevNode"; -char const * etcd::JSON_ETCD_INDEX = "X-Etcd-Index"; diff --git a/src/json_constants.hpp b/src/json_constants.hpp deleted file mode 100644 index 334e2ed..0000000 --- a/src/json_constants.hpp +++ /dev/null @@ -1,20 +0,0 @@ -#ifndef __ETCD_JSON_CONSTANTS_HPP__ -#define __ETCD_JSON_CONSTANTS_HPP__ - -namespace etcd -{ - extern char const * JSON_KEY; - extern char const * JSON_DIR; - extern char const * JSON_VALUE; - extern char const * JSON_CREATED; - extern char const * JSON_MODIFIED; - extern char const * JSON_ERROR_CODE; - extern char const * JSON_MESSAGE; - extern char const * JSON_ACTION; - extern char const * JSON_NODE; - extern char const * JSON_NODES; - extern char const * JSON_PREV_NODE; - extern char const * JSON_ETCD_INDEX; -}; - -#endif diff --git a/v3/include/Action.hpp b/v3/include/Action.hpp new file mode 100644 index 0000000..3be3038 --- /dev/null +++ b/v3/include/Action.hpp @@ -0,0 +1,21 @@ +#ifndef __V3_ACTION_HPP__ +#define __V3_ACTION_HPP__ + +#include + +using grpc::ClientContext; +using grpc::CompletionQueue; +using grpc::Status; + +namespace etcdv3 +{ + class Action + { + public: + Status status; + ClientContext context; + CompletionQueue cq_; + void waitForResponse(); + }; +} +#endif diff --git a/v3/include/AsyncCompareAndDeleteAction.hpp b/v3/include/AsyncCompareAndDeleteAction.hpp new file mode 100644 index 0000000..ea1114f --- /dev/null +++ b/v3/include/AsyncCompareAndDeleteAction.hpp @@ -0,0 +1,27 @@ +#ifndef __ASYNC_COMPAREANDDELETE_HPP__ +#define __ASYNC_COMPAREANDDELETE_HPP__ + +#include +#include "proto/rpc.grpc.pb.h" +#include "v3/include/Action.hpp" +#include "v3/include/AsyncTxnResponse.hpp" + + +using grpc::ClientAsyncResponseReader; +using etcdserverpb::TxnResponse; +using etcdserverpb::KV; + +namespace etcdv3 +{ + class AsyncCompareAndDeleteAction : public etcdv3::Action + { + public: + AsyncCompareAndDeleteAction(std::string const & key, std::string const & old_value, KV::Stub* stub_); + AsyncCompareAndDeleteAction(std::string const & key, int old_index, KV::Stub* stub_); + AsyncTxnResponse ParseResponse(); + TxnResponse reply; + std::unique_ptr> response_reader; + }; +} + +#endif diff --git a/v3/include/AsyncCompareAndSwapAction.hpp b/v3/include/AsyncCompareAndSwapAction.hpp new file mode 100644 index 0000000..c83ec58 --- /dev/null +++ b/v3/include/AsyncCompareAndSwapAction.hpp @@ -0,0 +1,27 @@ +#ifndef __ASYNC_COMPAREANDSWAP_HPP__ +#define __ASYNC_COMPAREANDSWAP_HPP__ + +#include +#include "proto/rpc.grpc.pb.h" +#include "v3/include/Action.hpp" +#include "v3/include/AsyncTxnResponse.hpp" + + +using grpc::ClientAsyncResponseReader; +using etcdserverpb::TxnResponse; +using etcdserverpb::KV; + +namespace etcdv3 +{ + class AsyncCompareAndSwapAction : public etcdv3::Action + { + public: + AsyncCompareAndSwapAction(std::string const & key, std::string const & value, std::string const & old_value, KV::Stub* stub_); + AsyncCompareAndSwapAction(std::string const & key, std::string const & value, int old_index, KV::Stub* stub_); + AsyncTxnResponse ParseResponse(); + TxnResponse reply; + std::unique_ptr> response_reader; + }; +} + +#endif diff --git a/v3/include/AsyncDeleteAction.hpp b/v3/include/AsyncDeleteAction.hpp new file mode 100644 index 0000000..4c8daf9 --- /dev/null +++ b/v3/include/AsyncDeleteAction.hpp @@ -0,0 +1,26 @@ +#ifndef __ASYNC_DELETE_HPP__ +#define __ASYNC_DELETE_HPP__ + +#include +#include "proto/rpc.grpc.pb.h" +#include "v3/include/Action.hpp" +#include "v3/include/AsyncTxnResponse.hpp" + + +using grpc::ClientAsyncResponseReader; +using etcdserverpb::TxnResponse; +using etcdserverpb::KV; + +namespace etcdv3 +{ + class AsyncDeleteAction : public etcdv3::Action + { + public: + AsyncDeleteAction(std::string const & key, KV::Stub* stub_, bool recursive=false); + AsyncTxnResponse ParseResponse(); + TxnResponse reply; + std::unique_ptr> response_reader; + }; +} + +#endif diff --git a/v3/include/AsyncGetAction.hpp b/v3/include/AsyncGetAction.hpp new file mode 100644 index 0000000..be1a3fa --- /dev/null +++ b/v3/include/AsyncGetAction.hpp @@ -0,0 +1,26 @@ +#ifndef __ASYNC_GET_HPP__ +#define __ASYNC_GET_HPP__ + +#include +#include "proto/rpc.grpc.pb.h" +#include "v3/include/Action.hpp" +#include "v3/include/AsyncRangeResponse.hpp" + + +using grpc::ClientAsyncResponseReader; +using etcdserverpb::RangeResponse; +using etcdserverpb::KV; + +namespace etcdv3 +{ + class AsyncGetAction : public etcdv3::Action + { + public: + AsyncGetAction(std::string const & key, KV::Stub* stub_, bool withPrefix=false); + AsyncRangeResponse ParseResponse(); + RangeResponse reply; + std::unique_ptr> response_reader; + }; +} + +#endif diff --git a/v3/include/AsyncRangeResponse.hpp b/v3/include/AsyncRangeResponse.hpp index 3de0294..a9ebe6b 100644 --- a/v3/include/AsyncRangeResponse.hpp +++ b/v3/include/AsyncRangeResponse.hpp @@ -7,9 +7,6 @@ using grpc::ClientAsyncResponseReader; -using grpc::ClientContext; -using grpc::CompletionQueue; -using grpc::Status; using etcdserverpb::RangeResponse; namespace etcdv3 @@ -17,16 +14,11 @@ namespace etcdv3 class AsyncRangeResponse : public etcdv3::V3Response { public: - AsyncRangeResponse(){action = "get";}; + AsyncRangeResponse(RangeResponse& resp); AsyncRangeResponse(const AsyncRangeResponse& other); AsyncRangeResponse& operator=(const AsyncRangeResponse& other); - AsyncRangeResponse& ParseResponse(); - void waitForResponse(); + void ParseResponse(); RangeResponse reply; - Status status; - ClientContext context; - CompletionQueue cq_; - std::unique_ptr> response_reader; }; } diff --git a/v3/include/AsyncSetAction.hpp b/v3/include/AsyncSetAction.hpp new file mode 100644 index 0000000..57f6ccb --- /dev/null +++ b/v3/include/AsyncSetAction.hpp @@ -0,0 +1,27 @@ +#ifndef __ASYNC_SET_HPP__ +#define __ASYNC_SET_HPP__ + +#include +#include "proto/rpc.grpc.pb.h" +#include "v3/include/Action.hpp" +#include "v3/include/AsyncTxnResponse.hpp" + + +using grpc::ClientAsyncResponseReader; +using etcdserverpb::TxnResponse; +using etcdserverpb::KV; + +namespace etcdv3 +{ + class AsyncSetAction : public etcdv3::Action + { + public: + AsyncSetAction(std::string const & key, std::string const & value, KV::Stub* stub_, bool create=false); + AsyncTxnResponse ParseResponse(); + TxnResponse reply; + std::unique_ptr> response_reader; + bool isCreate; + }; +} + +#endif diff --git a/v3/include/AsyncTxnResponse.hpp b/v3/include/AsyncTxnResponse.hpp index e07607d..7019930 100644 --- a/v3/include/AsyncTxnResponse.hpp +++ b/v3/include/AsyncTxnResponse.hpp @@ -1,14 +1,9 @@ #ifndef __ASYNC_TXNRESPONSE_HPP__ #define __ASYNC_TXNRESPONSE_HPP__ -#include -#include "proto/rpc.grpc.pb.h" #include "v3/include/V3Response.hpp" +#include "proto/rpc.pb.h" -using grpc::ClientAsyncResponseReader; -using grpc::ClientContext; -using grpc::CompletionQueue; -using grpc::Status; using etcdserverpb::TxnResponse; namespace etcdv3 @@ -16,17 +11,11 @@ namespace etcdv3 class AsyncTxnResponse : public etcdv3::V3Response { public: - AsyncTxnResponse(){}; - AsyncTxnResponse(const std::string act){action = act;}; - AsyncTxnResponse(const AsyncTxnResponse& other); + AsyncTxnResponse(TxnResponse& resp); AsyncTxnResponse& operator=(const AsyncTxnResponse& other); - AsyncTxnResponse& ParseResponse(); - void waitForResponse(); + AsyncTxnResponse(const AsyncTxnResponse& other); + void ParseResponse(); TxnResponse reply; - Status status; - ClientContext context; - CompletionQueue cq_; - std::unique_ptr> response_reader; }; } diff --git a/v3/include/AsyncUpdateAction.hpp b/v3/include/AsyncUpdateAction.hpp new file mode 100644 index 0000000..15b4094 --- /dev/null +++ b/v3/include/AsyncUpdateAction.hpp @@ -0,0 +1,26 @@ +#ifndef __ASYNC_UPDATE_HPP__ +#define __ASYNC_UPDATE_HPP__ + +#include +#include "proto/rpc.grpc.pb.h" +#include "v3/include/Action.hpp" +#include "v3/include/AsyncTxnResponse.hpp" + + +using grpc::ClientAsyncResponseReader; +using etcdserverpb::TxnResponse; +using etcdserverpb::KV; + +namespace etcdv3 +{ + class AsyncUpdateAction : public etcdv3::Action + { + public: + AsyncUpdateAction(std::string const & key, std::string const & value, KV::Stub* stub_); + AsyncTxnResponse ParseResponse(); + TxnResponse reply; + std::unique_ptr> response_reader; + }; +} + +#endif diff --git a/v3/include/AsyncWatchAction.hpp b/v3/include/AsyncWatchAction.hpp new file mode 100644 index 0000000..b91c6ff --- /dev/null +++ b/v3/include/AsyncWatchAction.hpp @@ -0,0 +1,31 @@ +#ifndef __ASYNC_WATCHACTION_HPP__ +#define __ASYNC_WATCHACTION_HPP__ + +#include +#include "proto/rpc.grpc.pb.h" +#include "v3/include/Action.hpp" +#include "v3/include/AsyncWatchResponse.hpp" + + +using grpc::ClientAsyncReaderWriter; +using etcdserverpb::WatchRequest; +using etcdserverpb::WatchResponse; +using etcdserverpb::KV; +using etcdserverpb::Watch; + +namespace etcdv3 +{ + class AsyncWatchAction : public etcdv3::Action + { + public: + AsyncWatchAction(std::string const & key, bool recursive, KV::Stub* stub_, Watch::Stub* watchServiceStub); + AsyncWatchAction(std::string const & key, int fromIndex, bool recursive, KV::Stub* stub_, Watch::Stub* watchServiceStub); + AsyncWatchResponse ParseResponse(); + void waitForResponse(); + WatchResponse reply; + KV::Stub* stub_; + std::unique_ptr> stream; + }; +} + +#endif diff --git a/v3/include/AsyncWatchResponse.hpp b/v3/include/AsyncWatchResponse.hpp index 7c07cd9..ac3a633 100644 --- a/v3/include/AsyncWatchResponse.hpp +++ b/v3/include/AsyncWatchResponse.hpp @@ -3,13 +3,10 @@ #include #include "proto/rpc.grpc.pb.h" +#include "proto/rpc.pb.h" #include "v3/include/V3Response.hpp" -using grpc::ClientAsyncReaderWriter; -using grpc::ClientContext; -using grpc::CompletionQueue; -using grpc::Status; using etcdserverpb::WatchRequest; using etcdserverpb::WatchResponse; using etcdserverpb::KV; @@ -19,20 +16,13 @@ namespace etcdv3 class AsyncWatchResponse : public etcdv3::V3Response { public: - AsyncWatchResponse(){fromIndex = -1;}; - AsyncWatchResponse(const std::string act){action = act;}; + AsyncWatchResponse(WatchResponse& resp); AsyncWatchResponse(const AsyncWatchResponse& other); AsyncWatchResponse& operator=(const AsyncWatchResponse& other); - AsyncWatchResponse& ParseResponse(); - void waitForResponse(); + void ParseResponse(); WatchResponse reply; - Status status; - ClientContext context; - CompletionQueue cq_; - std::unique_ptr> stream; - KV::Stub* stub_; - int fromIndex; }; } #endif + diff --git a/v3/include/V3Response.hpp b/v3/include/V3Response.hpp index de17bbb..cf902ad 100644 --- a/v3/include/V3Response.hpp +++ b/v3/include/V3Response.hpp @@ -1,9 +1,9 @@ #ifndef __V3_RESPONSE_HPP__ #define __V3_RESPONSE_HPP__ +#include #include "proto/kv.pb.h" - namespace etcdv3 { class V3Response @@ -11,11 +11,11 @@ namespace etcdv3 public: V3Response(): error_code(0), index(0) {}; int error_code; - std::string error_message; int index; + std::string error_message; std::string action; std::vector values; - std::vector prev_values; + std::vector prev_values; }; } #endif diff --git a/v3/include/action_constants.hpp b/v3/include/action_constants.hpp new file mode 100644 index 0000000..00a6769 --- /dev/null +++ b/v3/include/action_constants.hpp @@ -0,0 +1,16 @@ +#ifndef __ETCD_ACTION_CONSTANTS_HPP__ +#define __ETCD_ACTION_CONSTANTS_HPP__ + +namespace etcdv3 +{ + extern char const * CREATE_ACTION; + extern char const * UPDATE_ACTION; + extern char const * SET_ACTION; + extern char const * GET_ACTION; + extern char const * DELETE_ACTION; + extern char const * COMPARESWAP_ACTION; + extern char const * COMPAREDELETE_ACTION; + +}; + +#endif diff --git a/v3/src/Action.cpp b/v3/src/Action.cpp new file mode 100644 index 0000000..de6ee82 --- /dev/null +++ b/v3/src/Action.cpp @@ -0,0 +1,10 @@ +#include "v3/include/Action.hpp" + +void etcdv3::Action::waitForResponse() +{ + void* got_tag; + bool ok = false; + + cq_.Next(&got_tag, &ok); + GPR_ASSERT(got_tag == (void*)this); +} diff --git a/v3/src/AsyncCompareAndDeleteAction.cpp b/v3/src/AsyncCompareAndDeleteAction.cpp new file mode 100644 index 0000000..0cec36f --- /dev/null +++ b/v3/src/AsyncCompareAndDeleteAction.cpp @@ -0,0 +1,59 @@ +#include "v3/include/AsyncCompareAndDeleteAction.hpp" +#include "v3/include/action_constants.hpp" +#include "v3/include/Transaction.hpp" + +using etcdserverpb::Compare; +using etcdserverpb::RangeRequest; +using etcdserverpb::PutRequest; +using etcdserverpb::RequestOp; +using etcdserverpb::ResponseOp; +using etcdserverpb::TxnRequest; + +etcdv3::AsyncCompareAndDeleteAction::AsyncCompareAndDeleteAction(std::string const & key, std::string const & old_value, KV::Stub* stub_) +{ + etcdv3::Transaction transaction(key); + transaction.init_compare(old_value, Compare::CompareResult::Compare_CompareResult_EQUAL, + Compare::CompareTarget::Compare_CompareTarget_VALUE); + + transaction.setup_compare_and_delete_operation(key); + transaction.setup_basic_failure_operation(key); + + response_reader = stub_->AsyncTxn(&context, transaction.txn_request, &cq_); + response_reader->Finish(&reply, &status, (void*)this); +} + +etcdv3::AsyncCompareAndDeleteAction::AsyncCompareAndDeleteAction(std::string const & key, int old_index, KV::Stub* stub_) +{ + etcdv3::Transaction transaction(key); + transaction.init_compare(old_index, Compare::CompareResult::Compare_CompareResult_EQUAL, + Compare::CompareTarget::Compare_CompareTarget_MOD); + transaction.setup_compare_and_delete_operation(key); + transaction.setup_basic_failure_operation(key); + + response_reader = stub_->AsyncTxn(&context, transaction.txn_request, &cq_); + response_reader->Finish(&reply, &status, (void*)this); +} + +etcdv3::AsyncTxnResponse etcdv3::AsyncCompareAndDeleteAction::ParseResponse() +{ + AsyncTxnResponse txn_resp(reply); + if(!status.ok()) + { + txn_resp.error_code = status.error_code(); + txn_resp.error_message = status.error_message(); + } + else + { + txn_resp.ParseResponse(); + txn_resp.prev_values = txn_resp.values; + txn_resp.action = etcdv3::COMPAREDELETE_ACTION; + + if(!reply.succeeded()) + { + txn_resp.error_code=101; + txn_resp.error_message="Compare failed"; + } + } + + return txn_resp; +} diff --git a/v3/src/AsyncCompareAndSwapAction.cpp b/v3/src/AsyncCompareAndSwapAction.cpp new file mode 100644 index 0000000..809e8fe --- /dev/null +++ b/v3/src/AsyncCompareAndSwapAction.cpp @@ -0,0 +1,60 @@ +#include "v3/include/AsyncCompareAndSwapAction.hpp" +#include "v3/include/action_constants.hpp" +#include "v3/include/Transaction.hpp" + +using etcdserverpb::Compare; +using etcdserverpb::RangeRequest; +using etcdserverpb::PutRequest; +using etcdserverpb::RequestOp; +using etcdserverpb::ResponseOp; +using etcdserverpb::TxnRequest; + +etcdv3::AsyncCompareAndSwapAction::AsyncCompareAndSwapAction(std::string const & key, std::string const & value, std::string const & old_value, KV::Stub* stub_) +{ + etcdv3::Transaction transaction(key); + transaction.init_compare(old_value, Compare::CompareResult::Compare_CompareResult_EQUAL, + Compare::CompareTarget::Compare_CompareTarget_VALUE); + + transaction.setup_basic_failure_operation(key); + transaction.setup_compare_and_swap_sequence(value); + + response_reader = stub_->AsyncTxn(&context, transaction.txn_request, &cq_); + response_reader->Finish(&reply, &status, (void*)this); +} + +etcdv3::AsyncCompareAndSwapAction::AsyncCompareAndSwapAction(std::string const & key, std::string const & value, int old_index, KV::Stub* stub_) +{ + etcdv3::Transaction transaction(key); + transaction.init_compare(old_index, Compare::CompareResult::Compare_CompareResult_EQUAL, + Compare::CompareTarget::Compare_CompareTarget_MOD); + + transaction.setup_basic_failure_operation(key); + transaction.setup_compare_and_swap_sequence(value); + + response_reader = stub_->AsyncTxn(&context, transaction.txn_request, &cq_); + response_reader->Finish(&reply, &status, (void*)this); +} + +etcdv3::AsyncTxnResponse etcdv3::AsyncCompareAndSwapAction::ParseResponse() +{ + AsyncTxnResponse txn_resp(reply); + + if(!status.ok()) + { + txn_resp.error_code = status.error_code(); + txn_resp.error_message = status.error_message(); + } + else + { + txn_resp.ParseResponse(); + txn_resp.action = etcdv3::COMPARESWAP_ACTION; + + if(!reply.succeeded()) + { + txn_resp.error_code=101; + txn_resp.error_message="Compare failed"; + } + } + + return txn_resp; +} diff --git a/v3/src/AsyncDeleteAction.cpp b/v3/src/AsyncDeleteAction.cpp new file mode 100644 index 0000000..a257171 --- /dev/null +++ b/v3/src/AsyncDeleteAction.cpp @@ -0,0 +1,43 @@ +#include "v3/include/AsyncDeleteAction.hpp" +#include "v3/include/action_constants.hpp" +#include "v3/include/Transaction.hpp" + +using etcdserverpb::Compare; + +etcdv3::AsyncDeleteAction::AsyncDeleteAction(std::string const & key, KV::Stub* stub_, bool recursive) +{ + etcdv3::Transaction transaction(key); + transaction.init_compare(Compare::CompareResult::Compare_CompareResult_GREATER, + Compare::CompareTarget::Compare_CompareTarget_VERSION); + std::string range_end(key); + if(recursive) + { + int ascii = (int)range_end[range_end.length()-1]; + range_end.back() = ascii+1; + } + + transaction.setup_delete_sequence(key, range_end, recursive); + transaction.setup_delete_failure_operation(key, range_end, recursive); + + response_reader = stub_->AsyncTxn(&context, transaction.txn_request, &cq_); + response_reader->Finish(&reply, &status, (void*)this); +} + +etcdv3::AsyncTxnResponse etcdv3::AsyncDeleteAction::ParseResponse() +{ + AsyncTxnResponse txn_resp(reply); + + if(!status.ok()) + { + txn_resp.error_code = status.error_code(); + txn_resp.error_message = status.error_message(); + } + else + { + txn_resp.ParseResponse(); + txn_resp.prev_values = txn_resp.values; + txn_resp.action = etcdv3::DELETE_ACTION; + } + + return txn_resp; +} diff --git a/v3/src/AsyncGetAction.cpp b/v3/src/AsyncGetAction.cpp new file mode 100644 index 0000000..c187f1d --- /dev/null +++ b/v3/src/AsyncGetAction.cpp @@ -0,0 +1,41 @@ +#include "v3/include/AsyncGetAction.hpp" +#include "v3/include/action_constants.hpp" + +using etcdserverpb::RangeRequest; + +etcdv3::AsyncGetAction::AsyncGetAction(std::string const & key, KV::Stub* stub_, bool withPrefix) +{ + RangeRequest get_request; + get_request.set_key(key); + if(withPrefix) + { + std::string range_end(key); + int ascii = (int)range_end[range_end.length()-1]; + range_end.back() = ascii+1; + + get_request.set_range_end(range_end); + get_request.set_sort_target(RangeRequest::SortTarget::RangeRequest_SortTarget_KEY); + get_request.set_sort_order(RangeRequest::SortOrder::RangeRequest_SortOrder_ASCEND); + } + + response_reader = stub_->AsyncRange(&context,get_request,&cq_); + response_reader->Finish(&reply, &status, (void*)this); +} + +etcdv3::AsyncRangeResponse etcdv3::AsyncGetAction::ParseResponse() +{ + AsyncRangeResponse range_resp(reply); + + if(!status.ok()) + { + range_resp.error_code = status.error_code(); + range_resp.error_message = status.error_message(); + } + else + { + range_resp.ParseResponse(); + range_resp.action = etcdv3::GET_ACTION; + } + + return range_resp; +} diff --git a/v3/src/AsyncRangeResponse.cpp b/v3/src/AsyncRangeResponse.cpp index 60d197c..62ba9bb 100644 --- a/v3/src/AsyncRangeResponse.cpp +++ b/v3/src/AsyncRangeResponse.cpp @@ -1,4 +1,10 @@ #include "v3/include/AsyncRangeResponse.hpp" +#include "v3/include/action_constants.hpp" + +etcdv3::AsyncRangeResponse::AsyncRangeResponse(RangeResponse& resp) +{ + reply = resp; +} etcdv3::AsyncRangeResponse::AsyncRangeResponse(const etcdv3::AsyncRangeResponse& other) { @@ -22,37 +28,17 @@ etcdv3::AsyncRangeResponse& etcdv3::AsyncRangeResponse::operator=(const etcdv3:: return *this; } -void etcdv3::AsyncRangeResponse::waitForResponse() -{ - void* got_tag; - bool ok = false; - - cq_.Next(&got_tag, &ok); - GPR_ASSERT(got_tag == (void*)this); -} - -etcdv3::AsyncRangeResponse& etcdv3::AsyncRangeResponse::ParseResponse() +void etcdv3::AsyncRangeResponse::ParseResponse() { index = reply.header().revision(); - if(!status.ok()) + if(reply.kvs_size() == 0) { - error_code = status.error_code(); - error_message = status.error_message(); + error_code=100; + error_message="Key not found"; } - else + + for(int index=0; index < reply.kvs_size(); index++) { - - if(reply.kvs_size() == 0) - { - error_code=100; - error_message="Key not found"; - } - - for(int index=0; index < reply.kvs_size(); index++) - { - values.push_back(reply.kvs(index)); - } + values.push_back(reply.kvs(index)); } - index = reply.header().revision(); - return *this; } diff --git a/v3/src/AsyncSetAction.cpp b/v3/src/AsyncSetAction.cpp new file mode 100644 index 0000000..d0d8276 --- /dev/null +++ b/v3/src/AsyncSetAction.cpp @@ -0,0 +1,57 @@ +#include "v3/include/AsyncSetAction.hpp" +#include "v3/include/AsyncRangeResponse.hpp" +#include "v3/include/action_constants.hpp" +#include "v3/include/Transaction.hpp" + +using etcdserverpb::Compare; +using etcdserverpb::RangeRequest; +using etcdserverpb::PutRequest; +using etcdserverpb::RequestOp; +using etcdserverpb::ResponseOp; +using etcdserverpb::TxnRequest; + +etcdv3::AsyncSetAction::AsyncSetAction(std::string const & key, std::string const & value, KV::Stub* stub_, bool create) +{ + etcdv3::Transaction transaction(key); + isCreate = create; + if(create) + { + transaction.init_compare(Compare::CompareResult::Compare_CompareResult_EQUAL, + Compare::CompareTarget::Compare_CompareTarget_VERSION); + transaction.setup_basic_failure_operation(key); + transaction.setup_basic_create_sequence(key, value); + } + else + { + transaction.init_compare(Compare::CompareResult::Compare_CompareResult_EQUAL, + Compare::CompareTarget::Compare_CompareTarget_VERSION); + transaction.setup_set_failure_operation(key, value); + transaction.setup_basic_create_sequence(key, value); + } + response_reader = stub_->AsyncTxn(&context, transaction.txn_request, &cq_); + response_reader->Finish(&reply, &status, (void*)this); +} + +etcdv3::AsyncTxnResponse etcdv3::AsyncSetAction::ParseResponse() +{ + + AsyncTxnResponse txn_resp(reply); + + if(!status.ok()) + { + txn_resp.error_code = status.error_code(); + txn_resp.error_message = status.error_message(); + } + else + { + txn_resp.ParseResponse(); + txn_resp.action = isCreate? etcdv3::CREATE_ACTION:etcdv3::SET_ACTION; + + if(!reply.succeeded() && txn_resp.action == etcdv3::CREATE_ACTION) + { + txn_resp.error_code=105; + txn_resp.error_message="Key already exists"; + } + } + return txn_resp; +} diff --git a/v3/src/AsyncTxnResponse.cpp b/v3/src/AsyncTxnResponse.cpp index 86d813c..d27490f 100644 --- a/v3/src/AsyncTxnResponse.cpp +++ b/v3/src/AsyncTxnResponse.cpp @@ -1,8 +1,13 @@ #include "v3/include/AsyncTxnResponse.hpp" #include "v3/include/AsyncRangeResponse.hpp" +#include "v3/include/action_constants.hpp" using etcdserverpb::ResponseOp; +etcdv3::AsyncTxnResponse::AsyncTxnResponse(TxnResponse& resp) +{ + reply = resp; +} etcdv3::AsyncTxnResponse::AsyncTxnResponse(const etcdv3::AsyncTxnResponse& other) { @@ -12,7 +17,6 @@ etcdv3::AsyncTxnResponse::AsyncTxnResponse(const etcdv3::AsyncTxnResponse& other action = other.action; values = other.values; prev_values = other.prev_values; - } etcdv3::AsyncTxnResponse& etcdv3::AsyncTxnResponse::operator=(const etcdv3::AsyncTxnResponse& other) @@ -26,78 +30,30 @@ etcdv3::AsyncTxnResponse& etcdv3::AsyncTxnResponse::operator=(const etcdv3::Asyn return *this; } -void etcdv3::AsyncTxnResponse::waitForResponse() +void etcdv3::AsyncTxnResponse::ParseResponse() { - void* got_tag; - bool ok = false; - - cq_.Next(&got_tag, &ok); - GPR_ASSERT(got_tag == (void*)this); -} - -etcdv3::AsyncTxnResponse& etcdv3::AsyncTxnResponse::ParseResponse() -{ - index = reply.header().revision(); - if(!status.ok()) + std::vector range_kvs; + std::vector prev_range_kvs; + for(int index=0; index < reply.responses_size(); index++) { - error_code = status.error_code(); - error_message = status.error_message(); - } - else - { - std::vector range_kvs; - std::vector prev_range_kvs; - for(int index=0; index < reply.responses_size(); index++) + auto resp = reply.responses(index); + if(ResponseOp::ResponseCase::kResponseRange == resp.response_case()) { - auto resp = reply.responses(index); - if(ResponseOp::ResponseCase::kResponseRange == resp.response_case()) - { - AsyncRangeResponse response; - response.reply = resp.response_range(); - auto v3resp = response.ParseResponse(); + AsyncRangeResponse response(*(resp.mutable_response_range())); + response.ParseResponse(); - error_code = v3resp.error_code; - error_message = v3resp.error_message; + error_code = response.error_code; + error_message = response.error_message; - if(!v3resp.values.empty()) - { - prev_range_kvs=range_kvs; - range_kvs = v3resp.values; - } - } - else if(ResponseOp::ResponseCase::kResponseDeleteRange == resp.response_case()) + if(!response.values.empty()) { - //do nothing yet + prev_range_kvs=range_kvs; + range_kvs = response.values; } } - - if(!reply.succeeded()) - { - if(action == "create") - { - error_code=105; - error_message="Key already exists"; - } - else if(action == "compareAndSwap" || action == "compareAndDelete") - { - if(!error_code) - { - error_code=101; - error_message="Compare failed"; - } - } - } - - prev_values = prev_range_kvs; - - values = range_kvs; - - if(action == "delete" || action == "compareAndDelete") - { - prev_values = values; - } - - } - return *this; + } + prev_values = prev_range_kvs; + values = range_kvs; + } diff --git a/v3/src/AsyncUpdateAction.cpp b/v3/src/AsyncUpdateAction.cpp new file mode 100644 index 0000000..80190b7 --- /dev/null +++ b/v3/src/AsyncUpdateAction.cpp @@ -0,0 +1,41 @@ +#include "v3/include/AsyncUpdateAction.hpp" +#include "v3/include/AsyncRangeResponse.hpp" +#include "v3/include/action_constants.hpp" +#include "v3/include/Transaction.hpp" + +using etcdserverpb::Compare; +using etcdserverpb::RangeRequest; +using etcdserverpb::PutRequest; +using etcdserverpb::RequestOp; +using etcdserverpb::ResponseOp; +using etcdserverpb::TxnRequest; + +etcdv3::AsyncUpdateAction::AsyncUpdateAction(std::string const & key, std::string const & value, KV::Stub* stub_) +{ + etcdv3::Transaction transaction(key); + transaction.init_compare(Compare::CompareResult::Compare_CompareResult_GREATER, + Compare::CompareTarget::Compare_CompareTarget_VERSION); + + transaction.setup_basic_failure_operation(key); + transaction.setup_compare_and_swap_sequence(value); + + response_reader = stub_->AsyncTxn(&context, transaction.txn_request, &cq_); + response_reader->Finish(&reply, &status, (void*)this); +} + +etcdv3::AsyncTxnResponse etcdv3::AsyncUpdateAction::ParseResponse() +{ + AsyncTxnResponse txn_resp(reply); + + if(!status.ok()) + { + txn_resp.error_code = status.error_code(); + txn_resp.error_message = status.error_message(); + } + else + { + txn_resp.ParseResponse(); + txn_resp.action = etcdv3::UPDATE_ACTION; + } + return txn_resp; +} diff --git a/v3/src/AsyncWatchAction.cpp b/v3/src/AsyncWatchAction.cpp new file mode 100644 index 0000000..f8b9baa --- /dev/null +++ b/v3/src/AsyncWatchAction.cpp @@ -0,0 +1,91 @@ +#include "v3/include/AsyncWatchAction.hpp" +#include "v3/include/action_constants.hpp" + +using etcdserverpb::RangeRequest; +using etcdserverpb::RangeResponse; +using etcdserverpb::WatchCreateRequest; + +etcdv3::AsyncWatchAction::AsyncWatchAction(std::string const & key, bool recursive, KV::Stub* stub_, Watch::Stub* watchServiceStub) +{ + stream = watchServiceStub->AsyncWatch(&context,&cq_,(void*)this); + + WatchRequest watch_req; + WatchCreateRequest watch_create_req; + watch_create_req.set_key(key); + + std::string range_end(key); + if(recursive) + { + int ascii = (int)range_end[range_end.length()-1]; + range_end.back() = ascii+1; + watch_create_req.set_range_end(range_end); + } + + watch_req.mutable_create_request()->CopyFrom(watch_create_req); + stream->Write(watch_req, (void*)this); + stub_ = stub_; + +} + +etcdv3::AsyncWatchAction::AsyncWatchAction(std::string const & key, int fromIndex, bool recursive, KV::Stub* stub_, Watch::Stub* watchServiceStub) +{ + stream = watchServiceStub->AsyncWatch(&context,&cq_,(void*)this); + + WatchRequest watch_req; + WatchCreateRequest watch_create_req; + watch_create_req.set_key(key); + watch_create_req.set_start_revision(fromIndex); + + std::string range_end(key); + if(recursive) + { + int ascii = (int)range_end[range_end.length()-1]; + range_end.back() = ascii+1; + watch_create_req.set_range_end(range_end); + } + + + watch_req.mutable_create_request()->CopyFrom(watch_create_req); + stream->Write(watch_req, (void*)this); + stub_ = stub_; +} + +void etcdv3::AsyncWatchAction::waitForResponse() +{ + void* got_tag; + bool ok = false; + + stream->Read(&reply, (void*)3); + while(cq_.Next(&got_tag, &ok)) + { + if(got_tag == (void*)3) + { + if(reply.events_size()) + { + stream->WritesDone((void*)this); + cq_.Next(&got_tag, &ok); + break; + } + else + { + stream->Read(&reply, (void*)3); + } + } + } +} + +etcdv3::AsyncWatchResponse etcdv3::AsyncWatchAction::ParseResponse() +{ + + AsyncWatchResponse watch_resp(reply); + if(!status.ok()) + { + watch_resp.error_code = status.error_code(); + watch_resp.error_message = status.error_message(); + } + else + { + watch_resp.ParseResponse(); + } + return watch_resp; +} diff --git a/v3/src/AsyncWatchResponse.cpp b/v3/src/AsyncWatchResponse.cpp index 9fc55b9..92a0f66 100644 --- a/v3/src/AsyncWatchResponse.cpp +++ b/v3/src/AsyncWatchResponse.cpp @@ -1,8 +1,13 @@ #include "v3/include/AsyncWatchResponse.hpp" +#include "v3/include/action_constants.hpp" using etcdserverpb::RangeRequest; using etcdserverpb::RangeResponse; +etcdv3::AsyncWatchResponse::AsyncWatchResponse(WatchResponse& resp) +{ + reply = resp; +} etcdv3::AsyncWatchResponse::AsyncWatchResponse(const etcdv3::AsyncWatchResponse& other) { @@ -26,38 +31,10 @@ etcdv3::AsyncWatchResponse& etcdv3::AsyncWatchResponse::operator=(const etcdv3:: return *this; } -void etcdv3::AsyncWatchResponse::waitForResponse() -{ - void* got_tag; - bool ok = false; - - stream->Read(&reply, (void*)3); - while(cq_.Next(&got_tag, &ok)) - { - if(got_tag == (void*)3) - { - if(reply.events_size()) - { - stream->WritesDone((void*)this); - cq_.Next(&got_tag, &ok); - break; - } - else - { - stream->Read(&reply, (void*)3); - } - } - } -} - -etcdv3::AsyncWatchResponse& etcdv3::AsyncWatchResponse::ParseResponse() +void etcdv3::AsyncWatchResponse::ParseResponse() { index = reply.header().revision(); - - mvccpb::KeyValue kv; - std::map mapValue; - std::map prev_mapValue; - + std::map mapValue; for(int cnt =0; cnt < reply.events_size(); cnt++) { auto event = reply.events(cnt); @@ -65,48 +42,26 @@ etcdv3::AsyncWatchResponse& etcdv3::AsyncWatchResponse::ParseResponse() { if(event.has_kv()) { - kv = event.kv(); + auto kv = event.kv(); if(kv.version() == 1) { - action = "create"; + action = etcdv3::CREATE_ACTION; } else { - action = "set"; + action = etcdv3::SET_ACTION; } mapValue.emplace(kv.key(), kv); } } else if(mvccpb::Event::EventType::Event_EventType_DELETE == event.type()) { - action = "delete"; + action = etcdv3::DELETE_ACTION; } - //get previous value index - 1 - RangeRequest get_request; - get_request.set_key(kv.key()); - get_request.set_revision((fromIndex >=0)?fromIndex - 1:index-1); - - RangeResponse response; - ClientContext ctx; - - Status result = stub_->Range(&ctx, get_request, &response); - if (result.ok()) - { - for(int cnt=0; cnt < response.kvs_size(); cnt++) - { - prev_mapValue.emplace(response.kvs(cnt).key(),response.kvs(cnt)); - } - } + } - for(auto x: prev_mapValue) - { - prev_values.push_back(x.second); - } - for(auto x: mapValue) { values.push_back(x.second); } - - return *this; } diff --git a/v3/src/action_constants.cpp b/v3/src/action_constants.cpp new file mode 100644 index 0000000..56e43f4 --- /dev/null +++ b/v3/src/action_constants.cpp @@ -0,0 +1,10 @@ +#include "v3/include/action_constants.hpp" + +char const * etcdv3::CREATE_ACTION = "create"; +char const * etcdv3::COMPARESWAP_ACTION = "compareAndSwap"; +char const * etcdv3::UPDATE_ACTION = "update"; +char const * etcdv3::SET_ACTION = "set"; +char const * etcdv3::GET_ACTION = "get"; +char const * etcdv3::DELETE_ACTION = "delete"; +char const * etcdv3::COMPAREDELETE_ACTION = "compareAndDelete"; +