mirror of
https://github.com/google/flatbuffers.git
synced 2026-06-12 07:50:59 +00:00
[C++] Improve flatbuffers + gRPC integration (#4310)
* Rework flatbuffers + gRPC integration - Introduce `flatbuffers::grpc::Message<T>`, a `grpc_slice`-backed message buffer that handles refcounting and allows flatbuffers to transfer ownership to gRPC efficiently. This replaces `flatbuffers::BufferRef<T>`, which required a copy call and was also unsafe w.r.t. buffer lifetime. - Introduce `flatbuffers::grpc::MessageBuilder`, a gRPC-specific builder that forces a `grpc_slice`-backed allocator and also adds some helpful `Message<T>`-related methods. - Update serializers accordingly (now zero-copy between flatbuffers and gRPC). * gRPC: verify messages by default, but allow user to override * gRPC: fix some formatting issues * Disable verification by default, but add helper method * Make FlatBufferBuilder fields protected + remove vec accessor * Use bool add_ref parameter to toggle refcount incr * Remove unnecessary inline specifiers * Fix formatting * Use auto * Remove empty lines * Use grpc_slice helper macros * Simplify reset code * Disable Message copy ctor and assignment by default * Remove unused member * Enable gRPC verification by default * Use auto * Bake in message verification (remove template specialization) * Add RoundUp func * Consolidate gRPC message copy flag * Make vector_downward allocations fully lazy * Test message verification failure code/message * Add grpctest verification test comments * Simplify reallocate implementation * Make initial_size a size_t * Use ternary op for growth_policy * Use truthiness rather than dont explicit nullptr check * Indent preprocessor directives * Remove grpc message copy/assignment * Fix a few bugs * Add gRPC example * Add basic gRPC docs * Use doxygen EXAMPLE_PATH + @include * Reference example fbs in grpc docs * Move gRPC examples into grpc/samples * Fix pointer/reference formatting * Use std::function rather than templated callback func * Create fresh message builder for each request * Use Clear() in Reset() impl * Use FLATBUFFERS_CONSTEXPR
This commit is contained in:
committed by
Wouter van Oortmerssen
parent
dadd1a926e
commit
da67c0a71f
@@ -173,4 +173,4 @@ inline size_t PaddingBytes(size_t buf_size, size_t scalar_size) {
|
||||
}
|
||||
|
||||
}
|
||||
#endif // FLATBUFFERS_BASE_H_
|
||||
#endif // FLATBUFFERS_BASE_H_
|
||||
|
||||
@@ -69,7 +69,7 @@ template<typename T> T EndianSwap(T t) {
|
||||
}
|
||||
}
|
||||
|
||||
template<typename T> size_t AlignOf() {
|
||||
template<typename T> FLATBUFFERS_CONSTEXPR size_t AlignOf() {
|
||||
#ifdef _MSC_VER
|
||||
return __alignof(T);
|
||||
#else
|
||||
@@ -451,27 +451,27 @@ class DetachedBuffer {
|
||||
}
|
||||
|
||||
~DetachedBuffer() {
|
||||
if (buf_ != nullptr) {
|
||||
assert(allocator_ != nullptr);
|
||||
if (buf_) {
|
||||
assert(allocator_);
|
||||
allocator_->deallocate(buf_, reserved_);
|
||||
}
|
||||
if (own_allocator_ && allocator_ != nullptr) {
|
||||
if (own_allocator_ && allocator_) {
|
||||
delete allocator_;
|
||||
}
|
||||
}
|
||||
|
||||
const uint8_t *data() const {
|
||||
assert(cur_ != nullptr);
|
||||
assert(cur_);
|
||||
return cur_;
|
||||
}
|
||||
|
||||
uint8_t *data() {
|
||||
assert(cur_ != nullptr);
|
||||
assert(cur_);
|
||||
return cur_;
|
||||
}
|
||||
|
||||
size_t size() const {
|
||||
assert(cur_ != nullptr);
|
||||
assert(cur_);
|
||||
return size_;
|
||||
}
|
||||
|
||||
@@ -516,29 +516,39 @@ class vector_downward {
|
||||
Allocator *allocator = nullptr,
|
||||
bool own_allocator = false)
|
||||
: allocator_(allocator ? allocator : &DefaultAllocator::instance()),
|
||||
own_allocator_(own_allocator),
|
||||
reserved_((initial_size + sizeof(largest_scalar_t) - 1) &
|
||||
~(sizeof(largest_scalar_t) - 1)),
|
||||
buf_(allocator_->allocate(reserved_)), cur_(buf_ + reserved_) {
|
||||
own_allocator_(own_allocator), initial_size_(initial_size), reserved_(0),
|
||||
buf_(nullptr), cur_(nullptr) {
|
||||
assert(allocator_);
|
||||
}
|
||||
|
||||
~vector_downward() {
|
||||
if (buf_ != nullptr) {
|
||||
assert(allocator_ != nullptr);
|
||||
if (buf_) {
|
||||
assert(allocator_);
|
||||
allocator_->deallocate(buf_, reserved_);
|
||||
}
|
||||
if (own_allocator_ && allocator_ != nullptr) {
|
||||
if (own_allocator_ && allocator_) {
|
||||
delete allocator_;
|
||||
}
|
||||
}
|
||||
|
||||
void clear() {
|
||||
if (buf_ == nullptr) {
|
||||
assert(allocator_ != nullptr);
|
||||
buf_ = allocator_->allocate(reserved_);
|
||||
void reset() {
|
||||
if (buf_) {
|
||||
assert(allocator_);
|
||||
allocator_->deallocate(buf_, reserved_);
|
||||
}
|
||||
reserved_ = 0;
|
||||
buf_ = nullptr;
|
||||
cur_ = nullptr;
|
||||
}
|
||||
|
||||
void clear() {
|
||||
if (buf_) {
|
||||
cur_ = buf_ + reserved_;
|
||||
} else {
|
||||
reserved_ = 0;
|
||||
buf_ = nullptr;
|
||||
cur_ = nullptr;
|
||||
}
|
||||
cur_ = buf_ + reserved_;
|
||||
}
|
||||
|
||||
// Relinquish the pointer to the caller.
|
||||
@@ -554,10 +564,12 @@ class vector_downward {
|
||||
}
|
||||
|
||||
size_t growth_policy(size_t bytes) {
|
||||
return (bytes / 2) & ~(sizeof(largest_scalar_t) - 1);
|
||||
return (bytes == 0) ? initial_size_
|
||||
: ((bytes / 2) & ~(AlignOf<largest_scalar_t>() - 1));
|
||||
}
|
||||
|
||||
uint8_t *make_space(size_t len) {
|
||||
assert(cur_ >= buf_);
|
||||
if (len > static_cast<size_t>(cur_ - buf_)) {
|
||||
reallocate(len);
|
||||
}
|
||||
@@ -568,13 +580,23 @@ class vector_downward {
|
||||
return cur_;
|
||||
}
|
||||
|
||||
Allocator &get_allocator() { return *allocator_; }
|
||||
|
||||
uoffset_t size() const {
|
||||
assert(cur_ != nullptr && buf_ != nullptr);
|
||||
return static_cast<uoffset_t>(reserved_ - (cur_ - buf_));
|
||||
}
|
||||
|
||||
uoffset_t capacity() const {
|
||||
return reserved_;
|
||||
}
|
||||
|
||||
uint8_t *buf() const {
|
||||
assert(buf_);
|
||||
return buf_;
|
||||
}
|
||||
|
||||
uint8_t *data() const {
|
||||
assert(cur_ != nullptr);
|
||||
assert(cur_);
|
||||
return cur_;
|
||||
}
|
||||
|
||||
@@ -613,18 +635,23 @@ class vector_downward {
|
||||
|
||||
Allocator *allocator_;
|
||||
bool own_allocator_;
|
||||
size_t initial_size_;
|
||||
size_t reserved_;
|
||||
uint8_t *buf_;
|
||||
uint8_t *cur_; // Points at location between empty (below) and used (above).
|
||||
|
||||
void reallocate(size_t len) {
|
||||
size_t old_reserved = reserved_;
|
||||
assert(allocator_);
|
||||
auto old_reserved = reserved_;
|
||||
auto old_size = size();
|
||||
auto largest_align = AlignOf<largest_scalar_t>();
|
||||
reserved_ += (std::max)(len, growth_policy(reserved_));
|
||||
// Round up to avoid undefined behavior from unaligned loads and stores.
|
||||
reserved_ = (reserved_ + (largest_align - 1)) & ~(largest_align - 1);
|
||||
buf_ = allocator_->reallocate_downward(buf_, old_reserved, reserved_);
|
||||
reserved_ += (std::max)(len, growth_policy(old_reserved));
|
||||
FLATBUFFERS_CONSTEXPR size_t alignment = AlignOf<largest_scalar_t>();
|
||||
reserved_ = (reserved_ + alignment - 1) & ~(alignment - 1);
|
||||
if (buf_) {
|
||||
buf_ = allocator_->reallocate_downward(buf_, old_reserved, reserved_);
|
||||
} else {
|
||||
buf_ = allocator_->allocate(reserved_);
|
||||
}
|
||||
cur_ = buf_ + reserved_ - old_size;
|
||||
}
|
||||
};
|
||||
@@ -655,9 +682,6 @@ template <typename T> T* data(std::vector<T> &v) {
|
||||
/// `CreateVector` functions. Do this is depth-first order to build up a tree to
|
||||
/// the root. `Finish()` wraps up the buffer ready for transport.
|
||||
class FlatBufferBuilder
|
||||
/// @cond FLATBUFFERS_INTERNAL
|
||||
FLATBUFFERS_FINAL_CLASS
|
||||
/// @endcond
|
||||
{
|
||||
public:
|
||||
/// @brief Default constructor for FlatBufferBuilder.
|
||||
@@ -667,7 +691,7 @@ FLATBUFFERS_FINAL_CLASS
|
||||
/// a `DefaultAllocator`.
|
||||
/// @param[in] own_allocator Whether the builder/vector should own the
|
||||
/// allocator. Defaults to / `false`.
|
||||
explicit FlatBufferBuilder(uoffset_t initial_size = 1024,
|
||||
explicit FlatBufferBuilder(size_t initial_size = 1024,
|
||||
Allocator *allocator = nullptr,
|
||||
bool own_allocator = false)
|
||||
: buf_(initial_size, allocator, own_allocator), nested(false),
|
||||
@@ -682,6 +706,11 @@ FLATBUFFERS_FINAL_CLASS
|
||||
if (string_pool) delete string_pool;
|
||||
}
|
||||
|
||||
void Reset() {
|
||||
Clear(); // clear builder state
|
||||
buf_.reset(); // deallocate buffer
|
||||
}
|
||||
|
||||
/// @brief Reset all the state in this FlatBufferBuilder so it can be reused
|
||||
/// to construct another buffer.
|
||||
void Clear() {
|
||||
@@ -1392,7 +1421,7 @@ FLATBUFFERS_FINAL_CLASS
|
||||
Finish(root.o, file_identifier, true);
|
||||
}
|
||||
|
||||
private:
|
||||
protected:
|
||||
// You shouldn't really be copying instances of this class.
|
||||
FlatBufferBuilder(const FlatBufferBuilder &);
|
||||
FlatBufferBuilder &operator=(const FlatBufferBuilder &);
|
||||
|
||||
@@ -23,50 +23,225 @@
|
||||
#include "grpc++/support/byte_buffer.h"
|
||||
#include "grpc/byte_buffer_reader.h"
|
||||
|
||||
namespace flatbuffers {
|
||||
namespace grpc {
|
||||
|
||||
// Message is a typed wrapper around a buffer that manages the underlying
|
||||
// `grpc_slice` and also provides flatbuffers-specific helpers such as `Verify`
|
||||
// and `GetRoot`. Since it is backed by a `grpc_slice`, the underlying buffer
|
||||
// is refcounted and ownership is be managed automatically.
|
||||
template <class T>
|
||||
class Message {
|
||||
public:
|
||||
Message() : slice_(grpc_empty_slice()) {}
|
||||
|
||||
Message(grpc_slice slice, bool add_ref)
|
||||
: slice_(add_ref ? grpc_slice_ref(slice) : slice) {}
|
||||
|
||||
Message &operator=(const Message &other) = delete;
|
||||
|
||||
Message(Message &&other) : slice_(other.slice_) {
|
||||
other.slice_ = grpc_empty_slice();
|
||||
}
|
||||
|
||||
Message(const Message &other) = delete;
|
||||
|
||||
Message &operator=(Message &&other) {
|
||||
slice_ = other.slice_;
|
||||
other.slice_ = grpc_empty_slice();
|
||||
return *this;
|
||||
}
|
||||
|
||||
~Message() { grpc_slice_unref(slice_); }
|
||||
|
||||
const uint8_t *mutable_data() const { return GRPC_SLICE_START_PTR(slice_); }
|
||||
|
||||
const uint8_t *data() const { return GRPC_SLICE_START_PTR(slice_); }
|
||||
|
||||
size_t size() const { return GRPC_SLICE_LENGTH(slice_); }
|
||||
|
||||
bool Verify() const {
|
||||
Verifier verifier(data(), size());
|
||||
return verifier.VerifyBuffer<T>(nullptr);
|
||||
}
|
||||
|
||||
T *GetMutableRoot() { return flatbuffers::GetMutableRoot<T>(mutable_data()); }
|
||||
|
||||
const T *GetRoot() const { return flatbuffers::GetRoot<T>(data()); }
|
||||
|
||||
// This is only intended for serializer use, or if you know what you're doing
|
||||
const grpc_slice &BorrowSlice() const { return slice_; }
|
||||
|
||||
private:
|
||||
grpc_slice slice_;
|
||||
};
|
||||
|
||||
class MessageBuilder;
|
||||
|
||||
// SliceAllocator is a gRPC-specific allocator that uses the `grpc_slice`
|
||||
// refcounted slices to manage memory ownership. This makes it easy and
|
||||
// efficient to transfer buffers to gRPC.
|
||||
class SliceAllocator : public Allocator {
|
||||
public:
|
||||
SliceAllocator() : slice_(grpc_empty_slice()) {}
|
||||
|
||||
SliceAllocator(const SliceAllocator &other) = delete;
|
||||
SliceAllocator &operator=(const SliceAllocator &other) = delete;
|
||||
|
||||
virtual ~SliceAllocator() { grpc_slice_unref(slice_); }
|
||||
|
||||
virtual uint8_t *allocate(size_t size) override {
|
||||
assert(GRPC_SLICE_IS_EMPTY(slice_));
|
||||
slice_ = grpc_slice_malloc(size);
|
||||
return GRPC_SLICE_START_PTR(slice_);
|
||||
}
|
||||
|
||||
virtual void deallocate(uint8_t *p, size_t size) override {
|
||||
assert(p == GRPC_SLICE_START_PTR(slice_));
|
||||
assert(size == GRPC_SLICE_LENGTH(slice_));
|
||||
grpc_slice_unref(slice_);
|
||||
slice_ = grpc_empty_slice();
|
||||
}
|
||||
|
||||
virtual uint8_t *reallocate_downward(uint8_t *old_p, size_t old_size,
|
||||
size_t new_size) override {
|
||||
assert(old_p == GRPC_SLICE_START_PTR(slice_));
|
||||
assert(old_size == GRPC_SLICE_LENGTH(slice_));
|
||||
assert(new_size > old_size);
|
||||
grpc_slice old_slice = slice_;
|
||||
grpc_slice new_slice = grpc_slice_malloc(new_size);
|
||||
uint8_t *new_p = GRPC_SLICE_START_PTR(new_slice);
|
||||
memcpy(new_p + (new_size - old_size), old_p, old_size);
|
||||
slice_ = new_slice;
|
||||
grpc_slice_unref(old_slice);
|
||||
return new_p;
|
||||
}
|
||||
|
||||
private:
|
||||
grpc_slice &get_slice(uint8_t *p, size_t size) {
|
||||
assert(p == GRPC_SLICE_START_PTR(slice_));
|
||||
assert(size == GRPC_SLICE_LENGTH(slice_));
|
||||
return slice_;
|
||||
}
|
||||
|
||||
grpc_slice slice_;
|
||||
|
||||
friend class MessageBuilder;
|
||||
};
|
||||
|
||||
// SliceAllocatorMember is a hack to ensure that the MessageBuilder's
|
||||
// slice_allocator_ member is constructed before the FlatBufferBuilder, since
|
||||
// the allocator is used in the FlatBufferBuilder ctor.
|
||||
namespace detail {
|
||||
struct SliceAllocatorMember {
|
||||
SliceAllocator slice_allocator_;
|
||||
};
|
||||
}
|
||||
|
||||
// MessageBuilder is a gRPC-specific FlatBufferBuilder that uses SliceAllocator
|
||||
// to allocate gRPC buffers.
|
||||
class MessageBuilder : private detail::SliceAllocatorMember,
|
||||
public FlatBufferBuilder {
|
||||
public:
|
||||
explicit MessageBuilder(uoffset_t initial_size = 1024)
|
||||
: FlatBufferBuilder(initial_size, &slice_allocator_, false) {}
|
||||
|
||||
MessageBuilder(const MessageBuilder &other) = delete;
|
||||
MessageBuilder &operator=(const MessageBuilder &other) = delete;
|
||||
|
||||
~MessageBuilder() {}
|
||||
|
||||
// GetMessage extracts the subslice of the buffer corresponding to the
|
||||
// flatbuffers-encoded region and wraps it in a `Message<T>` to handle buffer
|
||||
// ownership.
|
||||
template <class T>
|
||||
Message<T> GetMessage() {
|
||||
auto buf_data = buf_.buf(); // pointer to memory
|
||||
auto buf_size = buf_.capacity(); // size of memory
|
||||
auto msg_data = buf_.data(); // pointer to msg
|
||||
auto msg_size = buf_.size(); // size of msg
|
||||
// Do some sanity checks on data/size
|
||||
assert(msg_data);
|
||||
assert(msg_size);
|
||||
assert(msg_data >= buf_data);
|
||||
assert(msg_data + msg_size <= buf_data + buf_size);
|
||||
// Calculate offsets from the buffer start
|
||||
auto begin = msg_data - buf_data;
|
||||
auto end = begin + msg_size;
|
||||
// Get the slice we are working with (no refcount change)
|
||||
grpc_slice slice = slice_allocator_.get_slice(buf_data, buf_size);
|
||||
// Extract a subslice of the existing slice (increment refcount)
|
||||
grpc_slice subslice = grpc_slice_sub(slice, begin, end);
|
||||
// Wrap the subslice in a `Message<T>`, but don't increment refcount
|
||||
Message<T> msg(subslice, false);
|
||||
return msg;
|
||||
}
|
||||
|
||||
template <class T>
|
||||
Message<T> ReleaseMessage() {
|
||||
Message<T> msg = GetMessage<T>();
|
||||
Reset();
|
||||
return msg;
|
||||
}
|
||||
|
||||
private:
|
||||
// SliceAllocator slice_allocator_; // part of SliceAllocatorMember
|
||||
};
|
||||
|
||||
} // namespace grpc
|
||||
} // namespace flatbuffers
|
||||
|
||||
namespace grpc {
|
||||
|
||||
template <class T>
|
||||
class SerializationTraits<T, typename std::enable_if<std::is_base_of<
|
||||
flatbuffers::BufferRefBase, T>::value>::type> {
|
||||
class SerializationTraits<flatbuffers::grpc::Message<T>> {
|
||||
public:
|
||||
// The type we're passing here is a BufferRef, which is already serialized
|
||||
// FlatBuffer data, which then gets passed to GRPC.
|
||||
static grpc::Status Serialize(const T& msg,
|
||||
grpc_byte_buffer **buffer,
|
||||
bool *own_buffer) {
|
||||
// TODO(wvo): make this work without copying.
|
||||
auto slice = gpr_slice_from_copied_buffer(
|
||||
reinterpret_cast<const char *>(msg.buf), msg.len);
|
||||
*buffer = grpc_raw_byte_buffer_create(&slice, 1);
|
||||
grpc_slice_unref(slice);
|
||||
static grpc::Status Serialize(const flatbuffers::grpc::Message<T> &msg,
|
||||
grpc_byte_buffer **buffer, bool *own_buffer) {
|
||||
// We are passed in a `Message<T>`, which is a wrapper around a
|
||||
// `grpc_slice`. We extract it here using `BorrowSlice()`. The const cast
|
||||
// is necesary because the `grpc_raw_byte_buffer_create` func expects
|
||||
// non-const slices in order to increment their refcounts.
|
||||
grpc_slice *slice = const_cast<grpc_slice *>(&msg.BorrowSlice());
|
||||
// Now use `grpc_raw_byte_buffer_create` to package the single slice into a
|
||||
// `grpc_byte_buffer`, incrementing the refcount in the process.
|
||||
*buffer = grpc_raw_byte_buffer_create(slice, 1);
|
||||
*own_buffer = true;
|
||||
return grpc::Status();
|
||||
}
|
||||
|
||||
// There is no de-serialization step in FlatBuffers, so we just receive
|
||||
// the data from GRPC.
|
||||
static grpc::Status Deserialize(grpc_byte_buffer *buffer, T *msg) {
|
||||
// TODO(wvo): make this more efficient / zero copy when possible.
|
||||
auto len = grpc_byte_buffer_length(buffer);
|
||||
if(msg->buf != nullptr){
|
||||
free(msg->buf);
|
||||
}
|
||||
msg->buf = reinterpret_cast<uint8_t *>(malloc(len));
|
||||
msg->len = static_cast<flatbuffers::uoffset_t>(len);
|
||||
msg->must_free = true;
|
||||
uint8_t *current = msg->buf;
|
||||
grpc_byte_buffer_reader reader;
|
||||
grpc_byte_buffer_reader_init(&reader, buffer);
|
||||
gpr_slice slice;
|
||||
while (grpc_byte_buffer_reader_next(&reader, &slice)) {
|
||||
memcpy(current, GPR_SLICE_START_PTR(slice), GPR_SLICE_LENGTH(slice));
|
||||
current += GPR_SLICE_LENGTH(slice);
|
||||
gpr_slice_unref(slice);
|
||||
// Deserialize by pulling the
|
||||
static grpc::Status Deserialize(grpc_byte_buffer *buffer,
|
||||
flatbuffers::grpc::Message<T> *msg) {
|
||||
// Check if this is a single uncompressed slice.
|
||||
if ((buffer->type == GRPC_BB_RAW) &&
|
||||
(buffer->data.raw.compression == GRPC_COMPRESS_NONE) &&
|
||||
(buffer->data.raw.slice_buffer.count == 1)) {
|
||||
// If it is, then we can reference the `grpc_slice` directly.
|
||||
grpc_slice slice = buffer->data.raw.slice_buffer.slices[0];
|
||||
// We wrap a `Message<T>` around the slice, incrementing the refcount.
|
||||
*msg = flatbuffers::grpc::Message<T>(slice, true);
|
||||
} else {
|
||||
// Otherwise, we need to use `grpc_byte_buffer_reader_readall` to read
|
||||
// `buffer` into a single contiguous `grpc_slice`. The gRPC reader gives
|
||||
// us back a new slice with the refcount already incremented.
|
||||
grpc_byte_buffer_reader reader;
|
||||
grpc_byte_buffer_reader_init(&reader, buffer);
|
||||
grpc_slice slice = grpc_byte_buffer_reader_readall(&reader);
|
||||
grpc_byte_buffer_reader_destroy(&reader);
|
||||
// We wrap a `Message<T>` around the slice, but dont increment refcount
|
||||
*msg = flatbuffers::grpc::Message<T>(slice, false);
|
||||
}
|
||||
GPR_ASSERT(current == msg->buf + msg->len);
|
||||
grpc_byte_buffer_reader_destroy(&reader);
|
||||
grpc_byte_buffer_destroy(buffer);
|
||||
return grpc::Status();
|
||||
#if FLATBUFFERS_GRPC_DISABLE_AUTO_VERIFICATION
|
||||
return ::grpc::Status::OK;
|
||||
#else
|
||||
if (msg->Verify()) {
|
||||
return ::grpc::Status::OK;
|
||||
} else {
|
||||
return ::grpc::Status(::grpc::StatusCode::INTERNAL,
|
||||
"Message verification failed");
|
||||
}
|
||||
#endif
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
Reference in New Issue
Block a user