Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sqs distributed trace #2204

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
5 changes: 3 additions & 2 deletions instrumentation/aws-java-sdk-sqs-1.10.44/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ dependencies {


jar {
manifest { attributes 'Implementation-Title': 'com.newrelic.instrumentation.aws-java-sdk-sqs-1.10.44' }
manifest { attributes 'Implementation-Title': 'com.newrelic.instrumentation.aws-java-sdk-sqs-spans-1.10.44', 'Enabled': 'false',
obenkenobi marked this conversation as resolved.
Show resolved Hide resolved
'Implementation-Title-Alias': 'aws-java-sdk-sqs' }
}

verifyInstrumentation {
Expand All @@ -25,4 +26,4 @@ verifyInstrumentation {
site {
title 'AWS SQS'
type 'Messaging'
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,42 +7,187 @@

package com.amazonaws.services.sqs;

import com.amazonaws.services.sqs.model.MessageAttributeValue;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import com.amazonaws.services.sqs.model.SendMessageBatchRequest;
import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry;
import com.amazonaws.services.sqs.model.SendMessageBatchResult;
import com.amazonaws.services.sqs.model.SendMessageRequest;
import com.amazonaws.services.sqs.model.SendMessageResult;
import com.newrelic.api.agent.HeaderType;
import com.newrelic.api.agent.Headers;
import com.newrelic.api.agent.MessageConsumeParameters;
import com.newrelic.api.agent.MessageProduceParameters;
import com.newrelic.api.agent.NewRelic;
import com.newrelic.api.agent.Trace;
import com.newrelic.api.agent.weaver.MatchType;
import com.newrelic.api.agent.weaver.Weave;
import com.newrelic.api.agent.weaver.Weaver;
import com.newrelic.utils.MetricUtil;
import com.newrelic.utils.SqsV1Util;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;

@Weave(type = MatchType.Interface, originalName = "com.amazonaws.services.sqs.AmazonSQS")
public class AmazonSQS_Instrumentation {

@Trace
public SendMessageBatchResult sendMessageBatch(SendMessageBatchRequest sendMessageBatchRequest) {
MessageProduceParameters messageProduceParameters = MetricUtil.generateExternalProduceMetrics(sendMessageBatchRequest.getQueueUrl());
for (SendMessageBatchRequestEntry request : sendMessageBatchRequest.getEntries()) {
SQSBatchRequestHeaders headers = new SQSBatchRequestHeaders(request);
NewRelic.getAgent().getTransaction().insertDistributedTraceHeaders(headers);
}

MessageProduceParameters messageProduceParameters = SqsV1Util.generateExternalProduceMetrics(sendMessageBatchRequest.getQueueUrl());
NewRelic.getAgent().getTracedMethod().reportAsExternal(messageProduceParameters);
return Weaver.callOriginal();
}

@Trace
public SendMessageResult sendMessage(SendMessageRequest sendMessageRequest) {
MessageProduceParameters messageProduceParameters = MetricUtil.generateExternalProduceMetrics(sendMessageRequest.getQueueUrl());
SQSRequestHeaders headers = new SQSRequestHeaders(sendMessageRequest);
NewRelic.getAgent().getTransaction().insertDistributedTraceHeaders(headers);

MessageProduceParameters messageProduceParameters = SqsV1Util.generateExternalProduceMetrics(sendMessageRequest.getQueueUrl());
NewRelic.getAgent().getTracedMethod().reportAsExternal(messageProduceParameters);
return Weaver.callOriginal();
}

@Trace
public ReceiveMessageResult receiveMessage(ReceiveMessageRequest receiveMessageRequest) {
MessageConsumeParameters messageConsumeParameters = MetricUtil.generateExternalConsumeMetrics(receiveMessageRequest.getQueueUrl());
List<String> updatedMessageAttrNames = new ArrayList<>(receiveMessageRequest.getMessageAttributeNames());
Collections.addAll(updatedMessageAttrNames, SqsV1Util.DT_HEADERS);
receiveMessageRequest.setMessageAttributeNames(updatedMessageAttrNames);

MessageConsumeParameters messageConsumeParameters = SqsV1Util.generateExternalConsumeMetrics(receiveMessageRequest.getQueueUrl());
NewRelic.getAgent().getTracedMethod().reportAsExternal(messageConsumeParameters);
return Weaver.callOriginal();
}

public static class SQSRequestHeaders implements Headers {
private SendMessageRequest request = null;
public SQSRequestHeaders(SendMessageRequest req) {
request = req;
}
@Override
public HeaderType getHeaderType() {
return HeaderType.MESSAGE;
}
@Override
public String getHeader(String name) {
Map<String, MessageAttributeValue> messageAttributes = request.getMessageAttributes();
if(messageAttributes != null) {
MessageAttributeValue value = messageAttributes.get(name);
if (value != null && value.getDataType().equalsIgnoreCase("string")) {
return value.getStringValue();
}
}
Map<String, String> customRequestHeaders = request.getCustomRequestHeaders();
if(customRequestHeaders != null) {
return customRequestHeaders.get(name);
}
return null;
}
@Override
public Collection<String> getHeaders(String name) {
String value = getHeader(name);
if(value != null) {
return Collections.singletonList(value);
}
return Collections.emptyList();
}
@Override
public void setHeader(String name, String value) {
if(request != null) {
Map<String, MessageAttributeValue> existingAttributes = request.getMessageAttributes();
if(!existingAttributes.containsKey(name)) {
request.addMessageAttributesEntry(name, new MessageAttributeValue().withDataType("String").withStringValue(value));
}
}
}
@Override
public void addHeader(String name, String value) {
if(request != null) {
Map<String, MessageAttributeValue> existingAttributes = request.getMessageAttributes();
if(!existingAttributes.containsKey(name)) {
request.addMessageAttributesEntry(name, new MessageAttributeValue().withDataType("String").withStringValue(value));
}
}
}
@Override
public Collection<String> getHeaderNames() {
Map<String, MessageAttributeValue> messageAttributes = request.getMessageAttributes();
return messageAttributes.keySet();
}
@Override
public boolean containsHeader(String name) {
return getHeaderNames().contains(name);
}
}

public static class SQSBatchRequestHeaders implements Headers {
private SendMessageBatchRequestEntry requestEntry = null;
public SQSBatchRequestHeaders(SendMessageBatchRequestEntry re) {
requestEntry = re;
}
@Override
public HeaderType getHeaderType() {
return HeaderType.MESSAGE;
}
@Override
public String getHeader(String name) {
Map<String, MessageAttributeValue> attributes = requestEntry.getMessageAttributes();
if(attributes != null) {
MessageAttributeValue value = attributes.get(name);
if(value != null) {
String dataType = value.getDataType();
if(dataType.equalsIgnoreCase("String")) {
String stringValue = value.getStringValue();
if(stringValue != null) return stringValue;
}
}
}
return null;
}
@Override
public Collection<String> getHeaders(String name) {
obenkenobi marked this conversation as resolved.
Show resolved Hide resolved
List<String> list = new ArrayList<String>();
String value = getHeader(name);
if(value != null && !value.isEmpty()) {
list.add(value);
}
return list;
}
@Override
public void setHeader(String name, String value) {
if(requestEntry != null) {
requestEntry.addMessageAttributesEntry(name, new MessageAttributeValue().withDataType("String").withStringValue(value));
}
}
@Override
public void addHeader(String name, String value) {
if(requestEntry != null) {
requestEntry.addMessageAttributesEntry(name, new MessageAttributeValue().withDataType("String").withStringValue(value));
}
}
@Override
public Collection<String> getHeaderNames() {
if(requestEntry != null) {
Map<String, MessageAttributeValue> attributes = requestEntry.getMessageAttributes();
if(attributes != null) {
return attributes.keySet();
}
}
return Collections.emptyList();
}
@Override
public boolean containsHeader(String name) {
return getHeaderNames().contains(name);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@
import com.newrelic.api.agent.Segment;
import com.newrelic.api.agent.weaver.Weaver;

public class MetricUtil {
public class SqsV1Util {

public static final String LIBRARY = "SQS";
public static final String OTEL_LIBRARY = "aws_sqs";
public static final String[] DT_HEADERS = new String[] {"newrelic","NEWRELIC","NewRelic","tracestate","TraceState","TRACESTATE"};

public static MessageProduceParameters generateExternalProduceMetrics(String queueUrl) {
DestinationData destinationData = DestinationData.parse(queueUrl);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@
import com.amazonaws.services.sqs.model.CreateQueueResult;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.SendMessageBatchRequest;
import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry;
import com.amazonaws.services.sqs.model.SendMessageRequest;
import com.newrelic.agent.introspec.InstrumentationTestConfig;
import com.newrelic.agent.introspec.InstrumentationTestRunner;
import com.newrelic.agent.introspec.Introspector;
import com.newrelic.agent.introspec.TracedMetricData;
import com.newrelic.api.agent.Trace;
import com.newrelic.utils.SqsV1Util;
import org.elasticmq.NodeAddress;
import org.elasticmq.rest.sqs.SQSRestServer;
import org.elasticmq.rest.sqs.SQSRestServerBuilder;
Expand All @@ -27,7 +29,13 @@
import org.junit.Test;
import org.junit.runner.RunWith;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

@RunWith(InstrumentationTestRunner.class)
@InstrumentationTestConfig(includePrefixes = { "com.amazonaws.services.sqs" }, configName = "dt_enabled.yml")
Expand Down Expand Up @@ -67,7 +75,13 @@ public static void afterClass() {
@Test
public void testSendMessage() {
Introspector introspector = InstrumentationTestRunner.getIntrospector();
sendMessageRequest();
SendMessageRequest request = sendMessageRequest();

Set<String> dtHeaders = new HashSet<>(Arrays.asList(SqsV1Util.DT_HEADERS));
boolean containsDtHeaders = request.getMessageAttributes().entrySet().stream().anyMatch(e ->
dtHeaders.contains(e.getKey()) && e.getValue() != null && !e.getValue().getStringValue().isEmpty());
assertTrue("Message request must contain headers", containsDtHeaders);

assertEquals(1, introspector.getFinishedTransactionCount(10000));

String txName = introspector.getTransactionNames().iterator().next();
Expand All @@ -77,9 +91,17 @@ public void testSendMessage() {
@Test
public void testSendMessageBatch() {
Introspector introspector = InstrumentationTestRunner.getIntrospector();
sendMessageBatch();
SendMessageBatchRequest request = sendMessageBatch();
assertEquals(1, introspector.getFinishedTransactionCount(10000));

Set<String> dtHeaders = new HashSet<>(Arrays.asList(SqsV1Util.DT_HEADERS));
assertFalse("Batch request must contain at least one entry", request.getEntries().isEmpty());
for (SendMessageBatchRequestEntry entry: request.getEntries()) {
boolean containsDtHeaders = entry.getMessageAttributes().entrySet().stream().anyMatch(e ->
dtHeaders.contains(e.getKey()) && e.getValue() != null && !e.getValue().getStringValue().isEmpty());
assertTrue("Message entry must contain headers", containsDtHeaders);
}

String txName = introspector.getTransactionNames().iterator().next();
checkScopedMetricCount(txName, "MessageBroker/SQS/Queue/Produce/Named/" + QUEUE_NAME, 1);
}
Expand All @@ -95,15 +117,24 @@ public void testReceiveMessage() {
}

@Trace(dispatcher = true)
private void sendMessageRequest() {
private SendMessageRequest sendMessageRequest() {
SendMessageRequest request = (new SendMessageRequest()).withQueueUrl(queueUrl).withMessageBody("body");
sqsClient.sendMessage(request);
return request;
}

@Trace(dispatcher = true)
private void sendMessageBatch() {
SendMessageBatchRequest request = (new SendMessageBatchRequest()).withQueueUrl(queueUrl);
sqsClient.sendMessageBatch(request);
private SendMessageBatchRequest sendMessageBatch() {
SendMessageBatchRequestEntry entry = new SendMessageBatchRequestEntry();
SendMessageBatchRequest request = (new SendMessageBatchRequest()).withQueueUrl(queueUrl)
.withEntries(entry);
try {
sqsClient.sendMessageBatch(request);
} catch (Exception e) {
// Do nothing
}

return request;
}

@Trace(dispatcher = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
import org.junit.Assert;
import org.junit.Test;

public class MetricUtilTest {
public class SqsV1UtilTest {

@Test
public void testGenerateProduceMetricsGoodQueueName() {
MessageProduceParameters messageProduceParameters = MetricUtil.generateExternalProduceMetrics("path/myQueue");
MessageProduceParameters messageProduceParameters = SqsV1Util.generateExternalProduceMetrics("path/myQueue");
Assert.assertEquals("SQS", messageProduceParameters.getLibrary());
Assert.assertEquals("aws_sqs", messageProduceParameters.getOtelLibrary());
Assert.assertEquals("myQueue", messageProduceParameters.getDestinationName());
Expand All @@ -28,7 +28,7 @@ public void testGenerateProduceMetricsGoodQueueName() {

@Test
public void testGenerateProduceMetricsBadQueueName() {
MessageProduceParameters messageProduceParameters = MetricUtil.generateExternalProduceMetrics("path");
MessageProduceParameters messageProduceParameters = SqsV1Util.generateExternalProduceMetrics("path");
Assert.assertEquals("SQS", messageProduceParameters.getLibrary());
Assert.assertEquals("aws_sqs", messageProduceParameters.getOtelLibrary());
Assert.assertEquals("unknown", messageProduceParameters.getDestinationName());
Expand All @@ -39,7 +39,7 @@ public void testGenerateProduceMetricsBadQueueName() {

@Test
public void testGenerateConsumeMetricsGoodQueueName() {
MessageConsumeParameters messageConsumeParameters = MetricUtil.generateExternalConsumeMetrics("path/myQueue");
MessageConsumeParameters messageConsumeParameters = SqsV1Util.generateExternalConsumeMetrics("path/myQueue");
Assert.assertEquals("SQS", messageConsumeParameters.getLibrary());
Assert.assertEquals("aws_sqs", messageConsumeParameters.getOtelLibrary());
Assert.assertEquals("myQueue", messageConsumeParameters.getDestinationName());
Expand All @@ -50,7 +50,7 @@ public void testGenerateConsumeMetricsGoodQueueName() {

@Test
public void testGenerateConsumeMetricsBadQueueName() {
MessageConsumeParameters messageConsumeParameters = MetricUtil.generateExternalConsumeMetrics("path");
MessageConsumeParameters messageConsumeParameters = SqsV1Util.generateExternalConsumeMetrics("path");
Assert.assertEquals("SQS", messageConsumeParameters.getLibrary());
Assert.assertEquals("aws_sqs", messageConsumeParameters.getOtelLibrary());
Assert.assertEquals("unknown", messageConsumeParameters.getDestinationName());
Expand All @@ -61,7 +61,7 @@ public void testGenerateConsumeMetricsBadQueueName() {

@Test
public void testGenerateConsumeAwsUrl() {
MessageConsumeParameters messageConsumeParameters = MetricUtil.generateExternalConsumeMetrics("https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue");
MessageConsumeParameters messageConsumeParameters = SqsV1Util.generateExternalConsumeMetrics("https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue");
Assert.assertEquals("SQS", messageConsumeParameters.getLibrary());
Assert.assertEquals("aws_sqs", messageConsumeParameters.getOtelLibrary());
Assert.assertEquals("MyQueue", messageConsumeParameters.getDestinationName());
Expand All @@ -72,7 +72,7 @@ public void testGenerateConsumeAwsUrl() {

@Test
public void testGenerateConsumeOtherUrl() {
MessageConsumeParameters messageConsumeParameters = MetricUtil.generateExternalConsumeMetrics("https://localhost/123456789012/MyQueue");
MessageConsumeParameters messageConsumeParameters = SqsV1Util.generateExternalConsumeMetrics("https://localhost/123456789012/MyQueue");
Assert.assertEquals("SQS", messageConsumeParameters.getLibrary());
Assert.assertEquals("aws_sqs", messageConsumeParameters.getOtelLibrary());
Assert.assertEquals("MyQueue", messageConsumeParameters.getDestinationName());
Expand All @@ -83,7 +83,7 @@ public void testGenerateConsumeOtherUrl() {

@Test
public void testGenerateProduceAwsUrl() {
MessageProduceParameters messageProduceParameters = MetricUtil.generateExternalProduceMetrics("https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue");
MessageProduceParameters messageProduceParameters = SqsV1Util.generateExternalProduceMetrics("https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue");
Assert.assertEquals("SQS", messageProduceParameters.getLibrary());
Assert.assertEquals("aws_sqs", messageProduceParameters.getOtelLibrary());
Assert.assertEquals("MyQueue", messageProduceParameters.getDestinationName());
Expand All @@ -94,7 +94,7 @@ public void testGenerateProduceAwsUrl() {

@Test
public void testGenerateProduceOtherUrl() {
MessageProduceParameters messageProduceParameters = MetricUtil.generateExternalProduceMetrics("https://localhost/123456789012/MyQueue");
MessageProduceParameters messageProduceParameters = SqsV1Util.generateExternalProduceMetrics("https://localhost/123456789012/MyQueue");
Assert.assertEquals("SQS", messageProduceParameters.getLibrary());
Assert.assertEquals("aws_sqs", messageProduceParameters.getOtelLibrary());
Assert.assertEquals("MyQueue", messageProduceParameters.getDestinationName());
Expand Down
Loading
Loading