Complete implementation for modify with index and delete with index

functionalities. AsyncModifyResponse is needed because handling of
indeces are different between create and mod and delete, as per
requirements.

remaining TODOs:
1) Watch functionality
This commit is contained in:
lampayan 2016-06-09 16:57:25 +02:00
parent 6fb775218d
commit c28d955b22
9 changed files with 147 additions and 18 deletions

View File

@ -168,7 +168,7 @@ private:
pplx::task<Response> removeEntryWithKey(std::string const &entryKey); pplx::task<Response> removeEntryWithKey(std::string const &entryKey);
pplx::task<Response> removeEntryWithKeyAndValue(std::string const &entryKey, std::string const &oldValue); pplx::task<Response> removeEntryWithKeyAndValue(std::string const &entryKey, std::string const &oldValue);
pplx::task<Response> removeEntryWithKeyAndIndex(std::string const &entryKey, int oldIndex); pplx::task<Response> removeEntryWithKeyAndIndex(std::string const &entryKey, int oldIndex);
pplx::task<Response> modifyEntryWithValueAndOldIndex(std::string const & key, std::string const & value, int old_index);
}; };

View File

@ -45,7 +45,6 @@ namespace etcd
{ {
auto v3resp = call->ParseResponse(); auto v3resp = call->ParseResponse();
resp = etcd::Response(v3resp); resp = etcd::Response(v3resp);
resp._index = call->reply.header().revision();
} }
else else
{ {

View File

@ -1,4 +1,4 @@
add_library(etcd-cpp-api SHARED ../proto/kv.pb.cc ../proto/auth.pb.cc ../proto/rpc.pb.cc ../proto/rpc.grpc.pb.cc ../v3/src/Utils.cpp ../v3/src/grpcClient.cpp ../v3/src/AsyncRangeResponse.cpp ../v3/src/AsyncPutResponse.cpp ../v3/src/V3BaseResponse.cpp ../v3/src/AsyncDelResponse.cpp Client.cpp Response.cpp Value.cpp json_constants.cpp BaseResponse.cpp AsyncDeleteResponse.cpp) add_library(etcd-cpp-api SHARED ../proto/kv.pb.cc ../proto/auth.pb.cc ../proto/rpc.pb.cc ../proto/rpc.grpc.pb.cc ../v3/src/Utils.cpp ../v3/src/grpcClient.cpp ../v3/src/AsyncRangeResponse.cpp ../v3/src/AsyncPutResponse.cpp ../v3/src/V3BaseResponse.cpp ../v3/src/AsyncDelResponse.cpp ../v3/src/AsyncModifyResponse.cpp Client.cpp Response.cpp Value.cpp json_constants.cpp BaseResponse.cpp AsyncDeleteResponse.cpp)
set_property(TARGET etcd-cpp-api PROPERTY CXX_STANDARD 11) set_property(TARGET etcd-cpp-api PROPERTY CXX_STANDARD 11)
target_link_libraries(etcd-cpp-api ${CPPREST_LIB} boost_system ssl crypto protobuf grpc++) target_link_libraries(etcd-cpp-api ${CPPREST_LIB} boost_system ssl crypto protobuf grpc++)

View File

@ -2,6 +2,7 @@
#include "v3/include/AsyncRangeResponse.hpp" #include "v3/include/AsyncRangeResponse.hpp"
#include "v3/include/AsyncPutResponse.hpp" #include "v3/include/AsyncPutResponse.hpp"
#include "v3/include/AsyncDelResponse.hpp" #include "v3/include/AsyncDelResponse.hpp"
#include "v3/include/AsyncModifyResponse.hpp"
#include "v3/include/Utils.hpp" #include "v3/include/Utils.hpp"
#include <iostream> #include <iostream>
@ -54,11 +55,44 @@ pplx::task<etcd::Response> etcd::Client::modify_if(std::string const & key, std:
return send_asyncmodify_if(key, value, old_value); return send_asyncmodify_if(key, value, old_value);
} }
pplx::task<etcd::Response> etcd::Client::modifyEntryWithValueAndOldIndex(std::string const & key, std::string const & value, int old_index) {
etcdv3::AsyncRangeResponse* resp = etcdv3::Utils::getKey(key, grpcClient);
if(!resp->reply.kvs_size())
{
resp->error_code=100;
resp->error_message="Key not found";
return Response::createResponse(*resp);
}
else if(resp->reply.kvs(0).mod_revision() != old_index)
{
resp->error_code = 101;
resp->error_message = "Compare failed";
return Response::createResponse(*resp);
}
PutRequest put_request;
put_request.set_key(key);
put_request.set_value(value);
etcdv3::AsyncModifyResponse* call= new etcdv3::AsyncModifyResponse("compareAndSwap");
//below 2 lines can be removed once we are able to use Txn
call->prev_value = resp->reply.kvs(0);
call->client = &grpcClient;
call->key = key;
call->rpcInstance = grpcClient.stub_->AsyncPut(&call->context,put_request,&call->cq_);
call->rpcInstance->Finish(&call->putResponse, &call->status, (void*)call);
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::modify_if(std::string const & key, std::string const & value, int old_index) pplx::task<etcd::Response> etcd::Client::modify_if(std::string const & key, std::string const & value, int old_index)
{ {
web::http::uri_builder uri("/v2/keys" + key); return modifyEntryWithValueAndOldIndex(key, value, old_index);
uri.append_query("prevIndex", old_index);
return send_put_request(uri, "value", value);
} }
//note: this one seems to not need the parseResponse() method //note: this one seems to not need the parseResponse() method
@ -133,7 +167,6 @@ pplx::task<etcd::Response> etcd::Client::rm_if(std::string const & key, std::str
pplx::task<etcd::Response> etcd::Client::removeEntryWithKeyAndIndex(std::string const &entryKey, int oldIndex) { pplx::task<etcd::Response> etcd::Client::removeEntryWithKeyAndIndex(std::string const &entryKey, int oldIndex) {
etcdv3::AsyncRangeResponse *searchResult = etcdv3::Utils::getKey(entryKey, grpcClient); etcdv3::AsyncRangeResponse *searchResult = etcdv3::Utils::getKey(entryKey, grpcClient);
std::cout << "found revision in item is: " << searchResult->reply.kvs(0).create_revision() << std::endl;
if(!searchResult->reply.kvs_size()) { if(!searchResult->reply.kvs_size()) {
searchResult->error_code = 100; searchResult->error_code = 100;
@ -262,16 +295,16 @@ pplx::task<etcd::Response> etcd::Client::send_asyncmodify_if(std::string const &
put_request.set_key(key); put_request.set_key(key);
put_request.set_value(value); put_request.set_value(value);
etcdv3::AsyncPutResponse* call= new etcdv3::AsyncPutResponse("compareAndSwap"); etcdv3::AsyncModifyResponse* call= new etcdv3::AsyncModifyResponse("compareAndSwap");
//below 2 lines can be removed once we are able to use Txn //below 2 lines can be removed once we are able to use Txn
call->prev_value = resp->reply.kvs(0); call->prev_value = resp->reply.kvs(0);
call->client = &grpcClient; call->client = &grpcClient;
call->key = key; call->key = key;
call->response_reader = grpcClient.stub_->AsyncPut(&call->context,put_request,&call->cq_); call->rpcInstance = grpcClient.stub_->AsyncPut(&call->context,put_request,&call->cq_);
call->response_reader->Finish(&call->reply, &call->status, (void*)call); call->rpcInstance->Finish(&call->putResponse, &call->status, (void*)call);
return Response::create(call); return Response::create(call);
@ -293,16 +326,16 @@ pplx::task<etcd::Response> etcd::Client::send_asyncmodify(std::string const & ke
put_request.set_key(key); put_request.set_key(key);
put_request.set_value(value); put_request.set_value(value);
etcdv3::AsyncPutResponse* call= new etcdv3::AsyncPutResponse("update"); etcdv3::AsyncModifyResponse* call= new etcdv3::AsyncModifyResponse("update");
//below 2 lines can be removed once we are able to use Txn //below 2 lines can be removed once we are able to use Txn
call->prev_value = resp->reply.kvs(0); call->prev_value = resp->reply.kvs(0);
call->client = &grpcClient; call->client = &grpcClient;
call->key = key; call->key = key;
call->response_reader = grpcClient.stub_->AsyncPut(&call->context,put_request,&call->cq_); call->rpcInstance = grpcClient.stub_->AsyncPut(&call->context,put_request,&call->cq_);
call->response_reader->Finish(&call->reply, &call->status, (void*)call); call->rpcInstance->Finish(&call->putResponse, &call->status, (void*)call);
return Response::create(call); return Response::create(call);

View File

@ -135,8 +135,6 @@ TEST_CASE("atomic compare-and-delete based on prevIndex")
CHECK("42" == res.prev_value().as_string()); CHECK("42" == res.prev_value().as_string());
} }
#if 0
TEST_CASE("deep atomic compare-and-swap") TEST_CASE("deep atomic compare-and-swap")
{ {
etcd::Client etcd("http://127.0.0.1:4001"); etcd::Client etcd("http://127.0.0.1:4001");
@ -145,7 +143,6 @@ TEST_CASE("deep atomic compare-and-swap")
// modify success // modify success
etcd::Response res = etcd.modify_if("/test/key1", "43", "42").get(); etcd::Response res = etcd.modify_if("/test/key1", "43", "42").get();
int index = res.index(); int index = res.index();
std::cout << "index to use: " << index << std::endl;
REQUIRE(res.is_ok()); REQUIRE(res.is_ok());
CHECK("compareAndSwap" == res.action()); CHECK("compareAndSwap" == res.action());
CHECK("43" == res.value().as_string()); CHECK("43" == res.value().as_string());
@ -169,6 +166,8 @@ TEST_CASE("deep atomic compare-and-swap")
CHECK("Compare failed" == res.error_message()); CHECK("Compare failed" == res.error_message());
} }
#if 0
TEST_CASE("create a directory") TEST_CASE("create a directory")
{ {
etcd::Client etcd("http://127.0.0.1:4001"); etcd::Client etcd("http://127.0.0.1:4001");

View File

@ -0,0 +1,33 @@
/*
* AsyncModifyResponse.h
*
* Created on: Jun 9, 2016
* Author: ubuntu
*/
#ifndef V3_SRC_ASYNCMODIFYRESPONSE_HPP_
#define V3_SRC_ASYNCMODIFYRESPONSE_HPP_
#include "v3/include/V3Response.hpp"
#include "v3/include/V3BaseResponse.hpp"
#include "v3/include/grpcClient.hpp"
namespace etcdv3 {
class AsyncModifyResponse : public etcdv3::V3Response, public etcdv3::V3BaseResponse {
public:
AsyncModifyResponse(){action="compareAndSwap";};
AsyncModifyResponse(std::string const &);
AsyncModifyResponse(const AsyncModifyResponse&);
AsyncModifyResponse& operator=(const AsyncModifyResponse&);
virtual ~AsyncModifyResponse();
etcdserverpb::PutResponse putResponse;
std::unique_ptr<grpc::ClientAsyncResponseReader<etcdserverpb::PutResponse>> rpcInstance;
AsyncModifyResponse& ParseResponse();
etcdv3::grpcClient* client;
std::string key;
};
} /* namespace etcdv3 */
#endif /* V3_SRC_ASYNCMODIFYRESPONSE_HPP_ */

View File

@ -0,0 +1,60 @@
/*
* AsyncModifyResponse.cpp
*
* Created on: Jun 9, 2016
* Author: ubuntu
*/
#include "v3/include/AsyncModifyResponse.hpp"
#include "v3/include/Utils.hpp"
namespace etcdv3 {
etcdv3::AsyncModifyResponse::AsyncModifyResponse(const etcdv3::AsyncModifyResponse& other) {
error_code = other.error_code;
error_message = other.error_message;
index = other.index;
action = other.action;
values = other.values;
prev_value.set_key(other.prev_value.key());
prev_value.set_value(other.prev_value.value());
prev_value.set_create_revision(other.prev_value.create_revision());
prev_value.set_mod_revision(other.prev_value.mod_revision());
}
etcdv3::AsyncModifyResponse::AsyncModifyResponse(const std::string &input) {
action = input;
}
etcdv3::AsyncModifyResponse& etcdv3::AsyncModifyResponse::operator=(const etcdv3::AsyncModifyResponse& other) {
error_code = other.error_code;
error_message = other.error_message;
index = other.index;
action = other.action;
values = other.values;
prev_value.set_key(other.prev_value.key());
prev_value.set_value(other.prev_value.value());
prev_value.set_create_revision(other.prev_value.create_revision());
prev_value.set_mod_revision(other.prev_value.mod_revision());
return *this;
}
AsyncModifyResponse::~AsyncModifyResponse() {
// TODO Auto-generated destructor stub
}
etcdv3::AsyncModifyResponse& etcdv3::AsyncModifyResponse::ParseResponse() {
etcdv3::AsyncRangeResponse* response = etcdv3::Utils::getKey(key, *client);
if(response->reply.kvs_size())
{
values.push_back(response->reply.kvs(0));
index = response->reply.kvs(0).mod_revision();
}
else{
index = response->reply.header().revision();
}
return *this;
}
} /* namespace etcdv3 */

View File

@ -1,6 +1,8 @@
#include "v3/include/AsyncPutResponse.hpp" #include "v3/include/AsyncPutResponse.hpp"
#include "v3/include/Utils.hpp" #include "v3/include/Utils.hpp"
#include <iostream>
using etcdserverpb::PutRequest; using etcdserverpb::PutRequest;
using etcdserverpb::PutRequest; using etcdserverpb::PutRequest;
@ -38,7 +40,10 @@ etcdv3::AsyncPutResponse& etcdv3::AsyncPutResponse::ParseResponse()
if(resp->reply.kvs_size()) if(resp->reply.kvs_size())
{ {
values.push_back(resp->reply.kvs(0)); values.push_back(resp->reply.kvs(0));
index = resp->reply.kvs(0).create_revision();
} }
else
index = resp->reply.header().revision();
return *this; return *this;
} }

View File

@ -42,7 +42,7 @@ etcdv3::AsyncRangeResponse& etcdv3::AsyncRangeResponse::ParseResponse()
error_code=100; error_code=100;
error_message="Key not found"; error_message="Key not found";
} }
index = reply.header().revision();
return *this; return *this;
} }