diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..2223cfc --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +build/ +compile_commands.json +proto/**/*.pb.cc +proto/**/*.pb.h diff --git a/CMakeLists.txt b/CMakeLists.txt index c917983..e584331 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,18 +1,43 @@ cmake_minimum_required (VERSION 3.1.3 FATAL_ERROR) project (etcd-cpp-api) +set(CMAKE_EXPORT_COMPILE_COMMANDS ON) + +set (etcd-cpp-api_VERSION_MAJOR 0) +set (etcd-cpp-api_VERSION_MINOR 1) + find_library(CPPREST_LIB NAMES cpprest) find_path(CPPREST_INCLUDE_DIR NAMES cpprest/http_client.h) find_package(Boost REQUIRED COMPONENTS system thread locale random) +find_package(OpenSSL REQUIRED) +find_package(Protobuf REQUIRED) -set (etcd-cpp-api_VERSION_MAJOR 0) -set (etcd-cpp-api_VERSION_MINOR 1) +set(GRPC_LIBRARY_PATH /usr/lib + /usr/lib64 + /usr/local/lib + /usr/local/lib64 + /usr/local/opt/grpc) +find_library(GPR_LIBRARY NAMES gpr PATHS ${GRPC_LIBRARY_PATH}) +find_library(GRPC_LIBRARY NAMES grpc PATHS ${GRPC_LIBRARY_PATH}) +find_library(GRPC++_LIBRARY NAMES grpc++ PATHS ${GRPC_LIBRARY_PATH}) +set(GRPC_LIBRARIES ${GPR_LIBRARY} ${GRPC_LIBRARY} ${GRPC++_LIBRARY}) + +file(GLOB_RECURSE PROTO_SRC RELATIVE "${CMAKE_SOURCE_DIR}/proto" "proto/*.proto") + +add_custom_target(proto-gen + COMMAND protoc -I . --cpp_out=. ${PROTO_SRC} + COMMAND protoc -I . --grpc_out=. --plugin=protoc-gen-grpc=`which grpc_cpp_plugin` ./rpc.proto ./v3lock.proto + COMMENT "Generate protobuf stuffs" + WORKING_DIRECTORY ${CMAKE_SOURCE_DIR}/proto) enable_testing() -include_directories(SYSTEM ${CPPREST_INCLUDE_DIR} ${Boost_INCLUDE_DIR}) +include_directories(SYSTEM ${CPPREST_INCLUDE_DIR} + ${Boost_INCLUDE_DIR} + ${PROTOBUF_INCLUDE_DIRS} + ${OPENSSL_INCLUDE_DIR}) include_directories(${CMAKE_CURRENT_SOURCE_DIR}) -set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wpedantic -Werror -std=c++11") +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wpedantic -Werror -Wno-string-compare -std=c++11") add_subdirectory(src) add_subdirectory(tst) diff --git a/etcd/Client.hpp b/etcd/Client.hpp index 55afe8c..63792be 100644 --- a/etcd/Client.hpp +++ b/etcd/Client.hpp @@ -2,18 +2,21 @@ #define __ETCD_CLIENT_HPP__ #include "etcd/Response.hpp" -#include "v3/include/Transaction.hpp" -#include "v3/include/AsyncTxnResponse.hpp" -#include "v3/include/Action.hpp" #include #include #include "proto/rpc.grpc.pb.h" +#include "proto/v3lock.grpc.pb.h" using etcdserverpb::KV; using etcdserverpb::Watch; using etcdserverpb::Lease; +using v3lockpb::Lock; + +namespace etcdv3 { + class Transaction; +} namespace etcd { @@ -181,11 +184,29 @@ namespace etcd */ pplx::task leasegrant(int ttl); - private: + /** + * Gains a lock at a key. + * @param key is the key to be used to request the lock. + */ + pplx::task lock(std::string const &key); + /** + * Releases a lock at a key. + * @param key is the lock key to release. + */ + pplx::task unlock(std::string const &key); + + /** + * Execute a etcd transaction. + * @param txn is the transaction object to be executed. + */ + pplx::task txn(etcdv3::Transaction const &txn); + + private: std::unique_ptr stub_; std::unique_ptr watchServiceStub; std::unique_ptr leaseServiceStub; + std::unique_ptr lockServiceStub; }; diff --git a/etcd/Response.hpp b/etcd/Response.hpp index 1b945f1..70e35b8 100644 --- a/etcd/Response.hpp +++ b/etcd/Response.hpp @@ -6,12 +6,13 @@ #include "etcd/Value.hpp" #include - -#include "v3/include/V3Response.hpp" +#include "proto/kv.pb.h" #include + namespace etcdv3 { class AsyncWatchAction; + class V3Response; } namespace etcd @@ -98,7 +99,17 @@ namespace etcd */ std::string const & key(int index) const; - protected: + /** + * Returns the lock key. + */ + std::string const & lock_key() const; + + /** + * Returns the watched events. + */ + std::vector const & events() const; + + protected: Response(const etcdv3::V3Response& response); Response(int error_code, char const * error_message); @@ -110,6 +121,8 @@ namespace etcd Value _prev_value; Values _values; Keys _keys; + std::string _lock_key; // for lock + std::vector _events; // for watch friend class SyncClient; friend class etcdv3::AsyncWatchAction; friend class Client; diff --git a/etcd/Value.hpp b/etcd/Value.hpp index f1e5b3b..0d0eacc 100644 --- a/etcd/Value.hpp +++ b/etcd/Value.hpp @@ -4,7 +4,10 @@ #include #include #include -#include "v3/include/KeyValue.hpp" + +namespace etcdv3 { + class KeyValue; +} namespace etcd { diff --git a/etcd/Watcher.hpp b/etcd/Watcher.hpp index 097c2d8..f308e19 100644 --- a/etcd/Watcher.hpp +++ b/etcd/Watcher.hpp @@ -3,9 +3,13 @@ #include #include "etcd/Response.hpp" -#include "v3/include/AsyncWatchAction.hpp" #include +#include "proto/rpc.grpc.pb.h" + +namespace etcdv3 { + class AsyncWatchAction; +} using etcdserverpb::KV; using etcdserverpb::Watch; diff --git a/proto/auth.proto b/proto/auth.proto index 50f9c33..8f82b7c 100644 --- a/proto/auth.proto +++ b/proto/auth.proto @@ -1,23 +1,37 @@ syntax = "proto3"; package authpb; +import "gogoproto/gogo.proto"; + +option (gogoproto.marshaler_all) = true; +option (gogoproto.sizer_all) = true; +option (gogoproto.unmarshaler_all) = true; +option (gogoproto.goproto_getters_all) = false; +option (gogoproto.goproto_enum_prefix_all) = false; + +message UserAddOptions { + bool no_password = 1; +}; + // User is a single entry in the bucket authUsers message User { bytes name = 1; bytes password = 2; repeated string roles = 3; + UserAddOptions options = 4; } // Permission is a single entity message Permission { - bytes key = 1; - enum Type { READ = 0; WRITE = 1; READWRITE = 2; } - Type permType = 2; + Type permType = 1; + + bytes key = 2; + bytes range_end = 3; } // Role is a single entry in the bucket authRoles diff --git a/proto/etcdserver.proto b/proto/etcdserver.proto index f51ec09..25e0aca 100644 --- a/proto/etcdserver.proto +++ b/proto/etcdserver.proto @@ -1,27 +1,34 @@ syntax = "proto2"; package etcdserverpb; +import "gogoproto/gogo.proto"; + +option (gogoproto.marshaler_all) = true; +option (gogoproto.sizer_all) = true; +option (gogoproto.unmarshaler_all) = true; +option (gogoproto.goproto_getters_all) = false; + message Request { - optional uint64 ID = 1; - optional string Method = 2; - optional string Path = 3; - optional string Val = 4; - optional bool Dir = 5; - optional string PrevValue = 6; - optional uint64 PrevIndex = 7; - optional bool PrevExist = 8; - optional int64 Expiration = 9; - optional bool Wait = 10; - optional uint64 Since = 11; - optional bool Recursive = 12; - optional bool Sorted = 13; - optional bool Quorum = 14; - optional int64 Time = 15; - optional bool Stream = 16; - optional bool Refresh = 17; + optional uint64 ID = 1 [(gogoproto.nullable) = false]; + optional string Method = 2 [(gogoproto.nullable) = false]; + optional string Path = 3 [(gogoproto.nullable) = false]; + optional string Val = 4 [(gogoproto.nullable) = false]; + optional bool Dir = 5 [(gogoproto.nullable) = false]; + optional string PrevValue = 6 [(gogoproto.nullable) = false]; + optional uint64 PrevIndex = 7 [(gogoproto.nullable) = false]; + optional bool PrevExist = 8 [(gogoproto.nullable) = true]; + optional int64 Expiration = 9 [(gogoproto.nullable) = false]; + optional bool Wait = 10 [(gogoproto.nullable) = false]; + optional uint64 Since = 11 [(gogoproto.nullable) = false]; + optional bool Recursive = 12 [(gogoproto.nullable) = false]; + optional bool Sorted = 13 [(gogoproto.nullable) = false]; + optional bool Quorum = 14 [(gogoproto.nullable) = false]; + optional int64 Time = 15 [(gogoproto.nullable) = false]; + optional bool Stream = 16 [(gogoproto.nullable) = false]; + optional bool Refresh = 17 [(gogoproto.nullable) = true]; } message Metadata { - optional uint64 NodeID = 1; - optional uint64 ClusterID = 2; + optional uint64 NodeID = 1 [(gogoproto.nullable) = false]; + optional uint64 ClusterID = 2 [(gogoproto.nullable) = false]; } diff --git a/proto/gogoproto/gogo.proto b/proto/gogoproto/gogo.proto new file mode 100644 index 0000000..b80c856 --- /dev/null +++ b/proto/gogoproto/gogo.proto @@ -0,0 +1,144 @@ +// Protocol Buffers for Go with Gadgets +// +// Copyright (c) 2013, The GoGo Authors. All rights reserved. +// http://github.com/gogo/protobuf +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +syntax = "proto2"; +package gogoproto; + +import "google/protobuf/descriptor.proto"; + +option java_package = "com.google.protobuf"; +option java_outer_classname = "GoGoProtos"; +option go_package = "github.com/gogo/protobuf/gogoproto"; + +extend google.protobuf.EnumOptions { + optional bool goproto_enum_prefix = 62001; + optional bool goproto_enum_stringer = 62021; + optional bool enum_stringer = 62022; + optional string enum_customname = 62023; + optional bool enumdecl = 62024; +} + +extend google.protobuf.EnumValueOptions { + optional string enumvalue_customname = 66001; +} + +extend google.protobuf.FileOptions { + optional bool goproto_getters_all = 63001; + optional bool goproto_enum_prefix_all = 63002; + optional bool goproto_stringer_all = 63003; + optional bool verbose_equal_all = 63004; + optional bool face_all = 63005; + optional bool gostring_all = 63006; + optional bool populate_all = 63007; + optional bool stringer_all = 63008; + optional bool onlyone_all = 63009; + + optional bool equal_all = 63013; + optional bool description_all = 63014; + optional bool testgen_all = 63015; + optional bool benchgen_all = 63016; + optional bool marshaler_all = 63017; + optional bool unmarshaler_all = 63018; + optional bool stable_marshaler_all = 63019; + + optional bool sizer_all = 63020; + + optional bool goproto_enum_stringer_all = 63021; + optional bool enum_stringer_all = 63022; + + optional bool unsafe_marshaler_all = 63023; + optional bool unsafe_unmarshaler_all = 63024; + + optional bool goproto_extensions_map_all = 63025; + optional bool goproto_unrecognized_all = 63026; + optional bool gogoproto_import = 63027; + optional bool protosizer_all = 63028; + optional bool compare_all = 63029; + optional bool typedecl_all = 63030; + optional bool enumdecl_all = 63031; + + optional bool goproto_registration = 63032; + optional bool messagename_all = 63033; + + optional bool goproto_sizecache_all = 63034; + optional bool goproto_unkeyed_all = 63035; +} + +extend google.protobuf.MessageOptions { + optional bool goproto_getters = 64001; + optional bool goproto_stringer = 64003; + optional bool verbose_equal = 64004; + optional bool face = 64005; + optional bool gostring = 64006; + optional bool populate = 64007; + optional bool stringer = 67008; + optional bool onlyone = 64009; + + optional bool equal = 64013; + optional bool description = 64014; + optional bool testgen = 64015; + optional bool benchgen = 64016; + optional bool marshaler = 64017; + optional bool unmarshaler = 64018; + optional bool stable_marshaler = 64019; + + optional bool sizer = 64020; + + optional bool unsafe_marshaler = 64023; + optional bool unsafe_unmarshaler = 64024; + + optional bool goproto_extensions_map = 64025; + optional bool goproto_unrecognized = 64026; + + optional bool protosizer = 64028; + optional bool compare = 64029; + + optional bool typedecl = 64030; + + optional bool messagename = 64033; + + optional bool goproto_sizecache = 64034; + optional bool goproto_unkeyed = 64035; +} + +extend google.protobuf.FieldOptions { + optional bool nullable = 65001; + optional bool embed = 65002; + optional string customtype = 65003; + optional string customname = 65004; + optional string jsontag = 65005; + optional string moretags = 65006; + optional string casttype = 65007; + optional string castkey = 65008; + optional string castvalue = 65009; + + optional bool stdtime = 65010; + optional bool stdduration = 65011; + optional bool wktpointer = 65012; + +} diff --git a/proto/google/api/annotations.proto b/proto/google/api/annotations.proto new file mode 100644 index 0000000..85c361b --- /dev/null +++ b/proto/google/api/annotations.proto @@ -0,0 +1,31 @@ +// Copyright (c) 2015, Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package google.api; + +import "google/api/http.proto"; +import "google/protobuf/descriptor.proto"; + +option go_package = "google.golang.org/genproto/googleapis/api/annotations;annotations"; +option java_multiple_files = true; +option java_outer_classname = "AnnotationsProto"; +option java_package = "com.google.api"; +option objc_class_prefix = "GAPI"; + +extend google.protobuf.MethodOptions { + // See `HttpRule`. + HttpRule http = 72295728; +} diff --git a/proto/google/api/http.proto b/proto/google/api/http.proto new file mode 100644 index 0000000..b2977f5 --- /dev/null +++ b/proto/google/api/http.proto @@ -0,0 +1,376 @@ +// Copyright 2019 Google LLC. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +syntax = "proto3"; + +package google.api; + +option cc_enable_arenas = true; +option go_package = "google.golang.org/genproto/googleapis/api/annotations;annotations"; +option java_multiple_files = true; +option java_outer_classname = "HttpProto"; +option java_package = "com.google.api"; +option objc_class_prefix = "GAPI"; + +// Defines the HTTP configuration for an API service. It contains a list of +// [HttpRule][google.api.HttpRule], each specifying the mapping of an RPC method +// to one or more HTTP REST API methods. +message Http { + // A list of HTTP configuration rules that apply to individual API methods. + // + // **NOTE:** All service configuration rules follow "last one wins" order. + repeated HttpRule rules = 1; + + // When set to true, URL path parameters will be fully URI-decoded except in + // cases of single segment matches in reserved expansion, where "%2F" will be + // left encoded. + // + // The default behavior is to not decode RFC 6570 reserved characters in multi + // segment matches. + bool fully_decode_reserved_expansion = 2; +} + +// # gRPC Transcoding +// +// gRPC Transcoding is a feature for mapping between a gRPC method and one or +// more HTTP REST endpoints. It allows developers to build a single API service +// that supports both gRPC APIs and REST APIs. Many systems, including [Google +// APIs](https://github.com/googleapis/googleapis), +// [Cloud Endpoints](https://cloud.google.com/endpoints), [gRPC +// Gateway](https://github.com/grpc-ecosystem/grpc-gateway), +// and [Envoy](https://github.com/envoyproxy/envoy) proxy support this feature +// and use it for large scale production services. +// +// `HttpRule` defines the schema of the gRPC/REST mapping. The mapping specifies +// how different portions of the gRPC request message are mapped to the URL +// path, URL query parameters, and HTTP request body. It also controls how the +// gRPC response message is mapped to the HTTP response body. `HttpRule` is +// typically specified as an `google.api.http` annotation on the gRPC method. +// +// Each mapping specifies a URL path template and an HTTP method. The path +// template may refer to one or more fields in the gRPC request message, as long +// as each field is a non-repeated field with a primitive (non-message) type. +// The path template controls how fields of the request message are mapped to +// the URL path. +// +// Example: +// +// service Messaging { +// rpc GetMessage(GetMessageRequest) returns (Message) { +// option (google.api.http) = { +// get: "/v1/{name=messages/*}" +// }; +// } +// } +// message GetMessageRequest { +// string name = 1; // Mapped to URL path. +// } +// message Message { +// string text = 1; // The resource content. +// } +// +// This enables an HTTP REST to gRPC mapping as below: +// +// HTTP | gRPC +// -----|----- +// `GET /v1/messages/123456` | `GetMessage(name: "messages/123456")` +// +// Any fields in the request message which are not bound by the path template +// automatically become HTTP query parameters if there is no HTTP request body. +// For example: +// +// service Messaging { +// rpc GetMessage(GetMessageRequest) returns (Message) { +// option (google.api.http) = { +// get:"/v1/messages/{message_id}" +// }; +// } +// } +// message GetMessageRequest { +// message SubMessage { +// string subfield = 1; +// } +// string message_id = 1; // Mapped to URL path. +// int64 revision = 2; // Mapped to URL query parameter `revision`. +// SubMessage sub = 3; // Mapped to URL query parameter `sub.subfield`. +// } +// +// This enables a HTTP JSON to RPC mapping as below: +// +// HTTP | gRPC +// -----|----- +// `GET /v1/messages/123456?revision=2&sub.subfield=foo` | +// `GetMessage(message_id: "123456" revision: 2 sub: SubMessage(subfield: +// "foo"))` +// +// Note that fields which are mapped to URL query parameters must have a +// primitive type or a repeated primitive type or a non-repeated message type. +// In the case of a repeated type, the parameter can be repeated in the URL +// as `...?param=A¶m=B`. In the case of a message type, each field of the +// message is mapped to a separate parameter, such as +// `...?foo.a=A&foo.b=B&foo.c=C`. +// +// For HTTP methods that allow a request body, the `body` field +// specifies the mapping. Consider a REST update method on the +// message resource collection: +// +// service Messaging { +// rpc UpdateMessage(UpdateMessageRequest) returns (Message) { +// option (google.api.http) = { +// patch: "/v1/messages/{message_id}" +// body: "message" +// }; +// } +// } +// message UpdateMessageRequest { +// string message_id = 1; // mapped to the URL +// Message message = 2; // mapped to the body +// } +// +// The following HTTP JSON to RPC mapping is enabled, where the +// representation of the JSON in the request body is determined by +// protos JSON encoding: +// +// HTTP | gRPC +// -----|----- +// `PATCH /v1/messages/123456 { "text": "Hi!" }` | `UpdateMessage(message_id: +// "123456" message { text: "Hi!" })` +// +// The special name `*` can be used in the body mapping to define that +// every field not bound by the path template should be mapped to the +// request body. This enables the following alternative definition of +// the update method: +// +// service Messaging { +// rpc UpdateMessage(Message) returns (Message) { +// option (google.api.http) = { +// patch: "/v1/messages/{message_id}" +// body: "*" +// }; +// } +// } +// message Message { +// string message_id = 1; +// string text = 2; +// } +// +// +// The following HTTP JSON to RPC mapping is enabled: +// +// HTTP | gRPC +// -----|----- +// `PATCH /v1/messages/123456 { "text": "Hi!" }` | `UpdateMessage(message_id: +// "123456" text: "Hi!")` +// +// Note that when using `*` in the body mapping, it is not possible to +// have HTTP parameters, as all fields not bound by the path end in +// the body. This makes this option more rarely used in practice when +// defining REST APIs. The common usage of `*` is in custom methods +// which don't use the URL at all for transferring data. +// +// It is possible to define multiple HTTP methods for one RPC by using +// the `additional_bindings` option. Example: +// +// service Messaging { +// rpc GetMessage(GetMessageRequest) returns (Message) { +// option (google.api.http) = { +// get: "/v1/messages/{message_id}" +// additional_bindings { +// get: "/v1/users/{user_id}/messages/{message_id}" +// } +// }; +// } +// } +// message GetMessageRequest { +// string message_id = 1; +// string user_id = 2; +// } +// +// This enables the following two alternative HTTP JSON to RPC mappings: +// +// HTTP | gRPC +// -----|----- +// `GET /v1/messages/123456` | `GetMessage(message_id: "123456")` +// `GET /v1/users/me/messages/123456` | `GetMessage(user_id: "me" message_id: +// "123456")` +// +// ## Rules for HTTP mapping +// +// 1. Leaf request fields (recursive expansion nested messages in the request +// message) are classified into three categories: +// - Fields referred by the path template. They are passed via the URL path. +// - Fields referred by the [HttpRule.body][google.api.HttpRule.body]. They are passed via the HTTP +// request body. +// - All other fields are passed via the URL query parameters, and the +// parameter name is the field path in the request message. A repeated +// field can be represented as multiple query parameters under the same +// name. +// 2. If [HttpRule.body][google.api.HttpRule.body] is "*", there is no URL query parameter, all fields +// are passed via URL path and HTTP request body. +// 3. If [HttpRule.body][google.api.HttpRule.body] is omitted, there is no HTTP request body, all +// fields are passed via URL path and URL query parameters. +// +// ### Path template syntax +// +// Template = "/" Segments [ Verb ] ; +// Segments = Segment { "/" Segment } ; +// Segment = "*" | "**" | LITERAL | Variable ; +// Variable = "{" FieldPath [ "=" Segments ] "}" ; +// FieldPath = IDENT { "." IDENT } ; +// Verb = ":" LITERAL ; +// +// The syntax `*` matches a single URL path segment. The syntax `**` matches +// zero or more URL path segments, which must be the last part of the URL path +// except the `Verb`. +// +// The syntax `Variable` matches part of the URL path as specified by its +// template. A variable template must not contain other variables. If a variable +// matches a single path segment, its template may be omitted, e.g. `{var}` +// is equivalent to `{var=*}`. +// +// The syntax `LITERAL` matches literal text in the URL path. If the `LITERAL` +// contains any reserved character, such characters should be percent-encoded +// before the matching. +// +// If a variable contains exactly one path segment, such as `"{var}"` or +// `"{var=*}"`, when such a variable is expanded into a URL path on the client +// side, all characters except `[-_.~0-9a-zA-Z]` are percent-encoded. The +// server side does the reverse decoding. Such variables show up in the +// [Discovery +// Document](https://developers.google.com/discovery/v1/reference/apis) as +// `{var}`. +// +// If a variable contains multiple path segments, such as `"{var=foo/*}"` +// or `"{var=**}"`, when such a variable is expanded into a URL path on the +// client side, all characters except `[-_.~/0-9a-zA-Z]` are percent-encoded. +// The server side does the reverse decoding, except "%2F" and "%2f" are left +// unchanged. Such variables show up in the +// [Discovery +// Document](https://developers.google.com/discovery/v1/reference/apis) as +// `{+var}`. +// +// ## Using gRPC API Service Configuration +// +// gRPC API Service Configuration (service config) is a configuration language +// for configuring a gRPC service to become a user-facing product. The +// service config is simply the YAML representation of the `google.api.Service` +// proto message. +// +// As an alternative to annotating your proto file, you can configure gRPC +// transcoding in your service config YAML files. You do this by specifying a +// `HttpRule` that maps the gRPC method to a REST endpoint, achieving the same +// effect as the proto annotation. This can be particularly useful if you +// have a proto that is reused in multiple services. Note that any transcoding +// specified in the service config will override any matching transcoding +// configuration in the proto. +// +// Example: +// +// http: +// rules: +// # Selects a gRPC method and applies HttpRule to it. +// - selector: example.v1.Messaging.GetMessage +// get: /v1/messages/{message_id}/{sub.subfield} +// +// ## Special notes +// +// When gRPC Transcoding is used to map a gRPC to JSON REST endpoints, the +// proto to JSON conversion must follow the [proto3 +// specification](https://developers.google.com/protocol-buffers/docs/proto3#json). +// +// While the single segment variable follows the semantics of +// [RFC 6570](https://tools.ietf.org/html/rfc6570) Section 3.2.2 Simple String +// Expansion, the multi segment variable **does not** follow RFC 6570 Section +// 3.2.3 Reserved Expansion. The reason is that the Reserved Expansion +// does not expand special characters like `?` and `#`, which would lead +// to invalid URLs. As the result, gRPC Transcoding uses a custom encoding +// for multi segment variables. +// +// The path variables **must not** refer to any repeated or mapped field, +// because client libraries are not capable of handling such variable expansion. +// +// The path variables **must not** capture the leading "/" character. The reason +// is that the most common use case "{var}" does not capture the leading "/" +// character. For consistency, all path variables must share the same behavior. +// +// Repeated message fields must not be mapped to URL query parameters, because +// no client library can support such complicated mapping. +// +// If an API needs to use a JSON array for request or response body, it can map +// the request or response body to a repeated field. However, some gRPC +// Transcoding implementations may not support this feature. +message HttpRule { + // Selects a method to which this rule applies. + // + // Refer to [selector][google.api.DocumentationRule.selector] for syntax details. + string selector = 1; + + // Determines the URL pattern is matched by this rules. This pattern can be + // used with any of the {get|put|post|delete|patch} methods. A custom method + // can be defined using the 'custom' field. + oneof pattern { + // Maps to HTTP GET. Used for listing and getting information about + // resources. + string get = 2; + + // Maps to HTTP PUT. Used for replacing a resource. + string put = 3; + + // Maps to HTTP POST. Used for creating a resource or performing an action. + string post = 4; + + // Maps to HTTP DELETE. Used for deleting a resource. + string delete = 5; + + // Maps to HTTP PATCH. Used for updating a resource. + string patch = 6; + + // The custom pattern is used for specifying an HTTP method that is not + // included in the `pattern` field, such as HEAD, or "*" to leave the + // HTTP method unspecified for this rule. The wild-card rule is useful + // for services that provide content to Web (HTML) clients. + CustomHttpPattern custom = 8; + } + + // The name of the request field whose value is mapped to the HTTP request + // body, or `*` for mapping all request fields not captured by the path + // pattern to the HTTP body, or omitted for not having any HTTP request body. + // + // NOTE: the referred field must be present at the top-level of the request + // message type. + string body = 7; + + // Optional. The name of the response field whose value is mapped to the HTTP + // response body. When omitted, the entire response message will be used + // as the HTTP response body. + // + // NOTE: The referred field must be present at the top-level of the response + // message type. + string response_body = 12; + + // Additional HTTP bindings for the selector. Nested bindings must + // not contain an `additional_bindings` field themselves (that is, + // the nesting may only be one level deep). + repeated HttpRule additional_bindings = 11; +} + +// A custom pattern is used for defining custom HTTP verb. +message CustomHttpPattern { + // The name of this custom HTTP verb. + string kind = 1; + + // The path matched by this custom verb. + string path = 2; +} diff --git a/proto/kv.proto b/proto/kv.proto index 4e64b50..23c911b 100644 --- a/proto/kv.proto +++ b/proto/kv.proto @@ -1,6 +1,14 @@ syntax = "proto3"; package mvccpb; +import "gogoproto/gogo.proto"; + +option (gogoproto.marshaler_all) = true; +option (gogoproto.sizer_all) = true; +option (gogoproto.unmarshaler_all) = true; +option (gogoproto.goproto_getters_all) = false; +option (gogoproto.goproto_enum_prefix_all) = false; + message KeyValue { // key is the key in bytes. An empty key is not allowed. bytes key = 1; diff --git a/proto/rpc.proto b/proto/rpc.proto index d4cfcbd..9405c90 100644 --- a/proto/rpc.proto +++ b/proto/rpc.proto @@ -1,34 +1,65 @@ syntax = "proto3"; package etcdserverpb; +import "gogoproto/gogo.proto"; import "kv.proto"; import "auth.proto"; +// for grpc-gateway +import "google/api/annotations.proto"; + +option (gogoproto.marshaler_all) = true; +option (gogoproto.unmarshaler_all) = true; service KV { // Range gets the keys in the range from the key-value store. - rpc Range(RangeRequest) returns (RangeResponse) {} + rpc Range(RangeRequest) returns (RangeResponse) { + option (google.api.http) = { + post: "/v3/kv/range" + body: "*" + }; + } // Put puts the given key into the key-value store. // A put request increments the revision of the key-value store // and generates one event in the event history. - rpc Put(PutRequest) returns (PutResponse) {} + rpc Put(PutRequest) returns (PutResponse) { + option (google.api.http) = { + post: "/v3/kv/put" + body: "*" + }; + } // DeleteRange deletes the given range from the key-value store. // A delete request increments the revision of the key-value store // and generates a delete event in the event history for every deleted key. - rpc DeleteRange(DeleteRangeRequest) returns (DeleteRangeResponse) {} + rpc DeleteRange(DeleteRangeRequest) returns (DeleteRangeResponse) { + option (google.api.http) = { + post: "/v3/kv/deleterange" + body: "*" + }; + } // Txn processes multiple requests in a single transaction. // A txn request increments the revision of the key-value store // and generates events with the same revision for every completed request. // It is not allowed to modify the same key several times within one txn. - rpc Txn(TxnRequest) returns (TxnResponse) {} + rpc Txn(TxnRequest) returns (TxnResponse) { + option (google.api.http) = { + post: "/v3/kv/txn" + body: "*" + }; + } // Compact compacts the event history in the etcd key-value store. The key-value // store should be periodically compacted or the event history will continue to grow // indefinitely. - rpc Compact(CompactionRequest) returns (CompactionResponse) {} + rpc Compact(CompactionRequest) returns (CompactionResponse) { + option (google.api.http) = { + post: "/v3/kv/compaction" + body: "*" + }; + } } service Watch { @@ -37,107 +68,305 @@ service Watch { // stream sends events. One watch RPC can watch on multiple key ranges, streaming events // for several watches at once. The entire event history can be watched starting from the // last compaction revision. - rpc Watch(stream WatchRequest) returns (stream WatchResponse) {} + rpc Watch(stream WatchRequest) returns (stream WatchResponse) { + option (google.api.http) = { + post: "/v3/watch" + body: "*" + }; + } } service Lease { // LeaseGrant creates a lease which expires if the server does not receive a keepAlive // within a given time to live period. All keys attached to the lease will be expired and // deleted if the lease expires. Each expired key generates a delete event in the event history. - rpc LeaseGrant(LeaseGrantRequest) returns (LeaseGrantResponse) {} + rpc LeaseGrant(LeaseGrantRequest) returns (LeaseGrantResponse) { + option (google.api.http) = { + post: "/v3/lease/grant" + body: "*" + }; + } // LeaseRevoke revokes a lease. All keys attached to the lease will expire and be deleted. - rpc LeaseRevoke(LeaseRevokeRequest) returns (LeaseRevokeResponse) {} + rpc LeaseRevoke(LeaseRevokeRequest) returns (LeaseRevokeResponse) { + option (google.api.http) = { + post: "/v3/lease/revoke" + body: "*" + additional_bindings { + post: "/v3/kv/lease/revoke" + body: "*" + } + }; + } // LeaseKeepAlive keeps the lease alive by streaming keep alive requests from the client // to the server and streaming keep alive responses from the server to the client. - rpc LeaseKeepAlive(stream LeaseKeepAliveRequest) returns (stream LeaseKeepAliveResponse) {} + rpc LeaseKeepAlive(stream LeaseKeepAliveRequest) returns (stream LeaseKeepAliveResponse) { + option (google.api.http) = { + post: "/v3/lease/keepalive" + body: "*" + }; + } - // TODO(xiangli) List all existing Leases? - // TODO(xiangli) Get details information (expirations, leased keys, etc.) of a lease? + // LeaseTimeToLive retrieves lease information. + rpc LeaseTimeToLive(LeaseTimeToLiveRequest) returns (LeaseTimeToLiveResponse) { + option (google.api.http) = { + post: "/v3/lease/timetolive" + body: "*" + additional_bindings { + post: "/v3/kv/lease/timetolive" + body: "*" + } + }; + } + + // LeaseLeases lists all existing leases. + rpc LeaseLeases(LeaseLeasesRequest) returns (LeaseLeasesResponse) { + option (google.api.http) = { + post: "/v3/lease/leases" + body: "*" + additional_bindings { + post: "/v3/kv/lease/leases" + body: "*" + } + }; + } } service Cluster { // MemberAdd adds a member into the cluster. - rpc MemberAdd(MemberAddRequest) returns (MemberAddResponse) {} + rpc MemberAdd(MemberAddRequest) returns (MemberAddResponse) { + option (google.api.http) = { + post: "/v3/cluster/member/add" + body: "*" + }; + } // MemberRemove removes an existing member from the cluster. - rpc MemberRemove(MemberRemoveRequest) returns (MemberRemoveResponse) {} + rpc MemberRemove(MemberRemoveRequest) returns (MemberRemoveResponse) { + option (google.api.http) = { + post: "/v3/cluster/member/remove" + body: "*" + }; + } // MemberUpdate updates the member configuration. - rpc MemberUpdate(MemberUpdateRequest) returns (MemberUpdateResponse) {} + rpc MemberUpdate(MemberUpdateRequest) returns (MemberUpdateResponse) { + option (google.api.http) = { + post: "/v3/cluster/member/update" + body: "*" + }; + } // MemberList lists all the members in the cluster. - rpc MemberList(MemberListRequest) returns (MemberListResponse) {} + rpc MemberList(MemberListRequest) returns (MemberListResponse) { + option (google.api.http) = { + post: "/v3/cluster/member/list" + body: "*" + }; + } + + // MemberPromote promotes a member from raft learner (non-voting) to raft voting member. + rpc MemberPromote(MemberPromoteRequest) returns (MemberPromoteResponse) { + option (google.api.http) = { + post: "/v3/cluster/member/promote" + body: "*" + }; + } } service Maintenance { // Alarm activates, deactivates, and queries alarms regarding cluster health. - rpc Alarm(AlarmRequest) returns (AlarmResponse) {} + rpc Alarm(AlarmRequest) returns (AlarmResponse) { + option (google.api.http) = { + post: "/v3/maintenance/alarm" + body: "*" + }; + } // Status gets the status of the member. - rpc Status(StatusRequest) returns (StatusResponse) {} + rpc Status(StatusRequest) returns (StatusResponse) { + option (google.api.http) = { + post: "/v3/maintenance/status" + body: "*" + }; + } // Defragment defragments a member's backend database to recover storage space. - rpc Defragment(DefragmentRequest) returns (DefragmentResponse) {} + rpc Defragment(DefragmentRequest) returns (DefragmentResponse) { + option (google.api.http) = { + post: "/v3/maintenance/defragment" + body: "*" + }; + } - // Hash returns the hash of the local KV state for consistency checking purpose. - // This is designed for testing; do not use this in production when there - // are ongoing transactions. - rpc Hash(HashRequest) returns (HashResponse) {} + // Hash computes the hash of whole backend keyspace, + // including key, lease, and other buckets in storage. + // This is designed for testing ONLY! + // Do not rely on this in production with ongoing transactions, + // since Hash operation does not hold MVCC locks. + // Use "HashKV" API instead for "key" bucket consistency checks. + rpc Hash(HashRequest) returns (HashResponse) { + option (google.api.http) = { + post: "/v3/maintenance/hash" + body: "*" + }; + } + + // HashKV computes the hash of all MVCC keys up to a given revision. + // It only iterates "key" bucket in backend storage. + rpc HashKV(HashKVRequest) returns (HashKVResponse) { + option (google.api.http) = { + post: "/v3/maintenance/hash" + body: "*" + }; + } // Snapshot sends a snapshot of the entire backend from a member over a stream to a client. - rpc Snapshot(SnapshotRequest) returns (stream SnapshotResponse) {} + rpc Snapshot(SnapshotRequest) returns (stream SnapshotResponse) { + option (google.api.http) = { + post: "/v3/maintenance/snapshot" + body: "*" + }; + } + + // MoveLeader requests current leader node to transfer its leadership to transferee. + rpc MoveLeader(MoveLeaderRequest) returns (MoveLeaderResponse) { + option (google.api.http) = { + post: "/v3/maintenance/transfer-leadership" + body: "*" + }; + } } service Auth { // AuthEnable enables authentication. - rpc AuthEnable(AuthEnableRequest) returns (AuthEnableResponse) {} + rpc AuthEnable(AuthEnableRequest) returns (AuthEnableResponse) { + option (google.api.http) = { + post: "/v3/auth/enable" + body: "*" + }; + } // AuthDisable disables authentication. - rpc AuthDisable(AuthDisableRequest) returns (AuthDisableResponse) {} + rpc AuthDisable(AuthDisableRequest) returns (AuthDisableResponse) { + option (google.api.http) = { + post: "/v3/auth/disable" + body: "*" + }; + } // Authenticate processes an authenticate request. - rpc Authenticate(AuthenticateRequest) returns (AuthenticateResponse) {} + rpc Authenticate(AuthenticateRequest) returns (AuthenticateResponse) { + option (google.api.http) = { + post: "/v3/auth/authenticate" + body: "*" + }; + } - // UserAdd adds a new user. - rpc UserAdd(AuthUserAddRequest) returns (AuthUserAddResponse) {} + // UserAdd adds a new user. User name cannot be empty. + rpc UserAdd(AuthUserAddRequest) returns (AuthUserAddResponse) { + option (google.api.http) = { + post: "/v3/auth/user/add" + body: "*" + }; + } // UserGet gets detailed user information. - rpc UserGet(AuthUserGetRequest) returns (AuthUserGetResponse) {} + rpc UserGet(AuthUserGetRequest) returns (AuthUserGetResponse) { + option (google.api.http) = { + post: "/v3/auth/user/get" + body: "*" + }; + } // UserList gets a list of all users. - rpc UserList(AuthUserListRequest) returns (AuthUserListResponse) {} + rpc UserList(AuthUserListRequest) returns (AuthUserListResponse) { + option (google.api.http) = { + post: "/v3/auth/user/list" + body: "*" + }; + } // UserDelete deletes a specified user. - rpc UserDelete(AuthUserDeleteRequest) returns (AuthUserDeleteResponse) {} + rpc UserDelete(AuthUserDeleteRequest) returns (AuthUserDeleteResponse) { + option (google.api.http) = { + post: "/v3/auth/user/delete" + body: "*" + }; + } // UserChangePassword changes the password of a specified user. - rpc UserChangePassword(AuthUserChangePasswordRequest) returns (AuthUserChangePasswordResponse) {} + rpc UserChangePassword(AuthUserChangePasswordRequest) returns (AuthUserChangePasswordResponse) { + option (google.api.http) = { + post: "/v3/auth/user/changepw" + body: "*" + }; + } // UserGrant grants a role to a specified user. - rpc UserGrantRole(AuthUserGrantRoleRequest) returns (AuthUserGrantRoleResponse) {} + rpc UserGrantRole(AuthUserGrantRoleRequest) returns (AuthUserGrantRoleResponse) { + option (google.api.http) = { + post: "/v3/auth/user/grant" + body: "*" + }; + } // UserRevokeRole revokes a role of specified user. - rpc UserRevokeRole(AuthUserRevokeRoleRequest) returns (AuthUserRevokeRoleResponse) {} + rpc UserRevokeRole(AuthUserRevokeRoleRequest) returns (AuthUserRevokeRoleResponse) { + option (google.api.http) = { + post: "/v3/auth/user/revoke" + body: "*" + }; + } - // RoleAdd adds a new role. - rpc RoleAdd(AuthRoleAddRequest) returns (AuthRoleAddResponse) {} + // RoleAdd adds a new role. Role name cannot be empty. + rpc RoleAdd(AuthRoleAddRequest) returns (AuthRoleAddResponse) { + option (google.api.http) = { + post: "/v3/auth/role/add" + body: "*" + }; + } // RoleGet gets detailed role information. - rpc RoleGet(AuthRoleGetRequest) returns (AuthRoleGetResponse) {} + rpc RoleGet(AuthRoleGetRequest) returns (AuthRoleGetResponse) { + option (google.api.http) = { + post: "/v3/auth/role/get" + body: "*" + }; + } // RoleList gets lists of all roles. - rpc RoleList(AuthRoleListRequest) returns (AuthRoleListResponse) {} + rpc RoleList(AuthRoleListRequest) returns (AuthRoleListResponse) { + option (google.api.http) = { + post: "/v3/auth/role/list" + body: "*" + }; + } // RoleDelete deletes a specified role. - rpc RoleDelete(AuthRoleDeleteRequest) returns (AuthRoleDeleteResponse) {} + rpc RoleDelete(AuthRoleDeleteRequest) returns (AuthRoleDeleteResponse) { + option (google.api.http) = { + post: "/v3/auth/role/delete" + body: "*" + }; + } // RoleGrantPermission grants a permission of a specified key or range to a specified role. - rpc RoleGrantPermission(AuthRoleGrantPermissionRequest) returns (AuthRoleGrantPermissionResponse) {} + rpc RoleGrantPermission(AuthRoleGrantPermissionRequest) returns (AuthRoleGrantPermissionResponse) { + option (google.api.http) = { + post: "/v3/auth/role/grant" + body: "*" + }; + } // RoleRevokePermission revokes a key or range permission of a specified role. - rpc RoleRevokePermission(AuthRoleRevokePermissionRequest) returns (AuthRoleRevokePermissionResponse) {} + rpc RoleRevokePermission(AuthRoleRevokePermissionRequest) returns (AuthRoleRevokePermissionResponse) { + option (google.api.http) = { + post: "/v3/auth/role/revoke" + body: "*" + }; + } } message ResponseHeader { @@ -146,6 +375,9 @@ message ResponseHeader { // member_id is the ID of the member which sent the response. uint64 member_id = 2; // revision is the key-value store revision when the request was applied. + // For watch progress responses, the header.revision indicates progress. All future events + // recieved in this stream are guaranteed to have a higher revision number than the + // header.revision number. int64 revision = 3; // raft_term is the raft term when the request was applied. uint64 raft_term = 4; @@ -169,11 +401,12 @@ message RangeRequest { bytes key = 1; // range_end is the upper bound on the requested range [key, range_end). // If range_end is '\0', the range is all keys >= key. - // If the range_end is one bit larger than the given key, - // then the range requests get the all keys with the prefix (the given key). - // If both key and range_end are '\0', then range requests returns all keys. + // If range_end is key plus one (e.g., "aa"+1 == "ab", "a\xff"+1 == "b"), + // then the range request gets all keys prefixed with key. + // If both key and range_end are '\0', then the range request returns all keys. bytes range_end = 2; - // limit is a limit on the number of keys returned for the request. + // limit is a limit on the number of keys returned for the request. When limit is set to 0, + // it is treated as no limit. int64 limit = 3; // revision is the point-in-time of the key-value store to use for the range. // If revision is less or equal to zero, the range is over the newest key-value store. @@ -196,9 +429,25 @@ message RangeRequest { // keys_only when set returns only the keys and not the values. bool keys_only = 8; - + // count_only when set returns only the count of the keys in the range. bool count_only = 9; + + // min_mod_revision is the lower bound for returned key mod revisions; all keys with + // lesser mod revisions will be filtered away. + int64 min_mod_revision = 10; + + // max_mod_revision is the upper bound for returned key mod revisions; all keys with + // greater mod revisions will be filtered away. + int64 max_mod_revision = 11; + + // min_create_revision is the lower bound for returned key create revisions; all keys with + // lesser create revisions will be filtered away. + int64 min_create_revision = 12; + + // max_create_revision is the upper bound for returned key create revisions; all keys with + // greater create revisions will be filtered away. + int64 max_create_revision = 13; } message RangeResponse { @@ -224,6 +473,14 @@ message PutRequest { // If prev_kv is set, etcd gets the previous key-value pair before changing it. // The previous key-value pair will be returned in the put response. bool prev_kv = 4; + + // If ignore_value is set, etcd updates the key using its current value. + // Returns an error if the key does not exist. + bool ignore_value = 5; + + // If ignore_lease is set, etcd updates the key using its current lease. + // Returns an error if the key does not exist. + bool ignore_lease = 6; } message PutResponse { @@ -237,11 +494,13 @@ message DeleteRangeRequest { bytes key = 1; // range_end is the key following the last key to delete for the range [key, range_end). // If range_end is not given, the range is defined to contain only the key argument. + // If range_end is one bit larger than the given key, then the range is all the keys + // with the prefix (the given key). // If range_end is '\0', the range is all keys greater than or equal to the key argument. bytes range_end = 2; // If prev_kv is set, etcd gets the previous key-value pairs before deleting it. - // The previous key-value pairs will be returned in the delte response. + // The previous key-value pairs will be returned in the delete response. bool prev_kv = 3; } @@ -259,6 +518,7 @@ message RequestOp { RangeRequest request_range = 1; PutRequest request_put = 2; DeleteRangeRequest request_delete_range = 3; + TxnRequest request_txn = 4; } } @@ -268,6 +528,7 @@ message ResponseOp { RangeResponse response_range = 1; PutResponse response_put = 2; DeleteRangeResponse response_delete_range = 3; + TxnResponse response_txn = 4; } } @@ -276,12 +537,14 @@ message Compare { EQUAL = 0; GREATER = 1; LESS = 2; + NOT_EQUAL = 3; } enum CompareTarget { VERSION = 0; CREATE = 1; MOD = 2; - VALUE= 3; + VALUE = 3; + LEASE = 4; } // result is logical comparison operation for this comparison. CompareResult result = 1; @@ -298,7 +561,15 @@ message Compare { int64 mod_revision = 6; // value is the value of the given key, in bytes. bytes value = 7; + // lease is the lease id of the given key. + int64 lease = 8; + // leave room for more target_union field tags, jump to 64 } + + // range_end compares the given target to all keys in the range [key, range_end). + // See RangeRequest for more details on key ranges. + bytes range_end = 64; + // TODO: fill out with most of the rest of RangeRequest fields when needed. } // From google paxosdb paper: @@ -341,7 +612,7 @@ message TxnResponse { // CompactionRequest compacts the key-value store up to a given revision. All superseded keys // with a revision less than the compaction revision will be removed. message CompactionRequest { - // revision is the key-value store revision for the compaction operation. + // revision is the key-value store revision for the compaction operation. int64 revision = 1; // physical is set so the RPC will wait until the compaction is physically // applied to the local database such that compacted entries are totally @@ -356,9 +627,22 @@ message CompactionResponse { message HashRequest { } +message HashKVRequest { + // revision is the key-value store revision for the hash operation. + int64 revision = 1; +} + +message HashKVResponse { + ResponseHeader header = 1; + // hash is the hash value computed from the responding member's MVCC keys up to a given revision. + uint32 hash = 2; + // compact_revision is the compacted revision of key-value store when hash begins. + int64 compact_revision = 3; +} + message HashResponse { ResponseHeader header = 1; - // hash is the hash value computed from the responding member's key-value store. + // hash is the hash value computed from the responding member's KV's backend. uint32 hash = 2; } @@ -382,18 +666,24 @@ message WatchRequest { oneof request_union { WatchCreateRequest create_request = 1; WatchCancelRequest cancel_request = 2; + WatchProgressRequest progress_request = 3; } } message WatchCreateRequest { // key is the key to register for watching. bytes key = 1; + // range_end is the end of the range [key, range_end) to watch. If range_end is not given, // only the key argument is watched. If range_end is equal to '\0', all keys greater than // or equal to the key argument are watched. + // If the range_end is one bit larger than the given key, + // then all keys with the prefix (the given key) will be watched. bytes range_end = 2; + // start_revision is an optional revision to watch from (inclusive). No start_revision is "now". int64 start_revision = 3; + // progress_notify is set so that the etcd server will periodically send a WatchResponse with // no events to the new watcher if there are no recent events. It is useful when clients // wish to recover a disconnected watcher starting from a recent known revision. @@ -401,17 +691,28 @@ message WatchCreateRequest { bool progress_notify = 4; enum FilterType { - // filter out put event. - NOPUT = 0; - // filter out delete event. - NODELETE = 1; + // filter out put event. + NOPUT = 0; + // filter out delete event. + NODELETE = 1; } + // filters filter the events at server side before it sends back to the watcher. repeated FilterType filters = 5; // If prev_kv is set, created watcher gets the previous KV before the event happens. // If the previous KV is already compacted, nothing will be returned. bool prev_kv = 6; + + // If watch_id is provided and non-zero, it will be assigned to this watcher. + // Since creating a watcher in etcd is not a synchronous operation, + // this can be used ensure that ordering is correct when creating multiple + // watchers on the same stream. Creating a watcher with an ID already in + // use on the stream will cause an error to be returned. + int64 watch_id = 7; + + // fragment enables splitting large revisions into multiple watch responses. + bool fragment = 8; } message WatchCancelRequest { @@ -419,33 +720,47 @@ message WatchCancelRequest { int64 watch_id = 1; } +// Requests the a watch stream progress status be sent in the watch response stream as soon as +// possible. +message WatchProgressRequest { +} + message WatchResponse { ResponseHeader header = 1; // watch_id is the ID of the watcher that corresponds to the response. int64 watch_id = 2; + // created is set to true if the response is for a create watch request. // The client should record the watch_id and expect to receive events for // the created watcher from the same stream. // All events sent to the created watcher will attach with the same watch_id. bool created = 3; + // canceled is set to true if the response is for a cancel watch request. // No further events will be sent to the canceled watcher. bool canceled = 4; + // compact_revision is set to the minimum index if a watcher tries to watch // at a compacted index. // // This happens when creating a watcher at a compacted revision or the watcher cannot - // catch up with the progress of the key-value store. + // catch up with the progress of the key-value store. // // The client should treat the watcher as canceled and should not try to create any // watcher with the same start_revision again. - int64 compact_revision = 5; + int64 compact_revision = 5; + + // cancel_reason indicates the reason for canceling the watcher. + string cancel_reason = 6; + + // framgment is true if large watch response was split over multiple responses. + bool fragment = 7; repeated mvccpb.Event events = 11; } message LeaseGrantRequest { - // TTL is the advisory time-to-live in seconds. + // TTL is the advisory time-to-live in seconds. Expired lease will return -1. int64 TTL = 1; // ID is the requested ID for the lease. If ID is set to 0, the lessor chooses an ID. int64 ID = 2; @@ -469,6 +784,22 @@ message LeaseRevokeResponse { ResponseHeader header = 1; } +message LeaseCheckpoint { + // ID is the lease ID to checkpoint. + int64 ID = 1; + + // Remaining_TTL is the remaining time until expiry of the lease. + int64 remaining_TTL = 2; +} + +message LeaseCheckpointRequest { + repeated LeaseCheckpoint checkpoints = 1; +} + +message LeaseCheckpointResponse { + ResponseHeader header = 1; +} + message LeaseKeepAliveRequest { // ID is the lease ID for the lease to keep alive. int64 ID = 1; @@ -482,6 +813,38 @@ message LeaseKeepAliveResponse { int64 TTL = 3; } +message LeaseTimeToLiveRequest { + // ID is the lease ID for the lease. + int64 ID = 1; + // keys is true to query all the keys attached to this lease. + bool keys = 2; +} + +message LeaseTimeToLiveResponse { + ResponseHeader header = 1; + // ID is the lease ID from the keep alive request. + int64 ID = 2; + // TTL is the remaining TTL in seconds for the lease; the lease will expire in under TTL+1 seconds. + int64 TTL = 3; + // GrantedTTL is the initial granted time in seconds upon lease creation/renewal. + int64 grantedTTL = 4; + // Keys is the list of keys attached to this lease. + repeated bytes keys = 5; +} + +message LeaseLeasesRequest { +} + +message LeaseStatus { + int64 ID = 1; + // TODO: int64 TTL = 2; +} + +message LeaseLeasesResponse { + ResponseHeader header = 1; + repeated LeaseStatus leases = 2; +} + message Member { // ID is the member ID for this member. uint64 ID = 1; @@ -491,17 +854,23 @@ message Member { repeated string peerURLs = 3; // clientURLs is the list of URLs the member exposes to clients for communication. If the member is not started, clientURLs will be empty. repeated string clientURLs = 4; + // isLearner indicates if the member is raft learner. + bool isLearner = 5; } message MemberAddRequest { // peerURLs is the list of URLs the added member will use to communicate with the cluster. repeated string peerURLs = 1; + // isLearner indicates if the added member is raft learner. + bool isLearner = 2; } message MemberAddResponse { ResponseHeader header = 1; // member is the member information for the added member. Member member = 2; + // members is a list of all members after adding the new member. + repeated Member members = 3; } message MemberRemoveRequest { @@ -511,6 +880,8 @@ message MemberRemoveRequest { message MemberRemoveResponse { ResponseHeader header = 1; + // members is a list of all members after removing the member. + repeated Member members = 2; } message MemberUpdateRequest { @@ -522,6 +893,8 @@ message MemberUpdateRequest { message MemberUpdateResponse{ ResponseHeader header = 1; + // members is a list of all members after updating the member. + repeated Member members = 2; } message MemberListRequest { @@ -533,6 +906,17 @@ message MemberListResponse { repeated Member members = 2; } +message MemberPromoteRequest { + // ID is the member ID of the member to promote. + uint64 ID = 1; +} + +message MemberPromoteResponse { + ResponseHeader header = 1; + // members is a list of all members after promoting the member. + repeated Member members = 2; +} + message DefragmentRequest { } @@ -540,9 +924,19 @@ message DefragmentResponse { ResponseHeader header = 1; } +message MoveLeaderRequest { + // targetID is the node ID for the new leader. + uint64 targetID = 1; +} + +message MoveLeaderResponse { + ResponseHeader header = 1; +} + enum AlarmType { NONE = 0; // default, used to query if any alarm is active NOSPACE = 1; // space quota is exhausted + CORRUPT = 2; // kv store corruption detected } message AlarmRequest { @@ -582,14 +976,22 @@ message StatusResponse { ResponseHeader header = 1; // version is the cluster protocol version used by the responding member. string version = 2; - // dbSize is the size of the backend database, in bytes, of the responding member. + // dbSize is the size of the backend database physically allocated, in bytes, of the responding member. int64 dbSize = 3; // leader is the member ID which the responding member believes is the current leader. uint64 leader = 4; - // raftIndex is the current raft index of the responding member. + // raftIndex is the current raft committed index of the responding member. uint64 raftIndex = 5; // raftTerm is the current raft term of the responding member. uint64 raftTerm = 6; + // raftAppliedIndex is the current raft applied index of the responding member. + uint64 raftAppliedIndex = 7; + // errors contains alarm/health information and status. + repeated string errors = 8; + // dbSizeInUse is the size of the backend database logically in use, in bytes, of the responding member. + int64 dbSizeInUse = 9; + // isLearner indicates if the member is raft learner. + bool isLearner = 10; } message AuthEnableRequest { @@ -606,6 +1008,7 @@ message AuthenticateRequest { message AuthUserAddRequest { string name = 1; string password = 2; + authpb.UserAddOptions options = 3; } message AuthUserGetRequest { @@ -664,8 +1067,8 @@ message AuthRoleGrantPermissionRequest { message AuthRoleRevokePermissionRequest { string role = 1; - string key = 2; - string range_end = 3; + bytes key = 2; + bytes range_end = 3; } message AuthEnableResponse { diff --git a/proto/v3lock.proto b/proto/v3lock.proto new file mode 100644 index 0000000..fb069e1 --- /dev/null +++ b/proto/v3lock.proto @@ -0,0 +1,65 @@ +syntax = "proto3"; +package v3lockpb; + +import "gogoproto/gogo.proto"; +import "rpc.proto"; + +// for grpc-gateway +import "google/api/annotations.proto"; + +option (gogoproto.marshaler_all) = true; +option (gogoproto.unmarshaler_all) = true; + +// The lock service exposes client-side locking facilities as a gRPC interface. +service Lock { + // Lock acquires a distributed shared lock on a given named lock. + // On success, it will return a unique key that exists so long as the + // lock is held by the caller. This key can be used in conjunction with + // transactions to safely ensure updates to etcd only occur while holding + // lock ownership. The lock is held until Unlock is called on the key or the + // lease associate with the owner expires. + rpc Lock(LockRequest) returns (LockResponse) { + option (google.api.http) = { + post: "/v3/lock/lock" + body: "*" + }; + } + + // Unlock takes a key returned by Lock and releases the hold on lock. The + // next Lock caller waiting for the lock will then be woken up and given + // ownership of the lock. + rpc Unlock(UnlockRequest) returns (UnlockResponse) { + option (google.api.http) = { + post: "/v3/lock/unlock" + body: "*" + }; + } +} + +message LockRequest { + // name is the identifier for the distributed shared lock to be acquired. + bytes name = 1; + // lease is the ID of the lease that will be attached to ownership of the + // lock. If the lease expires or is revoked and currently holds the lock, + // the lock is automatically released. Calls to Lock with the same lease will + // be treated as a single acquisition; locking twice with the same lease is a + // no-op. + int64 lease = 2; +} + +message LockResponse { + etcdserverpb.ResponseHeader header = 1; + // key is a key that will exist on etcd for the duration that the Lock caller + // owns the lock. Users should not modify this key or the lock may exhibit + // undefined behavior. + bytes key = 2; +} + +message UnlockRequest { + // key is the lock ownership key granted by Lock. + bytes key = 1; +} + +message UnlockResponse { + etcdserverpb.ResponseHeader header = 1; +} diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 183a0fb..d27cd6c 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1,7 +1,18 @@ -add_library(etcd-cpp-api SHARED ../proto/kv.pb.cc ../proto/auth.pb.cc ../proto/rpc.pb.cc ../proto/rpc.grpc.pb.cc ../v3/src/AsyncTxnResponse.cpp ../v3/src/AsyncRangeResponse.cpp ../v3/src/Transaction.cpp ../v3/src/action_constants.cpp ../v3/src/AsyncSetAction.cpp ../v3/src/AsyncCompareAndSwapAction.cpp ../v3/src/AsyncUpdateAction.cpp ../v3/src/AsyncGetAction.cpp ../v3/src/AsyncDeleteAction.cpp ../v3/src/AsyncCompareAndDeleteAction.cpp ../v3/src/Action.cpp ../v3/src/AsyncWatchAction.cpp ../v3/src/V3Response.cpp ../v3/src/AsyncDeleteRangeResponse.cpp ../v3/src/AsyncWatchResponse.cpp ../v3/src/AsyncLeaseGrantResponse.cpp ../v3/src/AsyncLeaseGrantAction.cpp ../v3/src/KeyValue.cpp Client.cpp Response.cpp Value.cpp SyncClient.cpp Watcher.cpp) -set_property(TARGET etcd-cpp-api PROPERTY CXX_STANDARD 11) +file(GLOB_RECURSE CPP_CLIENT_SRC RELATIVE "${CMAKE_SOURCE_DIR}/src" + "${CMAKE_SOURCE_DIR}/proto/*.cc" + "${CMAKE_SOURCE_DIR}/src/*.cpp" + "${CMAKE_SOURCE_DIR}/v3/src/*.cpp") -target_link_libraries(etcd-cpp-api ${CPPREST_LIB} boost_system ssl crypto protobuf grpc++) +add_library(etcd-cpp-api SHARED ${CPP_CLIENT_SRC}) +set_property(TARGET etcd-cpp-api PROPERTY CXX_STANDARD 11) +target_include_directories(etcd-cpp-api PRIVATE ${CMAKE_SOURCE_DIR}/proto) + +target_link_libraries(etcd-cpp-api PUBLIC + ${CPPREST_LIB} + ${Boost_LIBRARIES} + ${PROTOBUF_LIBRARIES} + ${OPENSSL_LIBRARIES} + ${GRPC_LIBRARIES}) install (TARGETS etcd-cpp-api DESTINATION lib) install (FILES ../etcd/Client.hpp @@ -10,4 +21,18 @@ install (FILES ../etcd/Client.hpp ../etcd/Value.hpp ../etcd/Watcher.hpp DESTINATION include/etcd) +install (FILES ../proto/auth.pb.h + ../proto/kv.pb.h + ../proto/rpc.pb.h + ../proto/rpc.grpc.pb.h + ../proto/v3lock.pb.h + ../proto/v3lock.grpc.pb.h + DESTINATION include/etcd/proto) +install (FILES ../v3/include/Transaction.hpp + DESTINATION include/etcd/v3/include) +install (FILES ../proto/gogoproto/gogo.pb.h + DESTINATION include/etcd/proto/gogoproto) +install (FILES ../proto/google/api/annotations.pb.h + ../proto/google/api/http.pb.h + DESTINATION include/etcd/proto/google/api) diff --git a/src/Client.cpp b/src/Client.cpp index 4bf24b6..1e0ef70 100644 --- a/src/Client.cpp +++ b/src/Client.cpp @@ -1,10 +1,12 @@ #include #include "etcd/Client.hpp" #include "v3/include/action_constants.hpp" +#include "v3/include/Action.hpp" #include "v3/include/AsyncTxnResponse.hpp" #include "v3/include/AsyncRangeResponse.hpp" #include "v3/include/AsyncWatchResponse.hpp" #include "v3/include/AsyncDeleteRangeResponse.hpp" +#include "v3/include/AsyncLockResponse.hpp" #include "v3/include/Transaction.hpp" #include @@ -16,6 +18,8 @@ #include "v3/include/AsyncDeleteAction.hpp" #include "v3/include/AsyncWatchAction.hpp" #include "v3/include/AsyncLeaseGrantAction.hpp" +#include "v3/include/AsyncLockAction.hpp" +#include "v3/include/AsyncTxnAction.hpp" using grpc::Channel; @@ -32,9 +36,11 @@ etcd::Client::Client(std::string const & address) stripped_address = address.substr(i+substr.length()); } std::shared_ptr channel = grpc::CreateChannel(stripped_address, grpc::InsecureChannelCredentials()); + std::cout << "channel is: " << channel << std::endl; stub_= KV::NewStub(channel); watchServiceStub= Watch::NewStub(channel); leaseServiceStub= Lease::NewStub(channel); + lockServiceStub = Lock::NewStub(channel); } @@ -325,5 +331,25 @@ pplx::task etcd::Client::leasegrant(int ttl) return Response::create(call); } +pplx::task etcd::Client::lock(std::string const &key) { + etcdv3::ActionParameters params; + params.key = key; + params.lock_stub = lockServiceStub.get(); + std::shared_ptr call(new etcdv3::AsyncLockAction(params)); + return Response::create(call); +} +pplx::task etcd::Client::unlock(std::string const &key) { + etcdv3::ActionParameters params; + params.key = key; + params.lock_stub = lockServiceStub.get(); + std::shared_ptr call(new etcdv3::AsyncUnlockAction(params)); + return Response::create(call); +} +pplx::task etcd::Client::txn(etcdv3::Transaction const &txn) { + etcdv3::ActionParameters params; + params.kv_stub = stub_.get(); + std::shared_ptr call(new etcdv3::AsyncTxnAction(params, txn)); + return Response::create(call); +} diff --git a/src/Response.cpp b/src/Response.cpp index cc20599..06774ac 100644 --- a/src/Response.cpp +++ b/src/Response.cpp @@ -1,4 +1,5 @@ #include "etcd/Response.hpp" +#include "v3/include/V3Response.hpp" #include @@ -25,6 +26,8 @@ etcd::Response::Response(const etcdv3::V3Response& reply) _prev_value = Value(reply.get_prev_value()); + _lock_key = reply.get_lock_key(); + _events = reply.get_events(); } @@ -95,3 +98,11 @@ std::string const & etcd::Response::key(int index) const { return _keys[index]; } + +std::string const & etcd::Response::lock_key() const { + return _lock_key; +} + +std::vector const & etcd::Response::events() const { + return this->_events; +}; diff --git a/src/Watcher.cpp b/src/Watcher.cpp index 600083d..ec7fa50 100644 --- a/src/Watcher.cpp +++ b/src/Watcher.cpp @@ -1,4 +1,5 @@ #include "etcd/Watcher.hpp" +#include "v3/include/AsyncWatchAction.hpp" etcd::Watcher::Watcher(std::string const & address, std::string const & key, std::function callback) { diff --git a/tst/CMakeLists.txt b/tst/CMakeLists.txt index e0ce264..13ed48d 100644 --- a/tst/CMakeLists.txt +++ b/tst/CMakeLists.txt @@ -1,8 +1,12 @@ -find_path(CATCH_INCLUDE_DIR NAMES catch.hpp) +find_path(CATCH_INCLUDE_DIR NAMES catch.hpp PATHS ${PROJECT_SOURCE_DIR}) include_directories(${CATCH_INCLUDE_DIR}) -add_executable(etcd_test EtcdTest.cpp EtcdSyncTest.cpp WatcherTest.cpp) +add_executable(etcd_test EtcdTest.cpp + EtcdSyncTest.cpp + WatcherTest.cpp + LockTest.cpp) set_property(TARGET etcd_test PROPERTY CXX_STANDARD 11) +target_include_directories(etcd_test PRIVATE ${CMAKE_SOURCE_DIR}/proto) target_link_libraries(etcd_test etcd-cpp-api) diff --git a/tst/EtcdTest.cpp b/tst/EtcdTest.cpp index fb7c4fd..a0b6649 100644 --- a/tst/EtcdTest.cpp +++ b/tst/EtcdTest.cpp @@ -343,6 +343,21 @@ TEST_CASE("watch changes in the past") CHECK("45" == res.value().as_string()); } +TEST_CASE("watch multiple keys and use promise") { + etcd::Client etcd("http://127.0.0.1:2379"); + + int start_index = etcd.add("/test/key1", "value1").get().index(); + etcd.add("/test/key2", "value2").get(); + + pplx::task res = etcd.watch("/test", start_index, true) + .then([](pplx::task const &resp_task) -> size_t { + auto const &resp = resp_task.get(); + return resp.events().size(); + }); + size_t event_size = res.get(); + CHECK(2 == event_size); +} + TEST_CASE("lease grant") { etcd::Client etcd("http://127.0.0.1:2379"); diff --git a/tst/LockTest.cpp b/tst/LockTest.cpp new file mode 100644 index 0000000..8cb5950 --- /dev/null +++ b/tst/LockTest.cpp @@ -0,0 +1,78 @@ +#include +#include +#include +#include + +#include "etcd/Client.hpp" + + +TEST_CASE("lock and unlock") +{ + etcd::Client etcd("http://127.0.0.1:2379"); + + // lock + etcd::Response resp1 = etcd.lock("/test/abcd").get(); + CHECK("lock" == resp1.action()); + REQUIRE(resp1.is_ok()); + REQUIRE(0 == resp1.error_code()); + + // unlock + etcd::Response resp2 = etcd.unlock(resp1.lock_key()).get(); + CHECK("unlock" == resp2.action()); + REQUIRE(resp2.is_ok()); + REQUIRE(0 == resp2.error_code()); +} + + +TEST_CASE("double lock will fail") +{ + etcd::Client etcd("http://127.0.0.1:2379"); + + // lock + etcd::Response resp1 = etcd.lock("/test/abcd").get(); + CHECK("lock" == resp1.action()); + REQUIRE(resp1.is_ok()); + REQUIRE(0 == resp1.error_code()); + + bool first_lock_release = false; + std::string lock_key = resp1.lock_key(); + + auto second_lock_thr = std::thread([&](){ + // lock again + etcd::Response resp2 = etcd.lock("/test/abcd").get(); + CHECK("lock" == resp2.action()); + REQUIRE(resp2.is_ok()); + REQUIRE(0 == resp2.error_code()); + lock_key = resp2.lock_key(); + + // will success after first lock released. + REQUIRE(first_lock_release); + }); + + auto first_lock_thr = std::thread([&]() { + // check the lock key exists. + etcd::Response resp3 = etcd.get(resp1.lock_key()).get(); + CHECK("get" == resp3.action()); + REQUIRE(resp3.is_ok()); + REQUIRE(0 == resp3.error_code()); + + // create a duration + first_lock_release = true; + std::this_thread::sleep_for(std::chrono::seconds(1)); + + // unlock the first lock + etcd::Response resp4 = etcd.unlock(lock_key).get(); + CHECK("unlock" == resp4.action()); + REQUIRE(resp4.is_ok()); + REQUIRE(0 == resp4.error_code()); + }); + + first_lock_thr.join(); + second_lock_thr.join(); + + // cleanup: unlock the second lock + etcd::Response resp5 = etcd.unlock(lock_key).get(); + CHECK("unlock" == resp5.action()); + REQUIRE(resp5.is_ok()); + REQUIRE(0 == resp5.error_code()); +} diff --git a/tst/WatcherTest.cpp b/tst/WatcherTest.cpp index f0d8043..7cbcca2 100644 --- a/tst/WatcherTest.cpp +++ b/tst/WatcherTest.cpp @@ -59,64 +59,8 @@ TEST_CASE("create watcher") } CHECK(2 == watcher_called); -// TEST_CASE("wait for a value change") -// { -// etcd::Client etcd(etcd_uri); -// etcd.set("/test/key1", "42").wait(); - -// pplx::task res = etcd.watch("/test/key1"); -// CHECK(!res.is_done()); - -// etcd.set("/test/key1", "43").get(); -// sleep(1); - -// REQUIRE(res.is_done()); -// REQUIRE("set" == res.get().action()); -// CHECK("43" == res.get().value().as_string()); -// } - -// TEST_CASE("wait for a directory change") -// { -// etcd::Client etcd(etcd_uri); - -// pplx::task res = etcd.watch("/test", true); - -// etcd.add("/test/key4", "44").wait(); -// REQUIRE(res.is_done()); -// CHECK("create" == res.get().action()); -// CHECK("44" == res.get().value().as_string()); - -// pplx::task res2 = etcd.watch("/test", true); - -// etcd.set("/test/key4", "45").wait(); -// sleep(1); -// REQUIRE(res2.is_done()); -// CHECK("set" == res2.get().action()); -// CHECK("45" == res2.get().value().as_string()); -// } - -// TEST_CASE("watch changes in the past") -// { -// etcd::Client etcd(etcd_uri); - -// int index = etcd.set("/test/key1", "42").get().index(); - -// etcd.set("/test/key1", "43").wait(); -// etcd.set("/test/key1", "44").wait(); -// etcd.set("/test/key1", "45").wait(); - -// etcd::Response res = etcd.watch("/test/key1", ++index).get(); -// CHECK("set" == res.action()); -// CHECK("43" == res.value().as_string()); - -// res = etcd.watch("/test/key1", ++index).get(); -// CHECK("set" == res.action()); -// CHECK("44" == res.value().as_string()); - -// res = etcd.watch("/test", ++index, true).get(); -// CHECK("set" == res.action()); -// CHECK("45" == res.value().as_string()); -// } + etcd.rmdir("/test", true).error_code(); +} // TEST_CASE("request cancellation") // { @@ -143,5 +87,4 @@ TEST_CASE("create watcher") // std::cout << "std::exception: " << ex.what() << "\n"; // } // } - etcd.rmdir("/test", true).error_code(); -} + diff --git a/v3/include/Action.hpp b/v3/include/Action.hpp index 7edec8f..949c4f0 100644 --- a/v3/include/Action.hpp +++ b/v3/include/Action.hpp @@ -3,6 +3,7 @@ #include #include "proto/rpc.grpc.pb.h" +#include "proto/v3lock.grpc.pb.h" using grpc::ClientContext; using grpc::CompletionQueue; @@ -11,6 +12,7 @@ using grpc::Status; using etcdserverpb::KV; using etcdserverpb::Watch; using etcdserverpb::Lease; +using v3lockpb::Lock; namespace etcdv3 { @@ -34,6 +36,7 @@ namespace etcdv3 KV::Stub* kv_stub; Watch::Stub* watch_stub; Lease::Stub* lease_stub; + Lock::Stub* lock_stub; }; class Action diff --git a/v3/include/AsyncLockAction.hpp b/v3/include/AsyncLockAction.hpp new file mode 100644 index 0000000..1508c15 --- /dev/null +++ b/v3/include/AsyncLockAction.hpp @@ -0,0 +1,41 @@ +#ifndef __ASYNC_LOCKACTION_HPP__ +#define __ASYNC_LOCKACTION_HPP__ + +#include +#include "proto/v3lock.grpc.pb.h" +#include "v3/include/Action.hpp" +#include "v3/include/AsyncLockResponse.hpp" +#include "etcd/Response.hpp" + + +using grpc::ClientAsyncResponseReader; +using v3lockpb::LockRequest; +using v3lockpb::LockResponse; +using v3lockpb::UnlockRequest; +using v3lockpb::UnlockResponse; + + +namespace etcdv3 +{ + class AsyncLockAction : public etcdv3::Action + { + public: + AsyncLockAction(etcdv3::ActionParameters param); + AsyncLockResponse ParseResponse(); + private: + LockResponse reply; + std::unique_ptr> response_reader; + }; + + class AsyncUnlockAction : public etcdv3::Action + { + public: + AsyncUnlockAction(etcdv3::ActionParameters param); + AsyncUnlockResponse ParseResponse(); + private: + UnlockResponse reply; + std::unique_ptr> response_reader; + }; +} + +#endif diff --git a/v3/include/AsyncLockResponse.hpp b/v3/include/AsyncLockResponse.hpp new file mode 100644 index 0000000..15186a9 --- /dev/null +++ b/v3/include/AsyncLockResponse.hpp @@ -0,0 +1,33 @@ +#ifndef __ASYNC_LOCK_HPP__ +#define __ASYNC_LOCK_HPP__ + +#include +#include "proto/v3lock.grpc.pb.h" +#include "v3/include/V3Response.hpp" + + +using grpc::ClientAsyncResponseReader; +using v3lockpb::LockRequest; +using v3lockpb::LockResponse; +using v3lockpb::UnlockRequest; +using v3lockpb::UnlockResponse; + +namespace etcdv3 +{ + class AsyncLockResponse : public etcdv3::V3Response + { + public: + AsyncLockResponse(){}; + void ParseResponse(LockResponse& resp); + }; + + class AsyncUnlockResponse : public etcdv3::V3Response + { + public: + AsyncUnlockResponse(){}; + void ParseResponse(UnlockResponse& resp); + }; +} + +#endif + diff --git a/v3/include/AsyncTxnAction.hpp b/v3/include/AsyncTxnAction.hpp new file mode 100644 index 0000000..8d15c95 --- /dev/null +++ b/v3/include/AsyncTxnAction.hpp @@ -0,0 +1,28 @@ +#ifndef __ASYNC_TXNACTION_HPP__ +#define __ASYNC_TXNACTION_HPP__ + +#include +#include "proto/rpc.grpc.pb.h" +#include "v3/include/Action.hpp" +#include "v3/include/AsyncTxnResponse.hpp" +#include "v3/include/Transaction.hpp" + + +using grpc::ClientAsyncResponseReader; +using etcdserverpb::TxnResponse; +using etcdserverpb::KV; + +namespace etcdv3 +{ + class AsyncTxnAction : public etcdv3::Action + { + public: + AsyncTxnAction(etcdv3::ActionParameters param, etcdv3::Transaction const &tx); + AsyncTxnResponse ParseResponse(); + private: + TxnResponse reply; + std::unique_ptr> response_reader; + }; +} + +#endif diff --git a/v3/include/AsyncTxnResponse.hpp b/v3/include/AsyncTxnResponse.hpp index 4a2764f..d1e0d98 100644 --- a/v3/include/AsyncTxnResponse.hpp +++ b/v3/include/AsyncTxnResponse.hpp @@ -12,7 +12,8 @@ namespace etcdv3 { public: AsyncTxnResponse(){}; - void ParseResponse(std::string const& key, bool prefix,TxnResponse& resp); + void ParseResponse(TxnResponse& resp); + void ParseResponse(std::string const& key, bool prefix, TxnResponse& resp); }; } diff --git a/v3/include/Transaction.hpp b/v3/include/Transaction.hpp index f15c2be..b9de935 100644 --- a/v3/include/Transaction.hpp +++ b/v3/include/Transaction.hpp @@ -24,10 +24,14 @@ public: void setup_delete_sequence(std::string const &key, std::string const &range_end, bool recursive); void setup_delete_failure_operation(std::string const &key, std::string const &range_end, bool recursive); void setup_compare_and_delete_operation(std::string const& key); - void setup_lease_grant_operation(int ttl); + void setup_lease_grant_operation(int ttl); + + // update without `get` and no `prev_kv` returned + void setup_put(std::string const &key, std::string const &value); + void setup_delete(std::string const &key); etcdserverpb::TxnRequest txn_request; - etcdserverpb::LeaseGrantRequest leasegrant_request; + etcdserverpb::LeaseGrantRequest leasegrant_request; private: std::string key; diff --git a/v3/include/V3Response.hpp b/v3/include/V3Response.hpp index a293adb..a82351f 100644 --- a/v3/include/V3Response.hpp +++ b/v3/include/V3Response.hpp @@ -2,6 +2,8 @@ #define __V3_RESPONSE_HPP__ #include +#include "proto/kv.pb.h" + #include "v3/include/KeyValue.hpp" namespace etcdv3 @@ -22,6 +24,9 @@ namespace etcdv3 etcdv3::KeyValue const & get_value() const; etcdv3::KeyValue const & get_prev_value() const; bool has_values() const; + void set_lock_key(std::string const &key); + std::string const &get_lock_key() const; + std::vector const & get_events() const; protected: int error_code; int index; @@ -31,6 +36,8 @@ namespace etcdv3 etcdv3::KeyValue prev_value; std::vector values; std::vector prev_values; + std::string lock_key; // for lock + std::vector events; // for watch }; } #endif diff --git a/v3/include/action_constants.hpp b/v3/include/action_constants.hpp index ea7c347..5c3fdc5 100644 --- a/v3/include/action_constants.hpp +++ b/v3/include/action_constants.hpp @@ -10,7 +10,9 @@ namespace etcdv3 extern char const * DELETE_ACTION; extern char const * COMPARESWAP_ACTION; extern char const * COMPAREDELETE_ACTION; - + extern char const * LOCK_ACTION; + extern char const * UNLOCK_ACTION; + extern char const * TXN_ACTION; } #endif diff --git a/v3/src/AsyncLockAction.cpp b/v3/src/AsyncLockAction.cpp new file mode 100644 index 0000000..448fa97 --- /dev/null +++ b/v3/src/AsyncLockAction.cpp @@ -0,0 +1,63 @@ +#include "v3/include/AsyncLockAction.hpp" +#include "v3/include/action_constants.hpp" + +using v3lockpb::LockRequest; +using v3lockpb::UnlockRequest; + +etcdv3::AsyncLockAction::AsyncLockAction(ActionParameters param) + : etcdv3::Action(param) +{ + LockRequest lock_request; + lock_request.set_name(parameters.key); + + response_reader = parameters.lock_stub->AsyncLock(&context, lock_request, &cq_); + response_reader->Finish(&reply, &status, (void*)this); +} + +etcdv3::AsyncLockResponse etcdv3::AsyncLockAction::ParseResponse() +{ + AsyncLockResponse lock_resp; + + if(!status.ok()) + { + std::cout << "lock error message is: " << status.error_message() << std::endl; + lock_resp.set_error_code(status.error_code()); + lock_resp.set_error_message(status.error_message()); + } + else + { + lock_resp.ParseResponse(reply); + lock_resp.set_action(etcdv3::LOCK_ACTION); + } + + return lock_resp; +} + +etcdv3::AsyncUnlockAction::AsyncUnlockAction(ActionParameters param) + : etcdv3::Action(param) +{ + UnlockRequest unlock_request; + unlock_request.set_key(parameters.key); + + response_reader = parameters.lock_stub->AsyncUnlock(&context, unlock_request, &cq_); + response_reader->Finish(&reply, &status, (void*)this); +} + +etcdv3::AsyncUnlockResponse etcdv3::AsyncUnlockAction::ParseResponse() +{ + AsyncUnlockResponse unlock_resp; + + if(!status.ok()) + { + std::cout << "unlock error message is: " << status.error_message() << std::endl; + unlock_resp.set_error_code(status.error_code()); + unlock_resp.set_error_message(status.error_message()); + } + else + { + unlock_resp.ParseResponse(reply); + unlock_resp.set_action(etcdv3::UNLOCK_ACTION); + } + + return unlock_resp; +} diff --git a/v3/src/AsyncLockResponse.cpp b/v3/src/AsyncLockResponse.cpp new file mode 100644 index 0000000..75a6503 --- /dev/null +++ b/v3/src/AsyncLockResponse.cpp @@ -0,0 +1,14 @@ +#include "v3/include/AsyncLockResponse.hpp" +#include "v3/include/action_constants.hpp" + + +void etcdv3::AsyncLockResponse::ParseResponse(LockResponse& resp) +{ + index = resp.header().revision(); + lock_key = resp.key(); +} + +void etcdv3::AsyncUnlockResponse::ParseResponse(UnlockResponse& resp) +{ + index = resp.header().revision(); +} diff --git a/v3/src/AsyncSetAction.cpp b/v3/src/AsyncSetAction.cpp index ef374bf..3425054 100644 --- a/v3/src/AsyncSetAction.cpp +++ b/v3/src/AsyncSetAction.cpp @@ -10,7 +10,7 @@ etcdv3::AsyncSetAction::AsyncSetAction(etcdv3::ActionParameters param, bool crea etcdv3::Transaction transaction(parameters.key); isCreate = create; transaction.init_compare(Compare::CompareResult::Compare_CompareResult_EQUAL, - Compare::CompareTarget::Compare_CompareTarget_VERSION); + Compare::CompareTarget::Compare_CompareTarget_VERSION); transaction.setup_basic_create_sequence(parameters.key, parameters.value, parameters.lease_id); diff --git a/v3/src/AsyncTxnAction.cpp b/v3/src/AsyncTxnAction.cpp new file mode 100644 index 0000000..08dd5c6 --- /dev/null +++ b/v3/src/AsyncTxnAction.cpp @@ -0,0 +1,37 @@ +#include "v3/include/action_constants.hpp" +#include "v3/include/AsyncTxnAction.hpp" +#include "v3/include/Transaction.hpp" + + +etcdv3::AsyncTxnAction::AsyncTxnAction(etcdv3::ActionParameters param, etcdv3::Transaction const &tx) + : etcdv3::Action(param) +{ + response_reader = parameters.kv_stub->AsyncTxn(&context, tx.txn_request, &cq_); + response_reader->Finish(&reply, &status, (void *)this); +} + +etcdv3::AsyncTxnResponse etcdv3::AsyncTxnAction::ParseResponse() +{ + AsyncTxnResponse txn_resp; + + if(!status.ok()) + { + txn_resp.set_error_code(status.error_code()); + txn_resp.set_error_message(status.error_message()); + } + else + { + txn_resp.ParseResponse(parameters.key, parameters.withPrefix, reply); + txn_resp.set_action(etcdv3::TXN_ACTION); + + //if there is an error code returned by parseResponse, we must + //not overwrite it. + if(!reply.succeeded() && !txn_resp.get_error_code()) + { + txn_resp.set_error_code(101); + txn_resp.set_error_message("compare failed"); + } + } + + return txn_resp; +} diff --git a/v3/src/AsyncTxnResponse.cpp b/v3/src/AsyncTxnResponse.cpp index 51da367..b3b08a0 100644 --- a/v3/src/AsyncTxnResponse.cpp +++ b/v3/src/AsyncTxnResponse.cpp @@ -5,6 +5,10 @@ using etcdserverpb::ResponseOp; +void etcdv3::AsyncTxnResponse::ParseResponse(TxnResponse& reply) { + index = reply.header().revision(); +} + void etcdv3::AsyncTxnResponse::ParseResponse(std::string const& key, bool prefix, TxnResponse& reply) { index = reply.header().revision(); diff --git a/v3/src/AsyncUpdateAction.cpp b/v3/src/AsyncUpdateAction.cpp index 8b76688..310f3c4 100644 --- a/v3/src/AsyncUpdateAction.cpp +++ b/v3/src/AsyncUpdateAction.cpp @@ -15,7 +15,7 @@ etcdv3::AsyncUpdateAction::AsyncUpdateAction(etcdv3::ActionParameters param) { etcdv3::Transaction transaction(parameters.key); transaction.init_compare(Compare::CompareResult::Compare_CompareResult_GREATER, - Compare::CompareTarget::Compare_CompareTarget_VERSION); + Compare::CompareTarget::Compare_CompareTarget_VERSION); transaction.setup_compare_and_swap_sequence(parameters.value, parameters.lease_id); diff --git a/v3/src/AsyncWatchAction.cpp b/v3/src/AsyncWatchAction.cpp index e9fbab3..5f7dcbd 100644 --- a/v3/src/AsyncWatchAction.cpp +++ b/v3/src/AsyncWatchAction.cpp @@ -27,8 +27,22 @@ etcdv3::AsyncWatchAction::AsyncWatchAction(etcdv3::ActionParameters param) } watch_req.mutable_create_request()->CopyFrom(watch_create_req); - stream->Write(watch_req, (void*)"write"); - stream->Read(&reply, (void*)this); + + // wait "create" success (the stream becomes ready) + void *got_tag; + bool ok = false; + if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)"create") { + stream->Write(watch_req, (void *)"write"); + } else { + throw std::runtime_error("failed to create a watch connection"); + } + + // wait "write" success + if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)"write") { + stream->Read(&reply, (void*)this); + } else { + throw std::runtime_error("failed to read proper reply from server"); + } } @@ -39,10 +53,15 @@ void etcdv3::AsyncWatchAction::waitForResponse() while(cq_.Next(&got_tag, &ok)) { - if(ok == false || (got_tag == (void*)"writes done")) + if(ok == false) { break; } + if(got_tag == (void*)"writes done") { + isCancelled = true; + cq_.Shutdown(); + break; + } if(got_tag == (void*)this) // read tag { if(reply.events_size()) @@ -79,6 +98,8 @@ void etcdv3::AsyncWatchAction::waitForResponse(std::function