diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index f060a13..edb6a79 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1,4 +1,4 @@ -add_library(etcd-cpp-api SHARED ../proto/kv.pb.cc ../proto/auth.pb.cc ../proto/rpc.pb.cc ../proto/rpc.grpc.pb.cc ../v3/src/AsyncTxnResponse.cpp ../v3/src/AsyncRangeResponse.cpp ../v3/src/Transaction.cpp ../v3/src/action_constants.cpp ../v3/src/AsyncSetAction.cpp ../v3/src/AsyncCompareAndSwapAction.cpp ../v3/src/AsyncUpdateAction.cpp ../v3/src/AsyncGetAction.cpp ../v3/src/AsyncDeleteAction.cpp ../v3/src/AsyncCompareAndDeleteAction.cpp ../v3/src/Action.cpp ../v3/src/AsyncWatchAction.cpp Client.cpp Response.cpp Value.cpp SyncClient.cpp Watcher.cpp ../v3/src/AsyncWatchResponse.cpp) +add_library(etcd-cpp-api SHARED ../proto/kv.pb.cc ../proto/auth.pb.cc ../proto/rpc.pb.cc ../proto/rpc.grpc.pb.cc ../v3/src/AsyncTxnResponse.cpp ../v3/src/AsyncRangeResponse.cpp ../v3/src/Transaction.cpp ../v3/src/action_constants.cpp ../v3/src/AsyncSetAction.cpp ../v3/src/AsyncCompareAndSwapAction.cpp ../v3/src/AsyncUpdateAction.cpp ../v3/src/AsyncGetAction.cpp ../v3/src/AsyncDeleteAction.cpp ../v3/src/AsyncCompareAndDeleteAction.cpp ../v3/src/Action.cpp ../v3/src/AsyncWatchAction.cpp ../v3/src/V3Response.cpp ../v3/src/AsyncDeleteRangeResponse.cpp ../v3/src/AsyncWatchResponse.cpp Client.cpp Response.cpp Value.cpp SyncClient.cpp Watcher.cpp) set_property(TARGET etcd-cpp-api PROPERTY CXX_STANDARD 11) target_link_libraries(etcd-cpp-api ${CPPREST_LIB} boost_system ssl crypto protobuf grpc++) diff --git a/src/Client.cpp b/src/Client.cpp index 66aec19..fd98487 100644 --- a/src/Client.cpp +++ b/src/Client.cpp @@ -4,6 +4,7 @@ #include "v3/include/AsyncTxnResponse.hpp" #include "v3/include/AsyncRangeResponse.hpp" #include "v3/include/AsyncWatchResponse.hpp" +#include "v3/include/AsyncDeleteRangeResponse.hpp" #include "v3/include/Transaction.hpp" #include diff --git a/src/Response.cpp b/src/Response.cpp index f5d074b..0b0079a 100644 --- a/src/Response.cpp +++ b/src/Response.cpp @@ -6,11 +6,13 @@ etcd::Response::Response(const etcdv3::V3Response& reply) { _index = reply.index; + _action = reply.action; _error_code = reply.error_code; _error_message = reply.error_message; - _action = reply.action; int size = reply.values.size(); - if(reply.isPrefix) + //with prefix means that we expect that + //values could have at least one result(e.g. ls, rmdir) + if(size) { for(int index = 0; index < size; index++) { @@ -18,15 +20,15 @@ etcd::Response::Response(const etcdv3::V3Response& reply) _keys.push_back(reply.values[index].key()); } } - else if(size == 1) + //values where we expect that + // at most one result.(e.g. set, add, modify, rm, watch) + else { - _value = Value(reply.values[0]); - } - - if(reply.prev_values.size() == 1) - { - _prev_value = Value(reply.prev_values[0]); + _value = Value(reply.value); } + + _prev_value = Value(reply.prev_value); + } diff --git a/tst/EtcdTest.cpp b/tst/EtcdTest.cpp index c6e64e1..f8ff1ea 100644 --- a/tst/EtcdTest.cpp +++ b/tst/EtcdTest.cpp @@ -107,9 +107,24 @@ TEST_CASE("atomic compare-and-swap") TEST_CASE("delete a value") { etcd::Client etcd("http://127.0.0.1:2379"); - etcd::Response resp = etcd.rm("/test/key1").get(); + etcd::Response resp = etcd.rm("/test/key11111").get(); + CHECK(!resp.is_ok()); + CHECK(100 == resp.error_code()); + CHECK("Key not found" == resp.error_message()); + + int index = etcd.get("/test/key1").get().index(); + int create_index = etcd.get("/test/key1").get().value().created_index(); + int modify_index = etcd.get("/test/key1").get().value().modified_index(); + resp = etcd.rm("/test/key1").get(); CHECK("43" == resp.prev_value().as_string()); + CHECK( "/test/key1" == resp.prev_value().key()); + CHECK( create_index == resp.prev_value().created_index()); + CHECK( modify_index == resp.prev_value().modified_index()); CHECK("delete" == resp.action()); + CHECK( resp.index() == resp.value().modified_index()); + CHECK( create_index == resp.value().created_index()); + CHECK("" == resp.value().as_string()); + CHECK( "/test/key1" == resp.value().key()); } TEST_CASE("atomic compare-and-delete based on prevValue") @@ -224,10 +239,42 @@ TEST_CASE("delete a directory") { etcd::Client etcd("http://127.0.0.1:2379"); //CHECK(108 == etcd.rmdir("/test/new_dir").get().error_code()); // Directory not empty - CHECK(0 == etcd.rmdir("/test/new_dir", true).get().error_code()); -} + etcd::Response resp = etcd.ls("/test/new_dir").get(); + //get the lowest created index + std::vector myset; + for(unsigned int cnt=0; cnt < resp.values().size(); cnt++) + { + myset.push_back(resp.value(cnt).created_index()); + } + std::sort(myset.begin(),myset.end()); + + //get the latest modified index + std::vector myset1; + for(unsigned int cnt=0; cnt < resp.values().size(); cnt++) + { + myset1.push_back(resp.value(cnt).modified_index()); + } + std::sort(myset1.begin(),myset1.end()); + resp = etcd.rmdir("/test/new_dir", true).get(); + int index = resp.index(); + + CHECK("" == resp.prev_value().as_string()); + CHECK( myset[0] == resp.prev_value().created_index()); + CHECK( myset1[myset1.size() - 1] == resp.prev_value().modified_index()); + CHECK( "/test/new_dir" == resp.prev_value().key()); + CHECK("delete" == resp.action()); + CHECK( index == resp.value().modified_index()); + CHECK( myset[0] == resp.value().created_index()); + CHECK("" == resp.value().as_string()); + + resp = etcd.rmdir("/test/dirnotfound", true).get(); + CHECK(!resp.is_ok()); + CHECK(100 == resp.error_code()); + CHECK("Key not found" == resp.error_message()); +} + TEST_CASE("wait for a value change") { etcd::Client etcd("http://127.0.0.1:2379"); diff --git a/tst/WatcherTest.cpp b/tst/WatcherTest.cpp index 1ed3bda..f0d8043 100644 --- a/tst/WatcherTest.cpp +++ b/tst/WatcherTest.cpp @@ -15,6 +15,7 @@ void printResponse(etcd::Response const & resp) else { std::cout << resp.action() << " " << resp.value().as_string() << std::endl; + std::cout << "Previous value: " << resp.prev_value().as_string() << std::endl; } } @@ -29,13 +30,15 @@ TEST_CASE("create watcher with cancel") sleep(1); etcd.set("/test/key", "42"); etcd.set("/test/key", "43"); + etcd.rm("/test/key"); + etcd.set("/test/key", "44"); sleep(1); - CHECK(2 == watcher_called); + CHECK(4 == watcher_called); watcher.Cancel(); etcd.set("/test/key", "50"); etcd.set("/test/key", "51"); sleep(1); - CHECK(2 == watcher_called); + CHECK(4 == watcher_called); etcd.rmdir("/test", true); @@ -140,5 +143,5 @@ TEST_CASE("create watcher") // std::cout << "std::exception: " << ex.what() << "\n"; // } // } - etcd.rmdir("/test", true).error_code(); + etcd.rmdir("/test", true).error_code(); } diff --git a/v3/include/Action.hpp b/v3/include/Action.hpp index 31aa09e..29ace6b 100644 --- a/v3/include/Action.hpp +++ b/v3/include/Action.hpp @@ -33,14 +33,16 @@ namespace etcdv3 class Action { - public: + public: Action(etcdv3::ActionParameters params); Action(){}; + void waitForResponse(); + protected: Status status; ClientContext context; CompletionQueue cq_; etcdv3::ActionParameters parameters; - void waitForResponse(); + }; } #endif diff --git a/v3/include/AsyncCompareAndDeleteAction.hpp b/v3/include/AsyncCompareAndDeleteAction.hpp index fceddbe..9446a9c 100644 --- a/v3/include/AsyncCompareAndDeleteAction.hpp +++ b/v3/include/AsyncCompareAndDeleteAction.hpp @@ -18,6 +18,7 @@ namespace etcdv3 public: AsyncCompareAndDeleteAction(etcdv3::ActionParameters param, etcdv3::Atomicity_Type type); AsyncTxnResponse ParseResponse(); + private: TxnResponse reply; std::unique_ptr> response_reader; }; diff --git a/v3/include/AsyncCompareAndSwapAction.hpp b/v3/include/AsyncCompareAndSwapAction.hpp index 41a308a..c1bc240 100644 --- a/v3/include/AsyncCompareAndSwapAction.hpp +++ b/v3/include/AsyncCompareAndSwapAction.hpp @@ -18,6 +18,7 @@ namespace etcdv3 public: AsyncCompareAndSwapAction(etcdv3::ActionParameters param, etcdv3::Atomicity_Type type); AsyncTxnResponse ParseResponse(); + private: TxnResponse reply; std::unique_ptr> response_reader; }; diff --git a/v3/include/AsyncDeleteAction.hpp b/v3/include/AsyncDeleteAction.hpp index ac33d26..06339ff 100644 --- a/v3/include/AsyncDeleteAction.hpp +++ b/v3/include/AsyncDeleteAction.hpp @@ -4,12 +4,11 @@ #include #include "proto/rpc.grpc.pb.h" #include "v3/include/Action.hpp" -#include "v3/include/AsyncTxnResponse.hpp" +#include "v3/include/AsyncDeleteRangeResponse.hpp" using grpc::ClientAsyncResponseReader; -using etcdserverpb::TxnResponse; -using etcdserverpb::KV; +using etcdserverpb::DeleteRangeResponse; namespace etcdv3 { @@ -17,9 +16,10 @@ namespace etcdv3 { public: AsyncDeleteAction(etcdv3::ActionParameters param); - AsyncTxnResponse ParseResponse(); - TxnResponse reply; - std::unique_ptr> response_reader; + AsyncDeleteRangeResponse ParseResponse(); + private: + DeleteRangeResponse reply; + std::unique_ptr> response_reader; }; } diff --git a/v3/include/AsyncDeleteRangeResponse.hpp b/v3/include/AsyncDeleteRangeResponse.hpp new file mode 100644 index 0000000..3726439 --- /dev/null +++ b/v3/include/AsyncDeleteRangeResponse.hpp @@ -0,0 +1,23 @@ +#ifndef __ASYNC_DELETERESPONSE_HPP__ +#define __ASYNC_DELETERESPONSE_HPP__ + +#include +#include "proto/rpc.grpc.pb.h" +#include "v3/include/V3Response.hpp" +#include "v3/include/Action.hpp" + + +using grpc::ClientAsyncResponseReader; +using etcdserverpb::DeleteRangeResponse; + +namespace etcdv3 +{ + class AsyncDeleteRangeResponse : public etcdv3::V3Response + { + public: + AsyncDeleteRangeResponse(){}; + void ParseResponse(std::string const& key, bool prefix, DeleteRangeResponse& resp); + }; +} + +#endif diff --git a/v3/include/AsyncGetAction.hpp b/v3/include/AsyncGetAction.hpp index 5c6a30b..a1f1905 100644 --- a/v3/include/AsyncGetAction.hpp +++ b/v3/include/AsyncGetAction.hpp @@ -9,7 +9,6 @@ using grpc::ClientAsyncResponseReader; using etcdserverpb::RangeResponse; -using etcdserverpb::KV; namespace etcdv3 { @@ -18,6 +17,7 @@ namespace etcdv3 public: AsyncGetAction(etcdv3::ActionParameters param); AsyncRangeResponse ParseResponse(); + private: RangeResponse reply; std::unique_ptr> response_reader; }; diff --git a/v3/include/AsyncRangeResponse.hpp b/v3/include/AsyncRangeResponse.hpp index a9ebe6b..3619671 100644 --- a/v3/include/AsyncRangeResponse.hpp +++ b/v3/include/AsyncRangeResponse.hpp @@ -14,11 +14,8 @@ namespace etcdv3 class AsyncRangeResponse : public etcdv3::V3Response { public: - AsyncRangeResponse(RangeResponse& resp); - AsyncRangeResponse(const AsyncRangeResponse& other); - AsyncRangeResponse& operator=(const AsyncRangeResponse& other); - void ParseResponse(); - RangeResponse reply; + AsyncRangeResponse(){}; + void ParseResponse(RangeResponse& resp, bool prefix=false); }; } diff --git a/v3/include/AsyncSetAction.hpp b/v3/include/AsyncSetAction.hpp index e191b2f..4f4d437 100644 --- a/v3/include/AsyncSetAction.hpp +++ b/v3/include/AsyncSetAction.hpp @@ -16,8 +16,9 @@ namespace etcdv3 class AsyncSetAction : public etcdv3::Action { public: - AsyncSetAction(etcdv3::ActionParameters param, bool isCreate=false); + AsyncSetAction(etcdv3::ActionParameters param, bool create=false); AsyncTxnResponse ParseResponse(); + private: TxnResponse reply; std::unique_ptr> response_reader; bool isCreate; diff --git a/v3/include/AsyncTxnResponse.hpp b/v3/include/AsyncTxnResponse.hpp index 7019930..4a2764f 100644 --- a/v3/include/AsyncTxnResponse.hpp +++ b/v3/include/AsyncTxnResponse.hpp @@ -11,11 +11,8 @@ namespace etcdv3 class AsyncTxnResponse : public etcdv3::V3Response { public: - AsyncTxnResponse(TxnResponse& resp); - AsyncTxnResponse& operator=(const AsyncTxnResponse& other); - AsyncTxnResponse(const AsyncTxnResponse& other); - void ParseResponse(); - TxnResponse reply; + AsyncTxnResponse(){}; + void ParseResponse(std::string const& key, bool prefix,TxnResponse& resp); }; } diff --git a/v3/include/AsyncUpdateAction.hpp b/v3/include/AsyncUpdateAction.hpp index 20eacbe..97d17fc 100644 --- a/v3/include/AsyncUpdateAction.hpp +++ b/v3/include/AsyncUpdateAction.hpp @@ -18,6 +18,7 @@ namespace etcdv3 public: AsyncUpdateAction(etcdv3::ActionParameters param); AsyncTxnResponse ParseResponse(); + private: TxnResponse reply; std::unique_ptr> response_reader; }; diff --git a/v3/include/AsyncWatchAction.hpp b/v3/include/AsyncWatchAction.hpp index fdad665..4e13ac7 100644 --- a/v3/include/AsyncWatchAction.hpp +++ b/v3/include/AsyncWatchAction.hpp @@ -24,8 +24,8 @@ namespace etcdv3 void waitForResponse(std::function callback); void CancelWatch(); void WatchReq(std::string const & key); + private: WatchResponse reply; - KV::Stub* kv_stub; std::unique_ptr> stream; bool isCancelled; }; diff --git a/v3/include/AsyncWatchResponse.hpp b/v3/include/AsyncWatchResponse.hpp index ac3a633..32a2a71 100644 --- a/v3/include/AsyncWatchResponse.hpp +++ b/v3/include/AsyncWatchResponse.hpp @@ -16,11 +16,8 @@ namespace etcdv3 class AsyncWatchResponse : public etcdv3::V3Response { public: - AsyncWatchResponse(WatchResponse& resp); - AsyncWatchResponse(const AsyncWatchResponse& other); - AsyncWatchResponse& operator=(const AsyncWatchResponse& other); - void ParseResponse(); - WatchResponse reply; + AsyncWatchResponse(){}; + void ParseResponse(WatchResponse& resp); }; } diff --git a/v3/include/IResponse.hpp b/v3/include/IResponse.hpp deleted file mode 100644 index 0514ff6..0000000 --- a/v3/include/IResponse.hpp +++ /dev/null @@ -1,9 +0,0 @@ -#ifndef __I_RESPONSE_HPP__ -#define __I_RESPONSE_HPP__ - -namespace etcdv3 -{ - class IResponse - { - }; -} diff --git a/v3/include/V3Response.hpp b/v3/include/V3Response.hpp index 33c39c2..45e8863 100644 --- a/v3/include/V3Response.hpp +++ b/v3/include/V3Response.hpp @@ -10,11 +10,18 @@ namespace etcdv3 { public: V3Response(): error_code(0), index(0), isPrefix(false) {}; + void set_error_code(int code); + void set_error_message(std::string msg); + void set_action(std::string action); + std::vector get_kv_values(); + std::vector get_prev_kv_values(); int error_code; int index; bool isPrefix; std::string error_message; std::string action; + mvccpb::KeyValue value; + mvccpb::KeyValue prev_value; std::vector values; std::vector prev_values; }; diff --git a/v3/src/Action.cpp b/v3/src/Action.cpp index 265798c..392954b 100644 --- a/v3/src/Action.cpp +++ b/v3/src/Action.cpp @@ -13,3 +13,4 @@ void etcdv3::Action::waitForResponse() cq_.Next(&got_tag, &ok); GPR_ASSERT(got_tag == (void*)this); } + diff --git a/v3/src/AsyncCompareAndDeleteAction.cpp b/v3/src/AsyncCompareAndDeleteAction.cpp index cf7ff8a..f1ef4fb 100644 --- a/v3/src/AsyncCompareAndDeleteAction.cpp +++ b/v3/src/AsyncCompareAndDeleteAction.cpp @@ -33,7 +33,7 @@ etcdv3::AsyncCompareAndDeleteAction::AsyncCompareAndDeleteAction(etcdv3::ActionP etcdv3::AsyncTxnResponse etcdv3::AsyncCompareAndDeleteAction::ParseResponse() { - AsyncTxnResponse txn_resp(reply); + AsyncTxnResponse txn_resp; if(!status.ok()) { txn_resp.error_code = status.error_code(); @@ -41,7 +41,7 @@ etcdv3::AsyncTxnResponse etcdv3::AsyncCompareAndDeleteAction::ParseResponse() } else { - txn_resp.ParseResponse(); + txn_resp.ParseResponse(parameters.key, parameters.withPrefix, reply); txn_resp.prev_values = txn_resp.values; txn_resp.action = etcdv3::COMPAREDELETE_ACTION; diff --git a/v3/src/AsyncCompareAndSwapAction.cpp b/v3/src/AsyncCompareAndSwapAction.cpp index acca2ef..9e3098c 100644 --- a/v3/src/AsyncCompareAndSwapAction.cpp +++ b/v3/src/AsyncCompareAndSwapAction.cpp @@ -33,7 +33,7 @@ etcdv3::AsyncCompareAndSwapAction::AsyncCompareAndSwapAction(etcdv3::ActionParam etcdv3::AsyncTxnResponse etcdv3::AsyncCompareAndSwapAction::ParseResponse() { - AsyncTxnResponse txn_resp(reply); + AsyncTxnResponse txn_resp; if(!status.ok()) { @@ -42,7 +42,7 @@ etcdv3::AsyncTxnResponse etcdv3::AsyncCompareAndSwapAction::ParseResponse() } else { - txn_resp.ParseResponse(); + txn_resp.ParseResponse(parameters.key, parameters.withPrefix, reply); txn_resp.action = etcdv3::COMPARESWAP_ACTION; //if there is an error code returned by parseResponse, we must diff --git a/v3/src/AsyncDeleteAction.cpp b/v3/src/AsyncDeleteAction.cpp index 124edaa..5002c1b 100644 --- a/v3/src/AsyncDeleteAction.cpp +++ b/v3/src/AsyncDeleteAction.cpp @@ -1,44 +1,39 @@ #include "v3/include/AsyncDeleteAction.hpp" #include "v3/include/action_constants.hpp" -#include "v3/include/Transaction.hpp" -using etcdserverpb::Compare; +using etcdserverpb::DeleteRangeRequest; etcdv3::AsyncDeleteAction::AsyncDeleteAction(ActionParameters param) : etcdv3::Action(param) { - etcdv3::Transaction transaction(parameters.key); - transaction.init_compare(Compare::CompareResult::Compare_CompareResult_GREATER, - Compare::CompareTarget::Compare_CompareTarget_VERSION); + DeleteRangeRequest del_request; + del_request.set_key(parameters.key); + del_request.set_prev_kv(true); std::string range_end(parameters.key); if(parameters.withPrefix) { int ascii = (int)range_end[range_end.length()-1]; range_end.back() = ascii+1; + del_request.set_range_end(range_end); } - transaction.setup_delete_sequence(parameters.key, range_end, parameters.withPrefix); - transaction.setup_delete_failure_operation(parameters.key, range_end, parameters.withPrefix); - - response_reader = parameters.kv_stub->AsyncTxn(&context, transaction.txn_request, &cq_); + response_reader = parameters.kv_stub->AsyncDeleteRange(&context, del_request, &cq_); response_reader->Finish(&reply, &status, (void*)this); } -etcdv3::AsyncTxnResponse etcdv3::AsyncDeleteAction::ParseResponse() +etcdv3::AsyncDeleteRangeResponse etcdv3::AsyncDeleteAction::ParseResponse() { - AsyncTxnResponse txn_resp(reply); + AsyncDeleteRangeResponse del_resp; if(!status.ok()) { - txn_resp.error_code = status.error_code(); - txn_resp.error_message = status.error_message(); + del_resp.set_error_code(status.error_code()); + del_resp.set_error_message(status.error_message()); } else { - txn_resp.ParseResponse(); - txn_resp.prev_values = txn_resp.values; - txn_resp.action = etcdv3::DELETE_ACTION; + del_resp.ParseResponse(parameters.key, parameters.withPrefix, reply); } - return txn_resp; + return del_resp; } diff --git a/v3/src/AsyncDeleteRangeResponse.cpp b/v3/src/AsyncDeleteRangeResponse.cpp new file mode 100644 index 0000000..ee92b08 --- /dev/null +++ b/v3/src/AsyncDeleteRangeResponse.cpp @@ -0,0 +1,81 @@ +#include "v3/include/AsyncDeleteRangeResponse.hpp" +#include "v3/include/action_constants.hpp" + + +void etcdv3::AsyncDeleteRangeResponse::ParseResponse(std::string const& key, bool prefix, DeleteRangeResponse& resp) +{ + index = resp.header().revision(); + + if(resp.prev_kvs_size() == 0) + { + error_code=100; + error_message="Key not found"; + } + else + { + action = etcdv3::DELETE_ACTION; + //get all previous values + for(int cnt=0; cnt < resp.prev_kvs_size(); cnt++) + { + prev_values.push_back(resp.prev_kvs(cnt)); + } + + //copy previous vales to values. We will format this later + for(unsigned int cnt=0; cnt < prev_values.size(); cnt++) + { + auto temp_value = prev_values[cnt]; + temp_value.set_mod_revision(index); + temp_value.clear_value(); + values.push_back(temp_value); + } + + prev_value = prev_values[0]; + value = values[0]; + + //get all mod revisions of previous values + std::vector mod_index; + for(unsigned int cnt = 0; cnt < prev_values.size(); ++cnt) + { + mod_index.push_back(prev_values[cnt].mod_revision()); + } + //sort it ascending + std::sort(mod_index.begin(),mod_index.end()); + // use the latest mod index + prev_value.set_mod_revision(mod_index.back()); + + //get all created revision of previous values + std::vector create_index; + for(unsigned int cnt = 0; cnt < prev_values.size(); ++cnt) + { + create_index.push_back(prev_values[cnt].create_revision()); + } + //sort it ascending + std::sort(create_index.begin(),create_index.end()); + + //use earliest create index + prev_value.set_create_revision(create_index.front()); + + + + //value modified index should be the same as index + value.set_mod_revision(index); + //value created index should be the same as prev value created index + value.set_create_revision(prev_value.create_revision()); + + //set key.When prefix delete is done, we should use the prefix provided by + //client as the key + value.set_key(key); + prev_value.set_key(key); + + //clear the value + value.clear_value(); + + //if withPrefix, clear previous value also + if(prefix) + { + prev_value.clear_value(); + } + prev_values.clear(); + values.clear(); + } +} diff --git a/v3/src/AsyncGetAction.cpp b/v3/src/AsyncGetAction.cpp index 2b2b207..dcc8f59 100644 --- a/v3/src/AsyncGetAction.cpp +++ b/v3/src/AsyncGetAction.cpp @@ -17,26 +17,24 @@ etcdv3::AsyncGetAction::AsyncGetAction(etcdv3::ActionParameters param) get_request.set_range_end(range_end); get_request.set_sort_target(RangeRequest::SortTarget::RangeRequest_SortTarget_KEY); get_request.set_sort_order(RangeRequest::SortOrder::RangeRequest_SortOrder_ASCEND); - } - + } response_reader = parameters.kv_stub->AsyncRange(&context,get_request,&cq_); response_reader->Finish(&reply, &status, (void*)this); } etcdv3::AsyncRangeResponse etcdv3::AsyncGetAction::ParseResponse() { - AsyncRangeResponse range_resp(reply); - + AsyncRangeResponse range_resp; if(!status.ok()) { - range_resp.error_code = status.error_code(); - range_resp.error_message = status.error_message(); + range_resp.set_error_code(status.error_code()); + range_resp.set_error_message(status.error_message()); } else { - range_resp.ParseResponse(); - range_resp.action = etcdv3::GET_ACTION; - range_resp.isPrefix = parameters.withPrefix; + range_resp.ParseResponse(reply, parameters.withPrefix); + //range_resp.set_action(etcdv3::GET_ACTION); + //range_resp.isPrefix = parameters.withPrefix; } return range_resp; } diff --git a/v3/src/AsyncRangeResponse.cpp b/v3/src/AsyncRangeResponse.cpp index 62ba9bb..060f459 100644 --- a/v3/src/AsyncRangeResponse.cpp +++ b/v3/src/AsyncRangeResponse.cpp @@ -1,44 +1,28 @@ #include "v3/include/AsyncRangeResponse.hpp" #include "v3/include/action_constants.hpp" -etcdv3::AsyncRangeResponse::AsyncRangeResponse(RangeResponse& resp) -{ - reply = resp; -} -etcdv3::AsyncRangeResponse::AsyncRangeResponse(const etcdv3::AsyncRangeResponse& other) +void etcdv3::AsyncRangeResponse::ParseResponse(RangeResponse& resp, bool prefix) { - error_code = other.error_code; - error_message = other.error_message; - index = other.index; - action = other.action; - values = other.values; - prev_values = other.prev_values; - -} - -etcdv3::AsyncRangeResponse& etcdv3::AsyncRangeResponse::operator=(const etcdv3::AsyncRangeResponse& other) -{ - error_code = other.error_code; - error_message = other.error_message; - index = other.index; - action = other.action; - values = other.values; - prev_values = other.prev_values; - return *this; -} - -void etcdv3::AsyncRangeResponse::ParseResponse() -{ - index = reply.header().revision(); - if(reply.kvs_size() == 0) + action = etcdv3::GET_ACTION; + index = resp.header().revision(); + if(resp.kvs_size() == 0) { error_code=100; error_message="Key not found"; + return; } - - for(int index=0; index < reply.kvs_size(); index++) + else { - values.push_back(reply.kvs(index)); + for(int index=0; index < resp.kvs_size(); index++) + { + values.push_back(resp.kvs(index)); + } + + if(!prefix) + { + value = values[0]; + values.clear(); + } } } diff --git a/v3/src/AsyncSetAction.cpp b/v3/src/AsyncSetAction.cpp index 6e4f027..8690ec7 100644 --- a/v3/src/AsyncSetAction.cpp +++ b/v3/src/AsyncSetAction.cpp @@ -19,16 +19,14 @@ etcdv3::AsyncSetAction::AsyncSetAction(etcdv3::ActionParameters param, bool crea Compare::CompareTarget::Compare_CompareTarget_VERSION); transaction.setup_basic_create_sequence(parameters.key, parameters.value); + if(isCreate) { - transaction.setup_basic_failure_operation(parameters.key); - //transaction.setup_basic_create_sequence(parameters.key, parameters.value); } else { transaction.setup_set_failure_operation(parameters.key, parameters.value); - //transaction.setup_basic_create_sequence(parameters.key, parameters.value); } response_reader = parameters.kv_stub->AsyncTxn(&context, transaction.txn_request, &cq_); response_reader->Finish(&reply, &status, (void*)this); @@ -37,17 +35,17 @@ etcdv3::AsyncSetAction::AsyncSetAction(etcdv3::ActionParameters param, bool crea etcdv3::AsyncTxnResponse etcdv3::AsyncSetAction::ParseResponse() { - AsyncTxnResponse txn_resp(reply); + AsyncTxnResponse txn_resp; if(!status.ok()) { - txn_resp.error_code = status.error_code(); - txn_resp.error_message = status.error_message(); + txn_resp.set_error_code(status.error_code()); + txn_resp.set_error_message(status.error_message()); } else { - txn_resp.ParseResponse(); - txn_resp.action = isCreate? etcdv3::CREATE_ACTION:etcdv3::SET_ACTION; + txn_resp.ParseResponse(parameters.key, parameters.withPrefix, reply); + txn_resp.set_action(isCreate? etcdv3::CREATE_ACTION:etcdv3::SET_ACTION); if(!reply.succeeded() && txn_resp.action == etcdv3::CREATE_ACTION) { diff --git a/v3/src/AsyncTxnResponse.cpp b/v3/src/AsyncTxnResponse.cpp index 90b68e1..861260c 100644 --- a/v3/src/AsyncTxnResponse.cpp +++ b/v3/src/AsyncTxnResponse.cpp @@ -1,65 +1,44 @@ #include "v3/include/AsyncTxnResponse.hpp" #include "v3/include/AsyncRangeResponse.hpp" +#include "v3/include/AsyncDeleteRangeResponse.hpp" #include "v3/include/action_constants.hpp" using etcdserverpb::ResponseOp; -etcdv3::AsyncTxnResponse::AsyncTxnResponse(TxnResponse& resp) -{ - reply = resp; -} - -etcdv3::AsyncTxnResponse::AsyncTxnResponse(const etcdv3::AsyncTxnResponse& other) -{ - error_code = other.error_code; - error_message = other.error_message; - index = other.index; - action = other.action; - values = other.values; - prev_values = other.prev_values; -} - -etcdv3::AsyncTxnResponse& etcdv3::AsyncTxnResponse::operator=(const etcdv3::AsyncTxnResponse& other) -{ - error_code = other.error_code; - error_message = other.error_message; - index = other.index; - action = other.action; - values = other.values; - prev_values = other.prev_values; - return *this; -} - -void etcdv3::AsyncTxnResponse::ParseResponse() +void etcdv3::AsyncTxnResponse::ParseResponse(std::string const& key, bool prefix, TxnResponse& reply) { index = reply.header().revision(); - std::vector range_kvs; - std::vector prev_range_kvs; for(int index=0; index < reply.responses_size(); index++) { auto resp = reply.responses(index); if(ResponseOp::ResponseCase::kResponseRange == resp.response_case()) { - AsyncRangeResponse response(*(resp.mutable_response_range())); - response.ParseResponse(); + AsyncRangeResponse response; + response.ParseResponse(*(resp.mutable_response_range()),prefix); error_code = response.error_code; error_message = response.error_message; - if(!response.values.empty()) - { - values.insert(values.end(), response.values.begin(),response.values.end()); - } + values = response.values; + value = response.value; } else if(ResponseOp::ResponseCase::kResponsePut == resp.response_case()) { auto put_resp = resp.response_put(); if(put_resp.has_prev_kv()) { - prev_values.push_back(put_resp.prev_kv()); + prev_value = put_resp.prev_kv(); } } - } + else if(ResponseOp::ResponseCase::kResponseDeleteRange == resp.response_case()) + { + AsyncDeleteRangeResponse response; + response.ParseResponse(key,prefix,*(resp.mutable_response_delete_range())); - + prev_value = response.prev_value; + + values = response.values; + value = response.value; + } + } } diff --git a/v3/src/AsyncUpdateAction.cpp b/v3/src/AsyncUpdateAction.cpp index cd96206..c37b6bb 100644 --- a/v3/src/AsyncUpdateAction.cpp +++ b/v3/src/AsyncUpdateAction.cpp @@ -25,7 +25,7 @@ etcdv3::AsyncUpdateAction::AsyncUpdateAction(etcdv3::ActionParameters param) etcdv3::AsyncTxnResponse etcdv3::AsyncUpdateAction::ParseResponse() { - AsyncTxnResponse txn_resp(reply); + AsyncTxnResponse txn_resp; if(!status.ok()) { @@ -36,7 +36,7 @@ etcdv3::AsyncTxnResponse etcdv3::AsyncUpdateAction::ParseResponse() { if(reply.succeeded()) { - txn_resp.ParseResponse(); + txn_resp.ParseResponse(parameters.key, parameters.withPrefix, reply); txn_resp.action = etcdv3::UPDATE_ACTION; } else diff --git a/v3/src/AsyncWatchAction.cpp b/v3/src/AsyncWatchAction.cpp index 0263269..69cb53d 100644 --- a/v3/src/AsyncWatchAction.cpp +++ b/v3/src/AsyncWatchAction.cpp @@ -9,6 +9,7 @@ using etcdserverpb::WatchCreateRequest; etcdv3::AsyncWatchAction::AsyncWatchAction(etcdv3::ActionParameters param) : etcdv3::Action(param) { + isCancelled = false; stream = parameters.watch_stub->AsyncWatch(&context,&cq_,(void*)"create"); WatchRequest watch_req; @@ -92,7 +93,7 @@ void etcdv3::AsyncWatchAction::waitForResponse(std::function mapValue; - std::map mapPrevValue; for(int cnt =0; cnt < reply.events_size(); cnt++) { auto event = reply.events(cnt); - const mvccpb::KeyValue& kv = event.kv(); if(mvccpb::Event::EventType::Event_EventType_PUT == event.type()) { - if(kv.version() == 1) + if(event.kv().version() == 1) { action = etcdv3::CREATE_ACTION; } @@ -50,36 +17,21 @@ void etcdv3::AsyncWatchResponse::ParseResponse() { action = etcdv3::SET_ACTION; } - // just store the first occurence of the key in values. - // this is done so tas client will not need to change their behaviour. - // and then break immediately - mapValue.emplace(kv.key(), kv); + value = event.kv(); } else if(mvccpb::Event::EventType::Event_EventType_DELETE == event.type()) { action = etcdv3::DELETE_ACTION; - // just store the first occurence of the key in values. - // this is done so tas client will not need to change their behaviour. - // break immediately - mapValue.emplace(kv.key(), kv); + value = event.kv(); } if(event.has_prev_kv()) { - - auto kv = event.prev_kv(); - mapPrevValue.emplace(kv.key(),kv); + prev_value = event.prev_kv(); } + // just store the first occurence of the key in values. + // this is done so tas client will not need to change their behaviour. + // break immediately break; - - - } - for(auto x: mapPrevValue) - { - prev_values.push_back(x.second); - } - for(auto x: mapValue) - { - values.push_back(x.second); } } diff --git a/v3/src/Transaction.cpp b/v3/src/Transaction.cpp index 5a2451b..63b97e1 100644 --- a/v3/src/Transaction.cpp +++ b/v3/src/Transaction.cpp @@ -98,12 +98,14 @@ void etcdv3::Transaction::setup_delete_failure_operation(std::string const &key, * add key and then get new value of key */ void etcdv3::Transaction::setup_basic_create_sequence(std::string const& key, std::string const& value) { - std::unique_ptr get_request(new RangeRequest()); std::unique_ptr put_request(new PutRequest()); put_request->set_key(key); put_request->set_value(value); + put_request->set_prev_kv(true); RequestOp* req_success = txn_request.add_success(); req_success->set_allocated_request_put(put_request.release()); + + std::unique_ptr get_request(new RangeRequest()); get_request->set_key(key); req_success = txn_request.add_success(); req_success->set_allocated_request_range(get_request.release()); @@ -130,38 +132,23 @@ void etcdv3::Transaction::setup_compare_and_swap_sequence(std::string const& val * get key, delete */ void etcdv3::Transaction::setup_delete_sequence(std::string const &key, std::string const &range_end, bool recursive) { - std::unique_ptr get_request(new RangeRequest()); - get_request->set_key(key); + std::unique_ptr del_request(new DeleteRangeRequest()); + del_request->set_key(key); + del_request->set_prev_kv(true); if(recursive) { - get_request->set_range_end(range_end); - get_request->set_sort_target(RangeRequest::SortTarget::RangeRequest_SortTarget_KEY); - get_request->set_sort_order(RangeRequest::SortOrder::RangeRequest_SortOrder_ASCEND); + del_request->set_range_end(range_end); } RequestOp* req_success = txn_request.add_success(); - req_success->set_allocated_request_range(get_request.release()); - - std::unique_ptr del_request(new DeleteRangeRequest()); - del_request->set_key(key); - if(recursive) - { - del_request->set_range_end(range_end); - } - - req_success = txn_request.add_success(); req_success->set_allocated_request_delete_range(del_request.release()); } void etcdv3::Transaction::setup_compare_and_delete_operation(std::string const& key) { - std::unique_ptr get_request(new RangeRequest()); - get_request->set_key(key); - RequestOp* req_success = txn_request.add_success(); - req_success->set_allocated_request_range(get_request.release()); - std::unique_ptr del_request(new DeleteRangeRequest()); del_request->set_key(key); - req_success = txn_request.add_success(); + del_request->set_prev_kv(true); + RequestOp* req_success = txn_request.add_success(); req_success->set_allocated_request_delete_range(del_request.release()); } diff --git a/v3/src/V3Response.cpp b/v3/src/V3Response.cpp new file mode 100644 index 0000000..38f92a1 --- /dev/null +++ b/v3/src/V3Response.cpp @@ -0,0 +1,27 @@ +#include "v3/include/V3Response.hpp" +#include "v3/include/action_constants.hpp" + +void etcdv3::V3Response::set_error_code(int code) +{ + error_code = code; +} + +void etcdv3::V3Response::set_error_message(std::string msg) +{ + error_message = msg; +} + +void etcdv3::V3Response::set_action(std::string action) +{ + this->action = action; +} + +std::vector etcdv3::V3Response::get_kv_values() +{ + return values; +} + +std::vector etcdv3::V3Response::get_prev_kv_values() +{ + return prev_values; +}