Use prev_kv

This commit is contained in:
arches 2016-07-07 05:39:52 -04:00
parent 1e046d87a0
commit 48d95e8569
6 changed files with 39 additions and 38 deletions

View File

@ -45,7 +45,9 @@ etcdv3::AsyncTxnResponse etcdv3::AsyncCompareAndSwapAction::ParseResponse()
txn_resp.ParseResponse(); txn_resp.ParseResponse();
txn_resp.action = etcdv3::COMPARESWAP_ACTION; txn_resp.action = etcdv3::COMPARESWAP_ACTION;
if(!reply.succeeded()) //if there is an error code returned by parseResponse, we must
//not overwrite it.
if(!reply.succeeded() && !txn_resp.error_code)
{ {
txn_resp.error_code=101; txn_resp.error_code=101;
txn_resp.error_message="Compare failed"; txn_resp.error_message="Compare failed";

View File

@ -15,19 +15,20 @@ etcdv3::AsyncSetAction::AsyncSetAction(etcdv3::ActionParameters param, bool crea
{ {
etcdv3::Transaction transaction(parameters.key); etcdv3::Transaction transaction(parameters.key);
isCreate = create; isCreate = create;
transaction.init_compare(Compare::CompareResult::Compare_CompareResult_EQUAL,
Compare::CompareTarget::Compare_CompareTarget_VERSION);
transaction.setup_basic_create_sequence(parameters.key, parameters.value);
if(isCreate) if(isCreate)
{ {
transaction.init_compare(Compare::CompareResult::Compare_CompareResult_EQUAL,
Compare::CompareTarget::Compare_CompareTarget_VERSION);
transaction.setup_basic_failure_operation(parameters.key); transaction.setup_basic_failure_operation(parameters.key);
transaction.setup_basic_create_sequence(parameters.key, parameters.value); //transaction.setup_basic_create_sequence(parameters.key, parameters.value);
} }
else else
{ {
transaction.init_compare(Compare::CompareResult::Compare_CompareResult_EQUAL,
Compare::CompareTarget::Compare_CompareTarget_VERSION);
transaction.setup_set_failure_operation(parameters.key, parameters.value); transaction.setup_set_failure_operation(parameters.key, parameters.value);
transaction.setup_basic_create_sequence(parameters.key, parameters.value); //transaction.setup_basic_create_sequence(parameters.key, parameters.value);
} }
response_reader = parameters.kv_stub->AsyncTxn(&context, transaction.txn_request, &cq_); response_reader = parameters.kv_stub->AsyncTxn(&context, transaction.txn_request, &cq_);
response_reader->Finish(&reply, &status, (void*)this); response_reader->Finish(&reply, &status, (void*)this);

View File

@ -45,19 +45,21 @@ void etcdv3::AsyncTxnResponse::ParseResponse()
error_code = response.error_code; error_code = response.error_code;
error_message = response.error_message; error_message = response.error_message;
if(!response.values.empty()) if(!response.values.empty())
{ {
prev_range_kvs=range_kvs; values.insert(values.end(), response.values.begin(),response.values.end());
range_kvs = response.values;
} }
} }
else if(ResponseOp::ResponseCase::kResponseDeleteRange == resp.response_case()) else if(ResponseOp::ResponseCase::kResponsePut == resp.response_case())
{ {
std::cout << "number of deleted keys: " << resp.response_delete_range().deleted() <<std::endl; auto put_resp = resp.response_put();
if(put_resp.has_prev_kv())
{
prev_values.push_back(put_resp.prev_kv());
}
} }
} }
prev_values = prev_range_kvs;
values = range_kvs;
} }

View File

@ -17,7 +17,6 @@ etcdv3::AsyncUpdateAction::AsyncUpdateAction(etcdv3::ActionParameters param)
transaction.init_compare(Compare::CompareResult::Compare_CompareResult_GREATER, transaction.init_compare(Compare::CompareResult::Compare_CompareResult_GREATER,
Compare::CompareTarget::Compare_CompareTarget_VERSION); Compare::CompareTarget::Compare_CompareTarget_VERSION);
transaction.setup_basic_failure_operation(parameters.key);
transaction.setup_compare_and_swap_sequence(parameters.value); transaction.setup_compare_and_swap_sequence(parameters.value);
response_reader = parameters.kv_stub->AsyncTxn(&context, transaction.txn_request, &cq_); response_reader = parameters.kv_stub->AsyncTxn(&context, transaction.txn_request, &cq_);
@ -35,8 +34,17 @@ etcdv3::AsyncTxnResponse etcdv3::AsyncUpdateAction::ParseResponse()
} }
else else
{ {
txn_resp.ParseResponse(); if(reply.succeeded())
txn_resp.action = etcdv3::UPDATE_ACTION; {
txn_resp.ParseResponse();
txn_resp.action = etcdv3::UPDATE_ACTION;
}
else
{
txn_resp.error_code = 100;
txn_resp.error_message = "Key not found";
}
} }
return txn_resp; return txn_resp;
} }

