diff --git a/etcd/Client.hpp b/etcd/Client.hpp index a6005cb..6ece300 100644 --- a/etcd/Client.hpp +++ b/etcd/Client.hpp @@ -146,6 +146,7 @@ namespace etcd web::http::client::http_client client; std::unique_ptr stub_; + std::unique_ptr stub1_; pplx::task send_asyncput(const std::string& key, const std::string& value); std::unique_ptr watchServiceStub; pplx::task send_asyncadd(std::string const & key, const std::string& value); diff --git a/etcd/Response.hpp b/etcd/Response.hpp index 51db387..bdabbb8 100644 --- a/etcd/Response.hpp +++ b/etcd/Response.hpp @@ -27,25 +27,22 @@ namespace etcd static pplx::task createResponse(const etcdv3::V3Response& response); - templatestatic pplx::task create(T call) + templatestatic pplx::task create(std::shared_ptr call) { return pplx::task([call]() { void* got_tag; bool ok = false; - etcd::Response resp; + etcd::Response resp; //blocking call->cq_.Next(&got_tag, &ok); - GPR_ASSERT(got_tag == (void*)call); - - T call = static_cast(got_tag); + GPR_ASSERT(got_tag == (void*)call.get()); auto v3resp = call->ParseResponse(); resp = etcd::Response(v3resp); - delete call; //todo:make this a smart pointer return resp; }); }; diff --git a/src/Client.cpp b/src/Client.cpp index 7f983f6..5459628 100644 --- a/src/Client.cpp +++ b/src/Client.cpp @@ -14,6 +14,10 @@ using etcdserverpb::DeleteRangeRequest; using etcdserverpb::Compare; using etcdserverpb::RequestOp; +using grpc::ClientReaderWriter; +using etcdserverpb::WatchRequest; +using etcdserverpb::WatchResponse; +using etcdserverpb::WatchCreateRequest; etcd::Client::Client(std::string const & address) : client(address) @@ -27,6 +31,7 @@ etcd::Client::Client(std::string const & address) } std::shared_ptr channel = grpc::CreateChannel(stripped_address, grpc::InsecureChannelCredentials()); stub_= KV::NewStub(channel); + stub1_= Watch::NewStub(channel); } pplx::task etcd::Client::send_get_request(web::http::uri_builder & uri) @@ -295,11 +300,11 @@ pplx::task etcd::Client::send_asyncadd(std::string const & key, req_success->set_allocated_request_range(get_request.release()); - etcdv3::AsyncTxnResponse* call(new etcdv3::AsyncTxnResponse("create")); + std::shared_ptr call(new etcdv3::AsyncTxnResponse("create")); call->response_reader = stub_->AsyncTxn(&call->context,txn_request,&call->cq_); - call->response_reader->Finish(&call->reply, &call->status, (void*)call); + call->response_reader->Finish(&call->reply, &call->status, (void*)call.get()); return Response::create(call); @@ -349,11 +354,11 @@ pplx::task etcd::Client::send_asyncmodify_if(std::string const & - etcdv3::AsyncTxnResponse* call= new etcdv3::AsyncTxnResponse("compareAndSwap"); + std::shared_ptr call(new etcdv3::AsyncTxnResponse("compareAndSwap")); call->response_reader = stub_->AsyncTxn(&call->context,txn_request,&call->cq_); - call->rpcInstance->Finish(&call->putResponse, &call->status, (void*)call); + call->response_reader->Finish(&call->reply, &call->status, (void*)call.get()); return Response::create(call); @@ -397,7 +402,7 @@ pplx::task etcd::Client::send_asyncmodify(std::string const & ke req_success->set_allocated_request_range(get_request.release()); - etcdv3::AsyncTxnResponse* call= new etcdv3::AsyncTxnResponse("update"); + std::shared_ptr call(new etcdv3::AsyncTxnResponse("update")); call->response_reader = stub_->AsyncTxn(&call->context,txn_request,&call->cq_); @@ -419,11 +424,11 @@ pplx::task etcd::Client::send_asyncget(std::string const & key, get_request.set_sort_order(RangeRequest::SortOrder::RangeRequest_SortOrder_ASCEND); } - etcdv3::AsyncRangeResponse* call= new etcdv3::AsyncRangeResponse(); + std::shared_ptr call(new etcdv3::AsyncRangeResponse()); call->response_reader = stub_->AsyncRange(&call->context,get_request,&call->cq_); - call->response_reader->Finish(&call->reply, &call->status, (void*)call); + call->response_reader->Finish(&call->reply, &call->status, (void*)call.get()); return Response::create(call); } @@ -431,6 +436,39 @@ pplx::task etcd::Client::send_asyncget(std::string const & key, pplx::task etcd::Client::send_asyncput(std::string const & key, std::string const & value) { +#if 0 + //try watch here: + ClientContext context; + std::shared_ptr> stream(stub1_->Watch(&context)); + + WatchRequest watch_req; + WatchCreateRequest watch_create_req; + watch_create_req.set_key(key); + watch_req.mutable_create_request()->CopyFrom(watch_create_req); + + stream->Write(watch_req); + + std::cout<< "write finished" << std::endl; + + WatchResponse server_resp; + while(stream->Read(&server_resp)) + { + std::cout<< "read...watch id: "<< server_resp.watch_id()<< std::endl; + if(server_resp.events_size()) + { + std::cout << "event type: " << server_resp.events(0).type() << std::endl; + std::cout << "key: " << server_resp.events(0).kv().key() << std::endl; + std::cout << "value: " << server_resp.events(0).kv().value() << std::endl; + stream->WritesDone(); + + } + } + Status status = stream->Finish(); + if(!status.ok()) + { + std::cout << "rpc failed" << std::endl; + } +#endif //check if key is not present TxnRequest txn_request; Compare* compare = txn_request.add_compare(); @@ -469,11 +507,11 @@ pplx::task etcd::Client::send_asyncput(std::string const & key, req_success = txn_request.add_success(); req_success->set_allocated_request_range(get_request.release()); - etcdv3::AsyncTxnResponse* call= new etcdv3::AsyncTxnResponse("set"); + std::shared_ptr call(new etcdv3::AsyncTxnResponse("set")); call->response_reader = stub_->AsyncTxn(&call->context,txn_request,&call->cq_); - call->response_reader->Finish(&call->reply, &call->status, (void*)call); + call->response_reader->Finish(&call->reply, &call->status, (void*)call.get()); return Response::create(call); } @@ -543,13 +581,11 @@ pplx::task etcd::Client::send_asyncdelete(std::string const & ke req_failure->set_allocated_request_delete_range(del_request.release()); - - - etcdv3::AsyncTxnResponse* call= new etcdv3::AsyncTxnResponse("delete"); + std::shared_ptr call(new etcdv3::AsyncTxnResponse("delete")); call->response_reader = stub_->AsyncTxn(&call->context,txn_request,&call->cq_); - call->response_reader->Finish(&call->reply, &call->status, (void*)call); + call->response_reader->Finish(&call->reply, &call->status, (void*)call.get()); return Response::create(call); }