Implemented TTL and Lease

This commit is contained in:
arches 2016-07-12 11:57:28 -04:00
parent 658eaf7190
commit 3c52636028
25 changed files with 529 additions and 61 deletions

View File

@ -13,6 +13,7 @@
using etcdserverpb::KV; using etcdserverpb::KV;
using etcdserverpb::Watch; using etcdserverpb::Watch;
using etcdserverpb::Lease;
namespace etcd namespace etcd
{ {
@ -41,21 +42,47 @@ namespace etcd
* @param key is the key to be created or modified * @param key is the key to be created or modified
* @param value is the new value to be set * @param value is the new value to be set
*/ */
pplx::task<Response> set(std::string const & key, std::string const & value); pplx::task<Response> set(std::string const & key, std::string const & value, int ttl = 0);
/**
* Sets the value of a key. The key will be modified if already exists or created
* if it does not exists.
* @param key is the key to be created or modified
* @param value is the new value to be set
* @param leaseId is the lease attached to the key
*/
pplx::task<Response> set(std::string const & key, std::string const & value, int64_t leaseId);
/** /**
* Creates a new key and sets it's value. Fails if the key already exists. * Creates a new key and sets it's value. Fails if the key already exists.
* @param key is the key to be created * @param key is the key to be created
* @param value is the value to be set * @param value is the value to be set
*/ */
pplx::task<Response> add(std::string const & key, std::string const & value); pplx::task<Response> add(std::string const & key, std::string const & value, int ttl = 0);
/**
* Creates a new key and sets it's value. Fails if the key already exists.
* @param key is the key to be created
* @param value is the value to be set
* @param leaseId is the lease attached to the key
*/
pplx::task<Response> add(std::string const & key, std::string const & value, int64_t leaseId);
/** /**
* Modifies an existing key. Fails if the key does not exists. * Modifies an existing key. Fails if the key does not exists.
* @param key is the key to be modified * @param key is the key to be modified
* @param value is the new value to be set * @param value is the new value to be set
*/ */
pplx::task<Response> modify(std::string const & key, std::string const & value); pplx::task<Response> modify(std::string const & key, std::string const & value, int ttl = 0);
/**
* Modifies an existing key. Fails if the key does not exists.
* @param key is the key to be modified
* @param value is the new value to be set
* @param leaseId is the lease attached to the key
*/
pplx::task<Response> modify(std::string const & key, std::string const & value, int64_t leaseId);
/** /**
* Modifies an existing key only if it has a specific value. Fails if the key does not exists * Modifies an existing key only if it has a specific value. Fails if the key does not exists
@ -64,7 +91,17 @@ namespace etcd
* @param value is the new value to be set * @param value is the new value to be set
* @param old_value is the value to be replaced * @param old_value is the value to be replaced
*/ */
pplx::task<Response> modify_if(std::string const & key, std::string const & value, std::string const & old_value); pplx::task<Response> modify_if(std::string const & key, std::string const & value, std::string const & old_value, int ttl = 0);
/**
* Modifies an existing key only if it has a specific value. Fails if the key does not exists
* or the original value differs from the expected one.
* @param key is the key to be modified
* @param value is the new value to be set
* @param old_value is the value to be replaced
* @param leaseId is the lease attached to the key
*/
pplx::task<Response> modify_if(std::string const & key, std::string const & value, std::string const & old_value, int64_t leaseId);
/** /**
* Modifies an existing key only if it has a specific modification index value. Fails if the key * Modifies an existing key only if it has a specific modification index value. Fails if the key
@ -73,7 +110,17 @@ namespace etcd
* @param value is the new value to be set * @param value is the new value to be set
* @param old_index is the expected index of the original value * @param old_index is the expected index of the original value
*/ */
pplx::task<Response> modify_if(std::string const & key, std::string const & value, int old_index); pplx::task<Response> modify_if(std::string const & key, std::string const & value, int old_index, int ttl = 0);
/**
* Modifies an existing key only if it has a specific modification index value. Fails if the key
* does not exists or the modification index of the previous value differs from the expected one.
* @param key is the key to be modified
* @param value is the new value to be set
* @param old_index is the expected index of the original value
* @param leaseId is the lease attached to the key
*/
pplx::task<Response> modify_if(std::string const & key, std::string const & value, int old_index, int64_t leaseId);
/** /**
* Removes a single key. The key has to point to a plain, non directory entry. * Removes a single key. The key has to point to a plain, non directory entry.
@ -128,10 +175,17 @@ namespace etcd
*/ */
pplx::task<Response> watch(std::string const & key, int fromIndex, bool recursive = false); pplx::task<Response> watch(std::string const & key, int fromIndex, bool recursive = false);
protected: /**
* Grants a lease.
* @param ttl is the time to live of the lease
*/
pplx::task<Response> leasegrant(int ttl);
private:
std::unique_ptr<KV::Stub> stub_; std::unique_ptr<KV::Stub> stub_;
std::unique_ptr<Watch::Stub> watchServiceStub; std::unique_ptr<Watch::Stub> watchServiceStub;
std::unique_ptr<Lease::Stub> leaseServiceStub;
}; };

View File

@ -112,6 +112,7 @@ namespace etcd
Keys _keys; Keys _keys;
friend class SyncClient; friend class SyncClient;
friend class etcdv3::AsyncWatchAction; friend class etcdv3::AsyncWatchAction;
friend class Client;
}; };
} }

View File

