gRPC callbackService support added (#8666)

* grpc callbackService support added

Signed-off-by: shankeleven <shashanksati11@gmail.com>

* tests: regenerate C++ gRPC golden with --grpc-callback-api (CallbackService & async_ reactor APIs); update formatting and method placement

---------

Signed-off-by: shankeleven <shashanksati11@gmail.com>
Co-authored-by: Wouter van Oortmerssen <aardappel@gmail.com>
This commit is contained in:
Shashank
2025-08-29 04:19:27 +05:30
committed by GitHub
parent b87d04af8c
commit deb3d93454
21 changed files with 617 additions and 51 deletions

View File

@@ -8,8 +8,7 @@
namespace grpc_cpp_generator {
namespace {
template<class T>
static grpc::string as_string(T x) {
template<class T> static grpc::string as_string(T x) {
std::ostringstream out;
out << x;
return out.str();
@@ -39,12 +38,13 @@ static grpc::string FilenameIdentifier(const grpc::string &filename) {
return result;
}
template<class T, size_t N>
static T *array_end(T (&array)[N]) { return array + N; }
template<class T, size_t N> static T *array_end(T (&array)[N]) {
return array + N;
}
static void PrintIncludes(grpc_generator::Printer *printer,
const std::vector<grpc::string> &headers,
const Parameters &params) {
const std::vector<grpc::string> &headers,
const Parameters &params) {
std::map<grpc::string, grpc::string> vars;
vars["l"] = params.use_system_headers ? '<' : '"';
@@ -60,6 +60,18 @@ static void PrintIncludes(grpc_generator::Printer *printer,
vars["h"] = *i;
printer->Print(vars, "#include $l$$h$$r$\n");
}
if (params.generate_callback_api) {
// Callback API headers (guarded later by feature macro in emitted code).
static const char *cb_headers[] = {
"grpcpp/impl/codegen/callback_common.h",
"grpcpp/impl/codegen/server_callback_handlers.h",
"grpcpp/support/client_callback.h"
};
for (auto &h : cb_headers) {
vars["h"] = h;
printer->Print(vars, "#include $l$$h$$r$\n");
}
}
}
} // namespace
@@ -138,7 +150,6 @@ grpc::string GetHeaderIncludes(grpc_generator::File *file,
return output;
}
namespace {
static void PrintHeaderClientMethodInterfaces(
@@ -355,12 +366,10 @@ static void PrintHeaderClientMethodInterfaces(
}
}
static void PrintHeaderClientMethod(grpc_generator::Printer *printer,
const grpc_generator::Method *method,
std::map<grpc::string, grpc::string> *vars,
bool is_public) {
const grpc_generator::Method *method,
std::map<grpc::string, grpc::string> *vars,
bool is_public) {
(*vars)["Method"] = method->name();
(*vars)["Request"] = method->input_type_name();
(*vars)["Response"] = method->output_type_name();
@@ -377,6 +386,22 @@ static void PrintHeaderClientMethod(grpc_generator::Printer *printer,
*vars,
"::grpc::Status $Method$(::grpc::ClientContext* context, "
"const $Request$& request, $Response$* response) override;\n");
if ((*vars)["generate_callback_api"] == "1") {
// Native gRPC callback unary wrappers (function callback & reactor
// variants).
printer->Print(*vars,
"// Callback unary (function form). Request/response "
"must outlive callback.\n");
printer->Print(*vars,
"void async_$Method$(::grpc::ClientContext* context, "
"const $Request$& request, $Response$* response, "
"std::function<void(::grpc::Status)> on_done);\n");
printer->Print(*vars, "// Callback unary (reactor form).\n");
printer->Print(*vars,
"void async_$Method$(::grpc::ClientContext* context, "
"const $Request$& request, $Response$* response, "
"::grpc::ClientUnaryReactor* reactor);\n");
}
for (size_t i = 0; i < sizeof(async_prefixes) / sizeof(async_prefixes[0]);
i++) {
auto &async_prefix = async_prefixes[i];
@@ -407,6 +432,12 @@ static void PrintHeaderClientMethod(grpc_generator::Printer *printer,
"($Method$Raw(context, response));\n");
printer->Outdent();
printer->Print("}\n");
if ((*vars)["generate_callback_api"] == "1") {
printer->Print(*vars, "// Client streaming callback reactor entry.\n");
printer->Print(
*vars,
"void async_$Method$(::grpc::ClientContext* context, $Response$* response, ::grpc::ClientWriteReactor< $Request$ >* reactor);\n");
}
for (size_t i = 0; i < sizeof(async_prefixes) / sizeof(async_prefixes[0]);
i++) {
auto &async_prefix = async_prefixes[i];
@@ -440,6 +471,11 @@ static void PrintHeaderClientMethod(grpc_generator::Printer *printer,
"($Method$Raw(context, request));\n");
printer->Outdent();
printer->Print("}\n");
if ((*vars)["generate_callback_api"] == "1") {
printer->Print(*vars, "// Server streaming callback reactor entry.\n");
printer->Print(*vars,
"void async_$Method$(::grpc::ClientContext* context, const $Request$& request, ::grpc::ClientReadReactor< $Response$ >* reactor);\n");
}
for (size_t i = 0; i < sizeof(async_prefixes) / sizeof(async_prefixes[0]);
i++) {
auto &async_prefix = async_prefixes[i];
@@ -472,6 +508,12 @@ static void PrintHeaderClientMethod(grpc_generator::Printer *printer,
"$Method$Raw(context));\n");
printer->Outdent();
printer->Print("}\n");
if ((*vars)["generate_callback_api"] == "1") {
printer->Print(*vars, "// Bidirectional streaming callback reactor entry.\n");
printer->Print(
*vars,
"void async_$Method$(::grpc::ClientContext* context, ::grpc::ClientBidiReactor< $Request$, $Response$ >* reactor);\n");
}
for (size_t i = 0; i < sizeof(async_prefixes) / sizeof(async_prefixes[0]);
i++) {
auto &async_prefix = async_prefixes[i];
@@ -506,6 +548,10 @@ static void PrintHeaderClientMethod(grpc_generator::Printer *printer,
"const $Request$& request, "
"::grpc::CompletionQueue* cq) override;\n");
}
if ((*vars)["generate_callback_api"] == "1") {
// Native callback unary forms declared earlier (no private sync helper
// needed).
}
} else if (ClientOnlyStreaming(method)) {
printer->Print(*vars,
"::grpc::ClientWriter< $Request$>* $Method$Raw("
@@ -560,17 +606,17 @@ static void PrintHeaderClientMethod(grpc_generator::Printer *printer,
}
}
static void PrintHeaderClientMethodData(grpc_generator::Printer *printer,
const grpc_generator::Method *method,
std::map<grpc::string, grpc::string> *vars) {
static void PrintHeaderClientMethodData(
grpc_generator::Printer *printer, const grpc_generator::Method *method,
std::map<grpc::string, grpc::string> *vars) {
(*vars)["Method"] = method->name();
printer->Print(*vars,
"const ::grpc::internal::RpcMethod rpcmethod_$Method$_;\n");
}
static void PrintHeaderServerMethodSync(grpc_generator::Printer *printer,
const grpc_generator::Method *method,
std::map<grpc::string, grpc::string> *vars) {
static void PrintHeaderServerMethodSync(
grpc_generator::Printer *printer, const grpc_generator::Method *method,
std::map<grpc::string, grpc::string> *vars) {
(*vars)["Method"] = method->name();
(*vars)["Request"] = method->input_type_name();
(*vars)["Response"] = method->output_type_name();
@@ -602,9 +648,9 @@ static void PrintHeaderServerMethodSync(grpc_generator::Printer *printer,
printer->Print(method->GetTrailingComments("//").c_str());
}
static void PrintHeaderServerMethodAsync(grpc_generator::Printer *printer,
const grpc_generator::Method *method,
std::map<grpc::string, grpc::string> *vars) {
static void PrintHeaderServerMethodAsync(
grpc_generator::Printer *printer, const grpc_generator::Method *method,
std::map<grpc::string, grpc::string> *vars) {
(*vars)["Method"] = method->name();
(*vars)["Request"] = method->input_type_name();
(*vars)["Response"] = method->output_type_name();
@@ -894,8 +940,8 @@ static void PrintHeaderServerMethodGeneric(
}
static void PrintHeaderService(grpc_generator::Printer *printer,
const grpc_generator::Service *service,
std::map<grpc::string, grpc::string> *vars) {
const grpc_generator::Service *service,
std::map<grpc::string, grpc::string> *vars) {
(*vars)["Service"] = service->name();
printer->Print(service->GetLeadingComments("//").c_str());
@@ -930,6 +976,10 @@ static void PrintHeaderService(grpc_generator::Printer *printer,
false);
}
printer->Outdent();
// Forward declaration of nested CallbackService if callback API enabled.
if ((*vars)["generate_callback_api"] == "1") {
printer->Print("class CallbackService;\n");
}
printer->Print("};\n");
printer->Print(
"class Stub final : public StubInterface"
@@ -1062,9 +1112,51 @@ static void PrintHeaderService(grpc_generator::Printer *printer,
printer->Outdent();
printer->Print("};\n");
printer->Print(service->GetTrailingComments("//").c_str());
// Optional CallbackService (modern async API)
if ((*vars)["generate_callback_api"] == "1") {
(*vars)["Service"] = service->name();
printer->Print("\n#if defined(GRPC_CALLBACK_API_NONEXPERIMENTAL)\n");
printer->Print(*vars,
"class $Service$::CallbackService : public ::grpc::Service "
"{\n public:\n CallbackService();\n virtual "
"~CallbackService();\n");
printer->Indent();
for (int i = 0; i < service->method_count(); ++i) {
auto m = service->method(i);
(*vars)["Method"] = m->name();
(*vars)["Request"] = m->input_type_name();
(*vars)["Response"] = m->output_type_name();
if (m->NoStreaming()) {
printer->Print(*vars,
"virtual ::grpc::ServerUnaryReactor* "
"$Method$(::grpc::CallbackServerContext* context, const "
"$Request$* request, $Response$* response);\n");
} else if (ClientOnlyStreaming(m.get())) {
printer->Print(*vars,
"virtual ::grpc::ServerReadReactor<$Request$>* "
"$Method$(::grpc::CallbackServerContext* context, "
"$Response$* response);\n");
} else if (ServerOnlyStreaming(m.get())) {
printer->Print(*vars,
"virtual ::grpc::ServerWriteReactor<$Response$>* "
"$Method$(::grpc::CallbackServerContext* context, const "
"$Request$* request);\n");
} else if (m->BidiStreaming()) {
printer->Print(
*vars,
"virtual ::grpc::ServerBidiReactor<$Request$, $Response$>* "
"$Method$(::grpc::CallbackServerContext* context);\n");
}
}
printer->Outdent();
printer->Print(
"};\n#else\n// Callback API requested but not available in this gRPC "
"version.\n#endif // GRPC_CALLBACK_API_NONEXPERIMENTAL\n");
}
}
} // namespace
} // namespace
grpc::string GetHeaderServices(grpc_generator::File *file,
const Parameters &params) {
@@ -1084,9 +1176,14 @@ grpc::string GetHeaderServices(grpc_generator::File *file,
}
for (int i = 0; i < file->service_count(); ++i) {
vars["generate_callback_api"] = params.generate_callback_api ? "1" : "0";
PrintHeaderService(printer.get(), file->service(i).get(), &vars);
printer->Print("\n");
}
if (params.generate_callback_api) {
printer->Print("// FlatBuffers: Callback API code generated.\n");
printer->Print("#define FLATBUFFERS_GENERATED_GRPC_CALLBACK_API 1\n\n");
}
if (!params.services_namespace.empty()) {
printer->Print(vars, "} // namespace $services_namespace$\n\n");
@@ -1139,7 +1236,8 @@ grpc::string GetSourcePrologue(grpc_generator::File *file,
printer->Print(vars, "// Generated by the gRPC C++ plugin.\n");
printer->Print(vars,
"// If you make any local change, they will be lost.\n");
"// FlatBuffers modified generator: native gRPC callback "
"client API enabled when --grpc-callback-api.\n");
printer->Print(vars, "// source: $filename$\n\n");
printer->Print(vars, "#include \"$filename_base$$message_header_ext$\"\n");
@@ -1184,12 +1282,11 @@ grpc::string GetSourceIncludes(grpc_generator::File *file,
return output;
}
namespace {
static void PrintSourceClientMethod(grpc_generator::Printer *printer,
const grpc_generator::Method *method,
std::map<grpc::string, grpc::string> *vars) {
static void PrintSourceClientMethod(
grpc_generator::Printer *printer, const grpc_generator::Method *method,
std::map<grpc::string, grpc::string> *vars) {
(*vars)["Method"] = method->name();
(*vars)["Request"] = method->input_type_name();
(*vars)["Response"] = method->output_type_name();
@@ -1209,6 +1306,27 @@ static void PrintSourceClientMethod(grpc_generator::Printer *printer,
" return ::grpc::internal::BlockingUnaryCall"
"(channel_.get(), rpcmethod_$Method$_, "
"context, request, response);\n}\n\n");
if ((*vars)["generate_callback_api"] == "1") {
printer->Print(
*vars,
"void $ns$$Service$::Stub::async_$Method$(::grpc::ClientContext* "
"context, const $Request$& request, $Response$* response, "
"std::function<void(::grpc::Status)> on_done) {\n");
printer->Print(*vars,
" ::grpc::internal::CallbackUnaryCall(channel_.get(), "
"rpcmethod_$Method$_, context, &request, response, "
"std::move(on_done));\n}\n\n");
printer->Print(
*vars,
"void $ns$$Service$::Stub::async_$Method$(::grpc::ClientContext* "
"context, const $Request$& request, $Response$* response, "
"::grpc::ClientUnaryReactor* reactor) {\n");
printer->Print(
*vars,
" "
"::grpc::internal::ClientCallbackUnaryFactory::Create(channel_.get(),"
" rpcmethod_$Method$_, context, &request, response, reactor);\n}\n\n");
}
for (size_t i = 0; i < sizeof(async_prefixes) / sizeof(async_prefixes[0]);
i++) {
auto &async_prefix = async_prefixes[i];
@@ -1241,6 +1359,17 @@ static void PrintSourceClientMethod(grpc_generator::Printer *printer,
"rpcmethod_$Method$_, "
"context, response);\n"
"}\n\n");
if ((*vars)["generate_callback_api"] == "1") {
printer->Print(
*vars,
"void $ns$$Service$::Stub::async_$Method$(::grpc::ClientContext* "
"context, $Response$* response, ::grpc::ClientWriteReactor< "
"$Request$ >* reactor) {\n");
printer->Print(*vars,
" ::grpc::internal::ClientCallbackWriterFactory< "
"$Request$ >::Create(channel_.get(), rpcmethod_$Method$_, "
"context, response, reactor);\n}\n\n");
}
for (size_t i = 0; i < sizeof(async_prefixes) / sizeof(async_prefixes[0]);
i++) {
auto &async_prefix = async_prefixes[i];
@@ -1274,6 +1403,17 @@ static void PrintSourceClientMethod(grpc_generator::Printer *printer,
"rpcmethod_$Method$_, "
"context, request);\n"
"}\n\n");
if ((*vars)["generate_callback_api"] == "1") {
printer->Print(
*vars,
"void $ns$$Service$::Stub::async_$Method$(::grpc::ClientContext* "
"context, const $Request$& request, ::grpc::ClientReadReactor< "
"$Response$ >* reactor) {\n");
printer->Print(*vars,
" ::grpc::internal::ClientCallbackReaderFactory< "
"$Response$ >::Create(channel_.get(), "
"rpcmethod_$Method$_, context, &request, reactor);\n}\n\n");
}
for (size_t i = 0; i < sizeof(async_prefixes) / sizeof(async_prefixes[0]);
i++) {
auto &async_prefix = async_prefixes[i];
@@ -1307,6 +1447,17 @@ static void PrintSourceClientMethod(grpc_generator::Printer *printer,
"rpcmethod_$Method$_, "
"context);\n"
"}\n\n");
if ((*vars)["generate_callback_api"] == "1") {
printer->Print(
*vars,
"void $ns$$Service$::Stub::async_$Method$(::grpc::ClientContext* "
"context, ::grpc::ClientBidiReactor< $Request$, $Response$ >* "
"reactor) {\n");
printer->Print(*vars,
" ::grpc::internal::ClientCallbackReaderWriterFactory< "
"$Request$, $Response$ >::Create(channel_.get(), "
"rpcmethod_$Method$_, context, reactor);\n}\n\n");
}
for (size_t i = 0; i < sizeof(async_prefixes) / sizeof(async_prefixes[0]);
i++) {
auto &async_prefix = async_prefixes[i];
@@ -1331,9 +1482,9 @@ static void PrintSourceClientMethod(grpc_generator::Printer *printer,
}
}
static void PrintSourceServerMethod(grpc_generator::Printer *printer,
const grpc_generator::Method *method,
std::map<grpc::string, grpc::string> *vars) {
static void PrintSourceServerMethod(
grpc_generator::Printer *printer, const grpc_generator::Method *method,
std::map<grpc::string, grpc::string> *vars) {
(*vars)["Method"] = method->name();
(*vars)["Request"] = method->input_type_name();
(*vars)["Response"] = method->output_type_name();
@@ -1381,8 +1532,8 @@ static void PrintSourceServerMethod(grpc_generator::Printer *printer,
}
static void PrintSourceService(grpc_generator::Printer *printer,
const grpc_generator::Service *service,
std::map<grpc::string, grpc::string> *vars) {
const grpc_generator::Service *service,
std::map<grpc::string, grpc::string> *vars) {
(*vars)["Service"] = service->name();
if (service->method_count() > 0) {
@@ -1495,9 +1646,110 @@ static void PrintSourceService(grpc_generator::Printer *printer,
(*vars)["Idx"] = as_string(i);
PrintSourceServerMethod(printer, service->method(i).get(), vars);
}
// CallbackService implementation (if enabled)
if ((*vars)["generate_callback_api"] == "1") {
(*vars)["Service"] = service->name();
printer->Print("#if defined(GRPC_CALLBACK_API_NONEXPERIMENTAL)\n");
printer->Print(*vars,
"$ns$$Service$::CallbackService::CallbackService() {\n");
printer->Indent();
for (int i = 0; i < service->method_count(); ++i) {
auto method = service->method(i);
(*vars)["Idx"] = as_string(i);
(*vars)["Method"] = method->name();
(*vars)["Request"] = method->input_type_name();
(*vars)["Response"] = method->output_type_name();
if (method->NoStreaming()) {
printer->Print(
*vars,
"AddMethod(new ::grpc::internal::RpcServiceMethod(\n"
" $prefix$$Service$_method_names[$Idx$],\n"
" ::grpc::internal::RpcMethod::NORMAL_RPC,\n"
" new ::grpc::internal::CallbackUnaryHandler<$Request$, $Response$>(\n"
" [this](::grpc::CallbackServerContext* ctx, const $Request$* req, $Response$* resp) {\n"
" return this->$Method$(ctx, req, resp);\n"
" })));\n");
} else if (ClientOnlyStreaming(method.get())) {
printer->Print(
*vars,
"AddMethod(new ::grpc::internal::RpcServiceMethod(\n"
" $prefix$$Service$_method_names[$Idx$],\n"
" ::grpc::internal::RpcMethod::CLIENT_STREAMING,\n"
" new ::grpc::internal::CallbackClientStreamingHandler<$Request$, $Response$>(\n"
" [this](::grpc::CallbackServerContext* ctx, $Response$* resp) {\n"
" return this->$Method$(ctx, resp);\n"
" })));\n");
} else if (ServerOnlyStreaming(method.get())) {
printer->Print(
*vars,
"AddMethod(new ::grpc::internal::RpcServiceMethod(\n"
" $prefix$$Service$_method_names[$Idx$],\n"
" ::grpc::internal::RpcMethod::SERVER_STREAMING,\n"
" new ::grpc::internal::CallbackServerStreamingHandler<$Request$, $Response$>(\n"
" [this](::grpc::CallbackServerContext* ctx, const $Request$* req) {\n"
" return this->$Method$(ctx, req);\n"
" })));\n");
} else if (method->BidiStreaming()) {
printer->Print(
*vars,
"AddMethod(new ::grpc::internal::RpcServiceMethod(\n"
" $prefix$$Service$_method_names[$Idx$],\n"
" ::grpc::internal::RpcMethod::BIDI_STREAMING,\n"
" new ::grpc::internal::CallbackBidiHandler<$Request$, $Response$>(\n"
" [this](::grpc::CallbackServerContext* ctx) {\n"
" return this->$Method$(ctx);\n"
" })));\n");
}
}
printer->Outdent();
printer->Print("}\n\n");
printer->Print(*vars,
"$ns$$Service$::CallbackService::~CallbackService() {}\n\n");
// Default method bodies returning UNIMPLEMENTED reactors.
for (int i = 0; i < service->method_count(); ++i) {
auto method = service->method(i);
(*vars)["Method"] = method->name();
(*vars)["Request"] = method->input_type_name();
(*vars)["Response"] = method->output_type_name();
if (method->NoStreaming()) {
printer->Print(*vars,
"::grpc::ServerUnaryReactor* "
"$ns$$Service$::CallbackService::$Method$(::grpc::"
"CallbackServerContext* /*context*/, const $Request$* "
"/*request*/, $Response$* /*response*/) {\n"
" return nullptr; // user must override\n"
"}\n\n");
} else if (ClientOnlyStreaming(method.get())) {
printer->Print(
*vars,
"::grpc::ServerReadReactor<$Request$>* "
"$ns$$Service$::CallbackService::$Method$(::grpc::"
"CallbackServerContext* /*context*/, $Response$* /*response*/) {\n"
" return nullptr; // user must override\n"
"}\n\n");
} else if (ServerOnlyStreaming(method.get())) {
printer->Print(*vars,
"::grpc::ServerWriteReactor<$Response$>* "
"$ns$$Service$::CallbackService::$Method$(::grpc::"
"CallbackServerContext* /*context*/, const $Request$* "
"/*request*/) {\n"
" return nullptr; // user must override\n"
"}\n\n");
} else if (method->BidiStreaming()) {
printer->Print(*vars,
"::grpc::ServerBidiReactor<$Request$, $Response$>* "
"$ns$$Service$::CallbackService::$Method$(::grpc::"
"CallbackServerContext* /*context*/) {\n"
" return nullptr; // user must override\n"
"}\n\n");
}
}
printer->Print("#endif // GRPC_CALLBACK_API_NONEXPERIMENTAL\n");
}
}
} // namespace
} // namespace
grpc::string GetSourceServices(grpc_generator::File *file,
const Parameters &params) {
@@ -1519,6 +1771,7 @@ grpc::string GetSourceServices(grpc_generator::File *file,
}
for (int i = 0; i < file->service_count(); ++i) {
vars["generate_callback_api"] = params.generate_callback_api ? "1" : "0";
PrintSourceService(printer.get(), file->service(i).get(), &vars);
printer->Print("\n");
}
@@ -1601,12 +1854,11 @@ grpc::string GetMockIncludes(grpc_generator::File *file,
return output;
}
namespace {
static void PrintMockClientMethods(grpc_generator::Printer *printer,
const grpc_generator::Method *method,
std::map<grpc::string, grpc::string> *vars) {
const grpc_generator::Method *method,
std::map<grpc::string, grpc::string> *vars) {
(*vars)["Method"] = method->name();
(*vars)["Request"] = method->input_type_name();
(*vars)["Response"] = method->output_type_name();
@@ -1697,8 +1949,8 @@ static void PrintMockClientMethods(grpc_generator::Printer *printer,
}
static void PrintMockService(grpc_generator::Printer *printer,
const grpc_generator::Service *service,
std::map<grpc::string, grpc::string> *vars) {
const grpc_generator::Service *service,
std::map<grpc::string, grpc::string> *vars) {
(*vars)["Service"] = service->name();
printer->Print(*vars,
@@ -1712,7 +1964,7 @@ static void PrintMockService(grpc_generator::Printer *printer,
printer->Print("};\n");
}
} // namespace
} // namespace
grpc::string GetMockServices(grpc_generator::File *file,
const Parameters &params) {