From c467a239582a605adf0eb34d0fdf55a5267dd381 Mon Sep 17 00:00:00 2001 From: arches Date: Tue, 21 Jun 2016 09:36:31 -0400 Subject: [PATCH] Added Watch implementation --- v3/include/AsyncWatchResponse.hpp | 37 +++++++++++ v3/src/AsyncWatchResponse.cpp | 107 ++++++++++++++++++++++++++++++ 2 files changed, 144 insertions(+) create mode 100644 v3/include/AsyncWatchResponse.hpp create mode 100644 v3/src/AsyncWatchResponse.cpp diff --git a/v3/include/AsyncWatchResponse.hpp b/v3/include/AsyncWatchResponse.hpp new file mode 100644 index 0000000..5305345 --- /dev/null +++ b/v3/include/AsyncWatchResponse.hpp @@ -0,0 +1,37 @@ +#ifndef __ASYNC_WATCH_HPP__ +#define __ASYNC_WATCH_HPP__ + +#include +#include "proto/rpc.grpc.pb.h" +#include "v3/include/V3Response.hpp" + + +using grpc::ClientAsyncReaderWriter; +using grpc::ClientContext; +using grpc::CompletionQueue; +using grpc::Status; +using etcdserverpb::WatchRequest; +using etcdserverpb::WatchResponse; +using etcdserverpb::KV; + +namespace etcdv3 +{ + class AsyncWatchResponse : public etcdv3::V3Response + { + public: + AsyncWatchResponse(){}; + AsyncWatchResponse(const std::string act){action = act;}; + AsyncWatchResponse(const AsyncWatchResponse& other); + AsyncWatchResponse& operator=(const AsyncWatchResponse& other); + AsyncWatchResponse& ParseResponse(); + void waitForResponse(); + WatchResponse reply; + Status status; + ClientContext context; + CompletionQueue cq_; + std::unique_ptr> stream; + KV::Stub* stub_; + }; +} + +#endif diff --git a/v3/src/AsyncWatchResponse.cpp b/v3/src/AsyncWatchResponse.cpp new file mode 100644 index 0000000..2f33eef --- /dev/null +++ b/v3/src/AsyncWatchResponse.cpp @@ -0,0 +1,107 @@ +#include "v3/include/AsyncWatchResponse.hpp" + +using etcdserverpb::RangeRequest; +using etcdserverpb::RangeResponse; + + +etcdv3::AsyncWatchResponse::AsyncWatchResponse(const etcdv3::AsyncWatchResponse& other) +{ + error_code = other.error_code; + error_message = other.error_message; + index = other.index; + action = other.action; + values = other.values; + prev_values = other.prev_values; + +} + +etcdv3::AsyncWatchResponse& etcdv3::AsyncWatchResponse::operator=(const etcdv3::AsyncWatchResponse& other) +{ + error_code = other.error_code; + error_message = other.error_message; + index = other.index; + action = other.action; + values = other.values; + prev_values = other.prev_values; + return *this; +} + +void etcdv3::AsyncWatchResponse::waitForResponse() +{ + void* got_tag; + bool ok = false; + + stream->Read(&reply, (void*)3); + while(cq_.Next(&got_tag, &ok)) + { + if(got_tag == (void*)3) + { + if(reply.events_size()) + { + stream->WritesDone((void*)this); + cq_.Next(&got_tag, &ok); + break; + } + else + { + stream->Read(&reply, (void*)3); + } + } + } +} + +etcdv3::AsyncWatchResponse& etcdv3::AsyncWatchResponse::ParseResponse() +{ + index = reply.header().revision(); + + mvccpb::KeyValue kv; + std::map mapValue; + + for(int cnt =0; cnt < reply.events_size(); cnt++) + { + auto event = reply.events(cnt); + if(mvccpb::Event::EventType::Event_EventType_PUT == event.type()) + { + if(event.has_kv()) + { + kv = event.kv(); + if(kv.version() == 1) + { + action = "create"; + } + else + { + action = "set"; + } + //values.push_back(kv); + mapValue.emplace(kv.key(), kv); + } + } + else if(mvccpb::Event::EventType::Event_EventType_DELETE == event.type()) + { + action = "delete"; + } + //get previous value index - 1 + RangeRequest get_request; + get_request.set_key(kv.key()); + get_request.set_revision(index - 1); + + RangeResponse response; + ClientContext ctx; + + Status result = stub_->Range(&ctx, get_request, &response); + if (result.ok()) + { + for(int cnt=0; cnt < response.kvs_size(); cnt++) + { + prev_values.push_back(response.kvs(cnt)); + } + } + } + for(auto x: mapValue) + { + values.push_back(x.second); + } + + return *this; +}