View File

@ -36,7 +36,6 @@ void etcdv3::AsyncWatchResponse::ParseResponse()
index = reply.header().revision(); index = reply.header().revision();
std::map<std::string, mvccpb::KeyValue> mapValue; std::map<std::string, mvccpb::KeyValue> mapValue;
std::map<std::string, mvccpb::KeyValue> mapPrevValue; std::map<std::string, mvccpb::KeyValue> mapPrevValue;
std::cout << "events size: " << reply.events_size() <<std::endl;
for(int cnt =0; cnt < reply.events_size(); cnt++) for(int cnt =0; cnt < reply.events_size(); cnt++)
{ {
auto event = reply.events(cnt); auto event = reply.events(cnt);
@ -54,8 +53,7 @@ void etcdv3::AsyncWatchResponse::ParseResponse()
// just store the first occurence of the key in values. // just store the first occurence of the key in values.
// this is done so tas client will not need to change their behaviour. // this is done so tas client will not need to change their behaviour.
// and then break immediately // and then break immediately
mapValue.emplace(kv.key(), kv); mapValue.emplace(kv.key(), kv);
break;
} }
else if(mvccpb::Event::EventType::Event_EventType_DELETE == event.type()) else if(mvccpb::Event::EventType::Event_EventType_DELETE == event.type())
@ -65,16 +63,15 @@ void etcdv3::AsyncWatchResponse::ParseResponse()
// this is done so tas client will not need to change their behaviour. // this is done so tas client will not need to change their behaviour.
// break immediately // break immediately
mapValue.emplace(kv.key(), kv); mapValue.emplace(kv.key(), kv);
break;
} }
if(event.has_prev_kv()) if(event.has_prev_kv())
{ {
auto kv = event.prev_kv(); auto kv = event.prev_kv();
std::cout << "previous value of key: " << kv.key() << " is " << kv.value() << std::endl;
mapPrevValue.emplace(kv.key(),kv); mapPrevValue.emplace(kv.key(),kv);
} }
break;
} }
for(auto x: mapPrevValue) for(auto x: mapPrevValue)

View File

@ -53,18 +53,14 @@ void etcdv3::Transaction::setup_basic_failure_operation(std::string const& key)
* get key on failure, get key before put, modify and then get updated key * get key on failure, get key before put, modify and then get updated key
*/ */
void etcdv3::Transaction::setup_set_failure_operation(std::string const &key, std::string const &value) { void etcdv3::Transaction::setup_set_failure_operation(std::string const &key, std::string const &value) {
std::unique_ptr<RangeRequest> get_request(new RangeRequest());
get_request->set_key(key);
RequestOp* req_failure = txn_request.add_failure();
req_failure->set_allocated_request_range(get_request.release());
std::unique_ptr<PutRequest> put_request(new PutRequest()); std::unique_ptr<PutRequest> put_request(new PutRequest());
put_request->set_key(key); put_request->set_key(key);
put_request->set_value(value); put_request->set_value(value);
req_failure = txn_request.add_failure(); put_request->set_prev_kv(true);
RequestOp* req_failure = txn_request.add_failure();
req_failure->set_allocated_request_put(put_request.release()); req_failure->set_allocated_request_put(put_request.release());
get_request.reset(new RangeRequest()); std::unique_ptr<RangeRequest> get_request(new RangeRequest());
get_request->set_key(key); get_request->set_key(key);
req_failure = txn_request.add_failure(); req_failure = txn_request.add_failure();
req_failure->set_allocated_request_range(get_request.release()); req_failure->set_allocated_request_range(get_request.release());
@ -117,19 +113,14 @@ void etcdv3::Transaction::setup_basic_create_sequence(std::string const& key, st
* get key value then modify and get new value * get key value then modify and get new value
*/ */
void etcdv3::Transaction::setup_compare_and_swap_sequence(std::string const& value) { void etcdv3::Transaction::setup_compare_and_swap_sequence(std::string const& value) {
std::unique_ptr<RangeRequest> get_request(new RangeRequest());
get_request.reset(new RangeRequest());
get_request->set_key(key);
RequestOp* req_success = txn_request.add_success();
req_success->set_allocated_request_range(get_request.release());
std::unique_ptr<PutRequest> put_request(new PutRequest()); std::unique_ptr<PutRequest> put_request(new PutRequest());
put_request->set_key(key); put_request->set_key(key);
put_request->set_value(value); put_request->set_value(value);
req_success = txn_request.add_success(); put_request->set_prev_kv(true);
RequestOp* req_success = txn_request.add_success();
req_success->set_allocated_request_put(put_request.release()); req_success->set_allocated_request_put(put_request.release());
get_request.reset(new RangeRequest()); std::unique_ptr<RangeRequest> get_request(new RangeRequest());
get_request->set_key(key); get_request->set_key(key);
req_success = txn_request.add_success(); req_success = txn_request.add_success();
req_success->set_allocated_request_range(get_request.release()); req_success->set_allocated_request_range(get_request.release());