Skip to content

Commit

Permalink
Merge pull request #45683 from ozangunalp/vertx_event_bus_consumer_co…
Browse files Browse the repository at this point in the history
…ntext

Dispatch Vert.x event bus consumer events in correct context
  • Loading branch information
cescoffier authored Jan 19, 2025
2 parents cd5792e + 1ea7599 commit 9104473
Show file tree
Hide file tree
Showing 10 changed files with 380 additions and 83 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package io.quarkus.vertx.runtime;

import static io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle.setContextSafe;
import static io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle.setCurrentContextSafe;
import static io.smallrye.common.expression.Expression.Flag.LENIENT_SYNTAX;
import static io.smallrye.common.expression.Expression.Flag.NO_TRIM;
Expand Down Expand Up @@ -32,9 +31,7 @@
import io.quarkus.virtual.threads.VirtualThreadsRecorder;
import io.smallrye.common.expression.Expression;
import io.smallrye.common.expression.ResolveContext;
import io.smallrye.common.vertx.VertxContext;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
Expand Down Expand Up @@ -123,35 +120,30 @@ public void handle(Void x) {
consumer.handler(new Handler<Message<Object>>() {
@Override
public void handle(Message<Object> m) {
// Will run on the context used for the consumer registration.
// It's a duplicated context, but we need to mark it as safe.
// The safety comes from the fact that it's instantiated by Vert.x for every
// message.
setCurrentContextSafe(true);
if (blocking) {
// We need to create a duplicated context from the "context"
Context dup = VertxContext.getOrCreateDuplicatedContext(context);
setContextSafe(dup, true);

if (runOnVirtualThread) {
// Switch to a Vert.x context to capture it and use it during the invocation.
dup.runOnContext(new Handler<Void>() {
VirtualThreadsRecorder.getCurrent().execute(new Runnable() {
@Override
public void handle(Void event) {
VirtualThreadsRecorder.getCurrent().execute(new Runnable() {
@Override
public void run() {
try {
invoker.invoke(m);
} catch (Exception e) {
if (m.replyAddress() == null) {
// No reply handler
throw wrapIfNecessary(e);
} else {
m.fail(ConsumeEvent.FAILURE_CODE, e.toString());
}
}
public void run() {
try {
invoker.invoke(m);
} catch (Exception e) {
if (m.replyAddress() == null) {
// No reply handler
throw wrapIfNecessary(e);
} else {
m.fail(ConsumeEvent.FAILURE_CODE, e.toString());
}
});
}
}
});
} else {
Future<Void> future = dup.executeBlocking(new Callable<Void>() {
Future<Void> future = Vertx.currentContext().executeBlocking(new Callable<Void>() {
@Override
public Void call() {
try {
Expand All @@ -170,11 +162,6 @@ public Void call() {
future.onFailure(context::reportException);
}
} else {
// Will run on the context used for the consumer registration.
// It's a duplicated context, but we need to mark it as safe.
// The safety comes from the fact that it's instantiated by Vert.x for every
// message.
setCurrentContextSafe(true);
try {
invoker.invoke(m);
} catch (Exception e) {
Expand Down
17 changes: 17 additions & 0 deletions integration-tests/opentelemetry-vertx/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-vertx-http</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest-jackson</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-micrometer</artifactId>
Expand Down Expand Up @@ -72,6 +76,19 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest-jackson-deployment</artifactId>
<version>${project.version}</version>
<type>pom</type>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-vertx-http-deployment</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package io.quarkus.it.opentelemetry.vertx;

import jakarta.enterprise.context.ApplicationScoped;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.instrumentation.annotations.WithSpan;
import io.quarkus.logging.Log;
import io.quarkus.vertx.ConsumeEvent;
import io.smallrye.common.annotation.Blocking;
import io.smallrye.common.annotation.RunOnVirtualThread;
import io.vertx.core.MultiMap;

@ApplicationScoped
public class EventBusConsumer {

@ConsumeEvent("pets")
// non-blocking
public String sayHi(Pet pet) {
Log.infov("Received a pet: {0} {1}", pet, Span.current());
process();
return "Hello " + pet.getName() + " (" + pet.getKind() + ")";
}

@ConsumeEvent("persons")
@Blocking
public String name(String name) {
Log.infov("Received a pet: {0} {1}", name, Span.current());
process();
return "Hello " + name;
}

@ConsumeEvent("person-headers")
@RunOnVirtualThread
public String personWithHeader(MultiMap headers, Person person) {
Log.infov("Received a person: {0} {1}", person, Span.current());
process();
String s = "Hello " + person.getFirstName() + " " + person.getLastName() + ", " + headers;
return s;
}

@WithSpan
public void process() {

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package io.quarkus.it.opentelemetry.vertx;

import jakarta.inject.Inject;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;

import io.smallrye.mutiny.Uni;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.json.JsonObject;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.core.eventbus.Message;

@Path("/event-bus")
public class EventBusSender {

@Inject
EventBus bus;

@POST
@Path("/person")
public Uni<String> helloToPerson(JsonObject json) {
return bus.<String> request("persons", json.getString("name"))
.onItem().transform(Message::body);
}

@POST
@Path("/person2")
@Produces("text/plain")
public Uni<String> helloToPersonWithHeaders(JsonObject json) {
return bus.<String> request(
"person-headers",
new Person(json.getString("firstName"), json.getString("lastName")),
new DeliveryOptions().addHeader("header", "headerValue"))
.onItem().transform(Message::body);
}

@POST
@Path("/pet")
public Uni<String> helloToPet(JsonObject json) {
return bus.<String> request("pets", new Pet(json.getString("name"), json.getString("kind")))
.onItem().transform(Message::body);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package io.quarkus.it.opentelemetry.vertx;

import io.quarkus.runtime.annotations.RegisterForReflection;

@RegisterForReflection
public class Person {

private String firstName;
private String lastName;

public Person(String firstName, String lastName) {
this.firstName = firstName;
this.lastName = lastName;
}

public Person() {
// Used by reflection.
}

public String getFirstName() {
return firstName;
}

public Person setFirstName(String firstName) {
this.firstName = firstName;
return this;
}

public String getLastName() {
return lastName;
}

public Person setLastName(String lastName) {
this.lastName = lastName;
return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package io.quarkus.it.opentelemetry.vertx;

/**
* Simple pojo.
* The test using this pojo will use the generic codec facility.
*/
public class Pet {

private final String name;

private final String kind;

public Pet(String name, String kind) {
this.name = name;
this.kind = kind;
}

public String getName() {
return name;
}

public String getKind() {
return kind;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.quarkus.it.opentelemetry.vertx;

import io.quarkus.test.junit.QuarkusIntegrationTest;

@QuarkusIntegrationTest
public class EventBusIT extends EventBusTest {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package io.quarkus.it.opentelemetry.vertx;

import static io.restassured.RestAssured.given;
import static java.net.HttpURLConnection.HTTP_OK;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.jupiter.api.Assertions.assertNotEquals;

import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.hamcrest.CoreMatchers;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import io.opentelemetry.api.trace.SpanId;
import io.opentelemetry.api.trace.SpanKind;
import io.quarkus.test.junit.QuarkusTest;
import io.restassured.http.ContentType;
import io.vertx.core.json.JsonObject;

@QuarkusTest
public class EventBusTest extends SpanExporterBaseTest {

@BeforeEach
void reset() {
given().get("/reset").then().statusCode(HTTP_OK);
await().atMost(5, TimeUnit.SECONDS).until(() -> getSpans().isEmpty());
}

@Test
public void testEventBusWithString() {
String body = new JsonObject().put("name", "Bob Morane").toString();
given().contentType(ContentType.JSON).body(body)
.post("/event-bus/person")
.then().statusCode(200).body(equalTo("Hello Bob Morane"));

await().atMost(5, TimeUnit.SECONDS).until(() -> getSpans().size() >= 3);
List<Map<String, Object>> spans = getSpans();

Map<String, Object> serverCall = getSpanByKindAndParentId(spans, SpanKind.SERVER, SpanId.getInvalid());
String spanId = getSpanId(serverCall);
assertNotEquals(SpanId.getInvalid(), spanId);

Map<String, Object> producerSpan = getSpanByKindAndParentId(spans, SpanKind.PRODUCER, spanId);
String producerSpanId = getSpanId(producerSpan);
assertNotEquals(SpanId.getInvalid(), producerSpanId);

Map<String, Object> consumerSpan = getSpanByKindAndParentId(spans, SpanKind.CONSUMER, producerSpanId);
String consumerSpanId = getSpanId(consumerSpan);
assertNotEquals(SpanId.getInvalid(), consumerSpanId);

Map<String, Object> methodCallSpan = getSpanByKindAndParentId(spans, SpanKind.INTERNAL, consumerSpanId);
String methodCallSpanId = getSpanId(methodCallSpan);
assertNotEquals(SpanId.getInvalid(), methodCallSpanId);
}

@Test
public void testEventBusWithObjectAndHeader() {
String body = new JsonObject()
.put("firstName", "Bob")
.put("lastName", "Morane")
.toString();
given().contentType(ContentType.JSON).body(body)
.post("/event-bus/person2")
.then().statusCode(200)
// For some reason Multimap.toString() has \n at the end.
.body(CoreMatchers.startsWith("Hello Bob Morane, header=headerValue\n"));

await().atMost(5, TimeUnit.SECONDS).until(() -> getSpans().size() >= 3);
List<Map<String, Object>> spans = getSpans();

Map<String, Object> serverCall = getSpanByKindAndParentId(spans, SpanKind.SERVER, SpanId.getInvalid());
String spanId = getSpanId(serverCall);
assertNotEquals(SpanId.getInvalid(), spanId);

Map<String, Object> producerSpan = getSpanByKindAndParentId(spans, SpanKind.PRODUCER, spanId);
String producerSpanId = getSpanId(producerSpan);
assertNotEquals(SpanId.getInvalid(), producerSpanId);

Map<String, Object> consumerSpan = getSpanByKindAndParentId(spans, SpanKind.CONSUMER, producerSpanId);
String consumerSpanId = getSpanId(consumerSpan);
assertNotEquals(SpanId.getInvalid(), consumerSpanId);

Map<String, Object> methodCallSpan = getSpanByKindAndParentId(spans, SpanKind.INTERNAL, consumerSpanId);
String methodCallSpanId = getSpanId(methodCallSpan);
assertNotEquals(SpanId.getInvalid(), methodCallSpanId);
}

@Test
public void testEventBusWithPet() {
String body = new JsonObject().put("name", "Neo").put("kind", "rabbit").toString();
given().contentType(ContentType.JSON).body(body)
.post("/event-bus/pet")
.then().statusCode(200).body(equalTo("Hello Neo (rabbit)"));

await().atMost(5, TimeUnit.SECONDS).until(() -> getSpans().size() >= 3);
List<Map<String, Object>> spans = getSpans();

Map<String, Object> serverCall = getSpanByKindAndParentId(spans, SpanKind.SERVER, SpanId.getInvalid());
String spanId = getSpanId(serverCall);
assertNotEquals(SpanId.getInvalid(), spanId);

Map<String, Object> producerSpan = getSpanByKindAndParentId(spans, SpanKind.PRODUCER, spanId);
String producerSpanId = getSpanId(producerSpan);
assertNotEquals(SpanId.getInvalid(), producerSpanId);

Map<String, Object> consumerSpan = getSpanByKindAndParentId(spans, SpanKind.CONSUMER, producerSpanId);
String consumerSpanId = getSpanId(consumerSpan);
assertNotEquals(SpanId.getInvalid(), consumerSpanId);

Map<String, Object> methodCallSpan = getSpanByKindAndParentId(spans, SpanKind.INTERNAL, consumerSpanId);
String methodCallSpanId = getSpanId(methodCallSpan);
assertNotEquals(SpanId.getInvalid(), methodCallSpanId);
}
}
Loading

0 comments on commit 9104473

Please sign in to comment.