Add all missing Lease functionality

This commit is contained in:
Eric Musgrave 2020-12-03 09:15:45 -05:00
parent eee2ea2d14
commit 39d021d381
31 changed files with 1307 additions and 392 deletions

104
README.md
View File

@ -343,21 +343,6 @@ fact it just sends the asynchron request, sets up a callback for the response an
callback is executed by some thread from the pplx library's thread pool and the callback (in this callback is executed by some thread from the pplx library's thread pool and the callback (in this
case a small lambda function actually) will call ```watch_for_changes``` again from there. case a small lambda function actually) will call ```watch_for_changes``` again from there.
### Requesting for lease
Users can request for lease which is governed by a time-to-live(TTL) value given by the user.
Moreover, user can attached the lease to a key(s) by indicating the lease id in ```add()```,
```set()```, ```modify()``` and ```modify_if()```. Also the ttl will that was granted by etcd
server will be indicated in ```ttl()```.
```c++
etcd::Client etcd("http://127.0.0.1:4001");
etcd::Response resp = etcd.leasegrant(60).get();
etcd.set("/test/key2", "bar", resp.value().lease());
std::cout <<"ttl" << resp.value().ttl();
```
### Watcher Class ### Watcher Class
Users can watch a key indefinitely or until user cancels the watch. This can be done by Users can watch a key indefinitely or until user cancels the watch. This can be done by
@ -375,11 +360,96 @@ either by user implicitly calling ```Cancel()``` or when watcher class is destro
} }
``` ```
### Leases
#### Create a lease
Users can request a lease which is governed by a time-to-live(TTL) value given by the user.
Moreover, user can attached the lease to a key(s) by indicating the lease id in ```add()```,
```set()```, ```modify()``` and ```modify_if()```. Also the ttl will that was granted by etcd
server will be indicated in ```ttl()```.
```c++
etcd::Client etcd("http://127.0.0.1:4001");
etcd::Response resp = etcd.leasegrant(60).get();
etcd.set("/test/key2", "bar", resp.value().lease());
std::cout <<"ttl" << resp.value().ttl();
```
When the lease is revoked or expires, any key attached to the lease will be removed from
the etcd database.
#### Revoke a lease
To revoke a lease, call `leaserevoke(lease_id)`.
```c++
etcd::Client etcd("http://127.0.0.1:4001");
etcd::Response resp = etcd.leasegrant(60).get();
etcd.set("/test/key2", "bar", resp.value().lease());
std::cout <<"ttl" << resp.value().ttl();
etcd.leaserevoke(resp.value().lease()); // revokes the lease and deletes '/test/key2'
```
#### Lease TTL
To get information on an existing lease, you can call `leasettl(lease_id, keys)`.
The `keys` parameter specifies whether or not to return a list of the keys that the
lease is attached to.
```c++
etcd::Client etcd("http://127.0.0.1:4001");
etcd::Response resp = etcd.leasegrant(60).get();
etcd.set("/test/key2", "bar", resp.value().lease());
std::cout <<"ttl" << resp.value().ttl() << std::endl;
etcd::Response leaseinfo_resp = etcd.leasettl(resp.value().lease(), true);
std::cout << "Lease Info: " << std::endl;
std::cout << " Granted TTL: " << leaseinfo_resp.leaseinfo().get_grantedttl() << std::endl;
std::cout << " Current TTL: " << leaseinfo_resp.leaseinfo().get_ttl() << std::endl;
std::cout << " Keys: " << std::endl;
for (auto k : leaseinfo_resp.leaseinfo().get_keys()) {
LOG(INFO) << " " << k << std::endl;
}
```
#### List all Leases
To get a list of all existing leases, call `listleases()`.
```c++
etcd::Client etcd("http://127.0.0.1:4001");
etcd::Response resp = etcd.leasegrant(60).get();
etcd.set("/test/key2", "bar", resp.value().lease());
std::cout <<"ttl" << resp.value().ttl() << std::endl;
etcd::Response leases_resp = etcd.listleases().get();
for (auto l : leases_resp.leases()) {
LOG(INFO) << "Lease: " << std::hex << l;
}
```
#### Lease KeepAlive
To keep a lease alive so that it does not expire and delete the attached keys, you must
send a periodic refresh to the server. This can be done using a one time command
called `leasekeepalive(lease_id)`. However, that is inefficient since it creates and
destroys a bidirectional stream on each call. Instead, it is recommend to use the
`etcd::KeepAlive` service, which is a long running task that will periodically
refresh the leases that you specify. You can create multiple `etcd::KeepAlive` instances
if you need to have different refresh intervals for different leases.
```c++
etcd::Client etcd("http://127.0.0.1:4001");
etcd::KeepAlive keepalive{etcd}; // Creates the KeepAlive service, but does not start it
keepalive.start(2000); // Starts the service with a 2s refresh interval.
etcd::Response resp = etcd.leasegrant(5).get(); // create a lease
int64_t lease_id = resp.value().lease();
etcd.set("/test/key2", "bar", lease_id); // attach lease to the key
std::cout << "ttl" << resp.value().ttl() << std::endl;
// Tell the KeepAlive service to refresh this lease every 2 seconds
keepalive.add(resp.value().lease());
// ...
// Remove the lease from the KeepAlive, and also immediately revoke it
keepalive.remove(lease_id, true);
// '/test/key2' is now deleted
```
### TODO ### TODO
1. Cancellation of asynchronous calls(except for watch) 1. Cancellation of asynchronous calls(except for watch)
2. LeaseKeepAlive 2. Authentication
3. Authentication
## License ## License

View File