@ -4,7 +4,7 @@
#include <cpprest/http_client.h> #include <cpprest/http_client.h>
#include <string> #include <string>
#include <vector> #include <vector>
#include "proto/kv.pb.h" #include "v3/include/KeyValue.hpp"
namespace etcd namespace etcd
{ {
@ -40,6 +40,13 @@ namespace etcd
*/ */
int modified_index() const; int modified_index() const;
/**
* Returns the ttl of this value or 0 if ttl is not set
*/
int ttl() const;
int64_t lease() const;
protected: protected:
friend class Response; friend class Response;
friend class BaseResponse; //deliberately done since Value class will be removed during full V3 friend class BaseResponse; //deliberately done since Value class will be removed during full V3
@ -47,12 +54,14 @@ namespace etcd
friend class AsyncDeleteResponse; friend class AsyncDeleteResponse;
Value(); Value();
Value(web::json::value const & json_value); Value(web::json::value const & json_value);
Value(mvccpb::KeyValue const & kvs); Value(etcdv3::KeyValue const & kvs);
std::string _key; std::string _key;
bool dir; bool dir;
std::string value; std::string value;
int created; int created;
int modified; int modified;
int _ttl;
int64_t leaseId;
}; };
typedef std::vector<Value> Values; typedef std::vector<Value> Values;

View File

@ -1,4 +1,4 @@
add_library(etcd-cpp-api SHARED ../proto/kv.pb.cc ../proto/auth.pb.cc ../proto/rpc.pb.cc ../proto/rpc.grpc.pb.cc ../v3/src/AsyncTxnResponse.cpp ../v3/src/AsyncRangeResponse.cpp ../v3/src/Transaction.cpp ../v3/src/action_constants.cpp ../v3/src/AsyncSetAction.cpp ../v3/src/AsyncCompareAndSwapAction.cpp ../v3/src/AsyncUpdateAction.cpp ../v3/src/AsyncGetAction.cpp ../v3/src/AsyncDeleteAction.cpp ../v3/src/AsyncCompareAndDeleteAction.cpp ../v3/src/Action.cpp ../v3/src/AsyncWatchAction.cpp ../v3/src/V3Response.cpp ../v3/src/AsyncDeleteRangeResponse.cpp ../v3/src/AsyncWatchResponse.cpp Client.cpp Response.cpp Value.cpp SyncClient.cpp Watcher.cpp) add_library(etcd-cpp-api SHARED ../proto/kv.pb.cc ../proto/auth.pb.cc ../proto/rpc.pb.cc ../proto/rpc.grpc.pb.cc ../v3/src/AsyncTxnResponse.cpp ../v3/src/AsyncRangeResponse.cpp ../v3/src/Transaction.cpp ../v3/src/action_constants.cpp ../v3/src/AsyncSetAction.cpp ../v3/src/AsyncCompareAndSwapAction.cpp ../v3/src/AsyncUpdateAction.cpp ../v3/src/AsyncGetAction.cpp ../v3/src/AsyncDeleteAction.cpp ../v3/src/AsyncCompareAndDeleteAction.cpp ../v3/src/Action.cpp ../v3/src/AsyncWatchAction.cpp ../v3/src/V3Response.cpp ../v3/src/AsyncDeleteRangeResponse.cpp ../v3/src/AsyncWatchResponse.cpp ../v3/src/AsyncLeaseGrantResponse.cpp ../v3/src/AsyncLeaseGrantAction.cpp ../v3/src/KeyValue.cpp Client.cpp Response.cpp Value.cpp SyncClient.cpp Watcher.cpp)
set_property(TARGET etcd-cpp-api PROPERTY CXX_STANDARD 11) set_property(TARGET etcd-cpp-api PROPERTY CXX_STANDARD 11)
target_link_libraries(etcd-cpp-api ${CPPREST_LIB} boost_system ssl crypto protobuf grpc++) target_link_libraries(etcd-cpp-api ${CPPREST_LIB} boost_system ssl crypto protobuf grpc++)

View File

