diff --git a/etcd/Client.hpp b/etcd/Client.hpp index 54eca77..f0f2dea 100644 --- a/etcd/Client.hpp +++ b/etcd/Client.hpp @@ -132,10 +132,6 @@ namespace etcd std::unique_ptr stub_; std::unique_ptr watchServiceStub; - -private: - std::shared_ptr initiate_transaction(const std::string &operation, - etcdv3::Transaction& transaction); }; diff --git a/etcd/Watcher.hpp b/etcd/Watcher.hpp index e21a28b..097c2d8 100644 --- a/etcd/Watcher.hpp +++ b/etcd/Watcher.hpp @@ -7,6 +7,7 @@ #include +using etcdserverpb::KV; using etcdserverpb::Watch; using grpc::Channel; @@ -17,7 +18,6 @@ namespace etcd public: Watcher(std::string const & etcd_url, std::string const & key, std::function callback); void Cancel(); - void AddKey(std::string const & key); ~Watcher(); protected: @@ -27,6 +27,7 @@ namespace etcd std::function callback; pplx::task currentTask; std::unique_ptr watchServiceStub; + std::unique_ptr stub_; std::unique_ptr call; }; } diff --git a/proto/kv.proto b/proto/kv.proto index 9a8004c..4e64b50 100644 --- a/proto/kv.proto +++ b/proto/kv.proto @@ -35,4 +35,7 @@ message Event { // A DELETE/EXPIRE event contains the deleted key with // its modification revision set to the revision of deletion. KeyValue kv = 2; + + // prev_kv holds the key-value pair before the event happens. + KeyValue prev_kv = 3; } diff --git a/proto/rpc.proto b/proto/rpc.proto index 4a0616e..d4cfcbd 100644 --- a/proto/rpc.proto +++ b/proto/rpc.proto @@ -103,9 +103,12 @@ service Auth { // UserAdd adds a new user. rpc UserAdd(AuthUserAddRequest) returns (AuthUserAddResponse) {} - // UserGet gets detailed user information or lists all users. + // UserGet gets detailed user information. rpc UserGet(AuthUserGetRequest) returns (AuthUserGetResponse) {} + // UserList gets a list of all users. + rpc UserList(AuthUserListRequest) returns (AuthUserListResponse) {} + // UserDelete deletes a specified user. rpc UserDelete(AuthUserDeleteRequest) returns (AuthUserDeleteResponse) {} @@ -121,9 +124,12 @@ service Auth { // RoleAdd adds a new role. rpc RoleAdd(AuthRoleAddRequest) returns (AuthRoleAddResponse) {} - // RoleGet gets detailed role information or lists all roles. + // RoleGet gets detailed role information. rpc RoleGet(AuthRoleGetRequest) returns (AuthRoleGetResponse) {} + // RoleList gets lists of all roles. + rpc RoleList(AuthRoleListRequest) returns (AuthRoleListResponse) {} + // RoleDelete deletes a specified role. rpc RoleDelete(AuthRoleDeleteRequest) returns (AuthRoleDeleteResponse) {} @@ -171,7 +177,7 @@ message RangeRequest { int64 limit = 3; // revision is the point-in-time of the key-value store to use for the range. // If revision is less or equal to zero, the range is over the newest key-value store. - // If the revision has been compacted, ErrCompaction is returned as a response. + // If the revision has been compacted, ErrCompacted is returned as a response. int64 revision = 4; // sort_order is the order for returned sorted results. @@ -187,14 +193,23 @@ message RangeRequest { // a serializable range request is served locally without needing to reach consensus // with other nodes in the cluster. bool serializable = 7; + + // keys_only when set returns only the keys and not the values. + bool keys_only = 8; + + // count_only when set returns only the count of the keys in the range. + bool count_only = 9; } message RangeResponse { ResponseHeader header = 1; // kvs is the list of key-value pairs matched by the range request. + // kvs is empty when count is requested. repeated mvccpb.KeyValue kvs = 2; // more indicates if there are more keys to return in the requested range. bool more = 3; + // count is set to the number of keys within the range when requested. + int64 count = 4; } message PutRequest { @@ -205,10 +220,16 @@ message PutRequest { // lease is the lease ID to associate with the key in the key-value store. A lease // value of 0 indicates no lease. int64 lease = 3; + + // If prev_kv is set, etcd gets the previous key-value pair before changing it. + // The previous key-value pair will be returned in the put response. + bool prev_kv = 4; } message PutResponse { ResponseHeader header = 1; + // if prev_kv is set in the request, the previous key-value pair will be returned. + mvccpb.KeyValue prev_kv = 2; } message DeleteRangeRequest { @@ -218,12 +239,18 @@ message DeleteRangeRequest { // If range_end is not given, the range is defined to contain only the key argument. // If range_end is '\0', the range is all keys greater than or equal to the key argument. bytes range_end = 2; + + // If prev_kv is set, etcd gets the previous key-value pairs before deleting it. + // The previous key-value pairs will be returned in the delte response. + bool prev_kv = 3; } message DeleteRangeResponse { ResponseHeader header = 1; // deleted is the number of keys deleted by the delete range request. int64 deleted = 2; + // if prev_kv is set in the request, the previous key-value pairs will be returned. + repeated mvccpb.KeyValue prev_kvs = 3; } message RequestOp { @@ -372,6 +399,19 @@ message WatchCreateRequest { // wish to recover a disconnected watcher starting from a recent known revision. // The etcd server may decide how often it will send notifications based on current load. bool progress_notify = 4; + + enum FilterType { + // filter out put event. + NOPUT = 0; + // filter out delete event. + NODELETE = 1; + } + // filters filter the events at server side before it sends back to the watcher. + repeated FilterType filters = 5; + + // If prev_kv is set, created watcher gets the previous KV before the event happens. + // If the previous KV is already compacted, nothing will be returned. + bool prev_kv = 6; } message WatchCancelRequest { @@ -605,6 +645,12 @@ message AuthRoleGetRequest { string role = 1; } +message AuthUserListRequest { +} + +message AuthRoleListRequest { +} + message AuthRoleDeleteRequest { string role = 1; } @@ -619,6 +665,7 @@ message AuthRoleGrantPermissionRequest { message AuthRoleRevokePermissionRequest { string role = 1; string key = 2; + string range_end = 3; } message AuthEnableResponse { @@ -671,6 +718,18 @@ message AuthRoleGetResponse { repeated authpb.Permission perm = 2; } +message AuthRoleListResponse { + ResponseHeader header = 1; + + repeated string roles = 2; +} + +message AuthUserListResponse { + ResponseHeader header = 1; + + repeated string users = 2; +} + message AuthRoleDeleteResponse { ResponseHeader header = 1; } diff --git a/src/Client.cpp b/src/Client.cpp index 4069958..66aec19 100644 --- a/src/Client.cpp +++ b/src/Client.cpp @@ -41,85 +41,140 @@ etcd::Client::Client(std::string const & address) pplx::task etcd::Client::get(std::string const & key) { - std::shared_ptr call(new etcdv3::AsyncGetAction(key,stub_.get())); + etcdv3::ActionParameters params; + params.key.assign(key); + params.withPrefix = false; + params.kv_stub = stub_.get(); + std::shared_ptr call(new etcdv3::AsyncGetAction(params)); return Response::create(call); } pplx::task etcd::Client::set(std::string const & key, std::string const & value) { - std::shared_ptr call(new etcdv3::AsyncSetAction(key, value, stub_.get())); + etcdv3::ActionParameters params; + params.key.assign(key); + params.value.assign(value); + params.kv_stub = stub_.get(); + std::shared_ptr call(new etcdv3::AsyncSetAction(params)); return Response::create(call);; } pplx::task etcd::Client::add(std::string const & key, std::string const & value) { - std::shared_ptr call(new etcdv3::AsyncSetAction(key, value, stub_.get(), true)); - return Response::create(call);; + etcdv3::ActionParameters params; + params.key.assign(key); + params.value.assign(value); + params.kv_stub = stub_.get(); + std::shared_ptr call(new etcdv3::AsyncSetAction(params,true)); + return Response::create(call); } pplx::task etcd::Client::modify(std::string const & key, std::string const & value) { - std::shared_ptr call(new etcdv3::AsyncUpdateAction(key,value,stub_.get()));; + etcdv3::ActionParameters params; + params.key.assign(key); + params.value.assign(value); + params.kv_stub = stub_.get(); + std::shared_ptr call(new etcdv3::AsyncUpdateAction(params)); return Response::create(call); } pplx::task etcd::Client::modify_if(std::string const & key, std::string const & value, std::string const & old_value) { - std::shared_ptr call(new etcdv3::AsyncCompareAndSwapAction(key,value,old_value, stub_.get()));; + etcdv3::ActionParameters params; + params.key.assign(key); + params.value.assign(value); + params.old_value.assign(old_value); + params.kv_stub = stub_.get(); + std::shared_ptr call(new etcdv3::AsyncCompareAndSwapAction(params,etcdv3::Atomicity_Type::PREV_VALUE)); return Response::create(call); } pplx::task etcd::Client::modify_if(std::string const & key, std::string const & value, int old_index) { - std::shared_ptr call(new etcdv3::AsyncCompareAndSwapAction(key,value,old_index, stub_.get()));; + etcdv3::ActionParameters params; + params.key.assign(key); + params.value.assign(value); + params.old_revision = old_index; + params.kv_stub = stub_.get(); + std::shared_ptr call(new etcdv3::AsyncCompareAndSwapAction(params,etcdv3::Atomicity_Type::PREV_INDEX));; return Response::create(call); } pplx::task etcd::Client::rm(std::string const & key) { - std::shared_ptr call(new etcdv3::AsyncDeleteAction(key,stub_.get()));; + etcdv3::ActionParameters params; + params.key.assign(key); + params.withPrefix = false; + params.kv_stub = stub_.get(); + std::shared_ptr call(new etcdv3::AsyncDeleteAction(params)); return Response::create(call); } pplx::task etcd::Client::rm_if(std::string const & key, std::string const & old_value) { - std::shared_ptr call(new etcdv3::AsyncCompareAndDeleteAction(key,old_value,stub_.get()));; + etcdv3::ActionParameters params; + params.key.assign(key); + params.old_value.assign(old_value); + params.kv_stub = stub_.get(); + std::shared_ptr call(new etcdv3::AsyncCompareAndDeleteAction(params,etcdv3::Atomicity_Type::PREV_VALUE));; return Response::create(call); } pplx::task etcd::Client::rm_if(std::string const & key, int old_index) { - std::shared_ptr call(new etcdv3::AsyncCompareAndDeleteAction(key,old_index,stub_.get()));; + etcdv3::ActionParameters params; + params.key.assign(key); + params.old_revision = old_index; + params.kv_stub = stub_.get(); + std::shared_ptr call(new etcdv3::AsyncCompareAndDeleteAction(params, etcdv3::Atomicity_Type::PREV_INDEX));; return Response::create(call); } pplx::task etcd::Client::rmdir(std::string const & key, bool recursive) { - std::shared_ptr call(new etcdv3::AsyncDeleteAction(key,stub_.get(),true)); + etcdv3::ActionParameters params; + params.key.assign(key); + params.withPrefix = true; + params.kv_stub = stub_.get(); + std::shared_ptr call(new etcdv3::AsyncDeleteAction(params)); return Response::create(call); } pplx::task etcd::Client::ls(std::string const & key) { - - std::shared_ptr call(new etcdv3::AsyncGetAction(key,stub_.get(),true)); + etcdv3::ActionParameters params; + params.key.assign(key); + params.withPrefix = true; + params.kv_stub = stub_.get(); + std::shared_ptr call(new etcdv3::AsyncGetAction(params)); return Response::create(call); } pplx::task etcd::Client::watch(std::string const & key, bool recursive) { - std::shared_ptr call(new etcdv3::AsyncWatchAction(key,recursive,stub_.get(),watchServiceStub.get())); + etcdv3::ActionParameters params; + params.key.assign(key); + params.withPrefix = recursive; + params.watch_stub = watchServiceStub.get(); + params.revision = 0; + std::shared_ptr call(new etcdv3::AsyncWatchAction(params)); return Response::create(call); } pplx::task etcd::Client::watch(std::string const & key, int fromIndex, bool recursive) { - std::shared_ptr call(new etcdv3::AsyncWatchAction(key,fromIndex,recursive,stub_.get(),watchServiceStub.get())); + etcdv3::ActionParameters params; + params.key.assign(key); + params.withPrefix = recursive; + params.revision = fromIndex; + params.watch_stub = watchServiceStub.get(); + std::shared_ptr call(new etcdv3::AsyncWatchAction(params)); return Response::create(call); } diff --git a/src/Watcher.cpp b/src/Watcher.cpp index 97bc5f2..600083d 100644 --- a/src/Watcher.cpp +++ b/src/Watcher.cpp @@ -15,6 +15,7 @@ etcd::Watcher::Watcher(std::string const & address, std::string const & key, std doWatch(key, callback); } + etcd::Watcher::~Watcher() { call->CancelWatch(); @@ -27,42 +28,17 @@ void etcd::Watcher::Cancel() currentTask.wait(); } -void etcd::Watcher::AddKey(std::string const & key) -{ - call->WatchReq(key); -} - void etcd::Watcher::doWatch(std::string const & key, std::function callback) { - - call.reset(new etcdv3::AsyncWatchAction(key,true,NULL,watchServiceStub.get())); + etcdv3::ActionParameters params; + params.key.assign(key); + params.withPrefix = true; + params.watch_stub = watchServiceStub.get(); + params.revision = 0; + call.reset(new etcdv3::AsyncWatchAction(params)); currentTask = pplx::task([this, callback]() { return call->waitForResponse(callback); }); - - - //return Response::create(call); - - /*currentTask = client.request(web::http::methods::GET, uri.to_string(), cancellation_source.get_token()) - .then([this](pplx::task response_task) - { - try - { - auto http_response = response_task.get(); - auto json_task = http_response.extract_json(); - auto json_value = json_task.get(); - callback(etcd::Response(http_response, json_value)); - } - catch (std::exception const & ex) - { - if (pplx::is_task_cancellation_requested() || (ex.what() == std::string("Operation canceled"))) - return; - - if(ex.what() != std::string("Retrieving message chunk header")) - throw; - } - doWatch(); - });*/ } diff --git a/tst/EtcdTest.cpp b/tst/EtcdTest.cpp index b7de9e9..df784e5 100644 --- a/tst/EtcdTest.cpp +++ b/tst/EtcdTest.cpp @@ -234,6 +234,7 @@ TEST_CASE("wait for a value change") REQUIRE(res.is_done()); REQUIRE("set" == res.get().action()); CHECK("43" == res.get().value().as_string()); + CHECK("42" == res.get().prev_value().as_string()); } TEST_CASE("wait for a directory change") @@ -266,7 +267,7 @@ TEST_CASE("wait for a directory change") TEST_CASE("watch changes in the past") { etcd::Client etcd("http://127.0.0.1:2379"); - + REQUIRE(0 == etcd.rmdir("/test", true).get().error_code()); int index = etcd.set("/test/key1", "42").get().index(); etcd.set("/test/key1", "43").wait(); @@ -276,6 +277,7 @@ TEST_CASE("watch changes in the past") etcd::Response res = etcd.watch("/test/key1", ++index).get(); CHECK("set" == res.action()); CHECK("43" == res.value().as_string()); + CHECK("42" == res.prev_value().as_string()); res = etcd.watch("/test/key1", ++index).get(); CHECK("set" == res.action()); diff --git a/tst/WatcherTest.cpp b/tst/WatcherTest.cpp index a34ca0e..1ed3bda 100644 --- a/tst/WatcherTest.cpp +++ b/tst/WatcherTest.cpp @@ -18,6 +18,29 @@ void printResponse(etcd::Response const & resp) } } +TEST_CASE("create watcher with cancel") +{ + + etcd::SyncClient etcd(etcd_uri); + etcd.rmdir("/test", true); + + watcher_called = 0; + etcd::Watcher watcher(etcd_uri, "/test", printResponse); + sleep(1); + etcd.set("/test/key", "42"); + etcd.set("/test/key", "43"); + sleep(1); + CHECK(2 == watcher_called); + watcher.Cancel(); + etcd.set("/test/key", "50"); + etcd.set("/test/key", "51"); + sleep(1); + CHECK(2 == watcher_called); + + etcd.rmdir("/test", true); + +} + TEST_CASE("create watcher") { @@ -25,17 +48,13 @@ TEST_CASE("create watcher") etcd.rmdir("/test", true); watcher_called = 0; - //{ - std::cout << "watch started" << std::endl; + { etcd::Watcher watcher(etcd_uri, "/test", printResponse); sleep(1); etcd.set("/test/key", "42"); - std::cout << "first set finished" << std::endl; etcd.set("/test/key", "43"); - std::cout << "second set finished" << std::endl; - //} + } - sleep(1); CHECK(2 == watcher_called); // TEST_CASE("wait for a value change") // { @@ -121,7 +140,5 @@ TEST_CASE("create watcher") // std::cout << "std::exception: " << ex.what() << "\n"; // } // } - std::cout << "start rmdir" << std::endl; etcd.rmdir("/test", true).error_code(); - std::cout << "end rmdir" << std::endl; } diff --git a/v3/include/Action.hpp b/v3/include/Action.hpp index 3be3038..2f9b948 100644 --- a/v3/include/Action.hpp +++ b/v3/include/Action.hpp @@ -2,13 +2,35 @@ #define __V3_ACTION_HPP__ #include +#include "proto/rpc.grpc.pb.h" using grpc::ClientContext; using grpc::CompletionQueue; using grpc::Status; +using etcdserverpb::KV; +using etcdserverpb::Watch; + namespace etcdv3 { + enum Atomicity_Type + { + PREV_INDEX = 0, + PREV_VALUE = 1 + }; + + struct ActionParameters + { + bool withPrefix; + int revision; + int old_revision; + std::string key; + std::string value; + std::string old_value; + KV::Stub* kv_stub; + Watch::Stub* watch_stub; + }; + class Action { public: @@ -17,5 +39,16 @@ namespace etcdv3 CompletionQueue cq_; void waitForResponse(); }; + + class Actionv2 + { + public: + Actionv2(etcdv3::ActionParameters params); + Status status; + ClientContext context; + CompletionQueue cq_; + etcdv3::ActionParameters parameters; + void waitForResponse(); + }; } #endif diff --git a/v3/include/AsyncCompareAndDeleteAction.hpp b/v3/include/AsyncCompareAndDeleteAction.hpp index ea1114f..3f1e259 100644 --- a/v3/include/AsyncCompareAndDeleteAction.hpp +++ b/v3/include/AsyncCompareAndDeleteAction.hpp @@ -13,11 +13,10 @@ using etcdserverpb::KV; namespace etcdv3 { - class AsyncCompareAndDeleteAction : public etcdv3::Action + class AsyncCompareAndDeleteAction : public etcdv3::Actionv2 { public: - AsyncCompareAndDeleteAction(std::string const & key, std::string const & old_value, KV::Stub* stub_); - AsyncCompareAndDeleteAction(std::string const & key, int old_index, KV::Stub* stub_); + AsyncCompareAndDeleteAction(etcdv3::ActionParameters param, etcdv3::Atomicity_Type type); AsyncTxnResponse ParseResponse(); TxnResponse reply; std::unique_ptr> response_reader; diff --git a/v3/include/AsyncCompareAndSwapAction.hpp b/v3/include/AsyncCompareAndSwapAction.hpp index c83ec58..2c97fbd 100644 --- a/v3/include/AsyncCompareAndSwapAction.hpp +++ b/v3/include/AsyncCompareAndSwapAction.hpp @@ -13,11 +13,10 @@ using etcdserverpb::KV; namespace etcdv3 { - class AsyncCompareAndSwapAction : public etcdv3::Action + class AsyncCompareAndSwapAction : public etcdv3::Actionv2 { public: - AsyncCompareAndSwapAction(std::string const & key, std::string const & value, std::string const & old_value, KV::Stub* stub_); - AsyncCompareAndSwapAction(std::string const & key, std::string const & value, int old_index, KV::Stub* stub_); + AsyncCompareAndSwapAction(etcdv3::ActionParameters param, etcdv3::Atomicity_Type type); AsyncTxnResponse ParseResponse(); TxnResponse reply; std::unique_ptr> response_reader; diff --git a/v3/include/AsyncDeleteAction.hpp b/v3/include/AsyncDeleteAction.hpp index 4c8daf9..fd182a1 100644 --- a/v3/include/AsyncDeleteAction.hpp +++ b/v3/include/AsyncDeleteAction.hpp @@ -13,10 +13,10 @@ using etcdserverpb::KV; namespace etcdv3 { - class AsyncDeleteAction : public etcdv3::Action + class AsyncDeleteAction : public etcdv3::Actionv2 { public: - AsyncDeleteAction(std::string const & key, KV::Stub* stub_, bool recursive=false); + AsyncDeleteAction(etcdv3::ActionParameters param); AsyncTxnResponse ParseResponse(); TxnResponse reply; std::unique_ptr> response_reader; diff --git a/v3/include/AsyncGetAction.hpp b/v3/include/AsyncGetAction.hpp index b9f8555..421d54c 100644 --- a/v3/include/AsyncGetAction.hpp +++ b/v3/include/AsyncGetAction.hpp @@ -13,14 +13,13 @@ using etcdserverpb::KV; namespace etcdv3 { - class AsyncGetAction : public etcdv3::Action + class AsyncGetAction : public etcdv3::Actionv2 { public: - AsyncGetAction(std::string const & key, KV::Stub* stub_, bool withPrefix=false); + AsyncGetAction(etcdv3::ActionParameters param); AsyncRangeResponse ParseResponse(); RangeResponse reply; std::unique_ptr> response_reader; - bool prefix; }; } diff --git a/v3/include/AsyncSetAction.hpp b/v3/include/AsyncSetAction.hpp index 57f6ccb..db36654 100644 --- a/v3/include/AsyncSetAction.hpp +++ b/v3/include/AsyncSetAction.hpp @@ -13,10 +13,10 @@ using etcdserverpb::KV; namespace etcdv3 { - class AsyncSetAction : public etcdv3::Action + class AsyncSetAction : public etcdv3::Actionv2 { public: - AsyncSetAction(std::string const & key, std::string const & value, KV::Stub* stub_, bool create=false); + AsyncSetAction(etcdv3::ActionParameters param, bool isCreate=false); AsyncTxnResponse ParseResponse(); TxnResponse reply; std::unique_ptr> response_reader; diff --git a/v3/include/AsyncUpdateAction.hpp b/v3/include/AsyncUpdateAction.hpp index 15b4094..6d40d57 100644 --- a/v3/include/AsyncUpdateAction.hpp +++ b/v3/include/AsyncUpdateAction.hpp @@ -13,10 +13,10 @@ using etcdserverpb::KV; namespace etcdv3 { - class AsyncUpdateAction : public etcdv3::Action + class AsyncUpdateAction : public etcdv3::Actionv2 { public: - AsyncUpdateAction(std::string const & key, std::string const & value, KV::Stub* stub_); + AsyncUpdateAction(etcdv3::ActionParameters param); AsyncTxnResponse ParseResponse(); TxnResponse reply; std::unique_ptr> response_reader; diff --git a/v3/include/AsyncWatchAction.hpp b/v3/include/AsyncWatchAction.hpp index d929253..09456ff 100644 --- a/v3/include/AsyncWatchAction.hpp +++ b/v3/include/AsyncWatchAction.hpp @@ -11,26 +11,23 @@ using grpc::ClientAsyncReaderWriter; using etcdserverpb::WatchRequest; using etcdserverpb::WatchResponse; -using etcdserverpb::KV; -using etcdserverpb::Watch; + namespace etcdv3 { - class AsyncWatchAction : public etcdv3::Action + class AsyncWatchAction : public etcdv3::Actionv2 { public: - AsyncWatchAction(std::string const & key, bool recursive, KV::Stub* stub_, Watch::Stub* watchServiceStub); - AsyncWatchAction(std::string const & key, int fromIndex, bool recursive, KV::Stub* stub_, Watch::Stub* watchServiceStub); + AsyncWatchAction(etcdv3::ActionParameters param); AsyncWatchResponse ParseResponse(); void waitForResponse(); void waitForResponse(std::function callback); void CancelWatch(); void WatchReq(std::string const & key); WatchResponse reply; - KV::Stub* stub_; - std::unique_ptr> stream; - bool prefix; - + KV::Stub* kv_stub; + std::unique_ptr> stream; + bool isCancelled; }; } diff --git a/v3/src/Action.cpp b/v3/src/Action.cpp index de6ee82..43dbf19 100644 --- a/v3/src/Action.cpp +++ b/v3/src/Action.cpp @@ -8,3 +8,17 @@ void etcdv3::Action::waitForResponse() cq_.Next(&got_tag, &ok); GPR_ASSERT(got_tag == (void*)this); } + +void etcdv3::Actionv2::waitForResponse() +{ + void* got_tag; + bool ok = false; + + cq_.Next(&got_tag, &ok); + GPR_ASSERT(got_tag == (void*)this); +} + +etcdv3::Actionv2::Actionv2(etcdv3::ActionParameters params) +{ + parameters = params; +} diff --git a/v3/src/AsyncCompareAndDeleteAction.cpp b/v3/src/AsyncCompareAndDeleteAction.cpp index 0cec36f..5e509f4 100644 --- a/v3/src/AsyncCompareAndDeleteAction.cpp +++ b/v3/src/AsyncCompareAndDeleteAction.cpp @@ -9,28 +9,25 @@ using etcdserverpb::RequestOp; using etcdserverpb::ResponseOp; using etcdserverpb::TxnRequest; -etcdv3::AsyncCompareAndDeleteAction::AsyncCompareAndDeleteAction(std::string const & key, std::string const & old_value, KV::Stub* stub_) +etcdv3::AsyncCompareAndDeleteAction::AsyncCompareAndDeleteAction(etcdv3::ActionParameters param, etcdv3::Atomicity_Type type) + :etcdv3::Actionv2(param) { - etcdv3::Transaction transaction(key); - transaction.init_compare(old_value, Compare::CompareResult::Compare_CompareResult_EQUAL, - Compare::CompareTarget::Compare_CompareTarget_VALUE); + etcdv3::Transaction transaction(parameters.key); + if(type == etcdv3::Atomicity_Type::PREV_VALUE) + { + transaction.init_compare(parameters.old_value, Compare::CompareResult::Compare_CompareResult_EQUAL, + Compare::CompareTarget::Compare_CompareTarget_VALUE); + } + else if (type == etcdv3::Atomicity_Type::PREV_INDEX) + { + transaction.init_compare(parameters.old_revision, Compare::CompareResult::Compare_CompareResult_EQUAL, + Compare::CompareTarget::Compare_CompareTarget_MOD); + } - transaction.setup_compare_and_delete_operation(key); - transaction.setup_basic_failure_operation(key); + transaction.setup_compare_and_delete_operation(parameters.key); + transaction.setup_basic_failure_operation(parameters.key); - response_reader = stub_->AsyncTxn(&context, transaction.txn_request, &cq_); - response_reader->Finish(&reply, &status, (void*)this); -} - -etcdv3::AsyncCompareAndDeleteAction::AsyncCompareAndDeleteAction(std::string const & key, int old_index, KV::Stub* stub_) -{ - etcdv3::Transaction transaction(key); - transaction.init_compare(old_index, Compare::CompareResult::Compare_CompareResult_EQUAL, - Compare::CompareTarget::Compare_CompareTarget_MOD); - transaction.setup_compare_and_delete_operation(key); - transaction.setup_basic_failure_operation(key); - - response_reader = stub_->AsyncTxn(&context, transaction.txn_request, &cq_); + response_reader = parameters.kv_stub->AsyncTxn(&context, transaction.txn_request, &cq_); response_reader->Finish(&reply, &status, (void*)this); } diff --git a/v3/src/AsyncCompareAndSwapAction.cpp b/v3/src/AsyncCompareAndSwapAction.cpp index 809e8fe..61f9830 100644 --- a/v3/src/AsyncCompareAndSwapAction.cpp +++ b/v3/src/AsyncCompareAndSwapAction.cpp @@ -9,29 +9,25 @@ using etcdserverpb::RequestOp; using etcdserverpb::ResponseOp; using etcdserverpb::TxnRequest; -etcdv3::AsyncCompareAndSwapAction::AsyncCompareAndSwapAction(std::string const & key, std::string const & value, std::string const & old_value, KV::Stub* stub_) +etcdv3::AsyncCompareAndSwapAction::AsyncCompareAndSwapAction(etcdv3::ActionParameters param, etcdv3::Atomicity_Type type) + : etcdv3::Actionv2(param) { - etcdv3::Transaction transaction(key); - transaction.init_compare(old_value, Compare::CompareResult::Compare_CompareResult_EQUAL, - Compare::CompareTarget::Compare_CompareTarget_VALUE); + etcdv3::Transaction transaction(parameters.key); + if(type == etcdv3::Atomicity_Type::PREV_VALUE) + { + transaction.init_compare(parameters.old_value, Compare::CompareResult::Compare_CompareResult_EQUAL, + Compare::CompareTarget::Compare_CompareTarget_VALUE); + } + else if (type == etcdv3::Atomicity_Type::PREV_INDEX) + { + transaction.init_compare(parameters.old_revision, Compare::CompareResult::Compare_CompareResult_EQUAL, + Compare::CompareTarget::Compare_CompareTarget_MOD); + } - transaction.setup_basic_failure_operation(key); - transaction.setup_compare_and_swap_sequence(value); + transaction.setup_basic_failure_operation(parameters.key); + transaction.setup_compare_and_swap_sequence(parameters.value); - response_reader = stub_->AsyncTxn(&context, transaction.txn_request, &cq_); - response_reader->Finish(&reply, &status, (void*)this); -} - -etcdv3::AsyncCompareAndSwapAction::AsyncCompareAndSwapAction(std::string const & key, std::string const & value, int old_index, KV::Stub* stub_) -{ - etcdv3::Transaction transaction(key); - transaction.init_compare(old_index, Compare::CompareResult::Compare_CompareResult_EQUAL, - Compare::CompareTarget::Compare_CompareTarget_MOD); - - transaction.setup_basic_failure_operation(key); - transaction.setup_compare_and_swap_sequence(value); - - response_reader = stub_->AsyncTxn(&context, transaction.txn_request, &cq_); + response_reader = parameters.kv_stub->AsyncTxn(&context, transaction.txn_request, &cq_); response_reader->Finish(&reply, &status, (void*)this); } diff --git a/v3/src/AsyncDeleteAction.cpp b/v3/src/AsyncDeleteAction.cpp index a257171..ea63356 100644 --- a/v3/src/AsyncDeleteAction.cpp +++ b/v3/src/AsyncDeleteAction.cpp @@ -4,22 +4,23 @@ using etcdserverpb::Compare; -etcdv3::AsyncDeleteAction::AsyncDeleteAction(std::string const & key, KV::Stub* stub_, bool recursive) +etcdv3::AsyncDeleteAction::AsyncDeleteAction(ActionParameters param) + : etcdv3::Actionv2(param) { - etcdv3::Transaction transaction(key); + etcdv3::Transaction transaction(parameters.key); transaction.init_compare(Compare::CompareResult::Compare_CompareResult_GREATER, Compare::CompareTarget::Compare_CompareTarget_VERSION); - std::string range_end(key); - if(recursive) + std::string range_end(parameters.key); + if(parameters.withPrefix) { int ascii = (int)range_end[range_end.length()-1]; range_end.back() = ascii+1; } - transaction.setup_delete_sequence(key, range_end, recursive); - transaction.setup_delete_failure_operation(key, range_end, recursive); + transaction.setup_delete_sequence(parameters.key, range_end, parameters.withPrefix); + transaction.setup_delete_failure_operation(parameters.key, range_end, parameters.withPrefix); - response_reader = stub_->AsyncTxn(&context, transaction.txn_request, &cq_); + response_reader = parameters.kv_stub->AsyncTxn(&context, transaction.txn_request, &cq_); response_reader->Finish(&reply, &status, (void*)this); } diff --git a/v3/src/AsyncGetAction.cpp b/v3/src/AsyncGetAction.cpp index 30fd2d3..dc19889 100644 --- a/v3/src/AsyncGetAction.cpp +++ b/v3/src/AsyncGetAction.cpp @@ -3,14 +3,14 @@ using etcdserverpb::RangeRequest; -etcdv3::AsyncGetAction::AsyncGetAction(std::string const & key, KV::Stub* stub_, bool withPrefix) +etcdv3::AsyncGetAction::AsyncGetAction(etcdv3::ActionParameters param) + : etcdv3::Actionv2(param) { RangeRequest get_request; - get_request.set_key(key); - prefix = withPrefix; - if(withPrefix) + get_request.set_key(parameters.key); + if(parameters.withPrefix) { - std::string range_end(key); + std::string range_end(parameters.key); int ascii = (int)range_end[range_end.length()-1]; range_end.back() = ascii+1; @@ -19,7 +19,7 @@ etcdv3::AsyncGetAction::AsyncGetAction(std::string const & key, KV::Stub* stub_, get_request.set_sort_order(RangeRequest::SortOrder::RangeRequest_SortOrder_ASCEND); } - response_reader = stub_->AsyncRange(&context,get_request,&cq_); + response_reader = parameters.kv_stub->AsyncRange(&context,get_request,&cq_); response_reader->Finish(&reply, &status, (void*)this); } @@ -36,8 +36,7 @@ etcdv3::AsyncRangeResponse etcdv3::AsyncGetAction::ParseResponse() { range_resp.ParseResponse(); range_resp.action = etcdv3::GET_ACTION; - range_resp.isPrefix = prefix; + range_resp.isPrefix = parameters.withPrefix; } - return range_resp; } diff --git a/v3/src/AsyncSetAction.cpp b/v3/src/AsyncSetAction.cpp index d0d8276..e4707ce 100644 --- a/v3/src/AsyncSetAction.cpp +++ b/v3/src/AsyncSetAction.cpp @@ -10,25 +10,26 @@ using etcdserverpb::RequestOp; using etcdserverpb::ResponseOp; using etcdserverpb::TxnRequest; -etcdv3::AsyncSetAction::AsyncSetAction(std::string const & key, std::string const & value, KV::Stub* stub_, bool create) +etcdv3::AsyncSetAction::AsyncSetAction(etcdv3::ActionParameters param, bool create) + : etcdv3::Actionv2(param) { - etcdv3::Transaction transaction(key); + etcdv3::Transaction transaction(parameters.key); isCreate = create; - if(create) + if(isCreate) { transaction.init_compare(Compare::CompareResult::Compare_CompareResult_EQUAL, Compare::CompareTarget::Compare_CompareTarget_VERSION); - transaction.setup_basic_failure_operation(key); - transaction.setup_basic_create_sequence(key, value); + transaction.setup_basic_failure_operation(parameters.key); + transaction.setup_basic_create_sequence(parameters.key, parameters.value); } else { transaction.init_compare(Compare::CompareResult::Compare_CompareResult_EQUAL, Compare::CompareTarget::Compare_CompareTarget_VERSION); - transaction.setup_set_failure_operation(key, value); - transaction.setup_basic_create_sequence(key, value); + transaction.setup_set_failure_operation(parameters.key, parameters.value); + transaction.setup_basic_create_sequence(parameters.key, parameters.value); } - response_reader = stub_->AsyncTxn(&context, transaction.txn_request, &cq_); + response_reader = parameters.kv_stub->AsyncTxn(&context, transaction.txn_request, &cq_); response_reader->Finish(&reply, &status, (void*)this); } diff --git a/v3/src/AsyncUpdateAction.cpp b/v3/src/AsyncUpdateAction.cpp index 80190b7..a9121fb 100644 --- a/v3/src/AsyncUpdateAction.cpp +++ b/v3/src/AsyncUpdateAction.cpp @@ -10,16 +10,17 @@ using etcdserverpb::RequestOp; using etcdserverpb::ResponseOp; using etcdserverpb::TxnRequest; -etcdv3::AsyncUpdateAction::AsyncUpdateAction(std::string const & key, std::string const & value, KV::Stub* stub_) +etcdv3::AsyncUpdateAction::AsyncUpdateAction(etcdv3::ActionParameters param) + : etcdv3::Actionv2(param) { - etcdv3::Transaction transaction(key); + etcdv3::Transaction transaction(parameters.key); transaction.init_compare(Compare::CompareResult::Compare_CompareResult_GREATER, - Compare::CompareTarget::Compare_CompareTarget_VERSION); + Compare::CompareTarget::Compare_CompareTarget_VERSION); - transaction.setup_basic_failure_operation(key); - transaction.setup_compare_and_swap_sequence(value); + transaction.setup_basic_failure_operation(parameters.key); + transaction.setup_compare_and_swap_sequence(parameters.value); - response_reader = stub_->AsyncTxn(&context, transaction.txn_request, &cq_); + response_reader = parameters.kv_stub->AsyncTxn(&context, transaction.txn_request, &cq_); response_reader->Finish(&reply, &status, (void*)this); } diff --git a/v3/src/AsyncWatchAction.cpp b/v3/src/AsyncWatchAction.cpp index 621668b..8b2e3ee 100644 --- a/v3/src/AsyncWatchAction.cpp +++ b/v3/src/AsyncWatchAction.cpp @@ -6,19 +6,20 @@ using etcdserverpb::RangeRequest; using etcdserverpb::RangeResponse; using etcdserverpb::WatchCreateRequest; -etcdv3::AsyncWatchAction::AsyncWatchAction(std::string const & key, bool recursive, KV::Stub* stub_, Watch::Stub* watchServiceStub) +etcdv3::AsyncWatchAction::AsyncWatchAction(etcdv3::ActionParameters param) + : etcdv3::Actionv2(param) { - std::cout << "AsyncWatchAction create start" << std::endl; - stream = watchServiceStub->AsyncWatch(&context,&cq_,(void*)"create"); + stream = parameters.watch_stub->AsyncWatch(&context,&cq_,(void*)"create"); WatchRequest watch_req; WatchCreateRequest watch_create_req; - watch_create_req.set_key(key); + watch_create_req.set_key(parameters.key); + watch_create_req.set_prev_kv(true); + watch_create_req.set_start_revision(parameters.revision); - std::string range_end(key); - prefix = recursive; - if(recursive) + if(parameters.withPrefix) { + std::string range_end(parameters.key); int ascii = (int)range_end[range_end.length()-1]; range_end.back() = ascii+1; watch_create_req.set_range_end(range_end); @@ -27,43 +28,6 @@ etcdv3::AsyncWatchAction::AsyncWatchAction(std::string const & key, bool recursi watch_req.mutable_create_request()->CopyFrom(watch_create_req); stream->Write(watch_req, (void*)"write"); stream->Read(&reply, (void*)this); - stub_ = stub_; - std::cout << "AsyncWatchAction create end" << std::endl; - -} - -etcdv3::AsyncWatchAction::AsyncWatchAction(std::string const & key, int fromIndex, bool recursive, KV::Stub* stub_, Watch::Stub* watchServiceStub) -{ - stream = watchServiceStub->AsyncWatch(&context,&cq_,(void*)1); - - WatchRequest watch_req; - WatchCreateRequest watch_create_req; - watch_create_req.set_key(key); - watch_create_req.set_start_revision(fromIndex); - - std::string range_end(key); - if(recursive) - { - int ascii = (int)range_end[range_end.length()-1]; - range_end.back() = ascii+1; - watch_create_req.set_range_end(range_end); - } - - watch_req.mutable_create_request()->CopyFrom(watch_create_req); - stream->Write(watch_req, (void*)1); - stream->Read(&reply, (void*)this); - stub_ = stub_; -} - -void etcdv3::AsyncWatchAction::WatchReq(std::string const & key) -{ - WatchRequest watch_req; - WatchCreateRequest watch_create_req; - watch_create_req.set_key(key); - - watch_req.mutable_create_request()->CopyFrom(watch_create_req); - stream->Write(watch_req, (void*)1); - stream->Read(&reply, (void*)this); } @@ -92,13 +56,14 @@ void etcdv3::AsyncWatchAction::waitForResponse() void etcdv3::AsyncWatchAction::CancelWatch() { - std::cout << "cancel watch"<< std::endl; - stream->WritesDone((void*)"writes done"); + if(isCancelled == false) + { + stream->WritesDone((void*)"writes done"); + } } void etcdv3::AsyncWatchAction::waitForResponse(std::function callback) { - std::cout << "waitForResponse start" << std::endl; void* got_tag; bool ok = false; @@ -108,22 +73,17 @@ void etcdv3::AsyncWatchAction::waitForResponse(std::functionRead(&reply, (void*)this); } } diff --git a/v3/src/AsyncWatchResponse.cpp b/v3/src/AsyncWatchResponse.cpp index 92a0f66..c0497b7 100644 --- a/v3/src/AsyncWatchResponse.cpp +++ b/v3/src/AsyncWatchResponse.cpp @@ -35,30 +35,51 @@ void etcdv3::AsyncWatchResponse::ParseResponse() { index = reply.header().revision(); std::map mapValue; + std::map mapPrevValue; + std::cout << "events size: " << reply.events_size() <