Updated to use boost for keepalive timers

This commit is contained in:
Eric Musgrave 2020-12-11 10:10:10 -05:00
parent 4e780a11f1
commit bc19624fdf
4 changed files with 378 additions and 140 deletions

View File

@ -423,27 +423,34 @@ send a periodic refresh to the server. This can be done using a one time command
called `leasekeepalive(lease_id)`. However, that is inefficient since it creates and called `leasekeepalive(lease_id)`. However, that is inefficient since it creates and
destroys a bidirectional stream on each call. Instead, it is recommend to use the destroys a bidirectional stream on each call. Instead, it is recommend to use the
`etcd::KeepAlive` service, which is a long running task that will periodically `etcd::KeepAlive` service, which is a long running task that will periodically
refresh the leases that you specify. You can create multiple `etcd::KeepAlive` instances refresh the leases that you specify. You can add multiple leases with different
if you need to have different refresh intervals for different leases. refresh intervals to the KeepAlive service, and it will start up timers for each.
You should set the refresh interval to something less than your lease TTL, so that
any small delays in processing do not cause your lease to expire. The first refresh
request is sent immediately after adding the lease to the service, and then at
each `refresh_interval` thereafter.
```c++ ```c++
etcd::Client etcd("http://127.0.0.1:4001"); etcd::Client etcd("http://127.0.0.1:4001");
etcd::KeepAlive keepalive{etcd}; // Creates the KeepAlive service, but does not start it {
keepalive.start(2000); // Starts the service with a 2s refresh interval. etcd::KeepAlive keepalive{etcd}; // Creates the KeepAlive service
etcd::Response resp = etcd.leasegrant(5).get(); // create a lease etcd::Response resp = etcd.leasegrant(5).get(); // create a lease
int64_t lease_id = resp.value().lease(); int64_t lease_id = resp.value().lease();
etcd.set("/test/key2", "bar", lease_id); // attach lease to the key etcd.set("/test/key2", "bar", lease_id); // attach lease to the key
std::cout << "ttl" << resp.value().ttl() << std::endl; std::cout << "ttl" << resp.value().ttl() << std::endl;
// Tell the KeepAlive service to refresh this lease every 2 seconds // Tell the KeepAlive service to refresh this lease every 1.5 seconds
keepalive.add(resp.value().lease()); keepalive.add(resp.value().lease(), std::chrono::milliseconds(1500));
// ... // ...
// Remove the lease from the KeepAlive, and also immediately revoke it // Remove the lease from the KeepAlive, and also immediately revoke it
keepalive.remove(lease_id, true); keepalive.remove(lease_id, true);
// '/test/key2' is now deleted // '/test/key2' is now deleted
}
// When the KeepAlive service object goes out of scope and is deleted,
// all leases are removed but are not immediately revoked.
``` ```
### TODO ### TODO

View File