@ -15,14 +15,11 @@
#include "v3/include/AsyncGetAction.hpp" #include "v3/include/AsyncGetAction.hpp"
#include "v3/include/AsyncDeleteAction.hpp" #include "v3/include/AsyncDeleteAction.hpp"
#include "v3/include/AsyncWatchAction.hpp" #include "v3/include/AsyncWatchAction.hpp"
#include "v3/include/AsyncLeaseGrantAction.hpp"
using grpc::Channel; using grpc::Channel;
using etcdserverpb::PutRequest;
using etcdserverpb::RangeRequest;
using etcdserverpb::TxnRequest;
using etcdserverpb::DeleteRangeRequest;
using etcdserverpb::Compare;
using etcdserverpb::RequestOp;
etcd::Client::Client(std::string const & address) etcd::Client::Client(std::string const & address)
@ -37,6 +34,7 @@ etcd::Client::Client(std::string const & address)
std::shared_ptr<Channel> channel = grpc::CreateChannel(stripped_address, grpc::InsecureChannelCredentials()); std::shared_ptr<Channel> channel = grpc::CreateChannel(stripped_address, grpc::InsecureChannelCredentials());
stub_= KV::NewStub(channel); stub_= KV::NewStub(channel);
watchServiceStub= Watch::NewStub(channel); watchServiceStub= Watch::NewStub(channel);
leaseServiceStub= Lease::NewStub(channel);
} }
@ -50,57 +48,197 @@ pplx::task<etcd::Response> etcd::Client::get(std::string const & key)
return Response::create(call); return Response::create(call);
} }
pplx::task<etcd::Response> etcd::Client::set(std::string const & key, std::string const & value) pplx::task<etcd::Response> etcd::Client::set(std::string const & key, std::string const & value, int ttl)
{ {
etcdv3::ActionParameters params; etcdv3::ActionParameters params;
params.key.assign(key); params.key.assign(key);
params.value.assign(value); params.value.assign(value);
params.kv_stub = stub_.get(); params.kv_stub = stub_.get();
std::shared_ptr<etcdv3::AsyncSetAction> call(new etcdv3::AsyncSetAction(params));
return Response::create(call);; if(ttl > 0)
{
auto res = leasegrant(ttl).get();
if(!res.is_ok())
{
return pplx::task<etcd::Response>([res]()
{
return etcd::Response(res.error_code(), res.error_message().c_str());
});
}
else
{
params.lease_id = res.value().lease();
}
} }
pplx::task<etcd::Response> etcd::Client::add(std::string const & key, std::string const & value) std::shared_ptr<etcdv3::AsyncSetAction> call(new etcdv3::AsyncSetAction(params));
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::set(std::string const & key, std::string const & value, int64_t leaseid)
{ {
etcdv3::ActionParameters params; etcdv3::ActionParameters params;
params.key.assign(key); params.key.assign(key);
params.value.assign(value); params.value.assign(value);
params.lease_id = leaseid;
params.kv_stub = stub_.get();
std::shared_ptr<etcdv3::AsyncSetAction> call(new etcdv3::AsyncSetAction(params));
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::add(std::string const & key, std::string const & value, int ttl)
{
etcdv3::ActionParameters params;
params.key.assign(key);
params.value.assign(value);
params.kv_stub = stub_.get();
if(ttl > 0)
{
auto res = leasegrant(ttl).get();
if(!res.is_ok())
{
return pplx::task<etcd::Response>([res]()
{
return etcd::Response(res.error_code(), res.error_message().c_str());
});
}
else
{
params.lease_id = res.value().lease();
}
}
std::shared_ptr<etcdv3::AsyncSetAction> call(new etcdv3::AsyncSetAction(params,true));
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::add(std::string const & key, std::string const & value, int64_t leaseid)
{
etcdv3::ActionParameters params;
params.key.assign(key);
params.value.assign(value);
params.lease_id = leaseid;
params.kv_stub = stub_.get(); params.kv_stub = stub_.get();
std::shared_ptr<etcdv3::AsyncSetAction> call(new etcdv3::AsyncSetAction(params,true)); std::shared_ptr<etcdv3::AsyncSetAction> call(new etcdv3::AsyncSetAction(params,true));
return Response::create(call); return Response::create(call);
} }
pplx::task<etcd::Response> etcd::Client::modify(std::string const & key, std::string const & value)
pplx::task<etcd::Response> etcd::Client::modify(std::string const & key, std::string const & value, int ttl)
{ {
etcdv3::ActionParameters params; etcdv3::ActionParameters params;
params.key.assign(key); params.key.assign(key);
params.value.assign(value); params.value.assign(value);
params.kv_stub = stub_.get(); params.kv_stub = stub_.get();
if(ttl > 0)
{
auto res = leasegrant(ttl).get();
if(!res.is_ok())
{
return pplx::task<etcd::Response>([res]()
{
return etcd::Response(res.error_code(), res.error_message().c_str());
});
}
else
{
params.lease_id = res.value().lease();
}
}
std::shared_ptr<etcdv3::AsyncUpdateAction> call(new etcdv3::AsyncUpdateAction(params));
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::modify(std::string const & key, std::string const & value, int64_t leaseid)
{
etcdv3::ActionParameters params;
params.key.assign(key);
params.value.assign(value);
params.lease_id = leaseid;
params.kv_stub = stub_.get();
std::shared_ptr<etcdv3::AsyncUpdateAction> call(new etcdv3::AsyncUpdateAction(params)); std::shared_ptr<etcdv3::AsyncUpdateAction> call(new etcdv3::AsyncUpdateAction(params));
return Response::create(call); return Response::create(call);
} }
pplx::task<etcd::Response> etcd::Client::modify_if(std::string const & key, std::string const & value, std::string const & old_value) pplx::task<etcd::Response> etcd::Client::modify_if(std::string const & key, std::string const & value, std::string const & old_value, int ttl)
{ {
etcdv3::ActionParameters params; etcdv3::ActionParameters params;
params.key.assign(key); params.key.assign(key);
params.value.assign(value); params.value.assign(value);
params.old_value.assign(old_value); params.old_value.assign(old_value);
params.kv_stub = stub_.get(); params.kv_stub = stub_.get();
if(ttl > 0)
{
auto res = leasegrant(ttl).get();
if(!res.is_ok())
{
return pplx::task<etcd::Response>([res]()
{
return etcd::Response(res.error_code(), res.error_message().c_str());
});
}
else
{
params.lease_id = res.value().lease();
}
}
std::shared_ptr<etcdv3::AsyncCompareAndSwapAction> call(new etcdv3::AsyncCompareAndSwapAction(params,etcdv3::Atomicity_Type::PREV_VALUE));
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::modify_if(std::string const & key, std::string const & value, std::string const & old_value, int64_t leaseid)
{
etcdv3::ActionParameters params;
params.key.assign(key);
params.value.assign(value);
params.old_value.assign(old_value);
params.lease_id = leaseid;
params.kv_stub = stub_.get();
std::shared_ptr<etcdv3::AsyncCompareAndSwapAction> call(new etcdv3::AsyncCompareAndSwapAction(params,etcdv3::Atomicity_Type::PREV_VALUE)); std::shared_ptr<etcdv3::AsyncCompareAndSwapAction> call(new etcdv3::AsyncCompareAndSwapAction(params,etcdv3::Atomicity_Type::PREV_VALUE));
return Response::create(call); return Response::create(call);
} }
pplx::task<etcd::Response> etcd::Client::modify_if(std::string const & key, std::string const & value, int old_index)
pplx::task<etcd::Response> etcd::Client::modify_if(std::string const & key, std::string const & value, int old_index, int ttl)
{ {
etcdv3::ActionParameters params; etcdv3::ActionParameters params;
params.key.assign(key); params.key.assign(key);
params.value.assign(value); params.value.assign(value);
params.old_revision = old_index; params.old_revision = old_index;
params.kv_stub = stub_.get(); params.kv_stub = stub_.get();
std::shared_ptr<etcdv3::AsyncCompareAndSwapAction> call(new etcdv3::AsyncCompareAndSwapAction(params,etcdv3::Atomicity_Type::PREV_INDEX));; if(ttl > 0)
{
auto res = leasegrant(ttl).get();
if(!res.is_ok())
{
return pplx::task<etcd::Response>([res]()
{
return etcd::Response(res.error_code(), res.error_message().c_str());
});
}
else
{
params.lease_id = res.value().lease();
}
}
std::shared_ptr<etcdv3::AsyncCompareAndSwapAction> call(new etcdv3::AsyncCompareAndSwapAction(params,etcdv3::Atomicity_Type::PREV_INDEX));
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::modify_if(std::string const & key, std::string const & value, int old_index, int64_t leaseid)
{
etcdv3::ActionParameters params;
params.key.assign(key);
params.value.assign(value);
params.lease_id = leaseid;
params.old_revision = old_index;
params.kv_stub = stub_.get();
std::shared_ptr<etcdv3::AsyncCompareAndSwapAction> call(new etcdv3::AsyncCompareAndSwapAction(params,etcdv3::Atomicity_Type::PREV_INDEX));
return Response::create(call); return Response::create(call);
} }
@ -122,7 +260,7 @@ pplx::task<etcd::Response> etcd::Client::rm_if(std::string const & key, std::str
params.key.assign(key); params.key.assign(key);
params.old_value.assign(old_value); params.old_value.assign(old_value);
params.kv_stub = stub_.get(); params.kv_stub = stub_.get();
std::shared_ptr<etcdv3::AsyncCompareAndDeleteAction> call(new etcdv3::AsyncCompareAndDeleteAction(params,etcdv3::Atomicity_Type::PREV_VALUE));; std::shared_ptr<etcdv3::AsyncCompareAndDeleteAction> call(new etcdv3::AsyncCompareAndDeleteAction(params,etcdv3::Atomicity_Type::PREV_VALUE));
return Response::create(call); return Response::create(call);
} }
@ -163,7 +301,6 @@ pplx::task<etcd::Response> etcd::Client::watch(std::string const & key, bool rec
params.key.assign(key); params.key.assign(key);
params.withPrefix = recursive; params.withPrefix = recursive;
params.watch_stub = watchServiceStub.get(); params.watch_stub = watchServiceStub.get();
params.revision = 0;
std::shared_ptr<etcdv3::AsyncWatchAction> call(new etcdv3::AsyncWatchAction(params)); std::shared_ptr<etcdv3::AsyncWatchAction> call(new etcdv3::AsyncWatchAction(params));
return Response::create(call); return Response::create(call);
} }
@ -179,4 +316,14 @@ pplx::task<etcd::Response> etcd::Client::watch(std::string const & key, int from
return Response::create(call); return Response::create(call);
} }
pplx::task<etcd::Response> etcd::Client::leasegrant(int ttl)
{
etcdv3::ActionParameters params;
params.ttl = ttl;
params.lease_stub = leaseServiceStub.get();
std::shared_ptr<etcdv3::AsyncLeaseGrantAction> call(new etcdv3::AsyncLeaseGrantAction(params));
return Response::create(call);
}

View File

@ -1,21 +1,26 @@
#include <iomanip>
#include "etcd/Value.hpp" #include "etcd/Value.hpp"
#include "proto/kv.pb.h" #include "v3/include/KeyValue.hpp"
etcd::Value::Value() etcd::Value::Value()
: dir(false), : dir(false),
created(0), created(0),
modified(0) modified(0),
_ttl(0),
leaseId(0)
{ {
} }
etcd::Value::Value(mvccpb::KeyValue const & kvs) etcd::Value::Value(etcdv3::KeyValue const & kvs)
{ {
dir=false; dir=false;
_key=kvs.key(); _key=kvs.key();
value=kvs.value(); value=kvs.value();
created=kvs.create_revision(); created=kvs.create_revision();
modified=kvs.mod_revision(); modified=kvs.mod_revision();
leaseId = kvs.lease();
_ttl = kvs.get_ttl();
} }
std::string const & etcd::Value::key() const std::string const & etcd::Value::key() const
@ -42,3 +47,15 @@ int etcd::Value::modified_index() const
{ {
return modified; return modified;
} }
int etcd::Value::ttl() const
{
return _ttl;
}
int64_t etcd::Value::lease() const
{
return leaseId;
}

View File

@ -75,6 +75,14 @@ TEST_CASE("set a key")
CHECK("43" == etcd.set("/test/key2", "44").get().prev_value().as_string()); CHECK("43" == etcd.set("/test/key2", "44").get().prev_value().as_string());
CHECK("" == etcd.set("/test/key3", "44").get().prev_value().as_string()); CHECK("" == etcd.set("/test/key3", "44").get().prev_value().as_string());
CHECK(0 == etcd.set("/test", "42").get().error_code()); // Not a file CHECK(0 == etcd.set("/test", "42").get().error_code()); // Not a file
//set with ttl
resp = etcd.set("/test/key1", "50", 10).get();
REQUIRE(0 == resp.error_code()); // overwrite
CHECK("set" == resp.action());
CHECK("43" == resp.prev_value().as_string());
CHECK("50" == resp.value().as_string());
CHECK( 0 < resp.value().lease());
} }
TEST_CASE("atomic compare-and-swap") TEST_CASE("atomic compare-and-swap")
@ -344,6 +352,86 @@ TEST_CASE("watch changes in the past")
CHECK("45" == res.value().as_string()); CHECK("45" == res.value().as_string());
} }
TEST_CASE("lease grant")
{
etcd::Client etcd("http://127.0.0.1:2379");
etcd::Response res = etcd.leasegrant(60).get();
REQUIRE(res.is_ok());
CHECK(60 == res.value().ttl());
CHECK(0 < res.value().lease());
int64_t leaseid = res.value().lease();
res = etcd.set("/test/key1", "43", leaseid).get();
REQUIRE(0 == res.error_code()); // overwrite
CHECK("set" == res.action());
CHECK(leaseid == res.value().lease());
//change leaseid
res = etcd.leasegrant(10).get();
leaseid = res.value().lease();
res = etcd.set("/test/key1", "43", leaseid).get();
REQUIRE(0 == res.error_code()); // overwrite
CHECK("set" == res.action());
CHECK(leaseid == res.value().lease());
//failure to attach lease id
res = etcd.set("/test/key1", "43", leaseid+1).get();
REQUIRE(!res.is_ok());
REQUIRE(5 == res.error_code());
CHECK("etcdserver: requested lease not found" == res.error_message());
res = etcd.modify("/test/key1", "44", leaseid).get();
REQUIRE(0 == res.error_code()); // overwrite
CHECK("update" == res.action());
CHECK(leaseid == res.value().lease());
CHECK("44" == res.value().as_string());
//failure to attach lease id
res = etcd.modify("/test/key1", "45", leaseid+1).get();
REQUIRE(!res.is_ok());
REQUIRE(5 == res.error_code());
CHECK("etcdserver: requested lease not found" == res.error_message());
res = etcd.modify_if("/test/key1", "45", "44", leaseid).get();
int index = res.index();
REQUIRE(res.is_ok());
CHECK("compareAndSwap" == res.action());
CHECK("45" == res.value().as_string());
//failure to attach lease id
res = etcd.modify_if("/test/key1", "46", "45", leaseid+1).get();
REQUIRE(!res.is_ok());
REQUIRE(5 == res.error_code());
CHECK("etcdserver: requested lease not found" == res.error_message());
// succes with the correct index
res = etcd.modify_if("/test/key1", "44", index, leaseid).get();
index = res.index();
REQUIRE(res.is_ok());
CHECK("compareAndSwap" == res.action());
CHECK("44" == res.value().as_string());
res = etcd.modify_if("/test/key1", "44", index, leaseid+1).get();
REQUIRE(!res.is_ok());
REQUIRE(5 == res.error_code());
CHECK("etcdserver: requested lease not found" == res.error_message());
res = etcd.add("/test/key11111", "43", leaseid).get();
REQUIRE(0 == res.error_code());
CHECK("create" == res.action());
CHECK(leaseid == res.value().lease());
//failure to attach lease id
res = etcd.set("/test/key11111", "43", leaseid+1).get();
REQUIRE(!res.is_ok());
REQUIRE(5 == res.error_code());
CHECK("etcdserver: requested lease not found" == res.error_message());
}
TEST_CASE("cleanup") TEST_CASE("cleanup")
{ {

Binary file not shown.

View File

@ -10,6 +10,7 @@ using grpc::Status;
using etcdserverpb::KV; using etcdserverpb::KV;
using etcdserverpb::Watch; using etcdserverpb::Watch;
using etcdserverpb::Lease;
namespace etcdv3 namespace etcdv3
{ {
@ -21,14 +22,18 @@ namespace etcdv3
struct ActionParameters struct ActionParameters
{ {
ActionParameters();
bool withPrefix; bool withPrefix;
int revision; int revision;
int old_revision; int old_revision;
int64_t lease_id;
int ttl;
std::string key; std::string key;
std::string value; std::string value;
std::string old_value; std::string old_value;
KV::Stub* kv_stub; KV::Stub* kv_stub;
Watch::Stub* watch_stub; Watch::Stub* watch_stub;
Lease::Stub* lease_stub;
}; };
class Action class Action

View File

@ -0,0 +1,25 @@
#ifndef __ASYNC_LEASEGRANTACTION_HPP__
#define __ASYNC_LEASEGRANTACTION_HPP__
#include <grpc++/grpc++.h>
#include "proto/rpc.grpc.pb.h"
#include "v3/include/Action.hpp"
#include "v3/include/AsyncLeaseGrantResponse.hpp"
using grpc::ClientAsyncResponseReader;
using etcdserverpb::LeaseGrantResponse;
namespace etcdv3
{
class AsyncLeaseGrantAction : public etcdv3::Action
{
public:
AsyncLeaseGrantAction(etcdv3::ActionParameters param);
AsyncLeaseGrantResponse ParseResponse();
private:
LeaseGrantResponse reply;
std::unique_ptr<ClientAsyncResponseReader<LeaseGrantResponse>> response_reader;
};
}
#endif

View File

@ -0,0 +1,21 @@
#ifndef __ASYNC_LEASEGRANTRESPONSE_HPP__
#define __ASYNC_LEASEGRANTRESPONSE_HPP__
#include <grpc++/grpc++.h>
#include "proto/rpc.grpc.pb.h"
#include "v3/include/V3Response.hpp"
using etcdserverpb::LeaseGrantResponse;
namespace etcdv3
{
class AsyncLeaseGrantResponse : public etcdv3::V3Response
{
public:
AsyncLeaseGrantResponse(){};
void ParseResponse(LeaseGrantResponse& resp);
};
}
#endif

20
v3/include/KeyValue.hpp Normal file
View File

@ -0,0 +1,20 @@
#ifndef __V3_ETCDV3KEYVALUE_HPP__
#define __V3_ETCDV3KEYVALUE_HPP__
#include "proto/kv.pb.h"
namespace etcdv3
{
class KeyValue : public mvccpb::KeyValue
{
public:
KeyValue();
KeyValue(const mvccpb::KeyValue& from);
void set_ttl(int ttl);
int get_ttl() const;
private:
int ttl;
};
}
#endif

View File

@ -18,14 +18,16 @@ public:
void init_compare(int, etcdserverpb::Compare::CompareResult, etcdserverpb::Compare::CompareTarget); void init_compare(int, etcdserverpb::Compare::CompareResult, etcdserverpb::Compare::CompareTarget);
void setup_basic_failure_operation(std::string const &key); void setup_basic_failure_operation(std::string const &key);
void setup_set_failure_operation(std::string const &key, std::string const &value); void setup_set_failure_operation(std::string const &key, std::string const &value, int64_t leaseid);
void setup_basic_create_sequence(std::string const &key, std::string const &value); void setup_basic_create_sequence(std::string const &key, std::string const &value, int64_t leaseid);
void setup_compare_and_swap_sequence(std::string const &valueToSwap); void setup_compare_and_swap_sequence(std::string const &valueToSwap, int64_t leaseid);
void setup_delete_sequence(std::string const &key, std::string const &range_end, bool recursive); void setup_delete_sequence(std::string const &key, std::string const &range_end, bool recursive);
void setup_delete_failure_operation(std::string const &key, std::string const &range_end, bool recursive); void setup_delete_failure_operation(std::string const &key, std::string const &range_end, bool recursive);
void setup_compare_and_delete_operation(std::string const& key); void setup_compare_and_delete_operation(std::string const& key);
void setup_lease_grant_operation(int ttl);
etcdserverpb::TxnRequest txn_request; etcdserverpb::TxnRequest txn_request;
etcdserverpb::LeaseGrantRequest leasegrant_request;
private: private:
std::string key; std::string key;

View File

@ -2,14 +2,14 @@
#define __V3_RESPONSE_HPP__ #define __V3_RESPONSE_HPP__
#include <grpc++/grpc++.h> #include <grpc++/grpc++.h>
#include "proto/kv.pb.h" #include "v3/include/KeyValue.hpp"
namespace etcdv3 namespace etcdv3
{ {
class V3Response class V3Response
{ {
public: public:
V3Response(): error_code(0), index(0), isPrefix(false) {}; V3Response(): error_code(0), index(0){};
void set_error_code(int code); void set_error_code(int code);
int get_error_code() const; int get_error_code() const;
std::string const & get_error_message() const; std::string const & get_error_message() const;
@ -17,21 +17,20 @@ namespace etcdv3
void set_action(std::string action); void set_action(std::string action);
int get_index() const; int get_index() const;
std::string const & get_action() const; std::string const & get_action() const;
std::vector<mvccpb::KeyValue> const & get_values() const; std::vector<etcdv3::KeyValue> const & get_values() const;
std::vector<mvccpb::KeyValue> const & get_prev_values() const; std::vector<etcdv3::KeyValue> const & get_prev_values() const;
mvccpb::KeyValue const & get_value() const; etcdv3::KeyValue const & get_value() const;
mvccpb::KeyValue const & get_prev_value() const; etcdv3::KeyValue const & get_prev_value() const;
bool has_values() const; bool has_values() const;
protected: protected:
int error_code; int error_code;
int index; int index;
bool isPrefix;
std::string error_message; std::string error_message;
std::string action; std::string action;
mvccpb::KeyValue value; etcdv3::KeyValue value;
mvccpb::KeyValue prev_value; etcdv3::KeyValue prev_value;
std::vector<mvccpb::KeyValue> values; std::vector<etcdv3::KeyValue> values;
std::vector<mvccpb::KeyValue> prev_values; std::vector<etcdv3::KeyValue> prev_values;
}; };
} }
#endif #endif

View File

@ -5,6 +5,18 @@ etcdv3::Action::Action(etcdv3::ActionParameters params)
parameters = params; parameters = params;
} }
etcdv3::ActionParameters::ActionParameters()
{
withPrefix = false;
revision = 0;
old_revision = 0;
lease_id = 0;
ttl = 0;
kv_stub = NULL;
watch_stub = NULL;
lease_stub = NULL;
}
void etcdv3::Action::waitForResponse() void etcdv3::Action::waitForResponse()
{ {
void* got_tag; void* got_tag;

View File

@ -25,7 +25,7 @@ etcdv3::AsyncCompareAndSwapAction::AsyncCompareAndSwapAction(etcdv3::ActionParam
} }
transaction.setup_basic_failure_operation(parameters.key); transaction.setup_basic_failure_operation(parameters.key);
transaction.setup_compare_and_swap_sequence(parameters.value); transaction.setup_compare_and_swap_sequence(parameters.value, parameters.lease_id);
response_reader = parameters.kv_stub->AsyncTxn(&context, transaction.txn_request, &cq_); response_reader = parameters.kv_stub->AsyncTxn(&context, transaction.txn_request, &cq_);
response_reader->Finish(&reply, &status, (void*)this); response_reader->Finish(&reply, &status, (void*)this);

View File

@ -0,0 +1,32 @@
#include "v3/include/AsyncLeaseGrantAction.hpp"
#include "v3/include/action_constants.hpp"
#include "v3/include/Transaction.hpp"
using etcdserverpb::LeaseGrantRequest;
etcdv3::AsyncLeaseGrantAction::AsyncLeaseGrantAction(etcdv3::ActionParameters param)
: etcdv3::Action(param)
{
etcdv3::Transaction transaction;
transaction.setup_lease_grant_operation(parameters.ttl);
response_reader = parameters.lease_stub->AsyncLeaseGrant(&context, transaction.leasegrant_request, &cq_);
response_reader->Finish(&reply, &status, (void*)this);
}
etcdv3::AsyncLeaseGrantResponse etcdv3::AsyncLeaseGrantAction::ParseResponse()
{
AsyncLeaseGrantResponse lease_resp;
if(!status.ok())
{
lease_resp.set_error_code(status.error_code());
lease_resp.set_error_message(status.error_message());
}
else
{
lease_resp.ParseResponse(reply);
}
return lease_resp;
}

View File

@ -0,0 +1,11 @@
#include "v3/include/AsyncLeaseGrantResponse.hpp"
#include "v3/include/action_constants.hpp"
void etcdv3::AsyncLeaseGrantResponse::ParseResponse(LeaseGrantResponse& resp)
{
index = resp.header().revision();
value.set_lease(resp.id());
value.set_ttl(resp.ttl());
error_message = resp.error();
}

View File

@ -16,7 +16,8 @@ void etcdv3::AsyncRangeResponse::ParseResponse(RangeResponse& resp, bool prefix)
{ {
for(int index=0; index < resp.kvs_size(); index++) for(int index=0; index < resp.kvs_size(); index++)
{ {
values.push_back(resp.kvs(index)); etcdv3::KeyValue kvs(resp.kvs(index));
values.push_back(kvs);
} }
if(!prefix) if(!prefix)

View File

@ -1,14 +1,8 @@
#include "v3/include/AsyncSetAction.hpp" #include "v3/include/AsyncSetAction.hpp"
#include "v3/include/AsyncRangeResponse.hpp"
#include "v3/include/action_constants.hpp" #include "v3/include/action_constants.hpp"
#include "v3/include/Transaction.hpp" #include "v3/include/Transaction.hpp"
using etcdserverpb::Compare; using etcdserverpb::Compare;
using etcdserverpb::RangeRequest;
using etcdserverpb::PutRequest;
using etcdserverpb::RequestOp;
using etcdserverpb::ResponseOp;
using etcdserverpb::TxnRequest;
etcdv3::AsyncSetAction::AsyncSetAction(etcdv3::ActionParameters param, bool create) etcdv3::AsyncSetAction::AsyncSetAction(etcdv3::ActionParameters param, bool create)
: etcdv3::Action(param) : etcdv3::Action(param)
@ -18,7 +12,7 @@ etcdv3::AsyncSetAction::AsyncSetAction(etcdv3::ActionParameters param, bool crea
transaction.init_compare(Compare::CompareResult::Compare_CompareResult_EQUAL, transaction.init_compare(Compare::CompareResult::Compare_CompareResult_EQUAL,
Compare::CompareTarget::Compare_CompareTarget_VERSION); Compare::CompareTarget::Compare_CompareTarget_VERSION);
transaction.setup_basic_create_sequence(parameters.key, parameters.value); transaction.setup_basic_create_sequence(parameters.key, parameters.value, parameters.lease_id);
if(isCreate) if(isCreate)
{ {
@ -26,7 +20,7 @@ etcdv3::AsyncSetAction::AsyncSetAction(etcdv3::ActionParameters param, bool crea
} }
else else
{ {
transaction.setup_set_failure_operation(parameters.key, parameters.value); transaction.setup_set_failure_operation(parameters.key, parameters.value, parameters.lease_id);
} }
response_reader = parameters.kv_stub->AsyncTxn(&context, transaction.txn_request, &cq_); response_reader = parameters.kv_stub->AsyncTxn(&context, transaction.txn_request, &cq_);
response_reader->Finish(&reply, &status, (void*)this); response_reader->Finish(&reply, &status, (void*)this);

View File

@ -27,7 +27,7 @@ void etcdv3::AsyncTxnResponse::ParseResponse(std::string const& key, bool prefix
auto put_resp = resp.response_put(); auto put_resp = resp.response_put();
if(put_resp.has_prev_kv()) if(put_resp.has_prev_kv())
{ {
prev_value = put_resp.prev_kv(); prev_value.CopyFrom(put_resp.prev_kv());
} }
} }
else if(ResponseOp::ResponseCase::kResponseDeleteRange == resp.response_case()) else if(ResponseOp::ResponseCase::kResponseDeleteRange == resp.response_case())
@ -35,7 +35,7 @@ void etcdv3::AsyncTxnResponse::ParseResponse(std::string const& key, bool prefix
AsyncDeleteRangeResponse response; AsyncDeleteRangeResponse response;
response.ParseResponse(key,prefix,*(resp.mutable_response_delete_range())); response.ParseResponse(key,prefix,*(resp.mutable_response_delete_range()));
prev_value = response.get_prev_value(); prev_value.CopyFrom(response.get_prev_value());
values = response.get_values(); values = response.get_values();
value = response.get_value(); value = response.get_value();

View File

@ -17,7 +17,7 @@ etcdv3::AsyncUpdateAction::AsyncUpdateAction(etcdv3::ActionParameters param)
transaction.init_compare(Compare::CompareResult::Compare_CompareResult_GREATER, transaction.init_compare(Compare::CompareResult::Compare_CompareResult_GREATER,
Compare::CompareTarget::Compare_CompareTarget_VERSION); Compare::CompareTarget::Compare_CompareTarget_VERSION);
transaction.setup_compare_and_swap_sequence(parameters.value); transaction.setup_compare_and_swap_sequence(parameters.value, parameters.lease_id);
response_reader = parameters.kv_stub->AsyncTxn(&context, transaction.txn_request, &cq_); response_reader = parameters.kv_stub->AsyncTxn(&context, transaction.txn_request, &cq_);
response_reader->Finish(&reply, &status, (void*)this); response_reader->Finish(&reply, &status, (void*)this);

22
v3/src/KeyValue.cpp Normal file
View File

@ -0,0 +1,22 @@
#include "v3/include/KeyValue.hpp"
etcdv3::KeyValue::KeyValue()
{
ttl = 0;
}
etcdv3::KeyValue::KeyValue(const mvccpb::KeyValue& from)
: mvccpb::KeyValue(from)
{
ttl =0;
}
void etcdv3::KeyValue::set_ttl(int ttl)
{
this->ttl = ttl;
}
int etcdv3::KeyValue::get_ttl() const
{
return ttl;
}

View File

@ -52,11 +52,12 @@ void etcdv3::Transaction::setup_basic_failure_operation(std::string const& key)
/** /**
* get key on failure, get key before put, modify and then get updated key * get key on failure, get key before put, modify and then get updated key
*/ */
void etcdv3::Transaction::setup_set_failure_operation(std::string const &key, std::string const &value) { void etcdv3::Transaction::setup_set_failure_operation(std::string const &key, std::string const &value, int64_t leaseid) {
std::unique_ptr<PutRequest> put_request(new PutRequest()); std::unique_ptr<PutRequest> put_request(new PutRequest());
put_request->set_key(key); put_request->set_key(key);
put_request->set_value(value); put_request->set_value(value);
put_request->set_prev_kv(true); put_request->set_prev_kv(true);
put_request->set_lease(leaseid);
RequestOp* req_failure = txn_request.add_failure(); RequestOp* req_failure = txn_request.add_failure();
req_failure->set_allocated_request_put(put_request.release()); req_failure->set_allocated_request_put(put_request.release());
@ -97,11 +98,12 @@ void etcdv3::Transaction::setup_delete_failure_operation(std::string const &key,
/** /**
* add key and then get new value of key * add key and then get new value of key
*/ */
void etcdv3::Transaction::setup_basic_create_sequence(std::string const& key, std::string const& value) { void etcdv3::Transaction::setup_basic_create_sequence(std::string const& key, std::string const& value, int64_t leaseid) {
std::unique_ptr<PutRequest> put_request(new PutRequest()); std::unique_ptr<PutRequest> put_request(new PutRequest());
put_request->set_key(key); put_request->set_key(key);
put_request->set_value(value); put_request->set_value(value);
put_request->set_prev_kv(true); put_request->set_prev_kv(true);
put_request->set_lease(leaseid);
RequestOp* req_success = txn_request.add_success(); RequestOp* req_success = txn_request.add_success();
req_success->set_allocated_request_put(put_request.release()); req_success->set_allocated_request_put(put_request.release());
@ -114,11 +116,12 @@ void etcdv3::Transaction::setup_basic_create_sequence(std::string const& key, st
/** /**
* get key value then modify and get new value * get key value then modify and get new value
*/ */
void etcdv3::Transaction::setup_compare_and_swap_sequence(std::string const& value) { void etcdv3::Transaction::setup_compare_and_swap_sequence(std::string const& value, int64_t leaseid) {
std::unique_ptr<PutRequest> put_request(new PutRequest()); std::unique_ptr<PutRequest> put_request(new PutRequest());
put_request->set_key(key); put_request->set_key(key);
put_request->set_value(value); put_request->set_value(value);
put_request->set_prev_kv(true); put_request->set_prev_kv(true);
put_request->set_lease(leaseid);
RequestOp* req_success = txn_request.add_success(); RequestOp* req_success = txn_request.add_success();
req_success->set_allocated_request_put(put_request.release()); req_success->set_allocated_request_put(put_request.release());
@ -152,6 +155,11 @@ void etcdv3::Transaction::setup_compare_and_delete_operation(std::string const&
req_success->set_allocated_request_delete_range(del_request.release()); req_success->set_allocated_request_delete_range(del_request.release());
} }
void etcdv3::Transaction::setup_lease_grant_operation(int ttl)
{
leasegrant_request.set_ttl(ttl);
}
etcdv3::Transaction::~Transaction() { etcdv3::Transaction::~Transaction() {
} }

View File

@ -36,22 +36,22 @@ void etcdv3::V3Response::set_action(std::string action)
this->action = action; this->action = action;
} }
std::vector<mvccpb::KeyValue> const & etcdv3::V3Response::get_values() const std::vector<etcdv3::KeyValue> const & etcdv3::V3Response::get_values() const
{ {
return values; return values;
} }
std::vector<mvccpb::KeyValue> const & etcdv3::V3Response::get_prev_values() const std::vector<etcdv3::KeyValue> const & etcdv3::V3Response::get_prev_values() const
{ {
return prev_values; return prev_values;
} }
mvccpb::KeyValue const & etcdv3::V3Response::get_value() const etcdv3::KeyValue const & etcdv3::V3Response::get_value() const
{ {
return value; return value;
} }
mvccpb::KeyValue const & etcdv3::V3Response::get_prev_value() const etcdv3::KeyValue const & etcdv3::V3Response::get_prev_value() const
{ {
return prev_value; return prev_value;
} }