diff --git a/CMakeLists.txt b/CMakeLists.txt index d31398077..d74123ade 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -226,7 +226,7 @@ endif() if(FLATBUFFERS_BUILD_GRPCTEST) if(CMAKE_COMPILER_IS_GNUCXX) - set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-unused-parameter") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-unused-parameter -Wno-shadow") endif() add_executable(grpctest ${FlatBuffers_GRPCTest_SRCS}) target_link_libraries(grpctest grpc++_unsecure pthread dl) diff --git a/docs/source/doxyfile b/docs/source/doxyfile index 770da9f2b..64af6710a 100755 --- a/docs/source/doxyfile +++ b/docs/source/doxyfile @@ -765,6 +765,7 @@ INPUT = "FlatBuffers.md" \ "../../CONTRIBUTING.md" \ "Tutorial.md" \ "GoApi.md" \ + "gRPC/CppUsage.md" \ "groups" \ "../../java/com/google/flatbuffers" \ "../../python/flatbuffers/builder.py" \ @@ -883,21 +884,21 @@ EXCLUDE_SYMBOLS = # that contain example code fragments that are included (see the \include # command). -EXAMPLE_PATH = "GoApi_generated.txt" +EXAMPLE_PATH = "GoApi_generated.txt" "../../grpc/samples" # If the value of the EXAMPLE_PATH tag contains directories, you can use the # EXAMPLE_PATTERNS tag to specify one or more wildcard pattern (like *.cpp and # *.h) to filter out the source-files in the directories. If left blank all # files are included. -EXAMPLE_PATTERNS = * +EXAMPLE_PATTERNS = *.cpp *.h *.txt *.fbs # If the EXAMPLE_RECURSIVE tag is set to YES then subdirectories will be # searched for input files to be used with the \include or \dontinclude commands # irrespective of the value of the RECURSIVE tag. # The default value is: NO. -EXAMPLE_RECURSIVE = NO +EXAMPLE_RECURSIVE = YES # The IMAGE_PATH tag can be used to specify one or more files or directories # that contain images that are to be included in the documentation (see the diff --git a/docs/source/doxygen_layout.xml b/docs/source/doxygen_layout.xml index 77866df3a..504d15c1b 100644 --- a/docs/source/doxygen_layout.xml +++ b/docs/source/doxygen_layout.xml @@ -39,6 +39,10 @@ title="Use in Python"/> + + + diff --git a/docs/source/gRPC/CppUsage.md b/docs/source/gRPC/CppUsage.md new file mode 100644 index 000000000..93dbb299b --- /dev/null +++ b/docs/source/gRPC/CppUsage.md @@ -0,0 +1,29 @@ +Use in C++ {#flatbuffers_grpc_guide_use_cpp} +========== + +## Before you get started + +Before diving into the FlatBuffers gRPC usage in C++, you should already be +familiar with the following: + +- FlatBuffers as a serialization format +- [gRPC](http://www.grpc.io/docs/) usage + +## Using the FlatBuffers gRPC C++ library + +NOTE: The examples below are also in the `grpc/samples/greeter` directory. + +We will illustrate usage with the following schema: + +@include grpc/samples/greeter/greeter.fbs + +When we run `flatc`, we pass in the `--grpc` option and generage an additional +`greeter.grpc.fb.h` and `greeter.grpc.fb.cc`. + +Example server code looks like this: + +@include grpc/samples/greeter/server.cpp + +Example client code looks like this: + +@include grpc/samples/greeter/client.cpp diff --git a/grpc/samples/greeter/Makefile b/grpc/samples/greeter/Makefile new file mode 100644 index 000000000..374670551 --- /dev/null +++ b/grpc/samples/greeter/Makefile @@ -0,0 +1,14 @@ +CXXFLAGS ?= -I../../../include +LDFLAGS ?= + +.PHONY: all +all: server client + +greeter_generated.h: greeter.fbs + flatc --grpc --cpp $< + +server: server.cpp greeter.grpc.fb.cc greeter_generated.h greeter.grpc.fb.h + g++ -std=c++11 -O2 $(CXXFLAGS) $(LDFLAGS) -lgpr -lgrpc -lgrpc++ server.cpp greeter.grpc.fb.cc -o $@ + +client: client.cpp greeter.grpc.fb.cc greeter_generated.h greeter.grpc.fb.h + g++ -std=c++11 -O2 $(CXXFLAGS) $(LDFLAGS) -lgpr -lgrpc -lgrpc++ client.cpp greeter.grpc.fb.cc -o $@ diff --git a/grpc/samples/greeter/client.cpp b/grpc/samples/greeter/client.cpp new file mode 100644 index 000000000..2e42f8fda --- /dev/null +++ b/grpc/samples/greeter/client.cpp @@ -0,0 +1,85 @@ +#include "greeter.grpc.fb.h" +#include "greeter_generated.h" + +#include + +#include +#include +#include + +class GreeterClient { + public: + GreeterClient(std::shared_ptr channel) + : stub_(Greeter::NewStub(channel)) {} + + std::string SayHello(const std::string &name) { + flatbuffers::grpc::MessageBuilder mb; + auto name_offset = mb.CreateString(name); + auto request_offset = CreateHelloRequest(mb, name_offset); + mb.Finish(request_offset); + auto request_msg = mb.ReleaseMessage(); + + flatbuffers::grpc::Message response_msg; + + grpc::ClientContext context; + + auto status = stub_->SayHello(&context, request_msg, &response_msg); + if (status.ok()) { + const HelloReply *response = response_msg.GetRoot(); + return response->message()->str(); + } else { + std::cerr << status.error_code() << ": " << status.error_message() + << std::endl; + return "RPC failed"; + } + } + + void SayManyHellos(const std::string &name, int num_greetings, + std::function callback) { + flatbuffers::grpc::MessageBuilder mb; + auto name_offset = mb.CreateString(name); + auto request_offset = + CreateManyHellosRequest(mb, name_offset, num_greetings); + mb.Finish(request_offset); + auto request_msg = mb.ReleaseMessage(); + + flatbuffers::grpc::Message response_msg; + + grpc::ClientContext context; + + auto stream = stub_->SayManyHellos(&context, request_msg); + while (stream->Read(&response_msg)) { + const HelloReply *response = response_msg.GetRoot(); + callback(response->message()->str()); + } + auto status = stream->Finish(); + if (!status.ok()) { + std::cerr << status.error_code() << ": " << status.error_message() + << std::endl; + callback("RPC failed"); + } + } + + private: + std::unique_ptr stub_; +}; + +int main(int argc, char **argv) { + std::string server_address("localhost:50051"); + + auto channel = + grpc::CreateChannel(server_address, grpc::InsecureChannelCredentials()); + GreeterClient greeter(channel); + + std::string name("world"); + + std::string message = greeter.SayHello(name); + std::cerr << "Greeter received: " << message << std::endl; + + int num_greetings = 10; + greeter.SayManyHellos(name, num_greetings, [](const std::string &message) { + std::cerr << "Greeter received: " << message << std::endl; + }); + + return 0; +} diff --git a/grpc/samples/greeter/greeter.fbs b/grpc/samples/greeter/greeter.fbs new file mode 100644 index 000000000..811303c93 --- /dev/null +++ b/grpc/samples/greeter/greeter.fbs @@ -0,0 +1,17 @@ +table HelloReply { + message:string; +} + +table HelloRequest { + name:string; +} + +table ManyHellosRequest { + name:string; + num_greetings:int; +} + +rpc_service Greeter { + SayHello(HelloRequest):HelloReply; + SayManyHellos(ManyHellosRequest):HelloReply (streaming: "server"); +} diff --git a/grpc/samples/greeter/server.cpp b/grpc/samples/greeter/server.cpp new file mode 100644 index 000000000..82c97dc51 --- /dev/null +++ b/grpc/samples/greeter/server.cpp @@ -0,0 +1,80 @@ +#include "greeter.grpc.fb.h" +#include "greeter_generated.h" + +#include + +#include +#include +#include + +class GreeterServiceImpl final : public Greeter::Service { + virtual grpc::Status SayHello( + grpc::ServerContext *context, + const flatbuffers::grpc::Message *request_msg, + flatbuffers::grpc::Message *response_msg) override { + // flatbuffers::grpc::MessageBuilder mb_; + // We call GetRoot to "parse" the message. Verification is already + // performed by default. See the notes below for more details. + const HelloRequest *request = request_msg->GetRoot(); + + // Fields are retrieved as usual with FlatBuffers + const std::string &name = request->name()->str(); + + // `flatbuffers::grpc::MessageBuilder` is a `FlatBufferBuilder` with a + // special allocator for efficient gRPC buffer transfer, but otherwise + // usage is the same as usual. + auto msg_offset = mb_.CreateString("Hello, " + name); + auto hello_offset = CreateHelloReply(mb_, msg_offset); + mb_.Finish(hello_offset); + + // The `ReleaseMessage()` function detaches the message from the + // builder, so we can transfer the resopnse to gRPC while simultaneously + // detaching that memory buffer from the builer. + *response_msg = mb_.ReleaseMessage(); + assert(response_msg->Verify()); + + // Return an OK status. + return grpc::Status::OK; + } + + virtual grpc::Status SayManyHellos( + grpc::ServerContext *context, + const flatbuffers::grpc::Message *request_msg, + grpc::ServerWriter> *writer) + override { + // The streaming usage below is simply a combination of standard gRPC + // streaming with the FlatBuffers usage shown above. + const ManyHellosRequest *request = request_msg->GetRoot(); + const std::string &name = request->name()->str(); + int num_greetings = request->num_greetings(); + + for (int i = 0; i < num_greetings; i++) { + auto msg_offset = mb_.CreateString("Many hellos, " + name); + auto hello_offset = CreateHelloReply(mb_, msg_offset); + mb_.Finish(hello_offset); + writer->Write(mb_.ReleaseMessage()); + } + + return grpc::Status::OK; + } + + flatbuffers::grpc::MessageBuilder mb_; +}; + +void RunServer() { + std::string server_address("0.0.0.0:50051"); + GreeterServiceImpl service; + + grpc::ServerBuilder builder; + builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); + builder.RegisterService(&service); + std::unique_ptr server(builder.BuildAndStart()); + std::cerr << "Server listening on " << server_address << std::endl; + + server->Wait(); +} + +int main(int argc, const char *argv[]) { + RunServer(); + return 0; +} diff --git a/grpc/tests/grpctest.cpp b/grpc/tests/grpctest.cpp index f0a1f5783..83b288a1c 100644 --- a/grpc/tests/grpctest.cpp +++ b/grpc/tests/grpctest.cpp @@ -27,24 +27,21 @@ using namespace MyGame::Example; // code. It implements all rpcs specified in the FlatBuffers schema. class ServiceImpl final : public MyGame::Example::MonsterStorage::Service { virtual ::grpc::Status Store(::grpc::ServerContext* context, - const flatbuffers::BufferRef *request, - flatbuffers::BufferRef *response) + const flatbuffers::grpc::Message *request, + flatbuffers::grpc::Message *response) override { // Create a response from the incoming request name. fbb_.Clear(); auto stat_offset = CreateStat(fbb_, fbb_.CreateString("Hello, " + request->GetRoot()->name()->str())); fbb_.Finish(stat_offset); - // Since we keep reusing the same FlatBufferBuilder, the memory it owns - // remains valid until the next call (this BufferRef doesn't own the - // memory it points to). - *response = flatbuffers::BufferRef(fbb_.GetBufferPointer(), - fbb_.GetSize()); + // Transfer ownership of the message to gRPC + *response = fbb_.ReleaseMessage(); return grpc::Status::OK; } virtual ::grpc::Status Retrieve(::grpc::ServerContext *context, - const flatbuffers::BufferRef *request, - ::grpc::ServerWriter< flatbuffers::BufferRef>* writer) + const flatbuffers::grpc::Message *request, + ::grpc::ServerWriter< flatbuffers::grpc::Message>* writer) override { for (int i=0; i<10; i++) { @@ -55,17 +52,16 @@ class ServiceImpl final : public MyGame::Example::MonsterStorage::Service { request->GetRoot()->id()->str() + " No." + std::to_string(i))); fbb_.Finish(monster_offset); - flatbuffers::BufferRef monsterRef( - fbb_.GetBufferPointer(), fbb_.GetSize() - ); + flatbuffers::grpc::Message monster = fbb_.ReleaseMessage(); + // Send monster to client using streaming. - writer->Write(monsterRef); + writer->Write(monster); } return grpc::Status::OK; } private: - flatbuffers::FlatBufferBuilder fbb_; + flatbuffers::grpc::MessageBuilder fbb_; }; // Track the server instance, so we can terminate it later. @@ -108,15 +104,14 @@ int main(int /*argc*/, const char * /*argv*/[]) { auto stub = MyGame::Example::MonsterStorage::NewStub(channel); - flatbuffers::FlatBufferBuilder fbb; + flatbuffers::grpc::MessageBuilder fbb; { grpc::ClientContext context; // Build a request with the name set. auto monster_offset = CreateMonster(fbb, 0, 0, 0, fbb.CreateString("Fred")); fbb.Finish(monster_offset); - auto request = flatbuffers::BufferRef(fbb.GetBufferPointer(), - fbb.GetSize()); - flatbuffers::BufferRef response; + auto request = fbb.ReleaseMessage(); + flatbuffers::grpc::Message response; // The actual RPC. auto status = stub->Store(&context, request, &response); @@ -133,11 +128,9 @@ int main(int /*argc*/, const char * /*argv*/[]) { fbb.Clear(); auto stat_offset = CreateStat(fbb, fbb.CreateString("Fred")); fbb.Finish(stat_offset); - auto request = flatbuffers::BufferRef( - fbb.GetBufferPointer(),fbb.GetSize() - ); + auto request = fbb.ReleaseMessage(); - flatbuffers::BufferRef response; + flatbuffers::grpc::Message response; auto stream = stub->Retrieve(&context, request); while (stream->Read(&response)) { auto resp = response.GetRoot()->name(); @@ -145,6 +138,21 @@ int main(int /*argc*/, const char * /*argv*/[]) { } } + #if !FLATBUFFERS_GRPC_DISABLE_AUTO_VERIFICATION + { + // Test that an invalid request errors out correctly + grpc::ClientContext context; + flatbuffers::grpc::Message request; // simulate invalid message + flatbuffers::grpc::Message response; + auto status = stub->Store(&context, request, &response); + // The rpc status should be INTERNAL to indicate a verification error. This + // matches the protobuf gRPC status code for an unparseable message. + assert(!status.ok()); + assert(status.error_code() == ::grpc::StatusCode::INTERNAL); + assert(strcmp(status.error_message().c_str(), "Message verification failed") == 0); + } + #endif + server_instance->Shutdown(); server_thread.join(); diff --git a/include/flatbuffers/base.h b/include/flatbuffers/base.h index 098182da9..04afd5a42 100644 --- a/include/flatbuffers/base.h +++ b/include/flatbuffers/base.h @@ -173,4 +173,4 @@ inline size_t PaddingBytes(size_t buf_size, size_t scalar_size) { } } -#endif // FLATBUFFERS_BASE_H_ \ No newline at end of file +#endif // FLATBUFFERS_BASE_H_ diff --git a/include/flatbuffers/flatbuffers.h b/include/flatbuffers/flatbuffers.h index 9311f8275..963f29637 100644 --- a/include/flatbuffers/flatbuffers.h +++ b/include/flatbuffers/flatbuffers.h @@ -69,7 +69,7 @@ template T EndianSwap(T t) { } } -template size_t AlignOf() { +template FLATBUFFERS_CONSTEXPR size_t AlignOf() { #ifdef _MSC_VER return __alignof(T); #else @@ -451,27 +451,27 @@ class DetachedBuffer { } ~DetachedBuffer() { - if (buf_ != nullptr) { - assert(allocator_ != nullptr); + if (buf_) { + assert(allocator_); allocator_->deallocate(buf_, reserved_); } - if (own_allocator_ && allocator_ != nullptr) { + if (own_allocator_ && allocator_) { delete allocator_; } } const uint8_t *data() const { - assert(cur_ != nullptr); + assert(cur_); return cur_; } uint8_t *data() { - assert(cur_ != nullptr); + assert(cur_); return cur_; } size_t size() const { - assert(cur_ != nullptr); + assert(cur_); return size_; } @@ -516,29 +516,39 @@ class vector_downward { Allocator *allocator = nullptr, bool own_allocator = false) : allocator_(allocator ? allocator : &DefaultAllocator::instance()), - own_allocator_(own_allocator), - reserved_((initial_size + sizeof(largest_scalar_t) - 1) & - ~(sizeof(largest_scalar_t) - 1)), - buf_(allocator_->allocate(reserved_)), cur_(buf_ + reserved_) { + own_allocator_(own_allocator), initial_size_(initial_size), reserved_(0), + buf_(nullptr), cur_(nullptr) { assert(allocator_); } ~vector_downward() { - if (buf_ != nullptr) { - assert(allocator_ != nullptr); + if (buf_) { + assert(allocator_); allocator_->deallocate(buf_, reserved_); } - if (own_allocator_ && allocator_ != nullptr) { + if (own_allocator_ && allocator_) { delete allocator_; } } - void clear() { - if (buf_ == nullptr) { - assert(allocator_ != nullptr); - buf_ = allocator_->allocate(reserved_); + void reset() { + if (buf_) { + assert(allocator_); + allocator_->deallocate(buf_, reserved_); + } + reserved_ = 0; + buf_ = nullptr; + cur_ = nullptr; + } + + void clear() { + if (buf_) { + cur_ = buf_ + reserved_; + } else { + reserved_ = 0; + buf_ = nullptr; + cur_ = nullptr; } - cur_ = buf_ + reserved_; } // Relinquish the pointer to the caller. @@ -554,10 +564,12 @@ class vector_downward { } size_t growth_policy(size_t bytes) { - return (bytes / 2) & ~(sizeof(largest_scalar_t) - 1); + return (bytes == 0) ? initial_size_ + : ((bytes / 2) & ~(AlignOf() - 1)); } uint8_t *make_space(size_t len) { + assert(cur_ >= buf_); if (len > static_cast(cur_ - buf_)) { reallocate(len); } @@ -568,13 +580,23 @@ class vector_downward { return cur_; } + Allocator &get_allocator() { return *allocator_; } + uoffset_t size() const { - assert(cur_ != nullptr && buf_ != nullptr); return static_cast(reserved_ - (cur_ - buf_)); } + uoffset_t capacity() const { + return reserved_; + } + + uint8_t *buf() const { + assert(buf_); + return buf_; + } + uint8_t *data() const { - assert(cur_ != nullptr); + assert(cur_); return cur_; } @@ -613,18 +635,23 @@ class vector_downward { Allocator *allocator_; bool own_allocator_; + size_t initial_size_; size_t reserved_; uint8_t *buf_; uint8_t *cur_; // Points at location between empty (below) and used (above). void reallocate(size_t len) { - size_t old_reserved = reserved_; + assert(allocator_); + auto old_reserved = reserved_; auto old_size = size(); - auto largest_align = AlignOf(); - reserved_ += (std::max)(len, growth_policy(reserved_)); - // Round up to avoid undefined behavior from unaligned loads and stores. - reserved_ = (reserved_ + (largest_align - 1)) & ~(largest_align - 1); - buf_ = allocator_->reallocate_downward(buf_, old_reserved, reserved_); + reserved_ += (std::max)(len, growth_policy(old_reserved)); + FLATBUFFERS_CONSTEXPR size_t alignment = AlignOf(); + reserved_ = (reserved_ + alignment - 1) & ~(alignment - 1); + if (buf_) { + buf_ = allocator_->reallocate_downward(buf_, old_reserved, reserved_); + } else { + buf_ = allocator_->allocate(reserved_); + } cur_ = buf_ + reserved_ - old_size; } }; @@ -655,9 +682,6 @@ template T* data(std::vector &v) { /// `CreateVector` functions. Do this is depth-first order to build up a tree to /// the root. `Finish()` wraps up the buffer ready for transport. class FlatBufferBuilder -/// @cond FLATBUFFERS_INTERNAL -FLATBUFFERS_FINAL_CLASS -/// @endcond { public: /// @brief Default constructor for FlatBufferBuilder. @@ -667,7 +691,7 @@ FLATBUFFERS_FINAL_CLASS /// a `DefaultAllocator`. /// @param[in] own_allocator Whether the builder/vector should own the /// allocator. Defaults to / `false`. - explicit FlatBufferBuilder(uoffset_t initial_size = 1024, + explicit FlatBufferBuilder(size_t initial_size = 1024, Allocator *allocator = nullptr, bool own_allocator = false) : buf_(initial_size, allocator, own_allocator), nested(false), @@ -682,6 +706,11 @@ FLATBUFFERS_FINAL_CLASS if (string_pool) delete string_pool; } + void Reset() { + Clear(); // clear builder state + buf_.reset(); // deallocate buffer + } + /// @brief Reset all the state in this FlatBufferBuilder so it can be reused /// to construct another buffer. void Clear() { @@ -1392,7 +1421,7 @@ FLATBUFFERS_FINAL_CLASS Finish(root.o, file_identifier, true); } - private: + protected: // You shouldn't really be copying instances of this class. FlatBufferBuilder(const FlatBufferBuilder &); FlatBufferBuilder &operator=(const FlatBufferBuilder &); diff --git a/include/flatbuffers/grpc.h b/include/flatbuffers/grpc.h index 0b6dc8364..3263f2e6c 100644 --- a/include/flatbuffers/grpc.h +++ b/include/flatbuffers/grpc.h @@ -23,50 +23,225 @@ #include "grpc++/support/byte_buffer.h" #include "grpc/byte_buffer_reader.h" +namespace flatbuffers { +namespace grpc { + +// Message is a typed wrapper around a buffer that manages the underlying +// `grpc_slice` and also provides flatbuffers-specific helpers such as `Verify` +// and `GetRoot`. Since it is backed by a `grpc_slice`, the underlying buffer +// is refcounted and ownership is be managed automatically. +template +class Message { + public: + Message() : slice_(grpc_empty_slice()) {} + + Message(grpc_slice slice, bool add_ref) + : slice_(add_ref ? grpc_slice_ref(slice) : slice) {} + + Message &operator=(const Message &other) = delete; + + Message(Message &&other) : slice_(other.slice_) { + other.slice_ = grpc_empty_slice(); + } + + Message(const Message &other) = delete; + + Message &operator=(Message &&other) { + slice_ = other.slice_; + other.slice_ = grpc_empty_slice(); + return *this; + } + + ~Message() { grpc_slice_unref(slice_); } + + const uint8_t *mutable_data() const { return GRPC_SLICE_START_PTR(slice_); } + + const uint8_t *data() const { return GRPC_SLICE_START_PTR(slice_); } + + size_t size() const { return GRPC_SLICE_LENGTH(slice_); } + + bool Verify() const { + Verifier verifier(data(), size()); + return verifier.VerifyBuffer(nullptr); + } + + T *GetMutableRoot() { return flatbuffers::GetMutableRoot(mutable_data()); } + + const T *GetRoot() const { return flatbuffers::GetRoot(data()); } + + // This is only intended for serializer use, or if you know what you're doing + const grpc_slice &BorrowSlice() const { return slice_; } + + private: + grpc_slice slice_; +}; + +class MessageBuilder; + +// SliceAllocator is a gRPC-specific allocator that uses the `grpc_slice` +// refcounted slices to manage memory ownership. This makes it easy and +// efficient to transfer buffers to gRPC. +class SliceAllocator : public Allocator { + public: + SliceAllocator() : slice_(grpc_empty_slice()) {} + + SliceAllocator(const SliceAllocator &other) = delete; + SliceAllocator &operator=(const SliceAllocator &other) = delete; + + virtual ~SliceAllocator() { grpc_slice_unref(slice_); } + + virtual uint8_t *allocate(size_t size) override { + assert(GRPC_SLICE_IS_EMPTY(slice_)); + slice_ = grpc_slice_malloc(size); + return GRPC_SLICE_START_PTR(slice_); + } + + virtual void deallocate(uint8_t *p, size_t size) override { + assert(p == GRPC_SLICE_START_PTR(slice_)); + assert(size == GRPC_SLICE_LENGTH(slice_)); + grpc_slice_unref(slice_); + slice_ = grpc_empty_slice(); + } + + virtual uint8_t *reallocate_downward(uint8_t *old_p, size_t old_size, + size_t new_size) override { + assert(old_p == GRPC_SLICE_START_PTR(slice_)); + assert(old_size == GRPC_SLICE_LENGTH(slice_)); + assert(new_size > old_size); + grpc_slice old_slice = slice_; + grpc_slice new_slice = grpc_slice_malloc(new_size); + uint8_t *new_p = GRPC_SLICE_START_PTR(new_slice); + memcpy(new_p + (new_size - old_size), old_p, old_size); + slice_ = new_slice; + grpc_slice_unref(old_slice); + return new_p; + } + + private: + grpc_slice &get_slice(uint8_t *p, size_t size) { + assert(p == GRPC_SLICE_START_PTR(slice_)); + assert(size == GRPC_SLICE_LENGTH(slice_)); + return slice_; + } + + grpc_slice slice_; + + friend class MessageBuilder; +}; + +// SliceAllocatorMember is a hack to ensure that the MessageBuilder's +// slice_allocator_ member is constructed before the FlatBufferBuilder, since +// the allocator is used in the FlatBufferBuilder ctor. +namespace detail { +struct SliceAllocatorMember { + SliceAllocator slice_allocator_; +}; +} + +// MessageBuilder is a gRPC-specific FlatBufferBuilder that uses SliceAllocator +// to allocate gRPC buffers. +class MessageBuilder : private detail::SliceAllocatorMember, + public FlatBufferBuilder { + public: + explicit MessageBuilder(uoffset_t initial_size = 1024) + : FlatBufferBuilder(initial_size, &slice_allocator_, false) {} + + MessageBuilder(const MessageBuilder &other) = delete; + MessageBuilder &operator=(const MessageBuilder &other) = delete; + + ~MessageBuilder() {} + + // GetMessage extracts the subslice of the buffer corresponding to the + // flatbuffers-encoded region and wraps it in a `Message` to handle buffer + // ownership. + template + Message GetMessage() { + auto buf_data = buf_.buf(); // pointer to memory + auto buf_size = buf_.capacity(); // size of memory + auto msg_data = buf_.data(); // pointer to msg + auto msg_size = buf_.size(); // size of msg + // Do some sanity checks on data/size + assert(msg_data); + assert(msg_size); + assert(msg_data >= buf_data); + assert(msg_data + msg_size <= buf_data + buf_size); + // Calculate offsets from the buffer start + auto begin = msg_data - buf_data; + auto end = begin + msg_size; + // Get the slice we are working with (no refcount change) + grpc_slice slice = slice_allocator_.get_slice(buf_data, buf_size); + // Extract a subslice of the existing slice (increment refcount) + grpc_slice subslice = grpc_slice_sub(slice, begin, end); + // Wrap the subslice in a `Message`, but don't increment refcount + Message msg(subslice, false); + return msg; + } + + template + Message ReleaseMessage() { + Message msg = GetMessage(); + Reset(); + return msg; + } + + private: + // SliceAllocator slice_allocator_; // part of SliceAllocatorMember +}; + +} // namespace grpc +} // namespace flatbuffers + namespace grpc { template -class SerializationTraits::value>::type> { +class SerializationTraits> { public: - // The type we're passing here is a BufferRef, which is already serialized - // FlatBuffer data, which then gets passed to GRPC. - static grpc::Status Serialize(const T& msg, - grpc_byte_buffer **buffer, - bool *own_buffer) { - // TODO(wvo): make this work without copying. - auto slice = gpr_slice_from_copied_buffer( - reinterpret_cast(msg.buf), msg.len); - *buffer = grpc_raw_byte_buffer_create(&slice, 1); - grpc_slice_unref(slice); + static grpc::Status Serialize(const flatbuffers::grpc::Message &msg, + grpc_byte_buffer **buffer, bool *own_buffer) { + // We are passed in a `Message`, which is a wrapper around a + // `grpc_slice`. We extract it here using `BorrowSlice()`. The const cast + // is necesary because the `grpc_raw_byte_buffer_create` func expects + // non-const slices in order to increment their refcounts. + grpc_slice *slice = const_cast(&msg.BorrowSlice()); + // Now use `grpc_raw_byte_buffer_create` to package the single slice into a + // `grpc_byte_buffer`, incrementing the refcount in the process. + *buffer = grpc_raw_byte_buffer_create(slice, 1); *own_buffer = true; return grpc::Status(); } - // There is no de-serialization step in FlatBuffers, so we just receive - // the data from GRPC. - static grpc::Status Deserialize(grpc_byte_buffer *buffer, T *msg) { - // TODO(wvo): make this more efficient / zero copy when possible. - auto len = grpc_byte_buffer_length(buffer); - if(msg->buf != nullptr){ - free(msg->buf); - } - msg->buf = reinterpret_cast(malloc(len)); - msg->len = static_cast(len); - msg->must_free = true; - uint8_t *current = msg->buf; - grpc_byte_buffer_reader reader; - grpc_byte_buffer_reader_init(&reader, buffer); - gpr_slice slice; - while (grpc_byte_buffer_reader_next(&reader, &slice)) { - memcpy(current, GPR_SLICE_START_PTR(slice), GPR_SLICE_LENGTH(slice)); - current += GPR_SLICE_LENGTH(slice); - gpr_slice_unref(slice); + // Deserialize by pulling the + static grpc::Status Deserialize(grpc_byte_buffer *buffer, + flatbuffers::grpc::Message *msg) { + // Check if this is a single uncompressed slice. + if ((buffer->type == GRPC_BB_RAW) && + (buffer->data.raw.compression == GRPC_COMPRESS_NONE) && + (buffer->data.raw.slice_buffer.count == 1)) { + // If it is, then we can reference the `grpc_slice` directly. + grpc_slice slice = buffer->data.raw.slice_buffer.slices[0]; + // We wrap a `Message` around the slice, incrementing the refcount. + *msg = flatbuffers::grpc::Message(slice, true); + } else { + // Otherwise, we need to use `grpc_byte_buffer_reader_readall` to read + // `buffer` into a single contiguous `grpc_slice`. The gRPC reader gives + // us back a new slice with the refcount already incremented. + grpc_byte_buffer_reader reader; + grpc_byte_buffer_reader_init(&reader, buffer); + grpc_slice slice = grpc_byte_buffer_reader_readall(&reader); + grpc_byte_buffer_reader_destroy(&reader); + // We wrap a `Message` around the slice, but dont increment refcount + *msg = flatbuffers::grpc::Message(slice, false); } - GPR_ASSERT(current == msg->buf + msg->len); - grpc_byte_buffer_reader_destroy(&reader); - grpc_byte_buffer_destroy(buffer); - return grpc::Status(); + #if FLATBUFFERS_GRPC_DISABLE_AUTO_VERIFICATION + return ::grpc::Status::OK; + #else + if (msg->Verify()) { + return ::grpc::Status::OK; + } else { + return ::grpc::Status(::grpc::StatusCode::INTERNAL, + "Message verification failed"); + } + #endif } }; diff --git a/src/idl_gen_grpc.cpp b/src/idl_gen_grpc.cpp index 52926821d..5d0cf4fc0 100644 --- a/src/idl_gen_grpc.cpp +++ b/src/idl_gen_grpc.cpp @@ -59,7 +59,7 @@ class FlatBufMethod : public grpc_generator::Method { std::string name() const { return method_->name; } std::string GRPCType(const StructDef &sd) const { - return "flatbuffers::BufferRef<" + sd.name + ">"; + return "flatbuffers::grpc::Message<" + sd.name + ">"; } std::string get_input_type_name() const { diff --git a/tests/monster_test.grpc.fb.cc b/tests/monster_test.grpc.fb.cc index 4ec4681e2..90b2764a4 100644 --- a/tests/monster_test.grpc.fb.cc +++ b/tests/monster_test.grpc.fb.cc @@ -31,46 +31,46 @@ MonsterStorage::Stub::Stub(const std::shared_ptr< ::grpc::ChannelInterface>& cha , rpcmethod_Retrieve_(MonsterStorage_method_names[1], ::grpc::RpcMethod::SERVER_STREAMING, channel) {} -::grpc::Status MonsterStorage::Stub::Store(::grpc::ClientContext* context, const flatbuffers::BufferRef& request, flatbuffers::BufferRef* response) { +::grpc::Status MonsterStorage::Stub::Store(::grpc::ClientContext* context, const flatbuffers::grpc::Message& request, flatbuffers::grpc::Message* response) { return ::grpc::BlockingUnaryCall(channel_.get(), rpcmethod_Store_, context, request, response); } -::grpc::ClientAsyncResponseReader< flatbuffers::BufferRef>* MonsterStorage::Stub::AsyncStoreRaw(::grpc::ClientContext* context, const flatbuffers::BufferRef& request, ::grpc::CompletionQueue* cq) { - return ::grpc::ClientAsyncResponseReader< flatbuffers::BufferRef>::Create(channel_.get(), cq, rpcmethod_Store_, context, request); +::grpc::ClientAsyncResponseReader< flatbuffers::grpc::Message>* MonsterStorage::Stub::AsyncStoreRaw(::grpc::ClientContext* context, const flatbuffers::grpc::Message& request, ::grpc::CompletionQueue* cq) { + return ::grpc::ClientAsyncResponseReader< flatbuffers::grpc::Message>::Create(channel_.get(), cq, rpcmethod_Store_, context, request); } -::grpc::ClientReader< flatbuffers::BufferRef>* MonsterStorage::Stub::RetrieveRaw(::grpc::ClientContext* context, const flatbuffers::BufferRef& request) { - return new ::grpc::ClientReader< flatbuffers::BufferRef>(channel_.get(), rpcmethod_Retrieve_, context, request); +::grpc::ClientReader< flatbuffers::grpc::Message>* MonsterStorage::Stub::RetrieveRaw(::grpc::ClientContext* context, const flatbuffers::grpc::Message& request) { + return new ::grpc::ClientReader< flatbuffers::grpc::Message>(channel_.get(), rpcmethod_Retrieve_, context, request); } -::grpc::ClientAsyncReader< flatbuffers::BufferRef>* MonsterStorage::Stub::AsyncRetrieveRaw(::grpc::ClientContext* context, const flatbuffers::BufferRef& request, ::grpc::CompletionQueue* cq, void* tag) { - return ::grpc::ClientAsyncReader< flatbuffers::BufferRef>::Create(channel_.get(), cq, rpcmethod_Retrieve_, context, request, tag); +::grpc::ClientAsyncReader< flatbuffers::grpc::Message>* MonsterStorage::Stub::AsyncRetrieveRaw(::grpc::ClientContext* context, const flatbuffers::grpc::Message& request, ::grpc::CompletionQueue* cq, void* tag) { + return ::grpc::ClientAsyncReader< flatbuffers::grpc::Message>::Create(channel_.get(), cq, rpcmethod_Retrieve_, context, request, tag); } MonsterStorage::Service::Service() { AddMethod(new ::grpc::RpcServiceMethod( MonsterStorage_method_names[0], ::grpc::RpcMethod::NORMAL_RPC, - new ::grpc::RpcMethodHandler< MonsterStorage::Service, flatbuffers::BufferRef, flatbuffers::BufferRef>( + new ::grpc::RpcMethodHandler< MonsterStorage::Service, flatbuffers::grpc::Message, flatbuffers::grpc::Message>( std::mem_fn(&MonsterStorage::Service::Store), this))); AddMethod(new ::grpc::RpcServiceMethod( MonsterStorage_method_names[1], ::grpc::RpcMethod::SERVER_STREAMING, - new ::grpc::ServerStreamingHandler< MonsterStorage::Service, flatbuffers::BufferRef, flatbuffers::BufferRef>( + new ::grpc::ServerStreamingHandler< MonsterStorage::Service, flatbuffers::grpc::Message, flatbuffers::grpc::Message>( std::mem_fn(&MonsterStorage::Service::Retrieve), this))); } MonsterStorage::Service::~Service() { } -::grpc::Status MonsterStorage::Service::Store(::grpc::ServerContext* context, const flatbuffers::BufferRef* request, flatbuffers::BufferRef* response) { +::grpc::Status MonsterStorage::Service::Store(::grpc::ServerContext* context, const flatbuffers::grpc::Message* request, flatbuffers::grpc::Message* response) { (void) context; (void) request; (void) response; return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""); } -::grpc::Status MonsterStorage::Service::Retrieve(::grpc::ServerContext* context, const flatbuffers::BufferRef* request, ::grpc::ServerWriter< flatbuffers::BufferRef>* writer) { +::grpc::Status MonsterStorage::Service::Retrieve(::grpc::ServerContext* context, const flatbuffers::grpc::Message* request, ::grpc::ServerWriter< flatbuffers::grpc::Message>* writer) { (void) context; (void) request; (void) writer; diff --git a/tests/monster_test.grpc.fb.h b/tests/monster_test.grpc.fb.h index d55eb227d..1942317fb 100644 --- a/tests/monster_test.grpc.fb.h +++ b/tests/monster_test.grpc.fb.h @@ -36,40 +36,40 @@ class MonsterStorage final { class StubInterface { public: virtual ~StubInterface() {} - virtual ::grpc::Status Store(::grpc::ClientContext* context, const flatbuffers::BufferRef& request, flatbuffers::BufferRef* response) = 0; - std::unique_ptr< ::grpc::ClientAsyncResponseReaderInterface< flatbuffers::BufferRef>> AsyncStore(::grpc::ClientContext* context, const flatbuffers::BufferRef& request, ::grpc::CompletionQueue* cq) { - return std::unique_ptr< ::grpc::ClientAsyncResponseReaderInterface< flatbuffers::BufferRef>>(AsyncStoreRaw(context, request, cq)); + virtual ::grpc::Status Store(::grpc::ClientContext* context, const flatbuffers::grpc::Message& request, flatbuffers::grpc::Message* response) = 0; + std::unique_ptr< ::grpc::ClientAsyncResponseReaderInterface< flatbuffers::grpc::Message>> AsyncStore(::grpc::ClientContext* context, const flatbuffers::grpc::Message& request, ::grpc::CompletionQueue* cq) { + return std::unique_ptr< ::grpc::ClientAsyncResponseReaderInterface< flatbuffers::grpc::Message>>(AsyncStoreRaw(context, request, cq)); } - std::unique_ptr< ::grpc::ClientReaderInterface< flatbuffers::BufferRef>> Retrieve(::grpc::ClientContext* context, const flatbuffers::BufferRef& request) { - return std::unique_ptr< ::grpc::ClientReaderInterface< flatbuffers::BufferRef>>(RetrieveRaw(context, request)); + std::unique_ptr< ::grpc::ClientReaderInterface< flatbuffers::grpc::Message>> Retrieve(::grpc::ClientContext* context, const flatbuffers::grpc::Message& request) { + return std::unique_ptr< ::grpc::ClientReaderInterface< flatbuffers::grpc::Message>>(RetrieveRaw(context, request)); } - std::unique_ptr< ::grpc::ClientAsyncReaderInterface< flatbuffers::BufferRef>> AsyncRetrieve(::grpc::ClientContext* context, const flatbuffers::BufferRef& request, ::grpc::CompletionQueue* cq, void* tag) { - return std::unique_ptr< ::grpc::ClientAsyncReaderInterface< flatbuffers::BufferRef>>(AsyncRetrieveRaw(context, request, cq, tag)); + std::unique_ptr< ::grpc::ClientAsyncReaderInterface< flatbuffers::grpc::Message>> AsyncRetrieve(::grpc::ClientContext* context, const flatbuffers::grpc::Message& request, ::grpc::CompletionQueue* cq, void* tag) { + return std::unique_ptr< ::grpc::ClientAsyncReaderInterface< flatbuffers::grpc::Message>>(AsyncRetrieveRaw(context, request, cq, tag)); } private: - virtual ::grpc::ClientAsyncResponseReaderInterface< flatbuffers::BufferRef>* AsyncStoreRaw(::grpc::ClientContext* context, const flatbuffers::BufferRef& request, ::grpc::CompletionQueue* cq) = 0; - virtual ::grpc::ClientReaderInterface< flatbuffers::BufferRef>* RetrieveRaw(::grpc::ClientContext* context, const flatbuffers::BufferRef& request) = 0; - virtual ::grpc::ClientAsyncReaderInterface< flatbuffers::BufferRef>* AsyncRetrieveRaw(::grpc::ClientContext* context, const flatbuffers::BufferRef& request, ::grpc::CompletionQueue* cq, void* tag) = 0; + virtual ::grpc::ClientAsyncResponseReaderInterface< flatbuffers::grpc::Message>* AsyncStoreRaw(::grpc::ClientContext* context, const flatbuffers::grpc::Message& request, ::grpc::CompletionQueue* cq) = 0; + virtual ::grpc::ClientReaderInterface< flatbuffers::grpc::Message>* RetrieveRaw(::grpc::ClientContext* context, const flatbuffers::grpc::Message& request) = 0; + virtual ::grpc::ClientAsyncReaderInterface< flatbuffers::grpc::Message>* AsyncRetrieveRaw(::grpc::ClientContext* context, const flatbuffers::grpc::Message& request, ::grpc::CompletionQueue* cq, void* tag) = 0; }; class Stub final : public StubInterface { public: Stub(const std::shared_ptr< ::grpc::ChannelInterface>& channel); - ::grpc::Status Store(::grpc::ClientContext* context, const flatbuffers::BufferRef& request, flatbuffers::BufferRef* response) override; - std::unique_ptr< ::grpc::ClientAsyncResponseReader< flatbuffers::BufferRef>> AsyncStore(::grpc::ClientContext* context, const flatbuffers::BufferRef& request, ::grpc::CompletionQueue* cq) { - return std::unique_ptr< ::grpc::ClientAsyncResponseReader< flatbuffers::BufferRef>>(AsyncStoreRaw(context, request, cq)); + ::grpc::Status Store(::grpc::ClientContext* context, const flatbuffers::grpc::Message& request, flatbuffers::grpc::Message* response) override; + std::unique_ptr< ::grpc::ClientAsyncResponseReader< flatbuffers::grpc::Message>> AsyncStore(::grpc::ClientContext* context, const flatbuffers::grpc::Message& request, ::grpc::CompletionQueue* cq) { + return std::unique_ptr< ::grpc::ClientAsyncResponseReader< flatbuffers::grpc::Message>>(AsyncStoreRaw(context, request, cq)); } - std::unique_ptr< ::grpc::ClientReader< flatbuffers::BufferRef>> Retrieve(::grpc::ClientContext* context, const flatbuffers::BufferRef& request) { - return std::unique_ptr< ::grpc::ClientReader< flatbuffers::BufferRef>>(RetrieveRaw(context, request)); + std::unique_ptr< ::grpc::ClientReader< flatbuffers::grpc::Message>> Retrieve(::grpc::ClientContext* context, const flatbuffers::grpc::Message& request) { + return std::unique_ptr< ::grpc::ClientReader< flatbuffers::grpc::Message>>(RetrieveRaw(context, request)); } - std::unique_ptr< ::grpc::ClientAsyncReader< flatbuffers::BufferRef>> AsyncRetrieve(::grpc::ClientContext* context, const flatbuffers::BufferRef& request, ::grpc::CompletionQueue* cq, void* tag) { - return std::unique_ptr< ::grpc::ClientAsyncReader< flatbuffers::BufferRef>>(AsyncRetrieveRaw(context, request, cq, tag)); + std::unique_ptr< ::grpc::ClientAsyncReader< flatbuffers::grpc::Message>> AsyncRetrieve(::grpc::ClientContext* context, const flatbuffers::grpc::Message& request, ::grpc::CompletionQueue* cq, void* tag) { + return std::unique_ptr< ::grpc::ClientAsyncReader< flatbuffers::grpc::Message>>(AsyncRetrieveRaw(context, request, cq, tag)); } private: std::shared_ptr< ::grpc::ChannelInterface> channel_; - ::grpc::ClientAsyncResponseReader< flatbuffers::BufferRef>* AsyncStoreRaw(::grpc::ClientContext* context, const flatbuffers::BufferRef& request, ::grpc::CompletionQueue* cq) override; - ::grpc::ClientReader< flatbuffers::BufferRef>* RetrieveRaw(::grpc::ClientContext* context, const flatbuffers::BufferRef& request) override; - ::grpc::ClientAsyncReader< flatbuffers::BufferRef>* AsyncRetrieveRaw(::grpc::ClientContext* context, const flatbuffers::BufferRef& request, ::grpc::CompletionQueue* cq, void* tag) override; + ::grpc::ClientAsyncResponseReader< flatbuffers::grpc::Message>* AsyncStoreRaw(::grpc::ClientContext* context, const flatbuffers::grpc::Message& request, ::grpc::CompletionQueue* cq) override; + ::grpc::ClientReader< flatbuffers::grpc::Message>* RetrieveRaw(::grpc::ClientContext* context, const flatbuffers::grpc::Message& request) override; + ::grpc::ClientAsyncReader< flatbuffers::grpc::Message>* AsyncRetrieveRaw(::grpc::ClientContext* context, const flatbuffers::grpc::Message& request, ::grpc::CompletionQueue* cq, void* tag) override; const ::grpc::RpcMethod rpcmethod_Store_; const ::grpc::RpcMethod rpcmethod_Retrieve_; }; @@ -79,8 +79,8 @@ class MonsterStorage final { public: Service(); virtual ~Service(); - virtual ::grpc::Status Store(::grpc::ServerContext* context, const flatbuffers::BufferRef* request, flatbuffers::BufferRef* response); - virtual ::grpc::Status Retrieve(::grpc::ServerContext* context, const flatbuffers::BufferRef* request, ::grpc::ServerWriter< flatbuffers::BufferRef>* writer); + virtual ::grpc::Status Store(::grpc::ServerContext* context, const flatbuffers::grpc::Message* request, flatbuffers::grpc::Message* response); + virtual ::grpc::Status Retrieve(::grpc::ServerContext* context, const flatbuffers::grpc::Message* request, ::grpc::ServerWriter< flatbuffers::grpc::Message>* writer); }; template class WithAsyncMethod_Store : public BaseClass { @@ -94,11 +94,11 @@ class MonsterStorage final { BaseClassMustBeDerivedFromService(this); } // disable synchronous version of this method - ::grpc::Status Store(::grpc::ServerContext* context, const flatbuffers::BufferRef* request, flatbuffers::BufferRef* response) final override { + ::grpc::Status Store(::grpc::ServerContext* context, const flatbuffers::grpc::Message* request, flatbuffers::grpc::Message* response) final override { abort(); return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""); } - void RequestStore(::grpc::ServerContext* context, flatbuffers::BufferRef* request, ::grpc::ServerAsyncResponseWriter< flatbuffers::BufferRef>* response, ::grpc::CompletionQueue* new_call_cq, ::grpc::ServerCompletionQueue* notification_cq, void *tag) { + void RequestStore(::grpc::ServerContext* context, flatbuffers::grpc::Message* request, ::grpc::ServerAsyncResponseWriter< flatbuffers::grpc::Message>* response, ::grpc::CompletionQueue* new_call_cq, ::grpc::ServerCompletionQueue* notification_cq, void *tag) { ::grpc::Service::RequestAsyncUnary(0, context, request, response, new_call_cq, notification_cq, tag); } }; @@ -114,11 +114,11 @@ class MonsterStorage final { BaseClassMustBeDerivedFromService(this); } // disable synchronous version of this method - ::grpc::Status Retrieve(::grpc::ServerContext* context, const flatbuffers::BufferRef* request, ::grpc::ServerWriter< flatbuffers::BufferRef>* writer) final override { + ::grpc::Status Retrieve(::grpc::ServerContext* context, const flatbuffers::grpc::Message* request, ::grpc::ServerWriter< flatbuffers::grpc::Message>* writer) final override { abort(); return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""); } - void RequestRetrieve(::grpc::ServerContext* context, flatbuffers::BufferRef* request, ::grpc::ServerAsyncWriter< flatbuffers::BufferRef>* writer, ::grpc::CompletionQueue* new_call_cq, ::grpc::ServerCompletionQueue* notification_cq, void *tag) { + void RequestRetrieve(::grpc::ServerContext* context, flatbuffers::grpc::Message* request, ::grpc::ServerAsyncWriter< flatbuffers::grpc::Message>* writer, ::grpc::CompletionQueue* new_call_cq, ::grpc::ServerCompletionQueue* notification_cq, void *tag) { ::grpc::Service::RequestAsyncServerStreaming(1, context, request, writer, new_call_cq, notification_cq, tag); } }; @@ -135,7 +135,7 @@ class MonsterStorage final { BaseClassMustBeDerivedFromService(this); } // disable synchronous version of this method - ::grpc::Status Store(::grpc::ServerContext* context, const flatbuffers::BufferRef* request, flatbuffers::BufferRef* response) final override { + ::grpc::Status Store(::grpc::ServerContext* context, const flatbuffers::grpc::Message* request, flatbuffers::grpc::Message* response) final override { abort(); return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""); } @@ -152,7 +152,7 @@ class MonsterStorage final { BaseClassMustBeDerivedFromService(this); } // disable synchronous version of this method - ::grpc::Status Retrieve(::grpc::ServerContext* context, const flatbuffers::BufferRef* request, ::grpc::ServerWriter< flatbuffers::BufferRef>* writer) final override { + ::grpc::Status Retrieve(::grpc::ServerContext* context, const flatbuffers::grpc::Message* request, ::grpc::ServerWriter< flatbuffers::grpc::Message>* writer) final override { abort(); return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""); } @@ -164,18 +164,18 @@ class MonsterStorage final { public: WithStreamedUnaryMethod_Store() { ::grpc::Service::MarkMethodStreamed(0, - new ::grpc::StreamedUnaryHandler< flatbuffers::BufferRef, flatbuffers::BufferRef>(std::bind(&WithStreamedUnaryMethod_Store::StreamedStore, this, std::placeholders::_1, std::placeholders::_2))); + new ::grpc::StreamedUnaryHandler< flatbuffers::grpc::Message, flatbuffers::grpc::Message>(std::bind(&WithStreamedUnaryMethod_Store::StreamedStore, this, std::placeholders::_1, std::placeholders::_2))); } ~WithStreamedUnaryMethod_Store() override { BaseClassMustBeDerivedFromService(this); } // disable regular version of this method - ::grpc::Status Store(::grpc::ServerContext* context, const flatbuffers::BufferRef* request, flatbuffers::BufferRef* response) final override { + ::grpc::Status Store(::grpc::ServerContext* context, const flatbuffers::grpc::Message* request, flatbuffers::grpc::Message* response) final override { abort(); return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""); } // replace default version of method with streamed unary - virtual ::grpc::Status StreamedStore(::grpc::ServerContext* context, ::grpc::ServerUnaryStreamer< flatbuffers::BufferRef,flatbuffers::BufferRef>* server_unary_streamer) = 0; + virtual ::grpc::Status StreamedStore(::grpc::ServerContext* context, ::grpc::ServerUnaryStreamer< flatbuffers::grpc::Message,flatbuffers::grpc::Message>* server_unary_streamer) = 0; }; typedef WithStreamedUnaryMethod_Store< Service > StreamedUnaryService; template @@ -185,18 +185,18 @@ class MonsterStorage final { public: WithSplitStreamingMethod_Retrieve() { ::grpc::Service::MarkMethodStreamed(1, - new ::grpc::SplitServerStreamingHandler< flatbuffers::BufferRef, flatbuffers::BufferRef>(std::bind(&WithSplitStreamingMethod_Retrieve::StreamedRetrieve, this, std::placeholders::_1, std::placeholders::_2))); + new ::grpc::SplitServerStreamingHandler< flatbuffers::grpc::Message, flatbuffers::grpc::Message>(std::bind(&WithSplitStreamingMethod_Retrieve::StreamedRetrieve, this, std::placeholders::_1, std::placeholders::_2))); } ~WithSplitStreamingMethod_Retrieve() override { BaseClassMustBeDerivedFromService(this); } // disable regular version of this method - ::grpc::Status Retrieve(::grpc::ServerContext* context, const flatbuffers::BufferRef* request, ::grpc::ServerWriter< flatbuffers::BufferRef>* writer) final override { + ::grpc::Status Retrieve(::grpc::ServerContext* context, const flatbuffers::grpc::Message* request, ::grpc::ServerWriter< flatbuffers::grpc::Message>* writer) final override { abort(); return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""); } // replace default version of method with split streamed - virtual ::grpc::Status StreamedRetrieve(::grpc::ServerContext* context, ::grpc::ServerSplitStreamer< flatbuffers::BufferRef,flatbuffers::BufferRef>* server_split_streamer) = 0; + virtual ::grpc::Status StreamedRetrieve(::grpc::ServerContext* context, ::grpc::ServerSplitStreamer< flatbuffers::grpc::Message,flatbuffers::grpc::Message>* server_split_streamer) = 0; }; typedef WithSplitStreamingMethod_Retrieve< Service > SplitStreamedService; typedef WithStreamedUnaryMethod_Store< WithSplitStreamingMethod_Retrieve< Service > > StreamedService;