@ -5,246 +5,288 @@
#include <string> #include <string>
#include <grpc++/grpc++.h>
#include "proto/rpc.grpc.pb.h" #include "proto/rpc.grpc.pb.h"
#include "proto/v3lock.grpc.pb.h" #include "proto/v3lock.grpc.pb.h"
#include <grpc++/grpc++.h>
using etcdserverpb::Auth; using etcdserverpb::Auth;
using etcdserverpb::KV; using etcdserverpb::KV;
using etcdserverpb::Watch;
using etcdserverpb::Lease; using etcdserverpb::Lease;
using etcdserverpb::Watch;
using v3lockpb::Lock; using v3lockpb::Lock;
namespace etcdv3 { namespace etcdv3 {
class Transaction; class Transaction;
} }
namespace etcd namespace etcd {
{ class Watcher;
class Watcher;
/**
* Client is responsible for maintaining a connection towards an etcd server.
* Etcd operations can be reached via the methods of the client.
*/
class Client {
public:
/**
* Constructs an etcd client object.
*
* @param etcd_url is the url of the etcd server to connect to, like
* "http://127.0.0.1:2379", or multiple url, seperated by ',' or ';'.
* @param load_balancer is the load balance strategy, can be one of
* round_robin/pick_first/grpclb/xds.
*/
Client(std::string const &etcd_url,
std::string const &load_balancer = "round_robin");
/** /**
* Client is responsible for maintaining a connection towards an etcd server. * Constructs an etcd client object.
* Etcd operations can be reached via the methods of the client. *
* @param etcd_url is the url of the etcd server to connect to, like
* "http://127.0.0.1:2379", or multiple url, seperated by ',' or ';'.
* @param username username of etcd auth
* @param password password of etcd auth
* @param load_balancer is the load balance strategy, can be one of
* round_robin/pick_first/grpclb/xds.
*/ */
class Client Client(std::string const &etcd_url, std::string const &username,
{ std::string const &password,
public: std::string const &load_balancer = "round_robin");
/**
* Constructs an etcd client object.
*
* @param etcd_url is the url of the etcd server to connect to, like "http://127.0.0.1:2379",
* or multiple url, seperated by ',' or ';'.
* @param load_balancer is the load balance strategy, can be one of round_robin/pick_first/grpclb/xds.
*/
Client(std::string const & etcd_url,
std::string const & load_balancer = "round_robin");
/** /**
* Constructs an etcd client object. * Sends a get request to the etcd server
* * @param key is the key to be read
* @param etcd_url is the url of the etcd server to connect to, like "http://127.0.0.1:2379", */
* or multiple url, seperated by ',' or ';'. pplx::task<Response> get(std::string const &key);
* @param username username of etcd auth
* @param password password of etcd auth
* @param load_balancer is the load balance strategy, can be one of round_robin/pick_first/grpclb/xds.
*/
Client(std::string const & etcd_url,
std::string const & username,
std::string const & password,
std::string const & load_balancer = "round_robin");
/** /**
* Sends a get request to the etcd server * Sets the value of a key. The key will be modified if already exists or
* @param key is the key to be read * created if it does not exists.
*/ * @param key is the key to be created or modified
pplx::task<Response> get(std::string const & key); * @param value is the new value to be set
*/
pplx::task<Response> set(std::string const &key, std::string const &value,
int ttl = 0);
/** /**
* Sets the value of a key. The key will be modified if already exists or created * Sets the value of a key. The key will be modified if already exists or
* if it does not exists. * created if it does not exists.
* @param key is the key to be created or modified * @param key is the key to be created or modified
* @param value is the new value to be set * @param value is the new value to be set
*/ * @param leaseId is the lease attached to the key
pplx::task<Response> set(std::string const & key, std::string const & value, int ttl = 0); */
pplx::task<Response> set(std::string const &key, std::string const &value,
int64_t leaseId);
/** /**
* Sets the value of a key. The key will be modified if already exists or created * Creates a new key and sets it's value. Fails if the key already exists.
* if it does not exists. * @param key is the key to be created
* @param key is the key to be created or modified * @param value is the value to be set
* @param value is the new value to be set */
* @param leaseId is the lease attached to the key pplx::task<Response> add(std::string const &key, std::string const &value,
*/ int ttl = 0);
pplx::task<Response> set(std::string const & key, std::string const & value, int64_t leaseId);
/**
* Creates a new key and sets it's value. Fails if the key already exists.
* @param key is the key to be created
* @param value is the value to be set
* @param leaseId is the lease attached to the key
*/
pplx::task<Response> add(std::string const &key, std::string const &value,
int64_t leaseId);
/** /**
* Creates a new key and sets it's value. Fails if the key already exists. * Modifies an existing key. Fails if the key does not exists.
* @param key is the key to be created * @param key is the key to be modified
* @param value is the value to be set * @param value is the new value to be set
*/ */
pplx::task<Response> add(std::string const & key, std::string const & value, int ttl = 0); pplx::task<Response> modify(std::string const &key, std::string const &value,
int ttl = 0);
/** /**
* Creates a new key and sets it's value. Fails if the key already exists. * Modifies an existing key. Fails if the key does not exists.
* @param key is the key to be created * @param key is the key to be modified
* @param value is the value to be set * @param value is the new value to be set
* @param leaseId is the lease attached to the key * @param leaseId is the lease attached to the key
*/ */
pplx::task<Response> add(std::string const & key, std::string const & value, int64_t leaseId); pplx::task<Response> modify(std::string const &key, std::string const &value,
int64_t leaseId);
/** /**
* Modifies an existing key. Fails if the key does not exists. * Modifies an existing key only if it has a specific value. Fails if the key
* @param key is the key to be modified * does not exists or the original value differs from the expected one.
* @param value is the new value to be set * @param key is the key to be modified
*/ * @param value is the new value to be set
pplx::task<Response> modify(std::string const & key, std::string const & value, int ttl = 0); * @param old_value is the value to be replaced
*/
pplx::task<Response> modify_if(std::string const &key,
std::string const &value,
std::string const &old_value, int ttl = 0);
/** /**
* Modifies an existing key. Fails if the key does not exists. * Modifies an existing key only if it has a specific value. Fails if the key
* @param key is the key to be modified * does not exists or the original value differs from the expected one.
* @param value is the new value to be set * @param key is the key to be modified
* @param leaseId is the lease attached to the key * @param value is the new value to be set
*/ * @param old_value is the value to be replaced
pplx::task<Response> modify(std::string const & key, std::string const & value, int64_t leaseId); * @param leaseId is the lease attached to the key
*/
pplx::task<Response> modify_if(std::string const &key,
std::string const &value,
std::string const &old_value, int64_t leaseId);
/** /**
* Modifies an existing key only if it has a specific value. Fails if the key does not exists * Modifies an existing key only if it has a specific modification index
* or the original value differs from the expected one. * value. Fails if the key does not exists or the modification index of the
* @param key is the key to be modified * previous value differs from the expected one.
* @param value is the new value to be set * @param key is the key to be modified
* @param old_value is the value to be replaced * @param value is the new value to be set
*/ * @param old_index is the expected index of the original value
pplx::task<Response> modify_if(std::string const & key, std::string const & value, std::string const & old_value, int ttl = 0); */
pplx::task<Response> modify_if(std::string const &key,
std::string const &value, int old_index,
int ttl = 0);
/** /**
* Modifies an existing key only if it has a specific value. Fails if the key does not exists * Modifies an existing key only if it has a specific modification index
* or the original value differs from the expected one. * value. Fails if the key does not exists or the modification index of the
* @param key is the key to be modified * previous value differs from the expected one.
* @param value is the new value to be set * @param key is the key to be modified
* @param old_value is the value to be replaced * @param value is the new value to be set
* @param leaseId is the lease attached to the key * @param old_index is the expected index of the original value
*/ * @param leaseId is the lease attached to the key
pplx::task<Response> modify_if(std::string const & key, std::string const & value, std::string const & old_value, int64_t leaseId); */
pplx::task<Response> modify_if(std::string const &key,
std::string const &value, int old_index,
int64_t leaseId);
/** /**
* Modifies an existing key only if it has a specific modification index value. Fails if the key * Removes a single key. The key has to point to a plain, non directory entry.
* does not exists or the modification index of the previous value differs from the expected one. * @param key is the key to be deleted
* @param key is the key to be modified */
* @param value is the new value to be set pplx::task<Response> rm(std::string const &key);
* @param old_index is the expected index of the original value
*/
pplx::task<Response> modify_if(std::string const & key, std::string const & value, int old_index, int ttl = 0);
/** /**
* Modifies an existing key only if it has a specific modification index value. Fails if the key * Removes a single key but only if it has a specific value. Fails if the key
* does not exists or the modification index of the previous value differs from the expected one. * does not exists or the its value differs from the expected one.
* @param key is the key to be modified * @param key is the key to be deleted
* @param value is the new value to be set */
* @param old_index is the expected index of the original value pplx::task<Response> rm_if(std::string const &key,
* @param leaseId is the lease attached to the key std::string const &old_value);
*/
pplx::task<Response> modify_if(std::string const & key, std::string const & value, int old_index, int64_t leaseId);
/** /**
* Removes a single key. The key has to point to a plain, non directory entry. * Removes an existing key only if it has a specific modification index value.
* @param key is the key to be deleted * Fails if the key does not exists or the modification index of it differs
*/ * from the expected one.
pplx::task<Response> rm(std::string const & key); * @param key is the key to be deleted
* @param old_index is the expected index of the existing value
*/
pplx::task<Response> rm_if(std::string const &key, int old_index);
/** /**
* Removes a single key but only if it has a specific value. Fails if the key does not exists * Gets a directory listing of the directory identified by the key.
* or the its value differs from the expected one. * @param key is the key to be listed
* @param key is the key to be deleted */
*/ pplx::task<Response> ls(std::string const &key);
pplx::task<Response> rm_if(std::string const & key, std::string const & old_value);
/** /**
* Removes an existing key only if it has a specific modification index value. Fails if the key * Gets a directory listing of the directory identified by the key.
* does not exists or the modification index of it differs from the expected one. * @param key is the key to be listed
* @param key is the key to be deleted * @param limit is the size limit of results to be listed, we don't use
* @param old_index is the expected index of the existing value * default parameters to ensure backwards binary compatibility.
*/ */
pplx::task<Response> rm_if(std::string const & key, int old_index); pplx::task<Response> ls(std::string const &key, size_t const limit);
/** /**
* Gets a directory listing of the directory identified by the key. * Removes a directory node. Fails if the parent directory dos not exists or
* @param key is the key to be listed * not a directory.
*/ * @param key is the directory to be created to be listed
pplx::task<Response> ls(std::string const & key); * @param recursive if true then delete a whole subtree, otherwise deletes
* only an empty directory.
*/
pplx::task<Response> rmdir(std::string const &key, bool recursive = false);
/**
* Watches for changes of a key or a subtree. Please note that if you watch
* e.g. "/testdir" and a new key is created, like "/testdir/newkey" then no
* change happened in the value of
* "/testdir" so your watch will not detect this. If you want to detect
* addition and deletion of directory entries then you have to do a recursive
* watch.
* @param key is the value or directory to be watched
* @param recursive if true watch a whole subtree
*/
pplx::task<Response> watch(std::string const &key, bool recursive = false);
/** /**
* Gets a directory listing of the directory identified by the key. * Watches for changes of a key or a subtree from a specific index. The index
* @param key is the key to be listed * value can be in the "past".
* @param limit is the size limit of results to be listed, we don't use default parameters * @param key is the value or directory to be watched
* to ensure backwards binary compatibility. * @param fromIndex the first index we are interested in
*/ * @param recursive if true watch a whole subtree
pplx::task<Response> ls(std::string const & key, size_t const limit); */
pplx::task<Response> watch(std::string const &key, int fromIndex,
bool recursive = false);
/**
* Grants a lease.
* @param ttl is the time to live of the lease (in seconds)
*/
pplx::task<Response> leasegrant(int ttl);
/** /**
* Removes a directory node. Fails if the parent directory dos not exists or not a directory. * Gets the list of existing leases
* @param key is the directory to be created to be listed */
* @param recursive if true then delete a whole subtree, otherwise deletes only an empty directory. pplx::task<Response> listleases();
*/
pplx::task<Response> rmdir(std::string const & key, bool recursive = false);
/** /**
* Watches for changes of a key or a subtree. Please note that if you watch e.g. "/testdir" and * Renews a lease.
* a new key is created, like "/testdir/newkey" then no change happened in the value of * @param lease_id is the id of of the lease
* "/testdir" so your watch will not detect this. If you want to detect addition and deletion of */
* directory entries then you have to do a recursive watch. pplx::task<Response> leasekeepalive(int64_t lease_id);
* @param key is the value or directory to be watched
* @param recursive if true watch a whole subtree
*/
pplx::task<Response> watch(std::string const & key, bool recursive = false);
/** /**
* Watches for changes of a key or a subtree from a specific index. The index value can be in the "past". * Revokes a lease.
* @param key is the value or directory to be watched * @param lease_id is the id of of the lease
* @param fromIndex the first index we are interested in */
* @param recursive if true watch a whole subtree pplx::task<Response> leaserevoke(int64_t lease_id);
*/
pplx::task<Response> watch(std::string const & key, int fromIndex, bool recursive = false);
/** /**
* Grants a lease. * Gets the TTL of an existing lease
* @param ttl is the time to live of the lease * @param lease_id is the id of of the lease
*/ */
pplx::task<Response> leasegrant(int ttl); pplx::task<Response> leasettl(int64_t lease_id, bool keys);
/** /**
* Gains a lock at a key. * Gains a lock at a key.
* @param key is the key to be used to request the lock. * @param key is the key to be used to request the lock.
*/ */
pplx::task<Response> lock(std::string const &key); pplx::task<Response> lock(std::string const &key);
/** /**
* Releases a lock at a key. * Releases a lock at a key.
* @param key is the lock key to release. * @param key is the lock key to release.
*/ */
pplx::task<Response> unlock(std::string const &key); pplx::task<Response> unlock(std::string const &key);
/** /**
* Execute a etcd transaction. * Execute a etcd transaction.
* @param txn is the transaction object to be executed. * @param txn is the transaction object to be executed.
*/ */
pplx::task<Response> txn(etcdv3::Transaction const &txn); pplx::task<Response> txn(etcdv3::Transaction const &txn);
private: private:
std::shared_ptr<grpc::Channel> channel; std::shared_ptr<grpc::Channel> channel;
std::string auth_token; std::string auth_token;
std::unique_ptr<KV::Stub> kvServiceStub; std::unique_ptr<KV::Stub> kvServiceStub;
std::unique_ptr<Watch::Stub> watchServiceStub; std::unique_ptr<Watch::Stub> watchServiceStub;
std::unique_ptr<Lease::Stub> leaseServiceStub; std::unique_ptr<Lease::Stub> leaseServiceStub;
std::unique_ptr<Lock::Stub> lockServiceStub; std::unique_ptr<Lock::Stub> lockServiceStub;
friend class Watcher; friend class Watcher;
friend class KeepAlive;
}; };
} // namespace etcd
}
#endif #endif

110
etcd/KeepAlive.hpp Normal file
View File

