Use Actionv2::ActionParameters to pass arguments to AsyncActions.

This commit is contained in:
arches 2016-07-06 12:11:27 -04:00
parent 7d64447e76
commit 1e046d87a0
25 changed files with 346 additions and 220 deletions

View File

@ -132,10 +132,6 @@ namespace etcd
std::unique_ptr<KV::Stub> stub_;
std::unique_ptr<Watch::Stub> watchServiceStub;
private:
std::shared_ptr<etcdv3::AsyncTxnResponse> initiate_transaction(const std::string &operation,
etcdv3::Transaction& transaction);
};

View File

@ -7,6 +7,7 @@
#include <grpc++/grpc++.h>
using etcdserverpb::KV;
using etcdserverpb::Watch;
using grpc::Channel;
@ -17,7 +18,6 @@ namespace etcd
public:
Watcher(std::string const & etcd_url, std::string const & key, std::function<void(Response)> callback);
void Cancel();
void AddKey(std::string const & key);
~Watcher();
protected:
@ -27,6 +27,7 @@ namespace etcd
std::function<void(Response)> callback;
pplx::task<void> currentTask;
std::unique_ptr<Watch::Stub> watchServiceStub;
std::unique_ptr<KV::Stub> stub_;
std::unique_ptr<etcdv3::AsyncWatchAction> call;
};
}

View File

@ -35,4 +35,7 @@ message Event {
// A DELETE/EXPIRE event contains the deleted key with
// its modification revision set to the revision of deletion.
KeyValue kv = 2;
// prev_kv holds the key-value pair before the event happens.
KeyValue prev_kv = 3;
}

View File

