mirror of
https://github.com/google/flatbuffers.git
synced 2026-06-02 04:04:19 +00:00
Fix Java generator bug that ignores bidi streaming attribute (#5063)
* Fix Java generator bug that ignores streaming: bidi attribute Tests * Java gRPC client streaming test * Java gRPC Bidi Streaming Test
This commit is contained in:
committed by
Wouter van Oortmerssen
parent
0143f4e364
commit
9635d494b3
@@ -22,21 +22,32 @@ import io.grpc.ManagedChannel;
|
||||
import io.grpc.ManagedChannelBuilder;
|
||||
import io.grpc.Server;
|
||||
import io.grpc.ServerBuilder;
|
||||
import io.grpc.stub.StreamObserver;
|
||||
import org.junit.Assert;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.InterruptedException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Iterator;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
|
||||
/**
|
||||
* Demonstrates basic client-server interaction using grpc-java over netty.
|
||||
*/
|
||||
public class JavaGrpcTest {
|
||||
static final String BIG_MONSTER_NAME = "big-monster";
|
||||
static final String BIG_MONSTER_NAME = "Cyberdemon";
|
||||
static final short nestedMonsterHp = 600;
|
||||
static final short nestedMonsterMana = 1024;
|
||||
static final int numStreamedMsgs = 10;
|
||||
static final int timeoutMs = 3000;
|
||||
static Server server;
|
||||
static ManagedChannel channel;
|
||||
static MonsterStorageGrpc.MonsterStorageBlockingStub blockingStub;
|
||||
static MonsterStorageGrpc.MonsterStorageStub asyncStub;
|
||||
|
||||
static class MyService extends MonsterStorageGrpc.MonsterStorageImplBase {
|
||||
@Override
|
||||
@@ -46,10 +57,7 @@ public class JavaGrpcTest {
|
||||
Assert.assertEquals(request.mana(), nestedMonsterMana);
|
||||
System.out.println("Received store request from " + request.name());
|
||||
// Create a response from the incoming request name.
|
||||
FlatBufferBuilder builder = new FlatBufferBuilder();
|
||||
int statOffset = Stat.createStat(builder, builder.createString("Hello " + request.name()), 100, 10);
|
||||
builder.finish(statOffset);
|
||||
Stat stat = Stat.getRootAsStat(builder.dataBuffer());
|
||||
Stat stat = GameFactory.createStat("Hello " + request.name(), 100, 10);
|
||||
responseObserver.onNext(stat);
|
||||
responseObserver.onCompleted();
|
||||
}
|
||||
@@ -58,55 +66,101 @@ public class JavaGrpcTest {
|
||||
public void retrieve(Stat request, io.grpc.stub.StreamObserver<Monster> responseObserver) {
|
||||
// Create 10 monsters for streaming response.
|
||||
for (int i=0; i<numStreamedMsgs; i++) {
|
||||
FlatBufferBuilder builder = new FlatBufferBuilder();
|
||||
int i1 = builder.createString(request.id() + " No." + i);
|
||||
Monster.startMonster(builder);
|
||||
Monster.addName(builder, i1);
|
||||
int i2 = Monster.endMonster(builder);
|
||||
Monster.finishMonsterBuffer(builder, i2);
|
||||
Monster monster = Monster.getRootAsMonster(builder.dataBuffer());
|
||||
Monster monster = GameFactory.createMonsterFromStat(request, i);
|
||||
responseObserver.onNext(monster);
|
||||
}
|
||||
responseObserver.onCompleted();
|
||||
}
|
||||
|
||||
@Override
|
||||
public StreamObserver<Monster> getMaxHitPoint(final StreamObserver<Stat> responseObserver) {
|
||||
return computeMinMax(responseObserver, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public StreamObserver<Monster> getMinMaxHitPoints(final StreamObserver<Stat> responseObserver) {
|
||||
return computeMinMax(responseObserver, true);
|
||||
}
|
||||
|
||||
private StreamObserver<Monster> computeMinMax(final StreamObserver<Stat> responseObserver, final boolean includeMin) {
|
||||
final AtomicInteger maxHp = new AtomicInteger(Integer.MIN_VALUE);
|
||||
final AtomicReference<String> maxHpMonsterName = new AtomicReference<String>();
|
||||
final AtomicInteger maxHpCount = new AtomicInteger();
|
||||
|
||||
final AtomicInteger minHp = new AtomicInteger(Integer.MAX_VALUE);
|
||||
final AtomicReference<String> minHpMonsterName = new AtomicReference<String>();
|
||||
final AtomicInteger minHpCount = new AtomicInteger();
|
||||
|
||||
return new StreamObserver<Monster>() {
|
||||
public void onNext(Monster monster) {
|
||||
if (monster.hp() > maxHp.get()) {
|
||||
// Found a monster of higher hit points.
|
||||
maxHp.set(monster.hp());
|
||||
maxHpMonsterName.set(monster.name());
|
||||
maxHpCount.set(1);
|
||||
}
|
||||
else if (monster.hp() == maxHp.get()) {
|
||||
// Count how many times we saw a monster of current max hit points.
|
||||
maxHpCount.getAndIncrement();
|
||||
}
|
||||
|
||||
if (monster.hp() < minHp.get()) {
|
||||
// Found a monster of a lower hit points.
|
||||
minHp.set(monster.hp());
|
||||
minHpMonsterName.set(monster.name());
|
||||
minHpCount.set(1);
|
||||
}
|
||||
else if (monster.hp() == minHp.get()) {
|
||||
// Count how many times we saw a monster of current min hit points.
|
||||
minHpCount.getAndIncrement();
|
||||
}
|
||||
}
|
||||
public void onCompleted() {
|
||||
Stat maxHpStat = GameFactory.createStat(maxHpMonsterName.get(), maxHp.get(), maxHpCount.get());
|
||||
// Send max hit points first.
|
||||
responseObserver.onNext(maxHpStat);
|
||||
if (includeMin) {
|
||||
// Send min hit points.
|
||||
Stat minHpStat = GameFactory.createStat(minHpMonsterName.get(), minHp.get(), minHpCount.get());
|
||||
responseObserver.onNext(minHpStat);
|
||||
}
|
||||
responseObserver.onCompleted();
|
||||
}
|
||||
public void onError(Throwable t) {
|
||||
// Not expected
|
||||
Assert.fail();
|
||||
};
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private static int startServer() throws IOException {
|
||||
Server server = ServerBuilder.forPort(0).addService(new MyService()).build().start();
|
||||
return server.getPort();
|
||||
}
|
||||
|
||||
@org.junit.Test
|
||||
public void testMonster() throws IOException {
|
||||
int port = startServer();
|
||||
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", port)
|
||||
@org.junit.BeforeClass
|
||||
public static void startServer() throws IOException {
|
||||
server = ServerBuilder.forPort(0).addService(new MyService()).build().start();
|
||||
int port = server.getPort();
|
||||
channel = ManagedChannelBuilder.forAddress("localhost", port)
|
||||
// Channels are secure by default (via SSL/TLS). For the example we disable TLS to avoid
|
||||
// needing certificates.
|
||||
.usePlaintext(true)
|
||||
.directExecutor()
|
||||
.build();
|
||||
blockingStub = MonsterStorageGrpc.newBlockingStub(channel);
|
||||
asyncStub = MonsterStorageGrpc.newStub(channel);
|
||||
}
|
||||
|
||||
MonsterStorageGrpc.MonsterStorageBlockingStub stub = MonsterStorageGrpc.newBlockingStub(channel);
|
||||
|
||||
FlatBufferBuilder builder = new FlatBufferBuilder();
|
||||
|
||||
int o_string = builder.createString(BIG_MONSTER_NAME);
|
||||
Monster.startMonster(builder);
|
||||
Monster.addName(builder, o_string);
|
||||
Monster.addHp(builder, nestedMonsterHp);
|
||||
Monster.addMana(builder, nestedMonsterMana);
|
||||
int monster1 = Monster.endMonster(builder);
|
||||
Monster.finishMonsterBuffer(builder, monster1);
|
||||
|
||||
ByteBuffer buffer = builder.dataBuffer();
|
||||
Monster monsterRequest = Monster.getRootAsMonster(buffer);
|
||||
Stat stat = stub.store(monsterRequest);
|
||||
@org.junit.Test
|
||||
public void testUnary() throws IOException {
|
||||
Monster monsterRequest = GameFactory.createMonster(BIG_MONSTER_NAME, nestedMonsterHp, nestedMonsterMana);
|
||||
Stat stat = blockingStub.store(monsterRequest);
|
||||
Assert.assertEquals(stat.id(), "Hello " + BIG_MONSTER_NAME);
|
||||
System.out.println("Received stat response from service: " + stat.id());
|
||||
}
|
||||
|
||||
|
||||
Iterator<Monster> iterator = stub.retrieve(stat);
|
||||
@org.junit.Test
|
||||
public void testServerStreaming() throws IOException {
|
||||
Monster monsterRequest = GameFactory.createMonster(BIG_MONSTER_NAME, nestedMonsterHp, nestedMonsterMana);
|
||||
Stat stat = blockingStub.store(monsterRequest);
|
||||
Iterator<Monster> iterator = blockingStub.retrieve(stat);
|
||||
int counter = 0;
|
||||
while(iterator.hasNext()) {
|
||||
Monster m = iterator.next();
|
||||
@@ -116,4 +170,73 @@ public class JavaGrpcTest {
|
||||
Assert.assertEquals(counter, numStreamedMsgs);
|
||||
System.out.println("FlatBuffers GRPC client/server test: completed successfully");
|
||||
}
|
||||
|
||||
@org.junit.Test
|
||||
public void testClientStreaming() throws IOException, InterruptedException {
|
||||
final AtomicReference<Stat> maxHitStat = new AtomicReference<Stat>();
|
||||
final CountDownLatch streamAlive = new CountDownLatch(1);
|
||||
|
||||
StreamObserver<Stat> statObserver = new StreamObserver<Stat>() {
|
||||
public void onCompleted() {
|
||||
streamAlive.countDown();
|
||||
}
|
||||
public void onError(Throwable ex) { }
|
||||
public void onNext(Stat stat) {
|
||||
maxHitStat.set(stat);
|
||||
}
|
||||
};
|
||||
StreamObserver<Monster> monsterStream = asyncStub.getMaxHitPoint(statObserver);
|
||||
short count = 10;
|
||||
for (short i = 0;i < count; ++i) {
|
||||
Monster monster = GameFactory.createMonster(BIG_MONSTER_NAME + i, (short) (nestedMonsterHp * i), nestedMonsterMana);
|
||||
monsterStream.onNext(monster);
|
||||
}
|
||||
monsterStream.onCompleted();
|
||||
// Wait a little bit for the server to send the stats of the monster with the max hit-points.
|
||||
streamAlive.await(timeoutMs, TimeUnit.MILLISECONDS);
|
||||
Assert.assertEquals(maxHitStat.get().id(), BIG_MONSTER_NAME + (count - 1));
|
||||
Assert.assertEquals(maxHitStat.get().val(), nestedMonsterHp * (count - 1));
|
||||
Assert.assertEquals(maxHitStat.get().count(), 1);
|
||||
}
|
||||
|
||||
@org.junit.Test
|
||||
public void testBiDiStreaming() throws IOException, InterruptedException {
|
||||
final AtomicReference<Stat> maxHitStat = new AtomicReference<Stat>();
|
||||
final AtomicReference<Stat> minHitStat = new AtomicReference<Stat>();
|
||||
final CountDownLatch streamAlive = new CountDownLatch(1);
|
||||
|
||||
StreamObserver<Stat> statObserver = new StreamObserver<Stat>() {
|
||||
public void onCompleted() {
|
||||
streamAlive.countDown();
|
||||
}
|
||||
public void onError(Throwable ex) { }
|
||||
public void onNext(Stat stat) {
|
||||
// We expect the server to send the max stat first and then the min stat.
|
||||
if (maxHitStat.get() == null) {
|
||||
maxHitStat.set(stat);
|
||||
}
|
||||
else {
|
||||
minHitStat.set(stat);
|
||||
}
|
||||
}
|
||||
};
|
||||
StreamObserver<Monster> monsterStream = asyncStub.getMinMaxHitPoints(statObserver);
|
||||
short count = 10;
|
||||
for (short i = 0;i < count; ++i) {
|
||||
Monster monster = GameFactory.createMonster(BIG_MONSTER_NAME + i, (short) (nestedMonsterHp * i), nestedMonsterMana);
|
||||
monsterStream.onNext(monster);
|
||||
}
|
||||
monsterStream.onCompleted();
|
||||
|
||||
// Wait a little bit for the server to send the stats of the monster with the max and min hit-points.
|
||||
streamAlive.await(timeoutMs, TimeUnit.MILLISECONDS);
|
||||
|
||||
Assert.assertEquals(maxHitStat.get().id(), BIG_MONSTER_NAME + (count - 1));
|
||||
Assert.assertEquals(maxHitStat.get().val(), nestedMonsterHp * (count - 1));
|
||||
Assert.assertEquals(maxHitStat.get().count(), 1);
|
||||
|
||||
Assert.assertEquals(minHitStat.get().id(), BIG_MONSTER_NAME + 0);
|
||||
Assert.assertEquals(minHitStat.get().val(), nestedMonsterHp * 0);
|
||||
Assert.assertEquals(minHitStat.get().count(), 1);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user