@ -0,0 +1,110 @@
#pragma once
#include <agents.h>
#include <concurrent_unordered_map.h>
#include <grpc++/grpc++.h>
#include <string>
#include "etcd/Client.hpp"
#include "etcd/Response.hpp"
#include "proto/rpc.grpc.pb.h"
namespace etcdv3 {
class AsyncKeepAliveAction;
}
using etcdserverpb::KV;
using grpc::Channel;
namespace etcd {
class KeepAlive {
enum class Type { READ = 1, WRITE = 2, CONNECT = 3, WRITES_DONE = 4, FINISH = 5 };
public:
/**
* Create an instance of the KeepAlive service
* Call start() to being the timer that automatically sends keepalives
* Call add(leaseid) to add a leaseid to be kept alive by this service
* @param client the client etcd connection to use
*/
KeepAlive(Client& client);
/**
* Start processing keepalive's
* The refresh time must be less than the granted TTL of your
* leases, otherwise they will expire. Create multiple KeepAlive
* services with different refreshes if you have many leases with varied
* TTL's
* @param refresh_in_ms the time between refreshes (default: 5000ms)
*/
pplx::task<void> start(int refresh_in_ms = 5000);
/**
* Add a lease to be kept alive by this service
* @param leaseid the id of the lease
* @param ttl the ttl of the lease (in seconds) <reserved for future use>
*/
void add(int64_t leaseid, int ttl = 5);
/**
* Remove a lease from being kept alive, and optionally revoke it immediately
* @param leaseid the id of the lease
* @param revoke true to immediatley revoke the lease (defaults to false)
*/
void remove(int64_t leaseid, bool revoke = false);
~KeepAlive();
private:
/**
* Sends a keep alive for the next lease in the queue
*/
void sendNextKeepAlive();
/**
*
*/
void readNextMessage();
// The timer that is triggering refreshes
std::unique_ptr<pplx::timer<int>> timer_;
// The map of leases that have been registered with this service to be kept alive
pplx::concurrent_unordered_map<int64_t, int> leases_;
// The current queue of leases that still need to be refreshed on this pass of the timer
pplx::concurrent_queue<std::pair<int64_t, int>> leaseQueue_;
// The long running task for this service
pplx::task<void> currentTask_;
// The client that we are attached to.
Client& client_;
// Context for the client. It could be used to convey extra information to
// the server and/or tweak certain RPC behaviors.
grpc::ClientContext context_;
// The producer-consumer queue we use to communicate asynchronously with the
// gRPC runtime.
grpc::CompletionQueue cq_;
// Out of the passed in Channel comes the stub, stored here, our view of the
// server's exposed services.
std::unique_ptr<etcdserverpb::Lease::Stub> stub_;
// The bidirectional, asynchronous stream for sending/receiving messages.
std::unique_ptr<
grpc::ClientAsyncReaderWriter<etcdserverpb::LeaseKeepAliveRequest, etcdserverpb::LeaseKeepAliveResponse>>
stream_;
// Allocated protobuf that holds the response. In real clients and servers,
// the memory management would a bit more complex as the thread that fills
// in the response should take care of concurrency as well as memory
// management.
etcdserverpb::LeaseKeepAliveResponse response_;
// Finish status when the client is done with the stream.
grpc::Status finish_status_ = grpc::Status::OK;
};
} // namespace etcd

View File

@ -5,6 +5,7 @@
#include <vector> #include <vector>
#include "etcd/Value.hpp" #include "etcd/Value.hpp"
#include "etcd/v3/LeaseInfo.hpp"
#include <grpc++/grpc++.h> #include <grpc++/grpc++.h>
#include "proto/kv.pb.h" #include "proto/kv.pb.h"
@ -12,6 +13,7 @@
namespace etcdv3 { namespace etcdv3 {
class AsyncWatchAction; class AsyncWatchAction;
class AsyncLeaseKeepAliveAction;
class V3Response; class V3Response;
} }
@ -122,6 +124,16 @@ namespace etcd
*/ */
std::chrono::microseconds const & duration() const; std::chrono::microseconds const & duration() const;
/**
* Returns the list of leases
*/
std::vector<int64_t> const& leases() const;
/**
* Returns detailed info for the lease
*/
etcdv3::LeaseInfo const& leaseinfo() const;
protected: protected:
Response(const etcdv3::V3Response& response, std::chrono::microseconds const& duration); Response(const etcdv3::V3Response& response, std::chrono::microseconds const& duration);
Response(int error_code, char const * error_message); Response(int error_code, char const * error_message);
@ -133,12 +145,15 @@ namespace etcd
Value _value; Value _value;
Value _prev_value; Value _prev_value;
Values _values; Values _values;
std::vector<int64_t> _leases;
etcdv3::LeaseInfo _leaseinfo;
Keys _keys; Keys _keys;
std::string _lock_key; // for lock std::string _lock_key; // for lock
std::vector<mvccpb::Event> _events; // for watch std::vector<mvccpb::Event> _events; // for watch
std::chrono::microseconds _duration; // execute duration (in microseconds), during the action created and response parsed std::chrono::microseconds _duration; // execute duration (in microseconds), during the action created and response parsed
friend class SyncClient; friend class SyncClient;
friend class etcdv3::AsyncWatchAction; friend class etcdv3::AsyncWatchAction;
friend class etcdv3::AsyncLeaseKeepAliveAction;
friend class Client; friend class Client;
}; };
} }

View File

@ -48,6 +48,9 @@ namespace etcd
*/ */
int ttl() const; int ttl() const;
/**
* Returns the id of the lease
*/
int64_t lease() const; int64_t lease() const;
protected: protected:

View File

@ -31,6 +31,7 @@ namespace etcdv3
int revision; int revision;
int old_revision; int old_revision;
int64_t lease_id; int64_t lease_id;
bool keys;
int ttl; int ttl;
int limit; int limit;
std::string key; std::string key;

View File

@ -0,0 +1,32 @@
#ifndef __ASYNC_LEASEKEEPALIVEACTION_HPP__
#define __ASYNC_LEASEKEEPALIVEACTION_HPP__
#include <grpc++/grpc++.h>
#include "etcd/Response.hpp"
#include "etcd/v3/Action.hpp"
#include "etcd/v3/AsyncleaseKeepAliveResponse.hpp"
#include "proto/rpc.grpc.pb.h"
using etcd::Response;
using etcdserverpb::LeaseKeepAliveRequest;
using etcdserverpb::LeaseKeepAliveResponse;
using grpc::ClientAsyncReaderWriter;
using grpc::ClientAsyncResponseReader;
namespace etcdv3 {
class AsyncLeaseKeepAliveAction : public etcdv3::Action {
public:
AsyncLeaseKeepAliveAction(etcdv3::ActionParameters param);
AsyncLeaseKeepAliveResponse ParseResponse();
void waitForResponse();
private:
LeaseKeepAliveResponse reply;
std::unique_ptr<ClientAsyncReaderWriter<LeaseKeepAliveRequest, LeaseKeepAliveResponse>> stream;
char* doneTag = "writes done";
};
} // namespace etcdv3
#endif

View File

@ -0,0 +1,23 @@
#ifndef __ASYNC_LEASEKEEPALIVERESPONSE_HPP__
#define __ASYNC_LEASEKEEPALIVERESPONSE_HPP__
#include <grpc++/grpc++.h>
#include "etcd/v3/V3Response.hpp"
#include "proto/rpc.grpc.pb.h"
#include "proto/rpc.pb.h"
using etcdserverpb::KV;
using etcdserverpb::LeaseKeepAliveRequest;
using etcdserverpb::LeaseKeepAliveResponse;
namespace etcdv3 {
class AsyncLeaseKeepAliveResponse : public etcdv3::V3Response {
public:
AsyncLeaseKeepAliveResponse(){};
void ParseResponse(LeaseKeepAliveResponse& resp);
};
} // namespace etcdv3
#endif

View File

@ -0,0 +1,26 @@
#ifndef __ASYNC_LEASELEASESACTION_HPP__
#define __ASYNC_LEASELEASESACTION_HPP__
#include <grpc++/grpc++.h>
#include "etcd/v3/Action.hpp"
#include "etcd/v3/AsyncLeaseLeasesResponse.hpp"
#include "proto/rpc.grpc.pb.h"
using etcdserverpb::LeaseLeasesRequest;
using etcdserverpb::LeaseLeasesResponse;
using grpc::ClientAsyncResponseReader;
namespace etcdv3 {
class AsyncLeaseLeasesAction : public etcdv3::Action {
public:
AsyncLeaseLeasesAction(etcdv3::ActionParameters param);
AsyncLeaseLeasesResponse ParseResponse();
private:
LeaseLeasesResponse reply;
std::unique_ptr<ClientAsyncResponseReader<LeaseLeasesResponse>> response_reader;
};
} // namespace etcdv3
#endif

View File

@ -0,0 +1,19 @@
#ifndef __ASYNC_LEASELEASESRESPONSE_HPP__
#define __ASYNC_LEASELEASESRESPONSE_HPP__
#include <grpc++/grpc++.h>
#include "etcd/v3/V3Response.hpp"
#include "proto/rpc.grpc.pb.h"
using etcdserverpb::LeaseLeasesResponse;
namespace etcdv3 {
class AsyncLeaseLeasesResponse : public etcdv3::V3Response {
public:
AsyncLeaseLeasesResponse(){};
void ParseResponse(LeaseLeasesResponse& resp);
};
} // namespace etcdv3
#endif

View File

@ -0,0 +1,27 @@
#ifndef __ASYNC_LEASEREVOKEACTION_HPP__
#define __ASYNC_LEASEREVOKEACTION_HPP__
#include <grpc++/grpc++.h>
#include "etcd/v3/Action.hpp"
#include "etcd/v3/AsyncLeaseRevokeResponse.hpp"
#include "proto/rpc.grpc.pb.h"
using etcdserverpb::LeaseRevokeRequest;
using etcdserverpb::LeaseRevokeResponse;
using grpc::ClientAsyncResponseReader;
namespace etcdv3 {
class AsyncLeaseRevokeAction : public etcdv3::Action {
public:
AsyncLeaseRevokeAction(etcdv3::ActionParameters param);
AsyncLeaseRevokeResponse ParseResponse();
private:
LeaseRevokeResponse reply;
std::unique_ptr<ClientAsyncResponseReader<LeaseRevokeResponse>> response_reader;
};
} // namespace etcdv3
#endif

View File

@ -0,0 +1,20 @@
#ifndef __ASYNC_LEASEREVOKERESPONSE_HPP__
#define __ASYNC_LEASEREVOKERESPONSE_HPP__
#include <grpc++/grpc++.h>
#include "etcd/v3/V3Response.hpp"
#include "proto/rpc.grpc.pb.h"
using etcdserverpb::LeaseRevokeResponse;
namespace etcdv3 {
class AsyncLeaseRevokeResponse : public etcdv3::V3Response {
public:
AsyncLeaseRevokeResponse(){};
void ParseResponse(LeaseRevokeResponse& resp);
};
} // namespace etcdv3
#endif

View File

@ -0,0 +1,27 @@
#ifndef __ASYNC_LEASETIMETOLIVEACTION_HPP__
#define __ASYNC_LEASETIMETOLIVEACTION_HPP__
#include <grpc++/grpc++.h>
#include "etcd/v3/Action.hpp"
#include "etcd/v3/AsyncLeaseTimeToLiveResponse.hpp"
#include "proto/rpc.grpc.pb.h"
using etcdserverpb::LeaseTimeToLiveRequest;
using etcdserverpb::LeaseTimeToLiveResponse;
using grpc::ClientAsyncResponseReader;
namespace etcdv3 {
class AsyncLeaseTimeToLiveAction : public etcdv3::Action {
public:
AsyncLeaseTimeToLiveAction(etcdv3::ActionParameters param);
AsyncLeaseTimeToLiveResponse ParseResponse();
private:
LeaseTimeToLiveResponse reply;
std::unique_ptr<ClientAsyncResponseReader<LeaseTimeToLiveResponse>> response_reader;
};
} // namespace etcdv3
#endif

