etcd-cpp-apiv3/etcd/KeepAlive.hpp

183 lines
5.7 KiB
C++

#pragma once
#include <grpc++/grpc++.h>
#include <boost/asio.hpp>
#include <boost/bind/bind.hpp>
#include <boost/function.hpp>
#include <boost/thread/thread.hpp>
#include <cstdio>
#include <iostream>
#include <mutex>
#include <string>
#include "etcd/Client.hpp"
#include "etcd/Response.hpp"
#include "proto/rpc.grpc.pb.h"
namespace etcdv3 {
class AsyncKeepAliveAction;
}
using etcdserverpb::KV;
using grpc::Channel;
namespace etcd {
/**
* Wrapper for a boost::asio timer that provides simple start/stop/repeat
* functions
*/
class KeepAliveTimer {
public:
typedef boost::function<void(boost::system::error_code, KeepAliveTimer *)>
handler_function;
KeepAliveTimer(boost::asio::io_context &io,
boost::asio::chrono::milliseconds interval)
: interval_(interval), timer_(boost::asio::steady_timer(io, interval)) {}
~KeepAliveTimer() { timer_.cancel(); }
void setCallback(handler_function handler) { handler_ = handler; }
void start() {
timer_.expires_from_now(boost::asio::chrono::milliseconds(
50) /* expire immediately; set to interval_ otherwise*/);
timer_.async_wait(boost::bind(handler_, boost::asio::placeholders::error,
this)); // boost::ref(*this)));
}
void repeat() {
timer_.expires_at(timer_.expiry() + interval_);
timer_.async_wait(boost::bind(handler_, boost::asio::placeholders::error,
this)); // boost::ref(*this)));
}
void stop() { timer_.cancel(); }
private:
boost::asio::chrono::milliseconds interval_;
boost::asio::steady_timer timer_;
handler_function handler_;
};
/**
* The main KeepAlive service class. You only need one per process since it can
* service many leases with different refresh intervals
*/
class KeepAlive {
enum class Type {
READ = 1,
WRITE = 2,
CONNECT = 3,
WRITES_DONE = 4,
FINISH = 5
};
public:
/**
* Create an instance of the KeepAlive service
* Call add(leaseid, refresh_interval) to add a leaseid to be kept alive by
* this service
* @param client the client etcd connection to use
*/
KeepAlive(Client &client);
/**
* Add a lease to be kept alive by this service
* @param leaseid the id of the lease
* @param refresh_interval the interval to refresh the lease
*/
void add(int64_t leaseid, boost::asio::chrono::milliseconds refresh_interval);
/**
* Remove a lease from being kept alive, and optionally revoke it immediately
* @param leaseid the id of the lease
* @param revoke true to immediatley revoke the lease (defaults to false)
*/
void remove(int64_t leaseid, bool revoke = false);
/**
* Remove all leases that the service is currently refreshing.
* @param revoke true to immediately revoke the leases (defaulst to false)
*/
void removeAll(bool revoke = false);
~KeepAlive();
private:
KeepAlive(KeepAlive const &rhs); // prevent copying
KeepAlive &operator=(KeepAlive const &rhs); // prevent assignment
/**
* Moves the registered leases to a processing queue, and then resets the
* timer and triggers the first lease renewal request
*/
void queueKeepAlivesCallback(const boost::system::error_code &error,
KeepAliveTimer *timer);
/**
* Sends a keep alive for the next lease in the queue
*/
void sendNextKeepAlive(std::unique_ptr<grpc::ClientAsyncReaderWriter<
etcdserverpb::LeaseKeepAliveRequest,
etcdserverpb::LeaseKeepAliveResponse>> &stream);
/**
* Read the response off the stream
*/
void readNextMessage(std::unique_ptr<grpc::ClientAsyncReaderWriter<
etcdserverpb::LeaseKeepAliveRequest,
etcdserverpb::LeaseKeepAliveResponse>> &stream);
// Boost io_context, which is responsible for doing the timer work
boost::asio::io_context io_;
// Thread for the io_context to use
std::unique_ptr<boost::thread> t_;
// A mutex to use to protect all of our lease maps/queues when manipulating
// them
std::mutex mutex_;
// The map of leases that have been registered with this service to be kept
// alive
std::map<int64_t, std::shared_ptr<KeepAliveTimer>> leases_;
// All of the timers that are currently running
std::map<boost::asio::chrono::milliseconds, std::shared_ptr<KeepAliveTimer>>
timers_;
// The current queue of keepalives that need to be sent
std::queue<int64_t> keepalive_queue_;
// The long running task for this service
pplx::task<void> current_task_;
// The client that we are attached to.
Client &client_;
// Context for the client. It could be used to convey extra information to
// the server and/or tweak certain RPC behaviors.
grpc::ClientContext context_;
// The producer-consumer queue we use to communicate asynchronously with the
// gRPC runtime.
grpc::CompletionQueue cq_;
// Out of the passed in Channel comes the stub, stored here, our view of the
// server's exposed services.
std::unique_ptr<etcdserverpb::Lease::Stub> stub_;
// The bidirectional, asynchronous stream for sending/receiving messages.
std::unique_ptr<
grpc::ClientAsyncReaderWriter<etcdserverpb::LeaseKeepAliveRequest,
etcdserverpb::LeaseKeepAliveResponse>>
stream_;
// Allocated protobuf that holds the response. In real clients and servers,
// the memory management would a bit more complex as the thread that fills
// in the response should take care of concurrency as well as memory
// management.
etcdserverpb::LeaseKeepAliveResponse response_;
// Finish status when the client is done with the stream.
grpc::Status finish_status_ = grpc::Status::OK;
};
} // namespace etcd