Implements "KeepAlive" feature, and enhance "lock" with a lease.

Signed-off-by: Tao He <sighingnow@gmail.com>
This commit is contained in:
Tao He 2021-01-10 23:31:00 +08:00
parent 02d170a0cf
commit bcded542c8
21 changed files with 665 additions and 123 deletions

View File

@ -294,6 +294,25 @@ prefix will be deleted. All deleted keys will be placed in ```response.values()`
However, if recursive parameter is false, functionality will be the same as just deleting a key.
The key supplied will NOT be treated as a prefix and will be treated as a normal key name.
### Lock
Etcd lock has been supported as follows:
```c++
etcd::Client etcd("http://127.0.0.1:4001");
etcd.lock("/test/lock");
```
It will create a lease and a keep-alive job behind the screen, the lease will be revoked until
the lock is unlocked.
Users can also feed their own lease directory for lock:
```c++
etcd::Client etcd("http://127.0.0.1:4001");
etcd.lock("/test/lock", lease_id);
```
### Watching for changes
Watching for a change is possible with the ```watch()``` operation of the client. The watch method
@ -343,22 +362,7 @@ fact it just sends the asynchron request, sets up a callback for the response an
callback is executed by some thread from the pplx library's thread pool and the callback (in this
case a small lambda function actually) will call ```watch_for_changes``` again from there.
### Requesting for lease
Users can request for lease which is governed by a time-to-live(TTL) value given by the user.
Moreover, user can attached the lease to a key(s) by indicating the lease id in ```add()```,
```set()```, ```modify()``` and ```modify_if()```. Also the ttl will that was granted by etcd
server will be indicated in ```ttl()```.
```c++
etcd::Client etcd("http://127.0.0.1:4001");
etcd::Response resp = etcd.leasegrant(60).get();
etcd.set("/test/key2", "bar", resp.value().lease());
std::cout <<"ttl" << resp.value().ttl();
```
### Watcher Class
#### Watcher Class
Users can watch a key indefinitely or until user cancels the watch. This can be done by
instantiating a Watcher class. The supplied callback function in Watcher class will be
@ -375,11 +379,46 @@ either by user implicitly calling ```Cancel()``` or when watcher class is destro
}
```
### Requesting for lease
Users can request for lease which is governed by a time-to-live(TTL) value given by the user.
Moreover, user can attached the lease to a key(s) by indicating the lease id in ```add()```,
```set()```, ```modify()``` and ```modify_if()```. Also the ttl will that was granted by etcd
server will be indicated in ```ttl()```.
```c++
etcd::Client etcd("http://127.0.0.1:4001");
etcd::Response resp = etcd.leasegrant(60).get();
etcd.set("/test/key2", "bar", resp.value().lease());
std::cout << "ttl" << resp.value().ttl();
```
The lease can be revoked by
```c++
etcd.leaserevoke(resp.value().lease());
```
The remaining time-to-live of a lease can be inspected by
```c++
etcd::Response resp2 = etcd.leasetimetolive(resp.value().lease()).get();
std::cout << "ttl" << resp.value().ttl();
```
#### Keep alive
Keep alive for leases is implemented using a seperate class `KeepAlive`, which can be used as:
```c++
etcd::KeepAlive keepalive(etcd, lease_id, ttl);
```
It will perform a periodly keep-alive action before it is cancelled explicitly, or destructed implicitly.
### TODO
1. Cancellation of asynchronous calls(except for watch)
2. LeaseKeepAlive
3. Authentication
## License

View File

@ -3,6 +3,7 @@
#include "etcd/Response.hpp"
#include <map>
#include <string>
#include <grpc++/grpc++.h>
@ -21,6 +22,7 @@ namespace etcdv3 {
namespace etcd
{
class KeepAlive;
class Watcher;
/**
@ -215,16 +217,36 @@ namespace etcd
pplx::task<Response> leasegrant(int ttl);
/**
* Gains a lock at a key.
* Revoke a lease.
* @param lease_id is the id the lease
*/
pplx::task<Response> leaserevoke(int64_t lease_id);
/**
* Get time-to-live of a lease.
* @param lease_id is the id the lease
*/
pplx::task<Response> leasetimetolive(int64_t lease_id);
/**
* Gains a lock at a key, using a default created lease, using the default lease (60 seconds), with
* keeping alive has already been taken care of by the library.
* @param key is the key to be used to request the lock.
*/
pplx::task<Response> lock(std::string const &key);
/**
* Gains a lock at a key, using a user-provided lease, the lifetime of the lease won't be taken care
* of by the library.
* @param key is the key to be used to request the lock.
*/
pplx::task<Response> lock(std::string const &key, int64_t lease_id);
/**
* Releases a lock at a key.
* @param key is the lock key to release.
*/
pplx::task<Response> unlock(std::string const &key);
pplx::task<Response> unlock(std::string const &lock_key);
/**
* Execute a etcd transaction.
@ -240,6 +262,10 @@ namespace etcd
std::unique_ptr<Lease::Stub> leaseServiceStub;
std::unique_ptr<Lock::Stub> lockServiceStub;
std::map<std::string, int64_t> leases_for_locks;
std::map<int64_t, std::shared_ptr<KeepAlive>> keep_alive_for_locks;
friend class KeepAlive;
friend class Watcher;
};

63
etcd/KeepAlive.hpp Normal file
View File

@ -0,0 +1,63 @@
#ifndef __ETCD_KEEPALIVE_HPP__
#define __ETCD_KEEPALIVE_HPP__
#include <string>
#include "etcd/Client.hpp"
#include "etcd/Response.hpp"
#include <boost/asio/io_context.hpp>
#include <boost/asio/steady_timer.hpp>
#include <grpc++/grpc++.h>
#include "proto/rpc.grpc.pb.h"
namespace etcdv3 {
class AsyncLeaseKeepAliveAction;
}
using etcdserverpb::KV;
using etcdserverpb::Lease;
using grpc::Channel;
namespace etcd
{
/**
* If ID is set to 0, etcd will choose an ID.
*/
class KeepAlive
{
public:
KeepAlive(Client const &client, int ttl, int64_t lease_id=0);
KeepAlive(std::string const & address, int ttl, int64_t lease_id=0);
KeepAlive(std::string const & address,
std::string const & username, std::string const & password,
int ttl, int64_t lease_id=0);
KeepAlive(KeepAlive const &) = delete;
KeepAlive(KeepAlive &&) = delete;
/**
* Stop the keep alive action.
*/
void Cancel();
~KeepAlive();
protected:
void refresh();
pplx::task<void> currentTask;
std::unique_ptr<Lease::Stub> leaseServiceStub;
std::unique_ptr<etcdv3::AsyncLeaseKeepAliveAction> call;
private:
int ttl;
int64_t lease_id;
bool continue_next;
boost::asio::io_context context;
std::unique_ptr<boost::asio::steady_timer> keepalive_timer_;
};
}
#endif

View File

@ -58,6 +58,8 @@ namespace etcd
Response mkdir(std::string const & key, int ttl = 0);
Response rmdir(std::string const & key, bool recursive = false);
Response leasegrant(int ttl);
Response leaserevoke(int64_t lease_id);
Response leasetimetolive(int64_t lease_id);
/**
* Watches for changes of a key or a subtree. Please note that if you watch e.g. "/testdir" and

View File

@ -39,6 +39,9 @@ namespace etcd
std::string const & key, int fromIndex,
std::function<void(Response)> callback, bool recursive=false);
Watcher(Watcher const &) = delete;
Watcher(Watcher &&) = delete;
/**
* Wait util the task has been stopped, actively or passively, e.g., the watcher
* get cancelled or the server closes the connection.

View File

@ -0,0 +1,80 @@
#ifndef __ASYNC_LEASEACTION_HPP__
#define __ASYNC_LEASEACTION_HPP__
#include <mutex>
#include <grpc++/grpc++.h>
#include "proto/rpc.grpc.pb.h"
#include "etcd/v3/Action.hpp"
#include "etcd/v3/AsyncLeaseResponse.hpp"
using grpc::ClientAsyncResponseReader;
using grpc::ClientAsyncReaderWriter;
using etcdserverpb::LeaseGrantResponse;
using etcdserverpb::LeaseRevokeResponse;
using etcdserverpb::LeaseCheckpoint;
using etcdserverpb::LeaseCheckpointResponse;
using etcdserverpb::LeaseKeepAliveRequest;
using etcdserverpb::LeaseKeepAliveResponse;
using etcdserverpb::LeaseTimeToLiveResponse;
using etcdserverpb::LeaseStatus;
using etcdserverpb::LeaseLeasesResponse;
namespace etcdv3
{
class AsyncLeaseGrantAction : public etcdv3::Action {
public:
AsyncLeaseGrantAction(etcdv3::ActionParameters param);
AsyncLeaseGrantResponse ParseResponse();
private:
LeaseGrantResponse reply;
std::unique_ptr<ClientAsyncResponseReader<LeaseGrantResponse>> response_reader;
};
class AsyncLeaseRevokeAction: public etcdv3::Action {
public:
AsyncLeaseRevokeAction(etcdv3::ActionParameters param);
AsyncLeaseRevokeResponse ParseResponse();
private:
LeaseRevokeResponse reply;
std::unique_ptr<ClientAsyncResponseReader<LeaseRevokeResponse>> response_reader;
};
class AsyncLeaseKeepAliveAction: public etcdv3::Action {
public:
AsyncLeaseKeepAliveAction(etcdv3::ActionParameters param);
AsyncLeaseKeepAliveResponse ParseResponse();
AsyncLeaseKeepAliveResponse Refresh();
void CancelKeepAlive();
bool Cancelled() const;
private:
LeaseKeepAliveResponse reply;
std::unique_ptr<ClientAsyncReaderWriter<LeaseKeepAliveRequest, LeaseKeepAliveResponse>> stream;
LeaseKeepAliveRequest req;
bool isCancelled;
std::mutex protect_is_cancelled;
};
class AsyncLeaseTimeToLiveAction: public etcdv3::Action {
public:
AsyncLeaseTimeToLiveAction(etcdv3::ActionParameters param);
AsyncLeaseTimeToLiveResponse ParseResponse();
private:
LeaseTimeToLiveResponse reply;
std::unique_ptr<ClientAsyncResponseReader<LeaseTimeToLiveResponse>> response_reader;
};
class AsyncLeaseLeasesAction: public etcdv3::Action {
public:
AsyncLeaseLeasesAction(etcdv3::ActionParameters param);
AsyncLeaseLeasesResponse ParseResponse();
private:
LeaseLeasesResponse reply;
std::unique_ptr<ClientAsyncResponseReader<LeaseLeasesResponse>> response_reader;
};
}
#endif

View File

@ -1,25 +0,0 @@
#ifndef __ASYNC_LEASEGRANTACTION_HPP__
#define __ASYNC_LEASEGRANTACTION_HPP__
#include <grpc++/grpc++.h>
#include "proto/rpc.grpc.pb.h"
#include "etcd/v3/Action.hpp"
#include "etcd/v3/AsyncLeaseGrantResponse.hpp"
using grpc::ClientAsyncResponseReader;
using etcdserverpb::LeaseGrantResponse;
namespace etcdv3
{
class AsyncLeaseGrantAction : public etcdv3::Action
{
public:
AsyncLeaseGrantAction(etcdv3::ActionParameters param);
AsyncLeaseGrantResponse ParseResponse();
private:
LeaseGrantResponse reply;
std::unique_ptr<ClientAsyncResponseReader<LeaseGrantResponse>> response_reader;
};
}
#endif

View File

@ -1,21 +0,0 @@
#ifndef __ASYNC_LEASEGRANTRESPONSE_HPP__
#define __ASYNC_LEASEGRANTRESPONSE_HPP__
#include <grpc++/grpc++.h>
#include "proto/rpc.grpc.pb.h"
#include "etcd/v3/V3Response.hpp"
using etcdserverpb::LeaseGrantResponse;
namespace etcdv3
{
class AsyncLeaseGrantResponse : public etcdv3::V3Response
{
public:
AsyncLeaseGrantResponse(){};
void ParseResponse(LeaseGrantResponse& resp);
};
}
#endif

View File

@ -0,0 +1,55 @@
#ifndef __ASYNC_LEASERESPONSE_HPP__
#define __ASYNC_LEASERESPONSE_HPP__
#include <grpc++/grpc++.h>
#include "proto/rpc.grpc.pb.h"
#include "etcd/v3/V3Response.hpp"
using etcdserverpb::LeaseGrantResponse;
using etcdserverpb::LeaseRevokeResponse;
using etcdserverpb::LeaseCheckpoint;
using etcdserverpb::LeaseCheckpointResponse;
using etcdserverpb::LeaseKeepAliveResponse;
using etcdserverpb::LeaseTimeToLiveResponse;
using etcdserverpb::LeaseStatus;
using etcdserverpb::LeaseLeasesResponse;
namespace etcdv3
{
class AsyncLeaseGrantResponse : public etcdv3::V3Response
{
public:
AsyncLeaseGrantResponse(){};
void ParseResponse(LeaseGrantResponse& resp);
};
class AsyncLeaseRevokeResponse : public etcdv3::V3Response
{
public:
AsyncLeaseRevokeResponse(){};
void ParseResponse(LeaseRevokeResponse& resp);
};
class AsyncLeaseKeepAliveResponse : public etcdv3::V3Response
{
public:
AsyncLeaseKeepAliveResponse(){};
void ParseResponse(LeaseKeepAliveResponse& resp);
};
class AsyncLeaseTimeToLiveResponse : public etcdv3::V3Response
{
public:
AsyncLeaseTimeToLiveResponse(){};
void ParseResponse(LeaseTimeToLiveResponse& resp);
};
class AsyncLeaseLeasesResponse : public etcdv3::V3Response
{
public:
AsyncLeaseLeasesResponse(){};
void ParseResponse(LeaseLeasesResponse& resp);
};
}
#endif

View File

@ -24,15 +24,12 @@ public:
void setup_delete_sequence(std::string const &key, std::string const &range_end, bool recursive);
void setup_delete_failure_operation(std::string const &key, std::string const &range_end, bool recursive);
void setup_compare_and_delete_operation(std::string const& key);
void setup_lease_grant_operation(int ttl);
// update without `get` and no `prev_kv` returned
void setup_put(std::string const &key, std::string const &value);
void setup_delete(std::string const &key);
etcdserverpb::TxnRequest txn_request;
etcdserverpb::LeaseGrantRequest leasegrant_request;
private:
std::string key;
};

View File

@ -13,6 +13,7 @@
#include <limits>
#include <memory>
#include "etcd/Client.hpp"
#include "etcd/KeepAlive.hpp"
#include "etcd/v3/action_constants.hpp"
#include "etcd/v3/Action.hpp"
#include "etcd/v3/AsyncTxnResponse.hpp"
@ -30,7 +31,7 @@
#include "etcd/v3/AsyncGetAction.hpp"
#include "etcd/v3/AsyncDeleteAction.hpp"
#include "etcd/v3/AsyncWatchAction.hpp"
#include "etcd/v3/AsyncLeaseGrantAction.hpp"
#include "etcd/v3/AsyncLeaseAction.hpp"
#include "etcd/v3/AsyncLockAction.hpp"
#include "etcd/v3/AsyncTxnAction.hpp"
@ -491,19 +492,85 @@ pplx::task<etcd::Response> etcd::Client::leasegrant(int ttl)
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::leaserevoke(int64_t lease_id)
{
etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.lease_id = lease_id;
params.lease_stub = leaseServiceStub.get();
std::shared_ptr<etcdv3::AsyncLeaseRevokeAction> call(new etcdv3::AsyncLeaseRevokeAction(params));
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::leasetimetolive(int64_t lease_id)
{
etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.lease_id = lease_id;
params.lease_stub = leaseServiceStub.get();
std::shared_ptr<etcdv3::AsyncLeaseTimeToLiveAction> call(new etcdv3::AsyncLeaseTimeToLiveAction(params));
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::lock(std::string const &key) {
etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
static const int DEFAULT_LEASE_TTL_FOR_LOCK = 10;
// routines in lock usually will be fast, less than 10 seconds.
//
// (base on our experiences in vineyard and GraphScope).
auto resp = this->leasegrant(DEFAULT_LEASE_TTL_FOR_LOCK).get();
int64_t lease_id = resp.value().lease();
this->keep_alive_for_locks[lease_id].reset(new KeepAlive(*this, DEFAULT_LEASE_TTL_FOR_LOCK, lease_id));
params.key = key;
params.lease_id = lease_id;
params.lock_stub = lockServiceStub.get();
std::shared_ptr<etcdv3::AsyncLockAction> call(new etcdv3::AsyncLockAction(params));
return Response::create(call).then(
[this, lease_id](pplx::task<etcd::Response> const &resp_task) -> etcd::Response {
auto const& resp = resp_task.get();
if (resp.is_ok()) {
this->leases_for_locks[resp.lock_key()] = lease_id;
} else {
this->keep_alive_for_locks.erase(lease_id);
}
return resp;
}
);
}
pplx::task<etcd::Response> etcd::Client::lock(std::string const &key,
int64_t lease_id) {
etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.key = key;
params.lease_id = lease_id;
params.lock_stub = lockServiceStub.get();
std::shared_ptr<etcdv3::AsyncLockAction> call(new etcdv3::AsyncLockAction(params));
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::unlock(std::string const &key) {
pplx::task<etcd::Response> etcd::Client::unlock(std::string const &lock_key) {
std::cout << "begin unlock" << std::endl;
// cancel the KeepAlive first, it exists
auto p_leases = this->leases_for_locks.find(lock_key);
if (p_leases != this->leases_for_locks.end()) {
std::cout << "Unlock for " << lock_key << " and revoke lease " << std::hex << p_leases->second << std::dec << std::endl;
auto p_keeps_alive = this->keep_alive_for_locks.find(p_leases->second);
if (p_keeps_alive != this->keep_alive_for_locks.end()) {
this->keep_alive_for_locks.erase(p_keeps_alive);
}
this->leases_for_locks.erase(p_leases);
} else {
std::cout << "Unable to find lease_id for " << lock_key;
}
// issue a "unlock" request
etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.key = key;
params.key = lock_key;
params.lock_stub = lockServiceStub.get();
std::shared_ptr<etcdv3::AsyncUnlockAction> call(new etcdv3::AsyncUnlockAction(params));
return Response::create(call);

78
src/KeepAlive.cpp Normal file
View File

@ -0,0 +1,78 @@
#include "etcd/KeepAlive.hpp"
#include "etcd/v3/AsyncLeaseAction.hpp"
etcd::KeepAlive::KeepAlive(Client const &client, int ttl, int64_t lease_id):
ttl(ttl), lease_id(lease_id), continue_next(true) {
leaseServiceStub= Lease::NewStub(client.channel);
etcdv3::ActionParameters params;
params.auth_token.assign(client.auth_token);
params.lease_id = lease_id;
params.lease_stub = leaseServiceStub.get();
call.reset(new etcdv3::AsyncLeaseKeepAliveAction(params));
currentTask = pplx::task<void>([this]() {
// start refresh
this->refresh();
context.run();
context.stop(); // clean up
});
}
etcd::KeepAlive::KeepAlive(std::string const & address, int ttl, int64_t lease_id):
KeepAlive(Client(address), ttl, lease_id) {
}
etcd::KeepAlive::KeepAlive(std::string const & address,
std::string const & username, std::string const & password,
int ttl, int64_t lease_id):
KeepAlive(Client(address, username, password), ttl, lease_id) {
}
etcd::KeepAlive::~KeepAlive()
{
this->Cancel();
}
void etcd::KeepAlive::Cancel()
{
#ifndef NDEBUG
{
std::ios::fmtflags os_flags (std::cout.flags());
std::cout << "Cancel keepalive for " << std::hex << lease_id << std::endl;
std::cout.flags(os_flags);
}
#endif
call->CancelKeepAlive();
if (keepalive_timer_) {
keepalive_timer_->cancel();
}
currentTask.wait();
}
void etcd::KeepAlive::refresh()
{
// minimal resolution: 1 second
int keepalive_ttl = std::max(ttl - 1, 1);
#ifndef NDEBUG
{
std::ios::fmtflags os_flags (std::cout.flags());
std::cout << "Trigger the next keepalive round with ttl " << keepalive_ttl
<< " for " << std::hex << lease_id << std::endl;
std::cout.flags(os_flags);
}
#endif
keepalive_timer_.reset(new boost::asio::steady_timer(
context, boost::asio::chrono::seconds(keepalive_ttl)));
keepalive_timer_->async_wait([this](const boost::system::error_code& error) {
if (error) {
#ifndef NDEBUG
std::cerr << "keepalive timer error: " << error << ", " << error.message() << std::endl;
#endif
} else {
this->call->Refresh();
// trigger the next round;
this->refresh();
}
});
}

View File

@ -109,6 +109,16 @@ etcd::Response etcd::SyncClient::leasegrant(int ttl)
CHECK_EXCEPTIONS(client.leasegrant(ttl).get());
}
etcd::Response etcd::SyncClient::leaserevoke(int64_t lease_id)
{
CHECK_EXCEPTIONS(client.leaserevoke(lease_id).get());
}
etcd::Response etcd::SyncClient::leasetimetolive(int64_t lease_id)
{
CHECK_EXCEPTIONS(client.leasetimetolive(lease_id).get());
}
etcd::Response etcd::SyncClient::watch(std::string const & key, bool recursive)
{
CHECK_EXCEPTIONS(client.watch(key, recursive).get());

172
src/v3/AsyncLeaseAction.cpp Normal file
View File

@ -0,0 +1,172 @@
#include "etcd/v3/AsyncLeaseAction.hpp"
#include "etcd/v3/action_constants.hpp"
#include "etcd/v3/Transaction.hpp"
using etcdserverpb::LeaseGrantRequest;
using etcdserverpb::LeaseRevokeRequest;
using etcdserverpb::LeaseCheckpointRequest;
using etcdserverpb::LeaseKeepAliveRequest;
using etcdserverpb::LeaseTimeToLiveRequest;
using etcdserverpb::LeaseLeasesRequest;
etcdv3::AsyncLeaseGrantAction::AsyncLeaseGrantAction(etcdv3::ActionParameters param)
: etcdv3::Action(param)
{
LeaseGrantRequest leasegrant_request;
leasegrant_request.set_ttl(parameters.ttl);
// If ID is set to 0, etcd will choose an ID.
leasegrant_request.set_id(parameters.lease_id);
response_reader = parameters.lease_stub->AsyncLeaseGrant(&context, leasegrant_request, &cq_);
response_reader->Finish(&reply, &status, (void*)this);
}
etcdv3::AsyncLeaseGrantResponse etcdv3::AsyncLeaseGrantAction::ParseResponse()
{
AsyncLeaseGrantResponse lease_resp;
if (!status.ok()) {
lease_resp.set_error_code(status.error_code());
lease_resp.set_error_message(status.error_message());
} else {
lease_resp.ParseResponse(reply);
}
return lease_resp;
}
etcdv3::AsyncLeaseRevokeAction::AsyncLeaseRevokeAction(etcdv3::ActionParameters param)
: etcdv3::Action(param)
{
LeaseRevokeRequest leaserevoke_request;
leaserevoke_request.set_id(parameters.lease_id);
response_reader = parameters.lease_stub->AsyncLeaseRevoke(&context, leaserevoke_request, &cq_);
response_reader->Finish(&reply, &status, (void*)this);
}
etcdv3::AsyncLeaseRevokeResponse etcdv3::AsyncLeaseRevokeAction::ParseResponse()
{
AsyncLeaseRevokeResponse lease_resp;
if (!status.ok()) {
lease_resp.set_error_code(status.error_code());
lease_resp.set_error_message(status.error_message());
} else {
lease_resp.ParseResponse(reply);
}
return lease_resp;
}
etcdv3::AsyncLeaseKeepAliveAction::AsyncLeaseKeepAliveAction(etcdv3::ActionParameters param)
: etcdv3::Action(param)
{
isCancelled = false;
stream = parameters.lease_stub->AsyncLeaseKeepAlive(&context, &cq_, (void*)"keepalive create");
void *got_tag = nullptr;
bool ok = false;
if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)"keepalive create") {
// ok
} else {
throw std::runtime_error("Failed to create a lease keep-alive connection");
}
}
etcdv3::AsyncLeaseKeepAliveResponse etcdv3::AsyncLeaseKeepAliveAction::ParseResponse()
{
AsyncLeaseKeepAliveResponse lease_resp;
if (!status.ok()) {
lease_resp.set_error_code(status.error_code());
lease_resp.set_error_message(status.error_message());
} else {
lease_resp.ParseResponse(reply);
}
return lease_resp;
}
etcdv3::AsyncLeaseKeepAliveResponse etcdv3::AsyncLeaseKeepAliveAction::Refresh()
{
std::lock_guard<std::mutex> scope_lock(this->protect_is_cancelled);
if (isCancelled) {
return ParseResponse();
}
LeaseKeepAliveRequest leasekeepalive_request;
leasekeepalive_request.set_id(parameters.lease_id);
void *got_tag = nullptr;
bool ok = false;
stream->Write(leasekeepalive_request, (void *)"keepalive write");
// wait write finish
if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)"keepalive write") {
stream->Read(&reply, (void*)"keepalive read");
// wait read finish
if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)"keepalive read") {
return ParseResponse();
}
}
throw std::runtime_error("Failed to create a lease keep-alive connection");
}
void etcdv3::AsyncLeaseKeepAliveAction::CancelKeepAlive()
{
std::lock_guard<std::mutex> scope_lock(this->protect_is_cancelled);
if(isCancelled == false)
{
isCancelled = true;
stream->WritesDone((void*)"keepalive done");
grpc::Status status;
stream->Finish(&status, (void *)this);
cq_.Shutdown();
}
}
bool etcdv3::AsyncLeaseKeepAliveAction::Cancelled() const
{
return isCancelled;
}
etcdv3::AsyncLeaseTimeToLiveAction::AsyncLeaseTimeToLiveAction(etcdv3::ActionParameters param)
: etcdv3::Action(param)
{
LeaseTimeToLiveRequest leasetimetolive_request;
leasetimetolive_request.set_id(parameters.lease_id);
// FIXME: unsupported parameters: "keys"
// leasetimetolive_request.set_keys(parameters.keys);
response_reader = parameters.lease_stub->AsyncLeaseTimeToLive(&context, leasetimetolive_request, &cq_);
response_reader->Finish(&reply, &status, (void*)this);
}
etcdv3::AsyncLeaseTimeToLiveResponse etcdv3::AsyncLeaseTimeToLiveAction::ParseResponse()
{
AsyncLeaseTimeToLiveResponse lease_resp;
if (!status.ok()) {
lease_resp.set_error_code(status.error_code());
lease_resp.set_error_message(status.error_message());
} else {
lease_resp.ParseResponse(reply);
}
return lease_resp;
}
etcdv3::AsyncLeaseLeasesAction::AsyncLeaseLeasesAction(etcdv3::ActionParameters param)
: etcdv3::Action(param)
{
LeaseLeasesRequest leaseleases_request;
response_reader = parameters.lease_stub->AsyncLeaseLeases(&context, leaseleases_request, &cq_);
response_reader->Finish(&reply, &status, (void*)this);
}
etcdv3::AsyncLeaseLeasesResponse etcdv3::AsyncLeaseLeasesAction::ParseResponse()
{
AsyncLeaseLeasesResponse lease_resp;
if (!status.ok()) {
lease_resp.set_error_code(status.error_code());
lease_resp.set_error_message(status.error_message());
} else {
lease_resp.ParseResponse(reply);
}
return lease_resp;
}

View File

@ -1,32 +0,0 @@
#include "etcd/v3/AsyncLeaseGrantAction.hpp"
#include "etcd/v3/action_constants.hpp"
#include "etcd/v3/Transaction.hpp"
using etcdserverpb::LeaseGrantRequest;
etcdv3::AsyncLeaseGrantAction::AsyncLeaseGrantAction(etcdv3::ActionParameters param)
: etcdv3::Action(param)
{
etcdv3::Transaction transaction;
transaction.setup_lease_grant_operation(parameters.ttl);
response_reader = parameters.lease_stub->AsyncLeaseGrant(&context, transaction.leasegrant_request, &cq_);
response_reader->Finish(&reply, &status, (void*)this);
}
etcdv3::AsyncLeaseGrantResponse etcdv3::AsyncLeaseGrantAction::ParseResponse()
{
AsyncLeaseGrantResponse lease_resp;
if(!status.ok())
{
lease_resp.set_error_code(status.error_code());
lease_resp.set_error_message(status.error_message());
}
else
{
lease_resp.ParseResponse(reply);
}
return lease_resp;
}

View File

@ -1,11 +0,0 @@
#include "etcd/v3/AsyncLeaseGrantResponse.hpp"
#include "etcd/v3/action_constants.hpp"
void etcdv3::AsyncLeaseGrantResponse::ParseResponse(LeaseGrantResponse& resp)
{
index = resp.header().revision();
value.kvs.set_lease(resp.id());
value.set_ttl(resp.ttl());
error_message = resp.error();
}

View File

@ -0,0 +1,35 @@
#include "etcd/v3/AsyncLeaseResponse.hpp"
#include "etcd/v3/action_constants.hpp"
void etcdv3::AsyncLeaseGrantResponse::ParseResponse(LeaseGrantResponse& resp) {
index = resp.header().revision();
value.kvs.set_lease(resp.id());
value.set_ttl(resp.ttl());
error_message = resp.error();
}
void etcdv3::AsyncLeaseRevokeResponse::ParseResponse(LeaseRevokeResponse& resp) {
index = resp.header().revision();
}
void etcdv3::AsyncLeaseKeepAliveResponse::ParseResponse(LeaseKeepAliveResponse& resp) {
index = resp.header().revision();
value.kvs.set_lease(resp.id());
value.set_ttl(resp.ttl());
}
void etcdv3::AsyncLeaseTimeToLiveResponse::ParseResponse(LeaseTimeToLiveResponse& resp) {
index = resp.header().revision();
value.kvs.set_lease(resp.id());
value.set_ttl(resp.ttl());
// FIXME: unsupported: fields "grantedTTL" and "keys"
}
void etcdv3::AsyncLeaseLeasesResponse::ParseResponse(LeaseLeasesResponse& resp) {
index = resp.header().revision();
// FIXME: only the first leases is recorded.
if (resp.leases_size() > 0) {
value.kvs.set_lease(resp.leases(0).id());
}
}

View File

@ -9,6 +9,7 @@ etcdv3::AsyncLockAction::AsyncLockAction(ActionParameters param)
{
LockRequest lock_request;
lock_request.set_name(parameters.key);
lock_request.set_lease(parameters.lease_id);
response_reader = parameters.lock_stub->AsyncLock(&context, lock_request, &cq_);
response_reader->Finish(&reply, &status, (void*)this);

View File

@ -69,7 +69,11 @@ void etcdv3::AsyncWatchAction::waitForResponse()
//
// 1. watch for a future revision, return immediately with empty events set
// 2. receive any effective events.
isCancelled = true;
stream->WritesDone((void*)"writes done");
grpc::Status status;
stream->Finish(&status, (void *)this);
cq_.Shutdown();
// leave a warning if the response is too large and been fragmented
if (reply.fragment()) {
@ -90,9 +94,12 @@ void etcdv3::AsyncWatchAction::CancelWatch()
std::lock_guard<std::mutex> scope_lock(this->protect_is_cancalled);
if(isCancelled == false)
{
stream->WritesDone((void*)"writes done");
}
isCancelled = true;
stream->WritesDone((void*)"writes done");
grpc::Status status;
stream->Finish(&status, (void *)this);
cq_.Shutdown();
}
}
bool etcdv3::AsyncWatchAction::Cancelled() const {

View File

@ -155,11 +155,6 @@ void etcdv3::Transaction::setup_compare_and_delete_operation(std::string const&
req_success->set_allocated_request_delete_range(del_request.release());
}
void etcdv3::Transaction::setup_lease_grant_operation(int ttl)
{
leasegrant_request.set_ttl(ttl);
}
void etcdv3::Transaction::setup_put(std::string const &key, std::string const &value) {
std::unique_ptr<PutRequest> put_request(new PutRequest());
put_request->set_key(key);

View File

@ -60,7 +60,8 @@ TEST_CASE("double lock will fail")
// create a duration
first_lock_release = true;
std::this_thread::sleep_for(std::chrono::seconds(1));
// using a duration longer than default lease TTL for lock (see: DEFAULT_LEASE_TTL_FOR_LOCK)
std::this_thread::sleep_for(std::chrono::seconds(15));
// unlock the first lock
etcd::Response resp4 = etcd.unlock(lock_key).get();