View File

@ -0,0 +1,20 @@
#ifndef __ASYNC_LEASETIMETOLIVERESPONSE_HPP__
#define __ASYNC_LEASETIMETOLIVERESPONSE_HPP__
#include <grpc++/grpc++.h>
#include "etcd/v3/V3Response.hpp"
#include "proto/rpc.grpc.pb.h"
using etcdserverpb::LeaseTimeToLiveResponse;
namespace etcdv3 {
class AsyncLeaseTimeToLiveResponse : public etcdv3::V3Response {
public:
AsyncLeaseTimeToLiveResponse(){};
void ParseResponse(LeaseTimeToLiveResponse& resp);
};
} // namespace etcdv3
#endif

27
etcd/v3/LeaseInfo.hpp Normal file
View File

@ -0,0 +1,27 @@
#ifndef __V3_ETCDV3LEASEINFO_HPP__
#define __V3_ETCDV3LEASEINFO_HPP__
#include "proto/kv.pb.h"
namespace etcdv3 {
class LeaseInfo {
public:
LeaseInfo();
// mvccpb::KeyValue kvs;
void set_lease(int64_t leaseid);
void set_ttl(int ttl);
void set_grantedttl(int ttl);
void add_key(std::string key);
int64_t get_lease() const;
int get_ttl() const;
int get_grantedttl() const;
std::vector<std::string> get_keys() const;
private:
int64_t leaseid_;
int ttl_;
int grantedttl_;
std::vector<std::string> keys_;
};
} // namespace etcdv3
#endif

View File

@ -25,6 +25,10 @@ public:
void setup_delete_failure_operation(std::string const &key, std::string const &range_end, bool recursive); void setup_delete_failure_operation(std::string const &key, std::string const &range_end, bool recursive);
void setup_compare_and_delete_operation(std::string const& key); void setup_compare_and_delete_operation(std::string const& key);
void setup_lease_grant_operation(int ttl); void setup_lease_grant_operation(int ttl);
void setup_lease_leases_operation();
void setup_lease_keepalive_operation(int64_t lease_id);
void setup_lease_revoke_operation(int64_t lease_id);
void setup_lease_timetolive_operation(int64_t lease_id, bool keys);
// update without `get` and no `prev_kv` returned // update without `get` and no `prev_kv` returned
void setup_put(std::string const &key, std::string const &value); void setup_put(std::string const &key, std::string const &value);
@ -32,6 +36,10 @@ public:
etcdserverpb::TxnRequest txn_request; etcdserverpb::TxnRequest txn_request;
etcdserverpb::LeaseGrantRequest leasegrant_request; etcdserverpb::LeaseGrantRequest leasegrant_request;
etcdserverpb::LeaseLeasesRequest leaseleases_request;
etcdserverpb::LeaseKeepAliveRequest leasekeepalive_request;
etcdserverpb::LeaseRevokeRequest leaserevoke_request;
etcdserverpb::LeaseTimeToLiveRequest leasetimetolive_request;
private: private:
std::string key; std::string key;

View File

@ -5,6 +5,7 @@
#include "proto/kv.pb.h" #include "proto/kv.pb.h"
#include "etcd/v3/KeyValue.hpp" #include "etcd/v3/KeyValue.hpp"
#include "etcd/v3/LeaseInfo.hpp"
namespace etcdv3 namespace etcdv3
{ {
@ -21,9 +22,12 @@ namespace etcdv3
std::string const & get_action() const; std::string const & get_action() const;
std::vector<etcdv3::KeyValue> const & get_values() const; std::vector<etcdv3::KeyValue> const & get_values() const;
std::vector<etcdv3::KeyValue> const & get_prev_values() const; std::vector<etcdv3::KeyValue> const & get_prev_values() const;
std::vector<int64_t> const& get_leases() const;
etcdv3::KeyValue const & get_value() const; etcdv3::KeyValue const & get_value() const;
etcdv3::KeyValue const & get_prev_value() const; etcdv3::KeyValue const & get_prev_value() const;
etcdv3::LeaseInfo const& get_leaseinfo() const;
bool has_values() const; bool has_values() const;
bool has_leases() const;
void set_lock_key(std::string const &key); void set_lock_key(std::string const &key);
std::string const &get_lock_key() const; std::string const &get_lock_key() const;
std::vector<mvccpb::Event> const & get_events() const; std::vector<mvccpb::Event> const & get_events() const;
@ -38,6 +42,8 @@ namespace etcdv3
std::vector<etcdv3::KeyValue> prev_values; std::vector<etcdv3::KeyValue> prev_values;
std::string lock_key; // for lock std::string lock_key; // for lock
std::vector<mvccpb::Event> events; // for watch std::vector<mvccpb::Event> events; // for watch
std::vector<int64_t> leases; // for list leases
LeaseInfo leaseinfo; // for lease ttl information
}; };
} }
#endif #endif

View File