@ -1,9 +1,14 @@
#pragma once #pragma once
#include <agents.h>
#include <concurrent_unordered_map.h>
#include <grpc++/grpc++.h> #include <grpc++/grpc++.h>
#include <boost/asio.hpp>
#include <boost/bind/bind.hpp>
#include <boost/function.hpp>
#include <boost/thread/thread.hpp>
#include <cstdio>
#include <iostream>
#include <mutex>
#include <string> #include <string>
#include "etcd/Client.hpp" #include "etcd/Client.hpp"
@ -18,34 +23,71 @@ using etcdserverpb::KV;
using grpc::Channel; using grpc::Channel;
namespace etcd { namespace etcd {
class KeepAlive { /**
enum class Type { READ = 1, WRITE = 2, CONNECT = 3, WRITES_DONE = 4, FINISH = 5 }; * Wrapper for a boost::asio timer that provides simple start/stop/repeat
* functions
*/
class KeepAliveTimer {
public:
typedef boost::function<void(boost::system::error_code, KeepAliveTimer *)>
handler_function;
public: KeepAliveTimer(boost::asio::io_context &io,
boost::asio::chrono::milliseconds interval)
: interval_(interval), timer_(boost::asio::steady_timer(io, interval)) {}
~KeepAliveTimer() { timer_.cancel(); }
void setCallback(handler_function handler) { handler_ = handler; }
void start() {
timer_.expires_from_now(boost::asio::chrono::milliseconds(
50) /* expire immediately; set to interval_ otherwise*/);
timer_.async_wait(boost::bind(handler_, boost::asio::placeholders::error,
this)); // boost::ref(*this)));
}
void repeat() {
timer_.expires_at(timer_.expiry() + interval_);
timer_.async_wait(boost::bind(handler_, boost::asio::placeholders::error,
this)); // boost::ref(*this)));
}
void stop() { timer_.cancel(); }
private:
boost::asio::steady_timer timer_;
boost::asio::chrono::milliseconds interval_;
handler_function handler_;
};
/**
* The main KeepAlive service class. You only need one per process since it can
* service many leases with different refresh intervals
*/
class KeepAlive {
enum class Type {
READ = 1,
WRITE = 2,
CONNECT = 3,
WRITES_DONE = 4,
FINISH = 5
};
public:
/** /**
* Create an instance of the KeepAlive service * Create an instance of the KeepAlive service
* Call start() to being the timer that automatically sends keepalives * Call add(leaseid, refresh_interval) to add a leaseid to be kept alive by
* Call add(leaseid) to add a leaseid to be kept alive by this service * this service
* @param client the client etcd connection to use * @param client the client etcd connection to use
*/ */
KeepAlive(Client& client); KeepAlive(Client &client);
/**
* Start processing keepalive's
* The refresh time must be less than the granted TTL of your
* leases, otherwise they will expire. Create multiple KeepAlive
* services with different refreshes if you have many leases with varied
* TTL's
* @param refresh_in_ms the time between refreshes (default: 5000ms)
*/
pplx::task<void> start(int refresh_in_ms = 5000);
/** /**
* Add a lease to be kept alive by this service * Add a lease to be kept alive by this service
* @param leaseid the id of the lease * @param leaseid the id of the lease
* @param ttl the ttl of the lease (in seconds) <reserved for future use> * @param refresh_interval the interval to refresh the lease
*/ */
void add(int64_t leaseid, int ttl = 5); void add(int64_t leaseid, boost::asio::chrono::milliseconds refresh_interval);
/** /**
* Remove a lease from being kept alive, and optionally revoke it immediately * Remove a lease from being kept alive, and optionally revoke it immediately
@ -54,32 +96,61 @@ class KeepAlive {
*/ */
void remove(int64_t leaseid, bool revoke = false); void remove(int64_t leaseid, bool revoke = false);
/**
* Remove all leases that the service is currently refreshing.
* @param revoke true to immediately revoke the leases (defaulst to false)
*/
void removeAll(bool revoke = false);
~KeepAlive(); ~KeepAlive();
private: private:
KeepAlive(KeepAlive const &rhs); // prevent copying
KeepAlive &operator=(KeepAlive const &rhs); // prevent assignment
/**
* Moves the registered leases to a processing queue, and then resets the
* timer and triggers the first lease renewal request
*/
void queueKeepAlivesCallback(const boost::system::error_code &error,
KeepAliveTimer *timer);
/** /**
* Sends a keep alive for the next lease in the queue * Sends a keep alive for the next lease in the queue
*/ */
void sendNextKeepAlive(); void sendNextKeepAlive(std::unique_ptr<grpc::ClientAsyncReaderWriter<
etcdserverpb::LeaseKeepAliveRequest,
etcdserverpb::LeaseKeepAliveResponse>> &stream);
/** /**
* * Read the response off the stream
*/ */
void readNextMessage(); void readNextMessage(std::unique_ptr<grpc::ClientAsyncReaderWriter<
etcdserverpb::LeaseKeepAliveRequest,
etcdserverpb::LeaseKeepAliveResponse>> &stream);
// The timer that is triggering refreshes // Boost io_context, which is responsible for doing the timer work
std::unique_ptr<pplx::timer<int>> timer_; boost::asio::io_context io_;
// Thread for the io_context to use
std::unique_ptr<boost::thread> t_;
// The map of leases that have been registered with this service to be kept alive // A mutex to use to protect all of our lease maps/queues when manipulating
pplx::concurrent_unordered_map<int64_t, int> leases_; // them
std::mutex mutex_;
// The current queue of leases that still need to be refreshed on this pass of the timer // The map of leases that have been registered with this service to be kept
pplx::concurrent_queue<std::pair<int64_t, int>> leaseQueue_; // alive
std::map<int64_t, std::shared_ptr<KeepAliveTimer>> leases_;
// All of the timers that are currently running
std::map<boost::asio::chrono::milliseconds, std::shared_ptr<KeepAliveTimer>>
timers_;
// The current queue of keepalives that need to be sent
std::queue<int64_t> keepalive_queue_;
// The long running task for this service // The long running task for this service
pplx::task<void> currentTask_; pplx::task<void> current_task_;
// The client that we are attached to. // The client that we are attached to.
Client& client_; Client &client_;
// Context for the client. It could be used to convey extra information to // Context for the client. It could be used to convey extra information to
// the server and/or tweak certain RPC behaviors. // the server and/or tweak certain RPC behaviors.
@ -95,7 +166,8 @@ class KeepAlive {
// The bidirectional, asynchronous stream for sending/receiving messages. // The bidirectional, asynchronous stream for sending/receiving messages.
std::unique_ptr< std::unique_ptr<
grpc::ClientAsyncReaderWriter<etcdserverpb::LeaseKeepAliveRequest, etcdserverpb::LeaseKeepAliveResponse>> grpc::ClientAsyncReaderWriter<etcdserverpb::LeaseKeepAliveRequest,
etcdserverpb::LeaseKeepAliveResponse>>
stream_; stream_;
// Allocated protobuf that holds the response. In real clients and servers, // Allocated protobuf that holds the response. In real clients and servers,

View File

@ -5,7 +5,7 @@
#include "etcd/Response.hpp" #include "etcd/Response.hpp"
#include "etcd/v3/Action.hpp" #include "etcd/v3/Action.hpp"
#include "etcd/v3/AsyncleaseKeepAliveResponse.hpp" #include "etcd/v3/AsyncLeaseKeepAliveResponse.hpp"
#include "proto/rpc.grpc.pb.h" #include "proto/rpc.grpc.pb.h"
using etcd::Response; using etcd::Response;

View File

@ -1,55 +1,46 @@
/**
* file: KeepAlive.cpp
* credit to https://github.com/grpc/grpc/pull/8934 for an example on using grpc
* bidi in c++
*/
#include "etcd/KeepAlive.hpp" #include "etcd/KeepAlive.hpp"
#include <agents.h>
#include <ppl.h>
#include "etcd/v3/AsyncLeaseKeepAliveAction.hpp" #include "etcd/v3/AsyncLeaseKeepAliveAction.hpp"
#include "etcd/v3/AsyncLeaseRevokeAction.hpp" #include "etcd/v3/AsyncLeaseRevokeAction.hpp"
etcd::KeepAlive::KeepAlive(Client& client) : client_(client) { stub_ = etcdserverpb::Lease::NewStub(client.channel); } etcd::KeepAlive::KeepAlive(Client &client)
: client_(client) { // , strand_(boost::asio::make_strand(io_)) {
stub_ = etcdserverpb::Lease::NewStub(client.channel);
pplx::task<void> etcd::KeepAlive::start(int refresh_in_ms) { // Open the grpc bidi stream
pplx::task_completion_event<void> tce; stream_ = stub_->AsyncLeaseKeepAlive(&context_, &cq_,
reinterpret_cast<void *>(Type::CONNECT));
// Start a repetetive timer that will trigger our lease keepalive refresh // Start up the long-running task that will manage the stream
timer_ = std::unique_ptr<pplx::timer<int>>(new pplx::timer<int>(refresh_in_ms, 0, nullptr, true)); current_task_ = pplx::task<void>([this]() {
// Keep the io context running even without any work (otherwise it exits
// immediately before any timers are started)
boost::asio::executor_work_guard<decltype(io_.get_executor())> work{
io_.get_executor()};
// spin up a thread that runs the boost io_context for our timer
t_ = std::unique_ptr<boost::thread>(
new boost::thread(boost::bind(&boost::asio::io_context::run, &io_)));
// Open the stream bool shutdown = false;
stream_ = stub_->AsyncLeaseKeepAlive(&context_, &cq_, reinterpret_cast<void*>(Type::CONNECT)); while (!shutdown) {
void *got_tag;
// Define the callback that is going to be repeatedly called to create a queue of lease id's
// that need to be refreshed
auto queueKeepAlivesCallback = new pplx::call<int>([this](int) {
// std::cout << " ** Queueing keepalives: " << std::endl;
// Copy the lease id's to a queue for processing
pplx::concurrent_unordered_map<int64_t, int>::iterator itr;
for (itr = leases_.begin(); itr != leases_.end(); ++itr) {
leaseQueue_.push(*itr);
}
// Kick off the process by sending the first keepalive command
// You can have at most one write or at most one read at any given time,
// so we are not sending another keepalive on the stream until the previous one
// responds
sendNextKeepAlive();
// tce.set();
});
// Create the long running task that will monitor the Completion Queue and
// send/receive data on the stream
// On a CONNECT, this will kick off the timer which then triggers the keepalive's
currentTask_ = pplx::task<void>([this, tce, queueKeepAlivesCallback]() {
while (true) {
void* got_tag;
bool ok = false; bool ok = false;
// Block until the next result is available in the completion queue // Block until the next result is available in the completion queue
// "cq". The return value of Next should always be checked. This // "cq". The return value of Next should always be checked. This
// return value tells us whether there is any kind of event or the cq_ // return value tells us whether there is any kind of event or the cq_
// is shutting down. // is shutting down.
// std::cout << "Waiting for next event..." << std::endl; #if DEBUG
std::cout << "Waiting for next event..." << std::endl;
#endif
if (!cq_.Next(&got_tag, &ok)) { if (!cq_.Next(&got_tag, &ok)) {
// std::cerr << "Client stream closed. Quitting" << std::endl; #if DEBUG
std::cerr << "Client stream closed. Quitting" << std::endl;
#endif
break; break;
} }
@ -59,69 +50,237 @@ pplx::task<void> etcd::KeepAlive::start(int refresh_in_ms) {
// to a void*, so we don't have extra memory management to take care // to a void*, so we don't have extra memory management to take care
// of. // of.
if (ok) { if (ok) {
// std::cout << std::endl << "**** Processing completion queue tag " << got_tag << std::endl; #if DEBUG
std::cout << std::endl
<< "**** Processing completion queue tag " << got_tag
<< std::endl;
#endif
switch (static_cast<Type>(reinterpret_cast<int64_t>(got_tag))) { switch (static_cast<Type>(reinterpret_cast<int64_t>(got_tag))) {
case Type::READ: case Type::READ:
// std::cout << "Read a new message, sending next." << std::endl; #if DEBUG
sendNextKeepAlive(); std::cout << "Read a new message, sending next." << std::endl;
#endif
sendNextKeepAlive(stream_);
break; break;
case Type::WRITE: case Type::WRITE:
// std::cout << "Sent message (async), attempting to read response." << std::endl; #if DEBUG
readNextMessage(); std::cout << "Sent message (async), attempting to read response."
<< std::endl;
#endif
readNextMessage(stream_);
break; break;
case Type::CONNECT: case Type::CONNECT:
// std::cout << "Server connected." << std::endl; #if DEBUG
tce.set(); std::cout << "Server connected." << std::endl;
timer_->link_target(queueKeepAlivesCallback); #endif
timer_->start();
break; break;
case Type::WRITES_DONE: case Type::WRITES_DONE:
// std::cout << "Server disconnecting." << std::endl; #if DEBUG
timer_->stop(); std::cout << "Server disconnecting." << std::endl;
#endif
stream_->Finish(&finish_status_,
reinterpret_cast<void *>(Type::FINISH));
break; break;
case Type::FINISH: case Type::FINISH:
// std::cout << "Client finish; status = " << (finish_status_.ok() ? "ok" : "cancelled") << std::endl; #if DEBUG
std::cout << "Client finish; status = "
<< (finish_status_.ok() ? "ok" : "cancelled") << std::endl;
#endif
context_.TryCancel(); context_.TryCancel();
cq_.Shutdown(); cq_.Shutdown();
io_.stop();
#if DEBUG
std::cout << "Cleanup complete." << std::endl;
#endif
shutdown = true;
break; break;
default: default:
// std::cerr << "Unexpected tag " << got_tag << std::endl; #if DEBUG
std::cerr << "Unexpected tag " << got_tag << std::endl;
#endif
break;
} }
} }
} }
}); });
// Returning a task that completes once the CONNECT completes, so that the
// client can perform anything they need to at that time.
pplx::task<void> start_completed(tce);
return start_completed;
} }
void etcd::KeepAlive::add(int64_t leaseid, int ttl) { leases_.insert(std::make_pair(leaseid, ttl)); } void etcd::KeepAlive::queueKeepAlivesCallback(
const boost::system::error_code &error, KeepAliveTimer *timer) {
#if DEBUG
std::time_t time =
std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
std::cout << " ** Queueing keepalives: " << std::ctime(&time);
#endif
if (error == boost::asio::error::operation_aborted) {
#if DEBUG
std::cout << " ** Aborting" << std::endl;
#endif
return;
}
// Lock the mutex and copy the lease id's to a queue for processing
{
std::lock_guard<std::mutex> lg(mutex_);
std::map<int64_t, std::shared_ptr<KeepAliveTimer>>::iterator itr;
for (itr = leases_.begin(); itr != leases_.end(); ++itr) {
if (itr->second.get() == timer) {
#if DEBUG
std::cout << " ** Queueing keepalive lease: " << itr->first
<< std::endl;
#endif
keepalive_queue_.push(itr->first);
}
}
}
// Trigger the timer to repeat
timer->repeat();
// Kick off the queue processing by sending the first keepalive command
// You can have at most one write or at most one read at any given time,
// so we are not sending another keepalive on the stream until the previous
// one responds
sendNextKeepAlive(stream_);
}
void etcd::KeepAlive::add(int64_t leaseid,
boost::asio::chrono::milliseconds refresh_interval) {
std::lock_guard<std::mutex> lg(mutex_);
// See if this lease is already being handled
auto leases_search = leases_.find(leaseid);
if (leases_search != leases_.end()) {
// Lease already exists
return;
}
// Look to see if we already have a timer with this interval
auto search = timers_.find(refresh_interval);
if (search != timers_.end()) {
// Existing timer, add this lease to the timer
// std::cout << "Found timer with refresh interval: " <<
// search->first.count() << std::endl;
leases_.insert(std::make_pair(leaseid, search->second));
} else {
// No existing timer, create one and add this lease
// std::cout << "No timer found with refresh interval: " <<
// refresh_interval.count() << std::endl;
auto timer = std::make_shared<KeepAliveTimer>(io_, refresh_interval);
timer->setCallback(boost::bind(&KeepAlive::queueKeepAlivesCallback, this,
boost::asio::placeholders::error,
timer.get()));
timer->start();
timers_.insert(std::make_pair(refresh_interval, timer));
leases_.insert(std::make_pair(leaseid, timer));
}
}
void etcd::KeepAlive::remove(int64_t leaseid, bool revoke) { void etcd::KeepAlive::remove(int64_t leaseid, bool revoke) {
leases_.unsafe_erase(leaseid); {
std::lock_guard<std::mutex> lg(mutex_);
// Search for the leaseid
auto lease = leases_.find(leaseid);
if (lease != leases_.end()) {
// Lease exists, let's remove it
// Check if the timer has any other leases attached or if it needs to be
// removed
bool delete_timer = true;
for (auto timer_search = leases_.begin(); timer_search != leases_.end();
++timer_search) {
if (timer_search->second == lease->second &&
timer_search->first != lease->first) {
// Found another lease using this timer, so don't delete the timer
delete_timer = false;
break;
}
}
// If we need to delete the timer...
if (delete_timer) {
// First make sure to stop the timer.
lease->second->stop();
// Find the timer in our timers_ map
for (auto timer_search = timers_.begin(); timer_search != timers_.end();
++timer_search) {
if (timer_search->second == lease->second) {
timers_.erase(timer_search);
break;
}
}
}
// Finally, we can erase the lease entry
leases_.erase(leaseid);
} // else = no record of the lease
}
if (revoke) { if (revoke) {
client_.leaserevoke(leaseid); client_.leaserevoke(leaseid);
} }
} }
void etcd::KeepAlive::sendNextKeepAlive() { void etcd::KeepAlive::sendNextKeepAlive(
std::pair<int64_t, int> lease; std::unique_ptr<
if (leaseQueue_.try_pop(lease)) { grpc::ClientAsyncReaderWriter<etcdserverpb::LeaseKeepAliveRequest,
// std::cout << "sending keepalive for " << lease.first << std::endl; etcdserverpb::LeaseKeepAliveResponse>>
&stream) {
LeaseKeepAliveRequest request; LeaseKeepAliveRequest request;
request.set_id(lease.first);
stream_->Write(request, reinterpret_cast<void*>(Type::WRITE)); {
std::lock_guard<std::mutex> lg(mutex_);
if (!keepalive_queue_.empty()) {
int64_t leaseid = keepalive_queue_.front();
keepalive_queue_.pop();
#if DEBUG
std::cout << "sending keepalive for " << leaseid << std::endl;
#endif
request.set_id(leaseid);
} else { } else {
// std::cout << "no keepalives left" << std::endl; // std::cout << "no keepalives left" << std::endl;
return;
} }
}
stream->Write(request, reinterpret_cast<void *>(Type::WRITE));
} }
void etcd::KeepAlive::readNextMessage() { void etcd::KeepAlive::readNextMessage(
std::unique_ptr<
grpc::ClientAsyncReaderWriter<etcdserverpb::LeaseKeepAliveRequest,
etcdserverpb::LeaseKeepAliveResponse>>
&stream) {
// std::cout << " ** Got response: " << response_.ttl() << std::endl; // std::cout << " ** Got response: " << response_.ttl() << std::endl;
stream_->Read(&response_, reinterpret_cast<void*>(Type::READ)); stream->Read(&response_, reinterpret_cast<void *>(Type::READ));
} }
etcd::KeepAlive::~KeepAlive() { cq_.Shutdown(); } void etcd::KeepAlive::removeAll(bool revoke) {
{
// empty the queue
std::lock_guard<std::mutex> lg(mutex_);
std::queue<int64_t> empty;
std::swap(keepalive_queue_, empty);
}
if (revoke) {
// revoke each lease
for (auto lease : leases_) {
client_.leaserevoke(lease.first);
}
}
// Clear the maps, which will destroy the timers as the shared_ptr's go out of
// scope
std::lock_guard<std::mutex> lg(mutex_);
leases_.clear();
timers_.clear();
}
etcd::KeepAlive::~KeepAlive() {
removeAll();
stream_->WritesDone(reinterpret_cast<void *>(Type::WRITES_DONE));
try {
t_->join();
} catch (const boost::thread_interrupted &) { /* suppress */
};
}