[C++] Use proper gRPC C++ API when using MemoryBuffer Slice (#6756)

* Use ByteBuffer

* Use Slice

* Upgrade gRPC to 1.39.0

* Update gRPC build script

* Reformat

* Added CMAKE_CXX_STANDARD=11 when building Abseil

* Enabled ABSL_ENABLE_INSTALL

* Added absl to grpctest deps
This commit is contained in:
Esun Kim
2021-07-29 17:41:51 +00:00
committed by GitHub
parent 5235133f32
commit e012054667
6 changed files with 55 additions and 94 deletions

View File

@@ -22,6 +22,7 @@
#include "flatbuffers/flatbuffers.h"
#include "grpc/byte_buffer_reader.h"
#include "grpcpp/support/byte_buffer.h"
#include "grpcpp/support/slice.h"
namespace flatbuffers {
namespace grpc {
@@ -32,33 +33,23 @@ namespace grpc {
// is refcounted and ownership is be managed automatically.
template<class T> class Message {
public:
Message() : slice_(grpc_empty_slice()) {}
Message() {}
Message(grpc_slice slice, bool add_ref)
: slice_(add_ref ? grpc_slice_ref(slice) : slice) {}
Message(::grpc::Slice slice) : slice_(slice) {}
Message &operator=(const Message &other) = delete;
Message(Message &&other) : slice_(other.slice_) {
other.slice_ = grpc_empty_slice();
}
Message(Message &&other) = default;
Message(const Message &other) = delete;
Message &operator=(Message &&other) {
grpc_slice_unref(slice_);
slice_ = other.slice_;
other.slice_ = grpc_empty_slice();
return *this;
}
Message &operator=(Message &&other) = default;
~Message() { grpc_slice_unref(slice_); }
const uint8_t *mutable_data() const { return slice_.begin(); }
const uint8_t *mutable_data() const { return GRPC_SLICE_START_PTR(slice_); }
const uint8_t *data() const { return slice_.begin(); }
const uint8_t *data() const { return GRPC_SLICE_START_PTR(slice_); }
size_t size() const { return GRPC_SLICE_LENGTH(slice_); }
size_t size() const { return slice_.size(); }
bool Verify() const {
Verifier verifier(data(), size());
@@ -70,10 +61,10 @@ template<class T> class Message {
const T *GetRoot() const { return flatbuffers::GetRoot<T>(data()); }
// This is only intended for serializer use, or if you know what you're doing
const grpc_slice &BorrowSlice() const { return slice_; }
const ::grpc::Slice &BorrowSlice() const { return slice_; }
private:
grpc_slice slice_;
::grpc::Slice slice_;
};
class MessageBuilder;
@@ -83,12 +74,12 @@ class MessageBuilder;
// efficient to transfer buffers to gRPC.
class SliceAllocator : public Allocator {
public:
SliceAllocator() : slice_(grpc_empty_slice()) {}
SliceAllocator() {}
SliceAllocator(const SliceAllocator &other) = delete;
SliceAllocator &operator=(const SliceAllocator &other) = delete;
SliceAllocator(SliceAllocator &&other) : slice_(grpc_empty_slice()) {
SliceAllocator(SliceAllocator &&other) {
// default-construct and swap idiom
swap(other);
}
@@ -105,45 +96,43 @@ class SliceAllocator : public Allocator {
swap(slice_, other.slice_);
}
virtual ~SliceAllocator() { grpc_slice_unref(slice_); }
virtual ~SliceAllocator() {}
virtual uint8_t *allocate(size_t size) override {
FLATBUFFERS_ASSERT(GRPC_SLICE_IS_EMPTY(slice_));
slice_ = grpc_slice_malloc(size);
return GRPC_SLICE_START_PTR(slice_);
FLATBUFFERS_ASSERT(slice_.size() == 0);
slice_ = ::grpc::Slice(size);
return const_cast<uint8_t *>(slice_.begin());
}
virtual void deallocate(uint8_t *p, size_t size) override {
FLATBUFFERS_ASSERT(p == GRPC_SLICE_START_PTR(slice_));
FLATBUFFERS_ASSERT(size == GRPC_SLICE_LENGTH(slice_));
grpc_slice_unref(slice_);
slice_ = grpc_empty_slice();
FLATBUFFERS_ASSERT(p == slice_.begin());
FLATBUFFERS_ASSERT(size == slice_.size());
slice_ = ::grpc::Slice();
}
virtual uint8_t *reallocate_downward(uint8_t *old_p, size_t old_size,
size_t new_size, size_t in_use_back,
size_t in_use_front) override {
FLATBUFFERS_ASSERT(old_p == GRPC_SLICE_START_PTR(slice_));
FLATBUFFERS_ASSERT(old_size == GRPC_SLICE_LENGTH(slice_));
FLATBUFFERS_ASSERT(old_p == slice_.begin());
FLATBUFFERS_ASSERT(old_size == slice_.size());
FLATBUFFERS_ASSERT(new_size > old_size);
grpc_slice old_slice = slice_;
grpc_slice new_slice = grpc_slice_malloc(new_size);
uint8_t *new_p = GRPC_SLICE_START_PTR(new_slice);
::grpc::Slice old_slice = slice_;
::grpc::Slice new_slice = ::grpc::Slice(new_size);
uint8_t *new_p = const_cast<uint8_t *>(new_slice.begin());
memcpy_downward(old_p, old_size, new_p, new_size, in_use_back,
in_use_front);
slice_ = new_slice;
grpc_slice_unref(old_slice);
return new_p;
}
private:
grpc_slice &get_slice(uint8_t *p, size_t size) {
FLATBUFFERS_ASSERT(p == GRPC_SLICE_START_PTR(slice_));
FLATBUFFERS_ASSERT(size == GRPC_SLICE_LENGTH(slice_));
::grpc::Slice &get_slice(uint8_t *p, size_t size) {
FLATBUFFERS_ASSERT(p == slice_.begin());
FLATBUFFERS_ASSERT(size == slice_.size());
return slice_;
}
grpc_slice slice_;
::grpc::Slice slice_;
friend class MessageBuilder;
};
@@ -184,9 +173,9 @@ class MessageBuilder : private detail::SliceAllocatorMember,
if (buf_.capacity()) {
uint8_t *buf = buf_.scratch_data(); // pointer to memory
size_t capacity = buf_.capacity(); // size of memory
slice_allocator_.slice_ = grpc_slice_new_with_len(buf, capacity, dealloc);
slice_allocator_.slice_ = ::grpc::Slice(buf, capacity, dealloc);
} else {
slice_allocator_.slice_ = grpc_empty_slice();
slice_allocator_.slice_ = ::grpc::Slice();
}
}
@@ -221,10 +210,10 @@ class MessageBuilder : private detail::SliceAllocatorMember,
// Releases the ownership of the buffer pointer.
// Returns the size, offset, and the original grpc_slice that
// allocated the buffer. Also see grpc_slice_unref().
uint8_t *ReleaseRaw(size_t &size, size_t &offset, grpc_slice &slice) {
uint8_t *ReleaseRaw(size_t &size, size_t &offset, ::grpc::Slice &slice) {
uint8_t *buf = FlatBufferBuilder::ReleaseRaw(size, offset);
slice = slice_allocator_.slice_;
slice_allocator_.slice_ = grpc_empty_slice();
slice_allocator_.slice_ = ::grpc::Slice();
return buf;
}
@@ -247,11 +236,11 @@ class MessageBuilder : private detail::SliceAllocatorMember,
auto begin = msg_data - buf_data;
auto end = begin + msg_size;
// Get the slice we are working with (no refcount change)
grpc_slice slice = slice_allocator_.get_slice(buf_data, buf_size);
::grpc::Slice slice = slice_allocator_.get_slice(buf_data, buf_size);
// Extract a subslice of the existing slice (increment refcount)
grpc_slice subslice = grpc_slice_sub(slice, begin, end);
::grpc::Slice subslice = slice.sub(begin, end);
// Wrap the subslice in a `Message<T>`, but don't increment refcount
Message<T> msg(subslice, false);
Message<T> msg(subslice);
return msg;
}
@@ -273,15 +262,10 @@ namespace grpc {
template<class T> class SerializationTraits<flatbuffers::grpc::Message<T>> {
public:
static grpc::Status Serialize(const flatbuffers::grpc::Message<T> &msg,
grpc_byte_buffer **buffer, bool *own_buffer) {
// We are passed in a `Message<T>`, which is a wrapper around a
// `grpc_slice`. We extract it here using `BorrowSlice()`. The const cast
// is necessary because the `grpc_raw_byte_buffer_create` func expects
// non-const slices in order to increment their refcounts.
grpc_slice *slice = const_cast<grpc_slice *>(&msg.BorrowSlice());
// Now use `grpc_raw_byte_buffer_create` to package the single slice into a
// `grpc_byte_buffer`, incrementing the refcount in the process.
*buffer = grpc_raw_byte_buffer_create(slice, 1);
ByteBuffer *buffer, bool *own_buffer) {
// Package the single slice into a `ByteBuffer`,
// incrementing the refcount in the process.
*buffer = ByteBuffer(&msg.BorrowSlice(), 1);
*own_buffer = true;
return grpc::Status::OK;
}
@@ -289,30 +273,13 @@ template<class T> class SerializationTraits<flatbuffers::grpc::Message<T>> {
// Deserialize by pulling the
static grpc::Status Deserialize(ByteBuffer *buf,
flatbuffers::grpc::Message<T> *msg) {
grpc_byte_buffer *buffer = *reinterpret_cast<grpc_byte_buffer **>(buf);
if (!buffer) {
return ::grpc::Status(::grpc::StatusCode::INTERNAL, "No payload");
Slice slice;
if (!buf->TrySingleSlice(&slice).ok()) {
if (!buf->DumpToSingleSlice(&slice).ok()) {
return ::grpc::Status(::grpc::StatusCode::INTERNAL, "No payload");
}
}
// Check if this is a single uncompressed slice.
if ((buffer->type == GRPC_BB_RAW) &&
(buffer->data.raw.compression == GRPC_COMPRESS_NONE) &&
(buffer->data.raw.slice_buffer.count == 1)) {
// If it is, then we can reference the `grpc_slice` directly.
grpc_slice slice = buffer->data.raw.slice_buffer.slices[0];
// We wrap a `Message<T>` around the slice, incrementing the refcount.
*msg = flatbuffers::grpc::Message<T>(slice, true);
} else {
// Otherwise, we need to use `grpc_byte_buffer_reader_readall` to read
// `buffer` into a single contiguous `grpc_slice`. The gRPC reader gives
// us back a new slice with the refcount already incremented.
grpc_byte_buffer_reader reader;
grpc_byte_buffer_reader_init(&reader, buffer);
grpc_slice slice = grpc_byte_buffer_reader_readall(&reader);
grpc_byte_buffer_reader_destroy(&reader);
// We wrap a `Message<T>` around the slice, but don't increment refcount
*msg = flatbuffers::grpc::Message<T>(slice, false);
}
grpc_byte_buffer_destroy(buffer);
*msg = flatbuffers::grpc::Message<T>(slice);
#if FLATBUFFERS_GRPC_DISABLE_AUTO_VERIFICATION
return ::grpc::Status::OK;
#else