@ -1,32 +1,36 @@
#include <netdb.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h> #include <sys/socket.h>
#include <netdb.h> #include <sys/types.h>
#include <limits>
#include <memory>
#include "etcd/Client.hpp" #include "etcd/Client.hpp"
#include "etcd/v3/action_constants.hpp"
#include "etcd/v3/Action.hpp" #include "etcd/v3/Action.hpp"
#include "etcd/v3/AsyncTxnResponse.hpp"
#include "etcd/v3/AsyncRangeResponse.hpp"
#include "etcd/v3/AsyncWatchResponse.hpp"
#include "etcd/v3/AsyncDeleteRangeResponse.hpp" #include "etcd/v3/AsyncDeleteRangeResponse.hpp"
#include "etcd/v3/AsyncLockResponse.hpp" #include "etcd/v3/AsyncLockResponse.hpp"
#include "etcd/v3/AsyncRangeResponse.hpp"
#include "etcd/v3/AsyncTxnResponse.hpp"
#include "etcd/v3/AsyncWatchResponse.hpp"
#include "etcd/v3/Transaction.hpp" #include "etcd/v3/Transaction.hpp"
#include "etcd/v3/action_constants.hpp"
#include <iostream> #include <iostream>
#include <limits>
#include <memory>
#include "etcd/v3/AsyncSetAction.hpp"
#include "etcd/v3/AsyncCompareAndSwapAction.hpp"
#include "etcd/v3/AsyncCompareAndDeleteAction.hpp" #include "etcd/v3/AsyncCompareAndDeleteAction.hpp"
#include "etcd/v3/AsyncUpdateAction.hpp" #include "etcd/v3/AsyncCompareAndSwapAction.hpp"
#include "etcd/v3/AsyncGetAction.hpp"
#include "etcd/v3/AsyncDeleteAction.hpp" #include "etcd/v3/AsyncDeleteAction.hpp"
#include "etcd/v3/AsyncWatchAction.hpp" #include "etcd/v3/AsyncGetAction.hpp"
#include "etcd/v3/AsyncLeaseGrantAction.hpp" #include "etcd/v3/AsyncLeaseGrantAction.hpp"
#include "etcd/v3/AsyncLeaseKeepAliveAction.hpp"
#include "etcd/v3/AsyncLeaseLeasesAction.hpp"
#include "etcd/v3/AsyncLeaseRevokeAction.hpp"
#include "etcd/v3/AsyncLeaseTimeToLiveAction.hpp"
#include "etcd/v3/AsyncLockAction.hpp" #include "etcd/v3/AsyncLockAction.hpp"
#include "etcd/v3/AsyncSetAction.hpp"
#include "etcd/v3/AsyncTxnAction.hpp" #include "etcd/v3/AsyncTxnAction.hpp"
#include "etcd/v3/AsyncUpdateAction.hpp"
#include "etcd/v3/AsyncWatchAction.hpp"
#include <boost/algorithm/string.hpp> #include <boost/algorithm/string.hpp>
@ -35,7 +39,8 @@ using grpc::Channel;
namespace etcd { namespace etcd {
namespace detail { namespace detail {
static bool dns_resolve(std::string const &target, std::vector<std::string> &endpoints) { static bool dns_resolve(std::string const &target,
std::vector<std::string> &endpoints) {
struct addrinfo hints = {}, *addrs; struct addrinfo hints = {}, *addrs;
hints.ai_family = AF_INET; hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM; hints.ai_socktype = SOCK_STREAM;
@ -47,15 +52,18 @@ static bool dns_resolve(std::string const &target, std::vector<std::string> &end
std::cerr << "warn: invalid URL: " << target << std::endl; std::cerr << "warn: invalid URL: " << target << std::endl;
return false; return false;
} }
if (getaddrinfo(target_parts[0].c_str(), target_parts[1].c_str(), &hints, &addrs) != 0) { if (getaddrinfo(target_parts[0].c_str(), target_parts[1].c_str(), &hints,
std::cerr << "warn: getaddrinfo() failed for endpoint " << target << std::endl; &addrs) != 0) {
std::cerr << "warn: getaddrinfo() failed for endpoint " << target
<< std::endl;
return false; return false;
} }
char host[16] = {'\0'}; char host[16] = {'\0'};
for (struct addrinfo* addr = addrs; addr != nullptr; addr = addr->ai_next) { for (struct addrinfo *addr = addrs; addr != nullptr; addr = addr->ai_next) {
memset(host, '\0', sizeof(host)); memset(host, '\0', sizeof(host));
getnameinfo(addr->ai_addr, addr->ai_addrlen, host, sizeof(host), NULL, 0, NI_NUMERICHOST); getnameinfo(addr->ai_addr, addr->ai_addrlen, host, sizeof(host), NULL, 0,
NI_NUMERICHOST);
endpoints.emplace_back(std::string(host) + ":" + target_parts[1]); endpoints.emplace_back(std::string(host) + ":" + target_parts[1]);
} }
freeaddrinfo(addrs); freeaddrinfo(addrs);
@ -64,14 +72,16 @@ static bool dns_resolve(std::string const &target, std::vector<std::string> &end
const std::string strip_and_resolve_addresses(std::string const &address) { const std::string strip_and_resolve_addresses(std::string const &address) {
std::vector<std::string> addresses; std::vector<std::string> addresses;
boost::algorithm::split(addresses, address, boost::algorithm::is_any_of(",;")); boost::algorithm::split(addresses, address,
boost::algorithm::is_any_of(",;"));
std::string stripped_address; std::string stripped_address;
{ {
std::vector<std::string> stripped_addresses; std::vector<std::string> stripped_addresses;
std::string substr("://"); std::string substr("://");
for (auto const &addr: addresses) { for (auto const &addr : addresses) {
std::string::size_type idx = addr.find(substr); std::string::size_type idx = addr.find(substr);
std::string target = idx == std::string::npos ? addr : addr.substr(idx + substr.length()); std::string target =
idx == std::string::npos ? addr : addr.substr(idx + substr.length());
etcd::detail::dns_resolve(target, stripped_addresses); etcd::detail::dns_resolve(target, stripped_addresses);
} }
stripped_address = boost::algorithm::join(stripped_addresses, ","); stripped_address = boost::algorithm::join(stripped_addresses, ",");
@ -100,370 +110,416 @@ const bool authenticate(std::shared_ptr<grpc::Channel> const &channel,
} }
} }
} } // namespace detail
} } // namespace etcd
etcd::Client::Client(std::string const & address, etcd::Client::Client(std::string const &address,
std::string const & load_balancer) std::string const &load_balancer) {
{
// create channels // create channels
std::string const addresses = etcd::detail::strip_and_resolve_addresses(address); std::string const addresses =
etcd::detail::strip_and_resolve_addresses(address);
grpc::ChannelArguments grpc_args; grpc::ChannelArguments grpc_args;
grpc_args.SetMaxSendMessageSize(std::numeric_limits<int>::max()); grpc_args.SetMaxSendMessageSize(std::numeric_limits<int>::max());
grpc_args.SetMaxReceiveMessageSize(std::numeric_limits<int>::max()); grpc_args.SetMaxReceiveMessageSize(std::numeric_limits<int>::max());
std::shared_ptr<grpc::ChannelCredentials> creds = grpc::InsecureChannelCredentials(); std::shared_ptr<grpc::ChannelCredentials> creds =
grpc::InsecureChannelCredentials();
grpc_args.SetLoadBalancingPolicyName(load_balancer); grpc_args.SetLoadBalancingPolicyName(load_balancer);
this->channel = grpc::CreateCustomChannel(addresses, creds, grpc_args); this->channel = grpc::CreateCustomChannel(addresses, creds, grpc_args);
// create stubs // create stubs
kvServiceStub = KV::NewStub(this->channel); kvServiceStub = KV::NewStub(this->channel);
watchServiceStub= Watch::NewStub(this->channel); watchServiceStub = Watch::NewStub(this->channel);
leaseServiceStub= Lease::NewStub(this->channel); leaseServiceStub = Lease::NewStub(this->channel);
lockServiceStub = Lock::NewStub(this->channel); lockServiceStub = Lock::NewStub(this->channel);
} }
etcd::Client::Client(std::string const & address, etcd::Client::Client(std::string const &address, std::string const &username,
std::string const & username, std::string const &password,
std::string const & password, std::string const &load_balancer) {
std::string const & load_balancer)
{
// create channels // create channels
std::string const addresses = etcd::detail::strip_and_resolve_addresses(address); std::string const addresses =
etcd::detail::strip_and_resolve_addresses(address);
grpc::ChannelArguments grpc_args; grpc::ChannelArguments grpc_args;
grpc_args.SetMaxSendMessageSize(std::numeric_limits<int>::max()); grpc_args.SetMaxSendMessageSize(std::numeric_limits<int>::max());
grpc_args.SetMaxReceiveMessageSize(std::numeric_limits<int>::max()); grpc_args.SetMaxReceiveMessageSize(std::numeric_limits<int>::max());
std::shared_ptr<grpc::ChannelCredentials> creds = grpc::InsecureChannelCredentials(); std::shared_ptr<grpc::ChannelCredentials> creds =
grpc::InsecureChannelCredentials();
grpc_args.SetLoadBalancingPolicyName(load_balancer); grpc_args.SetLoadBalancingPolicyName(load_balancer);
this->channel = grpc::CreateCustomChannel(addresses, creds, grpc_args); this->channel = grpc::CreateCustomChannel(addresses, creds, grpc_args);
// auth // auth
std::string token_or_message; std::string token_or_message;
if (!etcd::detail::authenticate(this->channel, username, password, token_or_message)) { if (!etcd::detail::authenticate(this->channel, username, password,
throw std::invalid_argument("Etcd authentication failed: " + token_or_message); token_or_message)) {
throw std::invalid_argument("Etcd authentication failed: " +
token_or_message);
} }
this->auth_token = token_or_message; this->auth_token = token_or_message;
// setup stubs // setup stubs
kvServiceStub = KV::NewStub(this->channel); kvServiceStub = KV::NewStub(this->channel);
watchServiceStub= Watch::NewStub(this->channel); watchServiceStub = Watch::NewStub(this->channel);
leaseServiceStub= Lease::NewStub(this->channel); leaseServiceStub = Lease::NewStub(this->channel);
lockServiceStub = Lock::NewStub(this->channel); lockServiceStub = Lock::NewStub(this->channel);
} }
pplx::task<etcd::Response> etcd::Client::get(std::string const & key) pplx::task<etcd::Response> etcd::Client::get(std::string const &key) {
{
etcdv3::ActionParameters params; etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token); params.auth_token.assign(this->auth_token);
params.key.assign(key); params.key.assign(key);
params.withPrefix = false; params.withPrefix = false;
params.kv_stub = kvServiceStub .get(); params.kv_stub = kvServiceStub.get();
std::shared_ptr<etcdv3::AsyncGetAction> call(new etcdv3::AsyncGetAction(params)); std::shared_ptr<etcdv3::AsyncGetAction> call(
new etcdv3::AsyncGetAction(params));
return Response::create(call); return Response::create(call);
} }
pplx::task<etcd::Response> etcd::Client::set(std::string const & key, std::string const & value, int ttl) pplx::task<etcd::Response>
{ etcd::Client::set(std::string const &key, std::string const &value, int ttl) {
etcdv3::ActionParameters params; etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token); params.auth_token.assign(this->auth_token);
params.key.assign(key); params.key.assign(key);
params.value.assign(value); params.value.assign(value);
params.kv_stub = kvServiceStub .get(); params.kv_stub = kvServiceStub.get();
if(ttl > 0) if (ttl > 0) {
{
auto res = leasegrant(ttl).get(); auto res = leasegrant(ttl).get();
if(!res.is_ok()) if (!res.is_ok()) {
{ return pplx::task<etcd::Response>([res]() {
return pplx::task<etcd::Response>([res]()
{
return etcd::Response(res.error_code(), res.error_message().c_str()); return etcd::Response(res.error_code(), res.error_message().c_str());
}); });
} } else {
else
{
params.lease_id = res.value().lease(); params.lease_id = res.value().lease();
} }
} }
std::shared_ptr<etcdv3::AsyncSetAction> call(new etcdv3::AsyncSetAction(params)); std::shared_ptr<etcdv3::AsyncSetAction> call(
new etcdv3::AsyncSetAction(params));
return Response::create(call); return Response::create(call);
} }
pplx::task<etcd::Response> etcd::Client::set(std::string const & key, std::string const & value, int64_t leaseid) pplx::task<etcd::Response> etcd::Client::set(std::string const &key,
{ std::string const &value,
int64_t leaseid) {
etcdv3::ActionParameters params; etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token); params.auth_token.assign(this->auth_token);
params.key.assign(key); params.key.assign(key);
params.value.assign(value); params.value.assign(value);
params.lease_id = leaseid; params.lease_id = leaseid;
params.kv_stub = kvServiceStub .get(); params.kv_stub = kvServiceStub.get();
std::shared_ptr<etcdv3::AsyncSetAction> call(new etcdv3::AsyncSetAction(params)); std::shared_ptr<etcdv3::AsyncSetAction> call(
new etcdv3::AsyncSetAction(params));
return Response::create(call); return Response::create(call);
} }
pplx::task<etcd::Response>
pplx::task<etcd::Response> etcd::Client::add(std::string const & key, std::string const & value, int ttl) etcd::Client::add(std::string const &key, std::string const &value, int ttl) {
{
etcdv3::ActionParameters params; etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token); params.auth_token.assign(this->auth_token);
params.key.assign(key); params.key.assign(key);
params.value.assign(value); params.value.assign(value);
params.kv_stub = kvServiceStub .get(); params.kv_stub = kvServiceStub.get();
if(ttl > 0) if (ttl > 0) {
{
auto res = leasegrant(ttl).get(); auto res = leasegrant(ttl).get();
if(!res.is_ok()) if (!res.is_ok()) {
{ return pplx::task<etcd::Response>([res]() {
return pplx::task<etcd::Response>([res]()
{
return etcd::Response(res.error_code(), res.error_message().c_str()); return etcd::Response(res.error_code(), res.error_message().c_str());
}); });
} } else {
else
{
params.lease_id = res.value().lease(); params.lease_id = res.value().lease();
} }
} }
std::shared_ptr<etcdv3::AsyncSetAction> call(new etcdv3::AsyncSetAction(params,true)); std::shared_ptr<etcdv3::AsyncSetAction> call(
new etcdv3::AsyncSetAction(params, true));
return Response::create(call); return Response::create(call);
} }
pplx::task<etcd::Response> etcd::Client::add(std::string const & key, std::string const & value, int64_t leaseid) pplx::task<etcd::Response> etcd::Client::add(std::string const &key,
{ std::string const &value,
int64_t leaseid) {
etcdv3::ActionParameters params; etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token); params.auth_token.assign(this->auth_token);
params.key.assign(key); params.key.assign(key);
params.value.assign(value); params.value.assign(value);
params.lease_id = leaseid; params.lease_id = leaseid;
params.kv_stub = kvServiceStub .get(); params.kv_stub = kvServiceStub.get();
std::shared_ptr<etcdv3::AsyncSetAction> call(new etcdv3::AsyncSetAction(params,true)); std::shared_ptr<etcdv3::AsyncSetAction> call(
new etcdv3::AsyncSetAction(params, true));
return Response::create(call); return Response::create(call);
} }
pplx::task<etcd::Response> etcd::Client::modify(std::string const &key,
pplx::task<etcd::Response> etcd::Client::modify(std::string const & key, std::string const & value, int ttl) std::string const &value,
{ int ttl) {
etcdv3::ActionParameters params; etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token); params.auth_token.assign(this->auth_token);
params.key.assign(key); params.key.assign(key);
params.value.assign(value); params.value.assign(value);
params.kv_stub = kvServiceStub .get(); params.kv_stub = kvServiceStub.get();
if(ttl > 0) if (ttl > 0) {
{
auto res = leasegrant(ttl).get(); auto res = leasegrant(ttl).get();
if(!res.is_ok()) if (!res.is_ok()) {
{ return pplx::task<etcd::Response>([res]() {
return pplx::task<etcd::Response>([res]()
{
return etcd::Response(res.error_code(), res.error_message().c_str()); return etcd::Response(res.error_code(), res.error_message().c_str());
}); });
} } else {
else
{
params.lease_id = res.value().lease(); params.lease_id = res.value().lease();
} }
} }
std::shared_ptr<etcdv3::AsyncUpdateAction> call(new etcdv3::AsyncUpdateAction(params)); std::shared_ptr<etcdv3::AsyncUpdateAction> call(
new etcdv3::AsyncUpdateAction(params));
return Response::create(call); return Response::create(call);
} }
pplx::task<etcd::Response> etcd::Client::modify(std::string const & key, std::string const & value, int64_t leaseid) pplx::task<etcd::Response> etcd::Client::modify(std::string const &key,
{ std::string const &value,
int64_t leaseid) {
etcdv3::ActionParameters params; etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token); params.auth_token.assign(this->auth_token);
params.key.assign(key); params.key.assign(key);
params.value.assign(value); params.value.assign(value);
params.lease_id = leaseid; params.lease_id = leaseid;
params.kv_stub = kvServiceStub .get(); params.kv_stub = kvServiceStub.get();
std::shared_ptr<etcdv3::AsyncUpdateAction> call(new etcdv3::AsyncUpdateAction(params)); std::shared_ptr<etcdv3::AsyncUpdateAction> call(
new etcdv3::AsyncUpdateAction(params));
return Response::create(call); return Response::create(call);
} }
pplx::task<etcd::Response> etcd::Client::modify_if(std::string const &key,
pplx::task<etcd::Response> etcd::Client::modify_if(std::string const & key, std::string const & value, std::string const & old_value, int ttl) std::string const &value,
{ std::string const &old_value,
int ttl) {
etcdv3::ActionParameters params; etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token); params.auth_token.assign(this->auth_token);
params.key.assign(key); params.key.assign(key);
params.value.assign(value); params.value.assign(value);
params.old_value.assign(old_value); params.old_value.assign(old_value);
params.kv_stub = kvServiceStub .get(); params.kv_stub = kvServiceStub.get();
if(ttl > 0) if (ttl > 0) {
{
auto res = leasegrant(ttl).get(); auto res = leasegrant(ttl).get();
if(!res.is_ok()) if (!res.is_ok()) {
{ return pplx::task<etcd::Response>([res]() {
return pplx::task<etcd::Response>([res]()
{
return etcd::Response(res.error_code(), res.error_message().c_str()); return etcd::Response(res.error_code(), res.error_message().c_str());
}); });
} } else {
else
{
params.lease_id = res.value().lease(); params.lease_id = res.value().lease();
} }
} }
std::shared_ptr<etcdv3::AsyncCompareAndSwapAction> call(new etcdv3::AsyncCompareAndSwapAction(params,etcdv3::Atomicity_Type::PREV_VALUE)); std::shared_ptr<etcdv3::AsyncCompareAndSwapAction> call(
new etcdv3::AsyncCompareAndSwapAction(
params, etcdv3::Atomicity_Type::PREV_VALUE));
return Response::create(call); return Response::create(call);
} }
pplx::task<etcd::Response> etcd::Client::modify_if(std::string const & key, std::string const & value, std::string const & old_value, int64_t leaseid) pplx::task<etcd::Response> etcd::Client::modify_if(std::string const &key,
{ std::string const &value,
std::string const &old_value,
int64_t leaseid) {
etcdv3::ActionParameters params; etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token); params.auth_token.assign(this->auth_token);
params.key.assign(key); params.key.assign(key);
params.value.assign(value); params.value.assign(value);
params.old_value.assign(old_value); params.old_value.assign(old_value);
params.lease_id = leaseid; params.lease_id = leaseid;
params.kv_stub = kvServiceStub .get(); params.kv_stub = kvServiceStub.get();
std::shared_ptr<etcdv3::AsyncCompareAndSwapAction> call(new etcdv3::AsyncCompareAndSwapAction(params,etcdv3::Atomicity_Type::PREV_VALUE)); std::shared_ptr<etcdv3::AsyncCompareAndSwapAction> call(
new etcdv3::AsyncCompareAndSwapAction(
params, etcdv3::Atomicity_Type::PREV_VALUE));
return Response::create(call); return Response::create(call);
} }
pplx::task<etcd::Response> etcd::Client::modify_if(std::string const & key, std::string const & value, int old_index, int ttl) pplx::task<etcd::Response> etcd::Client::modify_if(std::string const &key,
{ std::string const &value,
int old_index, int ttl) {
etcdv3::ActionParameters params; etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token); params.auth_token.assign(this->auth_token);
params.key.assign(key); params.key.assign(key);
params.value.assign(value); params.value.assign(value);
params.old_revision = old_index; params.old_revision = old_index;
params.kv_stub = kvServiceStub .get(); params.kv_stub = kvServiceStub.get();
if(ttl > 0) if (ttl > 0) {
{
auto res = leasegrant(ttl).get(); auto res = leasegrant(ttl).get();
if(!res.is_ok()) if (!res.is_ok()) {
{ return pplx::task<etcd::Response>([res]() {
return pplx::task<etcd::Response>([res]()
{
return etcd::Response(res.error_code(), res.error_message().c_str()); return etcd::Response(res.error_code(), res.error_message().c_str());
}); });
} } else {
else
{
params.lease_id = res.value().lease(); params.lease_id = res.value().lease();
} }
} }
std::shared_ptr<etcdv3::AsyncCompareAndSwapAction> call(new etcdv3::AsyncCompareAndSwapAction(params,etcdv3::Atomicity_Type::PREV_INDEX)); std::shared_ptr<etcdv3::AsyncCompareAndSwapAction> call(
new etcdv3::AsyncCompareAndSwapAction(
params, etcdv3::Atomicity_Type::PREV_INDEX));
return Response::create(call); return Response::create(call);
} }
pplx::task<etcd::Response> etcd::Client::modify_if(std::string const & key, std::string const & value, int old_index, int64_t leaseid) pplx::task<etcd::Response> etcd::Client::modify_if(std::string const &key,
{ std::string const &value,
int old_index,
int64_t leaseid) {
etcdv3::ActionParameters params; etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token); params.auth_token.assign(this->auth_token);
params.key.assign(key); params.key.assign(key);
params.value.assign(value); params.value.assign(value);
params.lease_id = leaseid; params.lease_id = leaseid;
params.old_revision = old_index; params.old_revision = old_index;
params.kv_stub = kvServiceStub .get(); params.kv_stub = kvServiceStub.get();
std::shared_ptr<etcdv3::AsyncCompareAndSwapAction> call(new etcdv3::AsyncCompareAndSwapAction(params,etcdv3::Atomicity_Type::PREV_INDEX)); std::shared_ptr<etcdv3::AsyncCompareAndSwapAction> call(
new etcdv3::AsyncCompareAndSwapAction(
params, etcdv3::Atomicity_Type::PREV_INDEX));
return Response::create(call); return Response::create(call);
} }
pplx::task<etcd::Response> etcd::Client::rm(std::string const &key) {
pplx::task<etcd::Response> etcd::Client::rm(std::string const & key)
{
etcdv3::ActionParameters params; etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token); params.auth_token.assign(this->auth_token);
params.key.assign(key); params.key.assign(key);
params.withPrefix = false; params.withPrefix = false;
params.kv_stub = kvServiceStub .get(); params.kv_stub = kvServiceStub.get();
std::shared_ptr<etcdv3::AsyncDeleteAction> call(new etcdv3::AsyncDeleteAction(params)); std::shared_ptr<etcdv3::AsyncDeleteAction> call(
new etcdv3::AsyncDeleteAction(params));
return Response::create(call); return Response::create(call);
} }
pplx::task<etcd::Response> etcd::Client::rm_if(std::string const &key,
pplx::task<etcd::Response> etcd::Client::rm_if(std::string const & key, std::string const & old_value) std::string const &old_value) {
{
etcdv3::ActionParameters params; etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token); params.auth_token.assign(this->auth_token);
params.key.assign(key); params.key.assign(key);
params.old_value.assign(old_value); params.old_value.assign(old_value);
params.kv_stub = kvServiceStub .get(); params.kv_stub = kvServiceStub.get();
std::shared_ptr<etcdv3::AsyncCompareAndDeleteAction> call(new etcdv3::AsyncCompareAndDeleteAction(params,etcdv3::Atomicity_Type::PREV_VALUE)); std::shared_ptr<etcdv3::AsyncCompareAndDeleteAction> call(
new etcdv3::AsyncCompareAndDeleteAction(
params, etcdv3::Atomicity_Type::PREV_VALUE));
return Response::create(call); return Response::create(call);
} }
pplx::task<etcd::Response> etcd::Client::rm_if(std::string const & key, int old_index) pplx::task<etcd::Response> etcd::Client::rm_if(std::string const &key,
{ int old_index) {
etcdv3::ActionParameters params; etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token); params.auth_token.assign(this->auth_token);
params.key.assign(key); params.key.assign(key);
params.old_revision = old_index; params.old_revision = old_index;
params.kv_stub = kvServiceStub .get(); params.kv_stub = kvServiceStub.get();
std::shared_ptr<etcdv3::AsyncCompareAndDeleteAction> call(new etcdv3::AsyncCompareAndDeleteAction(params, etcdv3::Atomicity_Type::PREV_INDEX));; std::shared_ptr<etcdv3::AsyncCompareAndDeleteAction> call(
new etcdv3::AsyncCompareAndDeleteAction(
params, etcdv3::Atomicity_Type::PREV_INDEX));
;
return Response::create(call); return Response::create(call);
} }
pplx::task<etcd::Response> etcd::Client::rmdir(std::string const & key, bool recursive) pplx::task<etcd::Response> etcd::Client::rmdir(std::string const &key,
{ bool recursive) {
etcdv3::ActionParameters params; etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token); params.auth_token.assign(this->auth_token);
params.key.assign(key); params.key.assign(key);
params.withPrefix = recursive; params.withPrefix = recursive;
params.kv_stub = kvServiceStub .get(); params.kv_stub = kvServiceStub.get();
std::shared_ptr<etcdv3::AsyncDeleteAction> call(new etcdv3::AsyncDeleteAction(params)); std::shared_ptr<etcdv3::AsyncDeleteAction> call(
new etcdv3::AsyncDeleteAction(params));
return Response::create(call); return Response::create(call);
} }
pplx::task<etcd::Response> etcd::Client::ls(std::string const & key) pplx::task<etcd::Response> etcd::Client::ls(std::string const &key) {
{
etcdv3::ActionParameters params; etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token); params.auth_token.assign(this->auth_token);
params.key.assign(key); params.key.assign(key);
params.withPrefix = true; params.withPrefix = true;
params.limit = 0; // default no limit. params.limit = 0; // default no limit.
params.kv_stub = kvServiceStub .get(); params.kv_stub = kvServiceStub.get();
std::shared_ptr<etcdv3::AsyncGetAction> call(new etcdv3::AsyncGetAction(params)); std::shared_ptr<etcdv3::AsyncGetAction> call(
new etcdv3::AsyncGetAction(params));
return Response::create(call); return Response::create(call);
} }
pplx::task<etcd::Response> etcd::Client::ls(std::string const & key, size_t const limit) pplx::task<etcd::Response> etcd::Client::ls(std::string const &key,
{ size_t const limit) {
etcdv3::ActionParameters params; etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token); params.auth_token.assign(this->auth_token);
params.key.assign(key); params.key.assign(key);
params.withPrefix = true; params.withPrefix = true;
params.limit = limit; params.limit = limit;
params.kv_stub = kvServiceStub .get(); params.kv_stub = kvServiceStub.get();
std::shared_ptr<etcdv3::AsyncGetAction> call(new etcdv3::AsyncGetAction(params)); std::shared_ptr<etcdv3::AsyncGetAction> call(
new etcdv3::AsyncGetAction(params));
return Response::create(call); return Response::create(call);
} }
pplx::task<etcd::Response> etcd::Client::watch(std::string const & key, bool recursive) pplx::task<etcd::Response> etcd::Client::watch(std::string const &key,
{ bool recursive) {
etcdv3::ActionParameters params; etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token); params.auth_token.assign(this->auth_token);
params.key.assign(key); params.key.assign(key);
params.withPrefix = recursive; params.withPrefix = recursive;
params.watch_stub = watchServiceStub.get(); params.watch_stub = watchServiceStub.get();
std::shared_ptr<etcdv3::AsyncWatchAction> call(new etcdv3::AsyncWatchAction(params)); std::shared_ptr<etcdv3::AsyncWatchAction> call(
new etcdv3::AsyncWatchAction(params));
return Response::create(call); return Response::create(call);
} }
pplx::task<etcd::Response> etcd::Client::watch(std::string const & key, int fromIndex, bool recursive) pplx::task<etcd::Response> etcd::Client::watch(std::string const &key,
{ int fromIndex, bool recursive) {
etcdv3::ActionParameters params; etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token); params.auth_token.assign(this->auth_token);
params.key.assign(key); params.key.assign(key);
params.withPrefix = recursive; params.withPrefix = recursive;
params.revision = fromIndex; params.revision = fromIndex;
params.watch_stub = watchServiceStub.get(); params.watch_stub = watchServiceStub.get();
std::shared_ptr<etcdv3::AsyncWatchAction> call(new etcdv3::AsyncWatchAction(params)); std::shared_ptr<etcdv3::AsyncWatchAction> call(
new etcdv3::AsyncWatchAction(params));
return Response::create(call); return Response::create(call);
} }
pplx::task<etcd::Response> etcd::Client::leasegrant(int ttl) pplx::task<etcd::Response> etcd::Client::leasegrant(int ttl) {
{
etcdv3::ActionParameters params; etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token); params.auth_token.assign(this->auth_token);
params.ttl = ttl; params.ttl = ttl;
params.lease_stub = leaseServiceStub.get(); params.lease_stub = leaseServiceStub.get();
std::shared_ptr<etcdv3::AsyncLeaseGrantAction> call(new etcdv3::AsyncLeaseGrantAction(params)); std::shared_ptr<etcdv3::AsyncLeaseGrantAction> call(
new etcdv3::AsyncLeaseGrantAction(params));
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::listleases() {
etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.lease_stub = leaseServiceStub.get();
std::shared_ptr<etcdv3::AsyncLeaseLeasesAction> call(
new etcdv3::AsyncLeaseLeasesAction(params));
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::leasekeepalive(int64_t lease_id) {
etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.lease_id = lease_id;
params.lease_stub = leaseServiceStub.get();
std::shared_ptr<etcdv3::AsyncLeaseKeepAliveAction> call(
new etcdv3::AsyncLeaseKeepAliveAction(params));
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::leaserevoke(int64_t lease_id) {
etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.lease_id = lease_id;
params.lease_stub = leaseServiceStub.get();
std::shared_ptr<etcdv3::AsyncLeaseRevokeAction> call(
new etcdv3::AsyncLeaseRevokeAction(params));
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::leasettl(int64_t lease_id, bool keys) {
etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.lease_id = lease_id;
params.keys = keys;
params.lease_stub = leaseServiceStub.get();
std::shared_ptr<etcdv3::AsyncLeaseTimeToLiveAction> call(
new etcdv3::AsyncLeaseTimeToLiveAction(params));
return Response::create(call); return Response::create(call);
} }
@ -472,7 +528,8 @@ pplx::task<etcd::Response> etcd::Client::lock(std::string const &key) {
params.auth_token.assign(this->auth_token); params.auth_token.assign(this->auth_token);
params.key = key; params.key = key;
params.lock_stub = lockServiceStub.get(); params.lock_stub = lockServiceStub.get();
std::shared_ptr<etcdv3::AsyncLockAction> call(new etcdv3::AsyncLockAction(params)); std::shared_ptr<etcdv3::AsyncLockAction> call(
new etcdv3::AsyncLockAction(params));
return Response::create(call); return Response::create(call);
} }
@ -481,14 +538,16 @@ pplx::task<etcd::Response> etcd::Client::unlock(std::string const &key) {
params.auth_token.assign(this->auth_token); params.auth_token.assign(this->auth_token);
params.key = key; params.key = key;
params.lock_stub = lockServiceStub.get(); params.lock_stub = lockServiceStub.get();
std::shared_ptr<etcdv3::AsyncUnlockAction> call(new etcdv3::AsyncUnlockAction(params)); std::shared_ptr<etcdv3::AsyncUnlockAction> call(
new etcdv3::AsyncUnlockAction(params));
return Response::create(call); return Response::create(call);
} }
pplx::task<etcd::Response> etcd::Client::txn(etcdv3::Transaction const &txn) { pplx::task<etcd::Response> etcd::Client::txn(etcdv3::Transaction const &txn) {
etcdv3::ActionParameters params; etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token); params.auth_token.assign(this->auth_token);
params.kv_stub = kvServiceStub .get(); params.kv_stub = kvServiceStub.get();
std::shared_ptr<etcdv3::AsyncTxnAction> call(new etcdv3::AsyncTxnAction(params, txn)); std::shared_ptr<etcdv3::AsyncTxnAction> call(
new etcdv3::AsyncTxnAction(params, txn));
return Response::create(call); return Response::create(call);
} }