@ -103,9 +103,12 @@ service Auth {
// UserAdd adds a new user.
rpc UserAdd(AuthUserAddRequest) returns (AuthUserAddResponse) {}
// UserGet gets detailed user information or lists all users.
// UserGet gets detailed user information.
rpc UserGet(AuthUserGetRequest) returns (AuthUserGetResponse) {}
// UserList gets a list of all users.
rpc UserList(AuthUserListRequest) returns (AuthUserListResponse) {}
// UserDelete deletes a specified user.
rpc UserDelete(AuthUserDeleteRequest) returns (AuthUserDeleteResponse) {}
@ -121,9 +124,12 @@ service Auth {
// RoleAdd adds a new role.
rpc RoleAdd(AuthRoleAddRequest) returns (AuthRoleAddResponse) {}
// RoleGet gets detailed role information or lists all roles.
// RoleGet gets detailed role information.
rpc RoleGet(AuthRoleGetRequest) returns (AuthRoleGetResponse) {}
// RoleList gets lists of all roles.
rpc RoleList(AuthRoleListRequest) returns (AuthRoleListResponse) {}
// RoleDelete deletes a specified role.
rpc RoleDelete(AuthRoleDeleteRequest) returns (AuthRoleDeleteResponse) {}
@ -171,7 +177,7 @@ message RangeRequest {
int64 limit = 3;
// revision is the point-in-time of the key-value store to use for the range.
// If revision is less or equal to zero, the range is over the newest key-value store.
// If the revision has been compacted, ErrCompaction is returned as a response.
// If the revision has been compacted, ErrCompacted is returned as a response.
int64 revision = 4;
// sort_order is the order for returned sorted results.
@ -187,14 +193,23 @@ message RangeRequest {
// a serializable range request is served locally without needing to reach consensus
// with other nodes in the cluster.
bool serializable = 7;
// keys_only when set returns only the keys and not the values.
bool keys_only = 8;
// count_only when set returns only the count of the keys in the range.
bool count_only = 9;
}
message RangeResponse {
ResponseHeader header = 1;
// kvs is the list of key-value pairs matched by the range request.
// kvs is empty when count is requested.
repeated mvccpb.KeyValue kvs = 2;
// more indicates if there are more keys to return in the requested range.
bool more = 3;
// count is set to the number of keys within the range when requested.
int64 count = 4;
}
message PutRequest {
@ -205,10 +220,16 @@ message PutRequest {
// lease is the lease ID to associate with the key in the key-value store. A lease
// value of 0 indicates no lease.
int64 lease = 3;
// If prev_kv is set, etcd gets the previous key-value pair before changing it.
// The previous key-value pair will be returned in the put response.
bool prev_kv = 4;
}
message PutResponse {
ResponseHeader header = 1;
// if prev_kv is set in the request, the previous key-value pair will be returned.
mvccpb.KeyValue prev_kv = 2;
}
message DeleteRangeRequest {
@ -218,12 +239,18 @@ message DeleteRangeRequest {
// If range_end is not given, the range is defined to contain only the key argument.
// If range_end is '\0', the range is all keys greater than or equal to the key argument.
bytes range_end = 2;
// If prev_kv is set, etcd gets the previous key-value pairs before deleting it.
// The previous key-value pairs will be returned in the delte response.
bool prev_kv = 3;
}
message DeleteRangeResponse {
ResponseHeader header = 1;
// deleted is the number of keys deleted by the delete range request.
int64 deleted = 2;
// if prev_kv is set in the request, the previous key-value pairs will be returned.
repeated mvccpb.KeyValue prev_kvs = 3;
}
message RequestOp {
@ -372,6 +399,19 @@ message WatchCreateRequest {
// wish to recover a disconnected watcher starting from a recent known revision.
// The etcd server may decide how often it will send notifications based on current load.
bool progress_notify = 4;
enum FilterType {
// filter out put event.
NOPUT = 0;
// filter out delete event.
NODELETE = 1;
}
// filters filter the events at server side before it sends back to the watcher.
repeated FilterType filters = 5;
// If prev_kv is set, created watcher gets the previous KV before the event happens.
// If the previous KV is already compacted, nothing will be returned.
bool prev_kv = 6;
}
message WatchCancelRequest {
@ -605,6 +645,12 @@ message AuthRoleGetRequest {
string role = 1;
}
message AuthUserListRequest {
}
message AuthRoleListRequest {
}
message AuthRoleDeleteRequest {
string role = 1;
}
@ -619,6 +665,7 @@ message AuthRoleGrantPermissionRequest {
message AuthRoleRevokePermissionRequest {
string role = 1;
string key = 2;
string range_end = 3;
}
message AuthEnableResponse {
@ -671,6 +718,18 @@ message AuthRoleGetResponse {
repeated authpb.Permission perm = 2;
}
message AuthRoleListResponse {
ResponseHeader header = 1;
repeated string roles = 2;
}
message AuthUserListResponse {
ResponseHeader header = 1;
repeated string users = 2;
}
message AuthRoleDeleteResponse {
ResponseHeader header = 1;
}

View File

@ -41,85 +41,140 @@ etcd::Client::Client(std::string const & address)
pplx::task<etcd::Response> etcd::Client::get(std::string const & key)
{
std::shared_ptr<etcdv3::AsyncGetAction> call(new etcdv3::AsyncGetAction(key,stub_.get()));
etcdv3::ActionParameters params;
params.key.assign(key);
params.withPrefix = false;
params.kv_stub = stub_.get();
std::shared_ptr<etcdv3::AsyncGetAction> call(new etcdv3::AsyncGetAction(params));
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::set(std::string const & key, std::string const & value)
{
std::shared_ptr<etcdv3::AsyncSetAction> call(new etcdv3::AsyncSetAction(key, value, stub_.get()));
etcdv3::ActionParameters params;
params.key.assign(key);
params.value.assign(value);
params.kv_stub = stub_.get();
std::shared_ptr<etcdv3::AsyncSetAction> call(new etcdv3::AsyncSetAction(params));
return Response::create(call);;
}
pplx::task<etcd::Response> etcd::Client::add(std::string const & key, std::string const & value)
{
std::shared_ptr<etcdv3::AsyncSetAction> call(new etcdv3::AsyncSetAction(key, value, stub_.get(), true));
return Response::create(call);;
etcdv3::ActionParameters params;
params.key.assign(key);
params.value.assign(value);
params.kv_stub = stub_.get();
std::shared_ptr<etcdv3::AsyncSetAction> call(new etcdv3::AsyncSetAction(params,true));
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::modify(std::string const & key, std::string const & value)
{
std::shared_ptr<etcdv3::AsyncUpdateAction> call(new etcdv3::AsyncUpdateAction(key,value,stub_.get()));;
etcdv3::ActionParameters params;
params.key.assign(key);
params.value.assign(value);
params.kv_stub = stub_.get();
std::shared_ptr<etcdv3::AsyncUpdateAction> call(new etcdv3::AsyncUpdateAction(params));
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::modify_if(std::string const & key, std::string const & value, std::string const & old_value)
{
std::shared_ptr<etcdv3::AsyncCompareAndSwapAction> call(new etcdv3::AsyncCompareAndSwapAction(key,value,old_value, stub_.get()));;
etcdv3::ActionParameters params;
params.key.assign(key);
params.value.assign(value);
params.old_value.assign(old_value);
params.kv_stub = stub_.get();
std::shared_ptr<etcdv3::AsyncCompareAndSwapAction> call(new etcdv3::AsyncCompareAndSwapAction(params,etcdv3::Atomicity_Type::PREV_VALUE));
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::modify_if(std::string const & key, std::string const & value, int old_index)
{
std::shared_ptr<etcdv3::AsyncCompareAndSwapAction> call(new etcdv3::AsyncCompareAndSwapAction(key,value,old_index, stub_.get()));;
etcdv3::ActionParameters params;
params.key.assign(key);
params.value.assign(value);
params.old_revision = old_index;
params.kv_stub = stub_.get();
std::shared_ptr<etcdv3::AsyncCompareAndSwapAction> call(new etcdv3::AsyncCompareAndSwapAction(params,etcdv3::Atomicity_Type::PREV_INDEX));;
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::rm(std::string const & key)
{
std::shared_ptr<etcdv3::AsyncDeleteAction> call(new etcdv3::AsyncDeleteAction(key,stub_.get()));;
etcdv3::ActionParameters params;
params.key.assign(key);
params.withPrefix = false;
params.kv_stub = stub_.get();
std::shared_ptr<etcdv3::AsyncDeleteAction> call(new etcdv3::AsyncDeleteAction(params));
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::rm_if(std::string const & key, std::string const & old_value)
{
std::shared_ptr<etcdv3::AsyncCompareAndDeleteAction> call(new etcdv3::AsyncCompareAndDeleteAction(key,old_value,stub_.get()));;
etcdv3::ActionParameters params;
params.key.assign(key);
params.old_value.assign(old_value);
params.kv_stub = stub_.get();
std::shared_ptr<etcdv3::AsyncCompareAndDeleteAction> call(new etcdv3::AsyncCompareAndDeleteAction(params,etcdv3::Atomicity_Type::PREV_VALUE));;
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::rm_if(std::string const & key, int old_index)
{
std::shared_ptr<etcdv3::AsyncCompareAndDeleteAction> call(new etcdv3::AsyncCompareAndDeleteAction(key,old_index,stub_.get()));;
etcdv3::ActionParameters params;
params.key.assign(key);
params.old_revision = old_index;
params.kv_stub = stub_.get();
std::shared_ptr<etcdv3::AsyncCompareAndDeleteAction> call(new etcdv3::AsyncCompareAndDeleteAction(params, etcdv3::Atomicity_Type::PREV_INDEX));;
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::rmdir(std::string const & key, bool recursive)
{
std::shared_ptr<etcdv3::AsyncDeleteAction> call(new etcdv3::AsyncDeleteAction(key,stub_.get(),true));
etcdv3::ActionParameters params;
params.key.assign(key);
params.withPrefix = true;
params.kv_stub = stub_.get();
std::shared_ptr<etcdv3::AsyncDeleteAction> call(new etcdv3::AsyncDeleteAction(params));
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::ls(std::string const & key)
{
std::shared_ptr<etcdv3::AsyncGetAction> call(new etcdv3::AsyncGetAction(key,stub_.get(),true));
etcdv3::ActionParameters params;
params.key.assign(key);
params.withPrefix = true;
params.kv_stub = stub_.get();
std::shared_ptr<etcdv3::AsyncGetAction> call(new etcdv3::AsyncGetAction(params));
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::watch(std::string const & key, bool recursive)
{
std::shared_ptr<etcdv3::AsyncWatchAction> call(new etcdv3::AsyncWatchAction(key,recursive,stub_.get(),watchServiceStub.get()));
etcdv3::ActionParameters params;
params.key.assign(key);
params.withPrefix = recursive;
params.watch_stub = watchServiceStub.get();
params.revision = 0;
std::shared_ptr<etcdv3::AsyncWatchAction> call(new etcdv3::AsyncWatchAction(params));
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::watch(std::string const & key, int fromIndex, bool recursive)
{
std::shared_ptr<etcdv3::AsyncWatchAction> call(new etcdv3::AsyncWatchAction(key,fromIndex,recursive,stub_.get(),watchServiceStub.get()));
etcdv3::ActionParameters params;
params.key.assign(key);
params.withPrefix = recursive;
params.revision = fromIndex;
params.watch_stub = watchServiceStub.get();
std::shared_ptr<etcdv3::AsyncWatchAction> call(new etcdv3::AsyncWatchAction(params));
return Response::create(call);
}

View File

@ -15,6 +15,7 @@ etcd::Watcher::Watcher(std::string const & address, std::string const & key, std
doWatch(key, callback);
}
etcd::Watcher::~Watcher()
{
call->CancelWatch();
@ -27,42 +28,17 @@ void etcd::Watcher::Cancel()
currentTask.wait();
}
void etcd::Watcher::AddKey(std::string const & key)
{
call->WatchReq(key);
}
void etcd::Watcher::doWatch(std::string const & key, std::function<void(Response)> callback)
{
call.reset(new etcdv3::AsyncWatchAction(key,true,NULL,watchServiceStub.get()));
etcdv3::ActionParameters params;
params.key.assign(key);
params.withPrefix = true;
params.watch_stub = watchServiceStub.get();
params.revision = 0;
call.reset(new etcdv3::AsyncWatchAction(params));
currentTask = pplx::task<void>([this, callback]()
{
return call->waitForResponse(callback);
});
//return Response::create(call);
/*currentTask = client.request(web::http::methods::GET, uri.to_string(), cancellation_source.get_token())
.then([this](pplx::task<web::http::http_response> response_task)
{
try
{
auto http_response = response_task.get();
auto json_task = http_response.extract_json();
auto json_value = json_task.get();
callback(etcd::Response(http_response, json_value));
}
catch (std::exception const & ex)
{
if (pplx::is_task_cancellation_requested() || (ex.what() == std::string("Operation canceled")))
return;
if(ex.what() != std::string("Retrieving message chunk header"))
throw;
}
doWatch();
});*/
}

View File

@ -234,6 +234,7 @@ TEST_CASE("wait for a value change")
REQUIRE(res.is_done());
REQUIRE("set" == res.get().action());
CHECK("43" == res.get().value().as_string());
CHECK("42" == res.get().prev_value().as_string());
}
TEST_CASE("wait for a directory change")
@ -266,7 +267,7 @@ TEST_CASE("wait for a directory change")
TEST_CASE("watch changes in the past")
{
etcd::Client etcd("http://127.0.0.1:2379");
REQUIRE(0 == etcd.rmdir("/test", true).get().error_code());
int index = etcd.set("/test/key1", "42").get().index();
etcd.set("/test/key1", "43").wait();
@ -276,6 +277,7 @@ TEST_CASE("watch changes in the past")
etcd::Response res = etcd.watch("/test/key1", ++index).get();
CHECK("set" == res.action());
CHECK("43" == res.value().as_string());
CHECK("42" == res.prev_value().as_string());
res = etcd.watch("/test/key1", ++index).get();
CHECK("set" == res.action());

View File

@ -18,6 +18,29 @@ void printResponse(etcd::Response const & resp)
}
}
TEST_CASE("create watcher with cancel")
{
etcd::SyncClient etcd(etcd_uri);
etcd.rmdir("/test", true);
watcher_called = 0;
etcd::Watcher watcher(etcd_uri, "/test", printResponse);
sleep(1);
etcd.set("/test/key", "42");
etcd.set("/test/key", "43");
sleep(1);
CHECK(2 == watcher_called);
watcher.Cancel();
etcd.set("/test/key", "50");
etcd.set("/test/key", "51");
sleep(1);
CHECK(2 == watcher_called);
etcd.rmdir("/test", true);
}
TEST_CASE("create watcher")
{
@ -25,17 +48,13 @@ TEST_CASE("create watcher")
etcd.rmdir("/test", true);
watcher_called = 0;
//{
std::cout << "watch started" << std::endl;
{
etcd::Watcher watcher(etcd_uri, "/test", printResponse);
sleep(1);
etcd.set("/test/key", "42");
std::cout << "first set finished" << std::endl;
etcd.set("/test/key", "43");
std::cout << "second set finished" << std::endl;
//}
}
sleep(1);
CHECK(2 == watcher_called);
// TEST_CASE("wait for a value change")
// {
@ -121,7 +140,5 @@ TEST_CASE("create watcher")
// std::cout << "std::exception: " << ex.what() << "\n";
// }
// }
std::cout << "start rmdir" << std::endl;
etcd.rmdir("/test", true).error_code();
std::cout << "end rmdir" << std::endl;
}

View File

@ -2,13 +2,35 @@
#define __V3_ACTION_HPP__
#include <grpc++/grpc++.h>
#include "proto/rpc.grpc.pb.h"
using grpc::ClientContext;
using grpc::CompletionQueue;
using grpc::Status;
using etcdserverpb::KV;
using etcdserverpb::Watch;
namespace etcdv3
{
enum Atomicity_Type
{
PREV_INDEX = 0,
PREV_VALUE = 1
};
struct ActionParameters
{
bool withPrefix;
int revision;
int old_revision;
std::string key;
std::string value;
std::string old_value;
KV::Stub* kv_stub;
Watch::Stub* watch_stub;
};
class Action
{
public:
@ -17,5 +39,16 @@ namespace etcdv3
CompletionQueue cq_;
void waitForResponse();
};
class Actionv2
{
public:
Actionv2(etcdv3::ActionParameters params);
Status status;
ClientContext context;
CompletionQueue cq_;
etcdv3::ActionParameters parameters;
void waitForResponse();
};
}
#endif

View File

@ -13,11 +13,10 @@ using etcdserverpb::KV;
namespace etcdv3
{
class AsyncCompareAndDeleteAction : public etcdv3::Action
class AsyncCompareAndDeleteAction : public etcdv3::Actionv2
{
public:
AsyncCompareAndDeleteAction(std::string const & key, std::string const & old_value, KV::Stub* stub_);
AsyncCompareAndDeleteAction(std::string const & key, int old_index, KV::Stub* stub_);
AsyncCompareAndDeleteAction(etcdv3::ActionParameters param, etcdv3::Atomicity_Type type);
AsyncTxnResponse ParseResponse();
TxnResponse reply;
std::unique_ptr<ClientAsyncResponseReader<TxnResponse>> response_reader;

View File

@ -13,11 +13,10 @@ using etcdserverpb::KV;
namespace etcdv3
{
class AsyncCompareAndSwapAction : public etcdv3::Action
class AsyncCompareAndSwapAction : public etcdv3::Actionv2
{
public:
AsyncCompareAndSwapAction(std::string const & key, std::string const & value, std::string const & old_value, KV::Stub* stub_);
AsyncCompareAndSwapAction(std::string const & key, std::string const & value, int old_index, KV::Stub* stub_);
AsyncCompareAndSwapAction(etcdv3::ActionParameters param, etcdv3::Atomicity_Type type);
AsyncTxnResponse ParseResponse();
TxnResponse reply;
std::unique_ptr<ClientAsyncResponseReader<TxnResponse>> response_reader;

View File

@ -13,10 +13,10 @@ using etcdserverpb::KV;
namespace etcdv3
{
class AsyncDeleteAction : public etcdv3::Action
class AsyncDeleteAction : public etcdv3::Actionv2
{
public:
AsyncDeleteAction(std::string const & key, KV::Stub* stub_, bool recursive=false);
AsyncDeleteAction(etcdv3::ActionParameters param);
AsyncTxnResponse ParseResponse();
TxnResponse reply;
std::unique_ptr<ClientAsyncResponseReader<TxnResponse>> response_reader;

View File

@ -13,14 +13,13 @@ using etcdserverpb::KV;
namespace etcdv3
{
class AsyncGetAction : public etcdv3::Action
class AsyncGetAction : public etcdv3::Actionv2
{
public:
AsyncGetAction(std::string const & key, KV::Stub* stub_, bool withPrefix=false);
AsyncGetAction(etcdv3::ActionParameters param);
AsyncRangeResponse ParseResponse();
RangeResponse reply;
std::unique_ptr<ClientAsyncResponseReader<RangeResponse>> response_reader;
bool prefix;
};
}

View File

@ -13,10 +13,10 @@ using etcdserverpb::KV;
namespace etcdv3
{
class AsyncSetAction : public etcdv3::Action
class AsyncSetAction : public etcdv3::Actionv2
{
public:
AsyncSetAction(std::string const & key, std::string const & value, KV::Stub* stub_, bool create=false);
AsyncSetAction(etcdv3::ActionParameters param, bool isCreate=false);
AsyncTxnResponse ParseResponse();
TxnResponse reply;
std::unique_ptr<ClientAsyncResponseReader<TxnResponse>> response_reader;

View File

@ -13,10 +13,10 @@ using etcdserverpb::KV;
namespace etcdv3
{
class AsyncUpdateAction : public etcdv3::Action
class AsyncUpdateAction : public etcdv3::Actionv2
{
public:
AsyncUpdateAction(std::string const & key, std::string const & value, KV::Stub* stub_);
AsyncUpdateAction(etcdv3::ActionParameters param);
AsyncTxnResponse ParseResponse();
TxnResponse reply;
std::unique_ptr<ClientAsyncResponseReader<TxnResponse>> response_reader;

View File

@ -11,26 +11,23 @@
using grpc::ClientAsyncReaderWriter;
using etcdserverpb::WatchRequest;
using etcdserverpb::WatchResponse;
using etcdserverpb::KV;
using etcdserverpb::Watch;
namespace etcdv3
{
class AsyncWatchAction : public etcdv3::Action
class AsyncWatchAction : public etcdv3::Actionv2
{
public:
AsyncWatchAction(std::string const & key, bool recursive, KV::Stub* stub_, Watch::Stub* watchServiceStub);
AsyncWatchAction(std::string const & key, int fromIndex, bool recursive, KV::Stub* stub_, Watch::Stub* watchServiceStub);
AsyncWatchAction(etcdv3::ActionParameters param);
AsyncWatchResponse ParseResponse();
void waitForResponse();
void waitForResponse(std::function<void(etcd::Response)> callback);
void CancelWatch();
void WatchReq(std::string const & key);
WatchResponse reply;
KV::Stub* stub_;
KV::Stub* kv_stub;
std::unique_ptr<ClientAsyncReaderWriter<WatchRequest,WatchResponse>> stream;
bool prefix;
bool isCancelled;
};
}

View File

@ -8,3 +8,17 @@ void etcdv3::Action::waitForResponse()
cq_.Next(&got_tag, &ok);
GPR_ASSERT(got_tag == (void*)this);
}
void etcdv3::Actionv2::waitForResponse()
{
void* got_tag;
bool ok = false;
cq_.Next(&got_tag, &ok);
GPR_ASSERT(got_tag == (void*)this);
}
etcdv3::Actionv2::Actionv2(etcdv3::ActionParameters params)
{
parameters = params;
}

View File

@ -9,28 +9,25 @@ using etcdserverpb::RequestOp;
using etcdserverpb::ResponseOp;
using etcdserverpb::TxnRequest;
etcdv3::AsyncCompareAndDeleteAction::AsyncCompareAndDeleteAction(std::string const & key, std::string const & old_value, KV::Stub* stub_)
etcdv3::AsyncCompareAndDeleteAction::AsyncCompareAndDeleteAction(etcdv3::ActionParameters param, etcdv3::Atomicity_Type type)
:etcdv3::Actionv2(param)
{
etcdv3::Transaction transaction(key);
transaction.init_compare(old_value, Compare::CompareResult::Compare_CompareResult_EQUAL,
etcdv3::Transaction transaction(parameters.key);
if(type == etcdv3::Atomicity_Type::PREV_VALUE)
{
transaction.init_compare(parameters.old_value, Compare::CompareResult::Compare_CompareResult_EQUAL,
Compare::CompareTarget::Compare_CompareTarget_VALUE);
transaction.setup_compare_and_delete_operation(key);
transaction.setup_basic_failure_operation(key);
response_reader = stub_->AsyncTxn(&context, transaction.txn_request, &cq_);
response_reader->Finish(&reply, &status, (void*)this);
}
etcdv3::AsyncCompareAndDeleteAction::AsyncCompareAndDeleteAction(std::string const & key, int old_index, KV::Stub* stub_)
{
etcdv3::Transaction transaction(key);
transaction.init_compare(old_index, Compare::CompareResult::Compare_CompareResult_EQUAL,
}
else if (type == etcdv3::Atomicity_Type::PREV_INDEX)
{
transaction.init_compare(parameters.old_revision, Compare::CompareResult::Compare_CompareResult_EQUAL,
Compare::CompareTarget::Compare_CompareTarget_MOD);
transaction.setup_compare_and_delete_operation(key);
transaction.setup_basic_failure_operation(key);
}
response_reader = stub_->AsyncTxn(&context, transaction.txn_request, &cq_);
transaction.setup_compare_and_delete_operation(parameters.key);
transaction.setup_basic_failure_operation(parameters.key);
response_reader = parameters.kv_stub->AsyncTxn(&context, transaction.txn_request, &cq_);
response_reader->Finish(&reply, &status, (void*)this);
}

View File

@ -9,29 +9,25 @@ using etcdserverpb::RequestOp;
using etcdserverpb::ResponseOp;
using etcdserverpb::TxnRequest;
etcdv3::AsyncCompareAndSwapAction::AsyncCompareAndSwapAction(std::string const & key, std::string const & value, std::string const & old_value, KV::Stub* stub_)
etcdv3::AsyncCompareAndSwapAction::AsyncCompareAndSwapAction(etcdv3::ActionParameters param, etcdv3::Atomicity_Type type)
: etcdv3::Actionv2(param)
{
etcdv3::Transaction transaction(key);
transaction.init_compare(old_value, Compare::CompareResult::Compare_CompareResult_EQUAL,
etcdv3::Transaction transaction(parameters.key);
if(type == etcdv3::Atomicity_Type::PREV_VALUE)
{
transaction.init_compare(parameters.old_value, Compare::CompareResult::Compare_CompareResult_EQUAL,
Compare::CompareTarget::Compare_CompareTarget_VALUE);
transaction.setup_basic_failure_operation(key);
transaction.setup_compare_and_swap_sequence(value);
response_reader = stub_->AsyncTxn(&context, transaction.txn_request, &cq_);
response_reader->Finish(&reply, &status, (void*)this);
}
etcdv3::AsyncCompareAndSwapAction::AsyncCompareAndSwapAction(std::string const & key, std::string const & value, int old_index, KV::Stub* stub_)
{
etcdv3::Transaction transaction(key);
transaction.init_compare(old_index, Compare::CompareResult::Compare_CompareResult_EQUAL,
}
else if (type == etcdv3::Atomicity_Type::PREV_INDEX)
{
transaction.init_compare(parameters.old_revision, Compare::CompareResult::Compare_CompareResult_EQUAL,
Compare::CompareTarget::Compare_CompareTarget_MOD);
}
transaction.setup_basic_failure_operation(key);
transaction.setup_compare_and_swap_sequence(value);
transaction.setup_basic_failure_operation(parameters.key);
transaction.setup_compare_and_swap_sequence(parameters.value);
response_reader = stub_->AsyncTxn(&context, transaction.txn_request, &cq_);
response_reader = parameters.kv_stub->AsyncTxn(&context, transaction.txn_request, &cq_);
response_reader->Finish(&reply, &status, (void*)this);
}

View File

@ -4,22 +4,23 @@
using etcdserverpb::Compare;
etcdv3::AsyncDeleteAction::AsyncDeleteAction(std::string const & key, KV::Stub* stub_, bool recursive)
etcdv3::AsyncDeleteAction::AsyncDeleteAction(ActionParameters param)
: etcdv3::Actionv2(param)
{
etcdv3::Transaction transaction(key);
etcdv3::Transaction transaction(parameters.key);
transaction.init_compare(Compare::CompareResult::Compare_CompareResult_GREATER,
Compare::CompareTarget::Compare_CompareTarget_VERSION);
std::string range_end(key);
if(recursive)
std::string range_end(parameters.key);
if(parameters.withPrefix)
{
int ascii = (int)range_end[range_end.length()-1];
range_end.back() = ascii+1;
}
transaction.setup_delete_sequence(key, range_end, recursive);
transaction.setup_delete_failure_operation(key, range_end, recursive);
transaction.setup_delete_sequence(parameters.key, range_end, parameters.withPrefix);
transaction.setup_delete_failure_operation(parameters.key, range_end, parameters.withPrefix);
response_reader = stub_->AsyncTxn(&context, transaction.txn_request, &cq_);
response_reader = parameters.kv_stub->AsyncTxn(&context, transaction.txn_request, &cq_);
response_reader->Finish(&reply, &status, (void*)this);
}

View File

@ -3,14 +3,14 @@
using etcdserverpb::RangeRequest;
etcdv3::AsyncGetAction::AsyncGetAction(std::string const & key, KV::Stub* stub_, bool withPrefix)
etcdv3::AsyncGetAction::AsyncGetAction(etcdv3::ActionParameters param)
: etcdv3::Actionv2(param)
{
RangeRequest get_request;
get_request.set_key(key);
prefix = withPrefix;
if(withPrefix)
get_request.set_key(parameters.key);
if(parameters.withPrefix)
{
std::string range_end(key);
std::string range_end(parameters.key);
int ascii = (int)range_end[range_end.length()-1];
range_end.back() = ascii+1;
@ -19,7 +19,7 @@ etcdv3::AsyncGetAction::AsyncGetAction(std::string const & key, KV::Stub* stub_,
get_request.set_sort_order(RangeRequest::SortOrder::RangeRequest_SortOrder_ASCEND);
}
response_reader = stub_->AsyncRange(&context,get_request,&cq_);
response_reader = parameters.kv_stub->AsyncRange(&context,get_request,&cq_);
response_reader->Finish(&reply, &status, (void*)this);
}
@ -36,8 +36,7 @@ etcdv3::AsyncRangeResponse etcdv3::AsyncGetAction::ParseResponse()
{
range_resp.ParseResponse();
range_resp.action = etcdv3::GET_ACTION;
range_resp.isPrefix = prefix;
range_resp.isPrefix = parameters.withPrefix;
}
return range_resp;
}

View File

@ -10,25 +10,26 @@ using etcdserverpb::RequestOp;
using etcdserverpb::ResponseOp;
using etcdserverpb::TxnRequest;
etcdv3::AsyncSetAction::AsyncSetAction(std::string const & key, std::string const & value, KV::Stub* stub_, bool create)
etcdv3::AsyncSetAction::AsyncSetAction(etcdv3::ActionParameters param, bool create)
: etcdv3::Actionv2(param)
{
etcdv3::Transaction transaction(key);
etcdv3::Transaction transaction(parameters.key);
isCreate = create;
if(create)
if(isCreate)
{
transaction.init_compare(Compare::CompareResult::Compare_CompareResult_EQUAL,
Compare::CompareTarget::Compare_CompareTarget_VERSION);
transaction.setup_basic_failure_operation(key);
transaction.setup_basic_create_sequence(key, value);
transaction.setup_basic_failure_operation(parameters.key);
transaction.setup_basic_create_sequence(parameters.key, parameters.value);
}
else
{
transaction.init_compare(Compare::CompareResult::Compare_CompareResult_EQUAL,
Compare::CompareTarget::Compare_CompareTarget_VERSION);
transaction.setup_set_failure_operation(key, value);
transaction.setup_basic_create_sequence(key, value);
transaction.setup_set_failure_operation(parameters.key, parameters.value);
transaction.setup_basic_create_sequence(parameters.key, parameters.value);
}
response_reader = stub_->AsyncTxn(&context, transaction.txn_request, &cq_);
response_reader = parameters.kv_stub->AsyncTxn(&context, transaction.txn_request, &cq_);
response_reader->Finish(&reply, &status, (void*)this);
}

View File

@ -10,16 +10,17 @@ using etcdserverpb::RequestOp;
using etcdserverpb::ResponseOp;
using etcdserverpb::TxnRequest;
etcdv3::AsyncUpdateAction::AsyncUpdateAction(std::string const & key, std::string const & value, KV::Stub* stub_)
etcdv3::AsyncUpdateAction::AsyncUpdateAction(etcdv3::ActionParameters param)
: etcdv3::Actionv2(param)
{
etcdv3::Transaction transaction(key);
etcdv3::Transaction transaction(parameters.key);
transaction.init_compare(Compare::CompareResult::Compare_CompareResult_GREATER,
Compare::CompareTarget::Compare_CompareTarget_VERSION);
transaction.setup_basic_failure_operation(key);
transaction.setup_compare_and_swap_sequence(value);
transaction.setup_basic_failure_operation(parameters.key);
transaction.setup_compare_and_swap_sequence(parameters.value);
response_reader = stub_->AsyncTxn(&context, transaction.txn_request, &cq_);
response_reader = parameters.kv_stub->AsyncTxn(&context, transaction.txn_request, &cq_);
response_reader->Finish(&reply, &status, (void*)this);
}

View File

@ -6,19 +6,20 @@ using etcdserverpb::RangeRequest;
using etcdserverpb::RangeResponse;
using etcdserverpb::WatchCreateRequest;
etcdv3::AsyncWatchAction::AsyncWatchAction(std::string const & key, bool recursive, KV::Stub* stub_, Watch::Stub* watchServiceStub)
etcdv3::AsyncWatchAction::AsyncWatchAction(etcdv3::ActionParameters param)
: etcdv3::Actionv2(param)
{
std::cout << "AsyncWatchAction create start" << std::endl;
stream = watchServiceStub->AsyncWatch(&context,&cq_,(void*)"create");
stream = parameters.watch_stub->AsyncWatch(&context,&cq_,(void*)"create");
WatchRequest watch_req;
WatchCreateRequest watch_create_req;
watch_create_req.set_key(key);
watch_create_req.set_key(parameters.key);
watch_create_req.set_prev_kv(true);
watch_create_req.set_start_revision(parameters.revision);
std::string range_end(key);
prefix = recursive;
if(recursive)
if(parameters.withPrefix)
{
std::string range_end(parameters.key);
int ascii = (int)range_end[range_end.length()-1];
range_end.back() = ascii+1;
watch_create_req.set_range_end(range_end);
@ -27,43 +28,6 @@ etcdv3::AsyncWatchAction::AsyncWatchAction(std::string const & key, bool recursi
watch_req.mutable_create_request()->CopyFrom(watch_create_req);
stream->Write(watch_req, (void*)"write");
stream->Read(&reply, (void*)this);
stub_ = stub_;
std::cout << "AsyncWatchAction create end" << std::endl;
}
etcdv3::AsyncWatchAction::AsyncWatchAction(std::string const & key, int fromIndex, bool recursive, KV::Stub* stub_, Watch::Stub* watchServiceStub)
{
stream = watchServiceStub->AsyncWatch(&context,&cq_,(void*)1);
WatchRequest watch_req;
WatchCreateRequest watch_create_req;
watch_create_req.set_key(key);
watch_create_req.set_start_revision(fromIndex);
std::string range_end(key);
if(recursive)
{
int ascii = (int)range_end[range_end.length()-1];
range_end.back() = ascii+1;
watch_create_req.set_range_end(range_end);
}
watch_req.mutable_create_request()->CopyFrom(watch_create_req);
stream->Write(watch_req, (void*)1);
stream->Read(&reply, (void*)this);
stub_ = stub_;
}
void etcdv3::AsyncWatchAction::WatchReq(std::string const & key)
{
WatchRequest watch_req;
WatchCreateRequest watch_create_req;
watch_create_req.set_key(key);
watch_req.mutable_create_request()->CopyFrom(watch_create_req);
stream->Write(watch_req, (void*)1);
stream->Read(&reply, (void*)this);
}
@ -92,13 +56,14 @@ void etcdv3::AsyncWatchAction::waitForResponse()
void etcdv3::AsyncWatchAction::CancelWatch()
{
std::cout << "cancel watch"<< std::endl;
if(isCancelled == false)
{
stream->WritesDone((void*)"writes done");
}
}
void etcdv3::AsyncWatchAction::waitForResponse(std::function<void(etcd::Response)> callback)
{
std::cout << "waitForResponse start" << std::endl;
void* got_tag;
bool ok = false;
@ -108,22 +73,17 @@ void etcdv3::AsyncWatchAction::waitForResponse(std::function<void(etcd::Response
{
break;
}
std::cout << "ok status: " << ok << std::endl;
if(got_tag == (void*)"writes done")
{
std::cout << "writes done" << std::endl;
isCancelled = true;
}
else if(got_tag == (void*)this) // read tag
{
std::cout << "read tag" << std::endl;
std::cout << "events size: "<< reply.events_size() << std::endl;
if(reply.events_size())
{
auto resp = ParseResponse();
callback(resp);
std::cout << "events received try to read again" << std::endl;
}
std::cout << " read again" << std::endl;
stream->Read(&reply, (void*)this);
}
}

View File

@ -35,14 +35,14 @@ void etcdv3::AsyncWatchResponse::ParseResponse()
{
index = reply.header().revision();
std::map<std::string, mvccpb::KeyValue> mapValue;
std::map<std::string, mvccpb::KeyValue> mapPrevValue;
std::cout << "events size: " << reply.events_size() <<std::endl;
for(int cnt =0; cnt < reply.events_size(); cnt++)
{
auto event = reply.events(cnt);
const mvccpb::KeyValue& kv = event.kv();
if(mvccpb::Event::EventType::Event_EventType_PUT == event.type())
{
if(event.has_kv())
{
auto kv = event.kv();
if(kv.version() == 1)
{
action = etcdv3::CREATE_ACTION;
@ -51,14 +51,35 @@ void etcdv3::AsyncWatchResponse::ParseResponse()
{
action = etcdv3::SET_ACTION;
}
// just store the first occurence of the key in values.
// this is done so tas client will not need to change their behaviour.
// and then break immediately
mapValue.emplace(kv.key(), kv);
}
break;
}
else if(mvccpb::Event::EventType::Event_EventType_DELETE == event.type())
{
action = etcdv3::DELETE_ACTION;
// just store the first occurence of the key in values.
// this is done so tas client will not need to change their behaviour.
// break immediately
mapValue.emplace(kv.key(), kv);
break;
}
if(event.has_prev_kv())
{
auto kv = event.prev_kv();
std::cout << "previous value of key: " << kv.key() << " is " << kv.value() << std::endl;
mapPrevValue.emplace(kv.key(),kv);
}
}
for(auto x: mapPrevValue)
{
prev_values.push_back(x.second);
}
for(auto x: mapValue)
{