127
src/KeepAlive.cpp Normal file
View File

@ -0,0 +1,127 @@
#include "etcd/KeepAlive.hpp"
#include <agents.h>
#include <ppl.h>
#include "etcd/v3/AsyncLeaseKeepAliveAction.hpp"
#include "etcd/v3/AsyncLeaseRevokeAction.hpp"
etcd::KeepAlive::KeepAlive(Client& client) : client_(client) { stub_ = etcdserverpb::Lease::NewStub(client.channel); }
pplx::task<void> etcd::KeepAlive::start(int refresh_in_ms) {
pplx::task_completion_event<void> tce;
// Start a repetetive timer that will trigger our lease keepalive refresh
timer_ = std::unique_ptr<pplx::timer<int>>(new pplx::timer<int>(refresh_in_ms, 0, nullptr, true));
// Open the stream
stream_ = stub_->AsyncLeaseKeepAlive(&context_, &cq_, reinterpret_cast<void*>(Type::CONNECT));
// Define the callback that is going to be repeatedly called to create a queue of lease id's
// that need to be refreshed
auto queueKeepAlivesCallback = new pplx::call<int>([this](int) {
std::cout << " ** Queueing keepalives: " << std::endl;
// Copy the lease id's to a queue for processing
pplx::concurrent_unordered_map<int64_t, int>::iterator itr;
for (itr = leases_.begin(); itr != leases_.end(); ++itr) {
leaseQueue_.push(*itr);
}
// Kick off the process by sending the first keepalive command
// You can have at most one write or at most one read at any given time,
// so we are not sending another keepalive on the stream until the previous one
// responds
sendNextKeepAlive();
// tce.set();
});
// Create the long running task that will monitor the Completion Queue and
// send/receive data on the stream
// On a CONNECT, this will kick off the timer which then triggers the keepalive's
currentTask_ = pplx::task<void>([this, tce, queueKeepAlivesCallback]() {
while (true) {
void* got_tag;
bool ok = false;
// Block until the next result is available in the completion queue
// "cq". The return value of Next should always be checked. This
// return value tells us whether there is any kind of event or the cq_
// is shutting down.
std::cout << "Waiting for next event..." << std::endl;
if (!cq_.Next(&got_tag, &ok)) {
std::cerr << "Client stream closed. Quitting" << std::endl;
break;
}
// It's important to process all tags even if the ok is false. One
// might want to deallocate memory that has be reinterpret_cast'ed to
// void* when the tag got initialized. For our example, we cast an int
// to a void*, so we don't have extra memory management to take care
// of.
if (ok) {
std::cout << std::endl << "**** Processing completion queue tag " << got_tag << std::endl;
switch (static_cast<Type>(reinterpret_cast<int64_t>(got_tag))) {
case Type::READ:
std::cout << "Read a new message, sending next." << std::endl;
sendNextKeepAlive();
break;
case Type::WRITE:
std::cout << "Sent message (async), attempting to read response." << std::endl;
readNextMessage();
break;
case Type::CONNECT:
std::cout << "Server connected." << std::endl;
tce.set();
timer_->link_target(queueKeepAlivesCallback);
timer_->start();
break;
case Type::WRITES_DONE:
std::cout << "Server disconnecting." << std::endl;
timer_->stop();
break;
case Type::FINISH:
std::cout << "Client finish; status = " << (finish_status_.ok() ? "ok" : "cancelled") << std::endl;
context_.TryCancel();
cq_.Shutdown();
break;
default:
std::cerr << "Unexpected tag " << got_tag << std::endl;
}
}
}
});
// Returning a task that completes once the CONNECT completes, so that the
// client can perform anything they need to at that time.
pplx::task<void> start_completed(tce);
return start_completed;
}
void etcd::KeepAlive::add(int64_t leaseid, int ttl) { leases_.insert(std::make_pair(leaseid, ttl)); }
void etcd::KeepAlive::remove(int64_t leaseid, bool revoke) {
leases_.unsafe_erase(leaseid);
if (revoke) {
client_.leaserevoke(leaseid);
}
}
void etcd::KeepAlive::sendNextKeepAlive() {
std::pair<int64_t, int> lease;
if (leaseQueue_.try_pop(lease)) {
std::cout << "sending keepalive for " << lease.first << std::endl;
LeaseKeepAliveRequest request;
request.set_id(lease.first);
stream_->Write(request, reinterpret_cast<void*>(Type::WRITE));
} else {
std::cout << "no keepalives left" << std::endl;
}
}
void etcd::KeepAlive::readNextMessage() {
std::cout << " ** Got response: " << response_.ttl() << std::endl;
stream_->Read(&response_, reinterpret_cast<void*>(Type::READ));
}
etcd::KeepAlive::~KeepAlive() { cq_.Shutdown(); }

View File

@ -31,6 +31,9 @@ etcd::Response::Response(const etcdv3::V3Response& reply, std::chrono::microseco
// duration // duration
_duration = duration; _duration = duration;
_leases = reply.get_leases();
_leaseinfo = reply.get_leaseinfo();
} }
@ -118,3 +121,7 @@ std::vector<mvccpb::Event> const & etcd::Response::events() const {
std::chrono::microseconds const& etcd::Response::duration() const { std::chrono::microseconds const& etcd::Response::duration() const {
return this->_duration; return this->_duration;
} }
std::vector<int64_t> const& etcd::Response::leases() const { return _leases; }
etcdv3::LeaseInfo const& etcd::Response::leaseinfo() const { return _leaseinfo; }

View File

@ -0,0 +1,69 @@
#include "etcd/v3/AsyncLeaseKeepAliveAction.hpp"
#include "etcd/v3/Transaction.hpp"
#include "etcd/v3/action_constants.hpp"
using etcdserverpb::LeaseKeepAliveRequest;
using etcdserverpb::RangeRequest;
using etcdserverpb::RangeResponse;
etcdv3::AsyncLeaseKeepAliveAction::AsyncLeaseKeepAliveAction(etcdv3::ActionParameters param) : etcdv3::Action(param) {
char* createTag = "create";
char* writeTag = "write";
stream = parameters.lease_stub->AsyncLeaseKeepAlive(&context, &cq_, (void*)createTag);
LeaseKeepAliveRequest request;
request.set_id(parameters.lease_id);
// wait "create" success (the stream becomes ready)
void* got_tag;
bool ok = false;
if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void*)createTag) {
stream->Write(request, (void*)writeTag);
} else {
throw std::runtime_error("failed to create a keepalive connection");
}
// wait "write" (LeaseKeepAliveRequest) success, and start to read the first
// reply
if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void*)writeTag) {
stream->Read(&reply, (void*)this);
} else {
throw std::runtime_error("failed to write LeaseKeepAliveRequest to server");
}
}
void etcdv3::AsyncLeaseKeepAliveAction::waitForResponse() {
void* got_tag;
bool ok = false;
while (cq_.Next(&got_tag, &ok)) {
if (ok == false) {
break;
}
if (got_tag == (void*)doneTag) {
cq_.Shutdown();
break;
}
if (got_tag == (void*)this) // read tag
{
if (reply.ByteSize()) {
stream->WritesDone((void*)doneTag);
} else {
stream->Read(&reply, (void*)this);
}
}
}
}
etcdv3::AsyncLeaseKeepAliveResponse etcdv3::AsyncLeaseKeepAliveAction::ParseResponse() {
AsyncLeaseKeepAliveResponse resp;
if (!status.ok()) {
resp.set_error_code(status.error_code());
resp.set_error_message(status.error_message());
} else {
resp.ParseResponse(reply);
}
return resp;
}

View File

@ -0,0 +1,13 @@
#include "etcd/v3/AsyncLeaseKeepAliveResponse.hpp"
#include <iostream>
#include "etcd/v3/action_constants.hpp"
using namespace std;
void etcdv3::AsyncLeaseKeepAliveResponse::ParseResponse(LeaseKeepAliveResponse& reply) {
index = reply.header().revision();
value.kvs.set_lease(reply.id());
value.set_ttl(reply.ttl());
return;
}

View File

@ -0,0 +1,26 @@
#include "etcd/v3/AsyncLeaseLeasesAction.hpp"
#include "etcd/v3/Transaction.hpp"
#include "etcd/v3/action_constants.hpp"
using etcdserverpb::LeaseLeasesRequest;
etcdv3::AsyncLeaseLeasesAction::AsyncLeaseLeasesAction(etcdv3::ActionParameters param) : etcdv3::Action(param) {
etcdv3::Transaction transaction;
transaction.setup_lease_leases_operation();
response_reader = parameters.lease_stub->AsyncLeaseLeases(&context, transaction.leaseleases_request, &cq_);
response_reader->Finish(&reply, &status, (void*)this);
}
etcdv3::AsyncLeaseLeasesResponse etcdv3::AsyncLeaseLeasesAction::ParseResponse() {
AsyncLeaseLeasesResponse lease_resp;
if (!status.ok()) {
lease_resp.set_error_code(status.error_code());
lease_resp.set_error_message(status.error_message());
} else {
lease_resp.ParseResponse(reply);
}
return lease_resp;
}

View File

@ -0,0 +1,11 @@
#include "etcd/v3/AsyncLeaseLeasesResponse.hpp"
#include "etcd/v3/action_constants.hpp"
void etcdv3::AsyncLeaseLeasesResponse::ParseResponse(
LeaseLeasesResponse &resp) {
index = resp.header().revision();
for (int index = 0; index < resp.leases_size(); index++) {
leases.push_back(resp.leases(index).id());
}
}

View File

@ -0,0 +1,28 @@
#include "etcd/v3/AsyncLeaseRevokeAction.hpp"
#include "etcd/v3/Transaction.hpp"
#include "etcd/v3/action_constants.hpp"
using etcdserverpb::LeaseRevokeRequest;
etcdv3::AsyncLeaseRevokeAction::AsyncLeaseRevokeAction(
etcdv3::ActionParameters param)
: etcdv3::Action(param) {
etcdv3::Transaction transaction;
transaction.setup_lease_revoke_operation(parameters.lease_id);
response_reader = parameters.lease_stub->AsyncLeaseRevoke(
&context, transaction.leaserevoke_request, &cq_);
response_reader->Finish(&reply, &status, (void *)this);
}
etcdv3::AsyncLeaseRevokeResponse
etcdv3::AsyncLeaseRevokeAction::ParseResponse() {
AsyncLeaseRevokeResponse lease_resp;
if (!status.ok()) {
lease_resp.set_error_code(status.error_code());
lease_resp.set_error_message(status.error_message());
} else {
lease_resp.ParseResponse(reply);
}
return lease_resp;
}

View File

@ -0,0 +1,7 @@
#include "etcd/v3/AsyncLeaseRevokeResponse.hpp"
#include "etcd/v3/action_constants.hpp"
void etcdv3::AsyncLeaseRevokeResponse::ParseResponse(
LeaseRevokeResponse &resp) {
index = resp.header().revision();
}

View File

@ -0,0 +1,29 @@
#include "etcd/v3/AsyncLeaseTimeToLiveAction.hpp"
#include "etcd/v3/Transaction.hpp"
#include "etcd/v3/action_constants.hpp"
using etcdserverpb::LeaseTimeToLiveRequest;
etcdv3::AsyncLeaseTimeToLiveAction::AsyncLeaseTimeToLiveAction(
etcdv3::ActionParameters param)
: etcdv3::Action(param) {
etcdv3::Transaction transaction;
transaction.setup_lease_timetolive_operation(parameters.lease_id,
parameters.keys);
response_reader = parameters.lease_stub->AsyncLeaseTimeToLive(
&context, transaction.leasetimetolive_request, &cq_);
response_reader->Finish(&reply, &status, (void *)this);
}
etcdv3::AsyncLeaseTimeToLiveResponse
etcdv3::AsyncLeaseTimeToLiveAction::ParseResponse() {
AsyncLeaseTimeToLiveResponse lease_resp;
if (!status.ok()) {
lease_resp.set_error_code(status.error_code());
lease_resp.set_error_message(status.error_message());
} else {
lease_resp.ParseResponse(reply);
}
return lease_resp;
}

View File

@ -0,0 +1,15 @@
#include "etcd/v3/AsyncLeaseTimeToLiveResponse.hpp"
#include "etcd/v3/action_constants.hpp"
void etcdv3::AsyncLeaseTimeToLiveResponse::ParseResponse(
LeaseTimeToLiveResponse &resp) {
index = resp.header().revision();
leaseinfo.set_lease(resp.id());
leaseinfo.set_ttl(resp.ttl());
leaseinfo.set_grantedttl(resp.grantedttl());
for (int index = 0; index < resp.keys_size(); index++) {
leaseinfo.add_key(resp.keys(index));
}
}

18
src/v3/LeaseInfo.cpp Normal file
View File

@ -0,0 +1,18 @@
#include "etcd/v3/LeaseInfo.hpp"
etcdv3::LeaseInfo::LeaseInfo() {
leaseid_ = 0;
ttl_ = 0;
grantedttl_ = 0;
keys_.clear();
}
void etcdv3::LeaseInfo::set_lease(int64_t id) { leaseid_ = id; }
void etcdv3::LeaseInfo::set_ttl(int ttl) { ttl_ = ttl; }
void etcdv3::LeaseInfo::set_grantedttl(int ttl) { grantedttl_ = ttl; }
void etcdv3::LeaseInfo::add_key(std::string key) { keys_.push_back(key); }
int64_t etcdv3::LeaseInfo::get_lease() const { return leaseid_; }
int etcdv3::LeaseInfo::get_ttl() const { return ttl_; }
int etcdv3::LeaseInfo::get_grantedttl() const { return grantedttl_; }
std::vector<std::string> etcdv3::LeaseInfo::get_keys() const { return keys_; }

View File

@ -160,6 +160,24 @@ void etcdv3::Transaction::setup_lease_grant_operation(int ttl)
leasegrant_request.set_ttl(ttl); leasegrant_request.set_ttl(ttl);
} }
void etcdv3::Transaction::setup_lease_leases_operation() {
// leaseleases_request.set_id(lease_id);
}
void etcdv3::Transaction::setup_lease_keepalive_operation(int64_t lease_id) {
leasekeepalive_request.set_id(lease_id);
}
void etcdv3::Transaction::setup_lease_revoke_operation(int64_t lease_id) {
leaserevoke_request.set_id(lease_id);
}
void etcdv3::Transaction::setup_lease_timetolive_operation(int64_t lease_id,
bool keys) {
leasetimetolive_request.set_id(lease_id);
leasetimetolive_request.set_keys(keys);
}
void etcdv3::Transaction::setup_put(std::string const &key, std::string const &value) { void etcdv3::Transaction::setup_put(std::string const &key, std::string const &value) {
std::unique_ptr<PutRequest> put_request(new PutRequest()); std::unique_ptr<PutRequest> put_request(new PutRequest());
put_request->set_key(key); put_request->set_key(key);

View File

@ -46,6 +46,10 @@ std::vector<etcdv3::KeyValue> const & etcdv3::V3Response::get_prev_values() cons
return prev_values; return prev_values;
} }
std::vector<int64_t> const& etcdv3::V3Response::get_leases() const {
return leases;
}
etcdv3::KeyValue const & etcdv3::V3Response::get_value() const etcdv3::KeyValue const & etcdv3::V3Response::get_value() const
{ {
return value; return value;
@ -56,11 +60,19 @@ etcdv3::KeyValue const & etcdv3::V3Response::get_prev_value() const
return prev_value; return prev_value;
} }
etcdv3::LeaseInfo const& etcdv3::V3Response::get_leaseinfo() const {
return leaseinfo;
}
bool etcdv3::V3Response::has_values() const bool etcdv3::V3Response::has_values() const
{ {
return values.size() > 0; return values.size() > 0;
} }
bool etcdv3::V3Response::has_leases() const {
return leases.size() > 0;
}
void etcdv3::V3Response::set_lock_key(std::string const &key) { void etcdv3::V3Response::set_lock_key(std::string const &key) {
this->lock_key = key; this->lock_key = key;
} }