diff --git a/cmd/query/app/querysvc/v2/querysvc/service.go b/cmd/query/app/querysvc/v2/querysvc/service.go index bfcc98053c3..bfc1560dced 100644 --- a/cmd/query/app/querysvc/v2/querysvc/service.go +++ b/cmd/query/app/querysvc/v2/querysvc/service.go @@ -181,23 +181,34 @@ func (qs QueryService) receiveTraces( yield func([]ptrace.Traces, error) bool, rawTraces bool, ) (map[pcommon.TraceID]struct{}, bool) { - aggregatedTraces := jptrace.AggregateTraces(seq) foundTraceIDs := make(map[pcommon.TraceID]struct{}) proceed := true - aggregatedTraces(func(trace ptrace.Traces, err error) bool { + + processTraces := func(traces []ptrace.Traces, err error) bool { if err != nil { proceed = yield(nil, err) return proceed } - if !rawTraces { - qs.options.Adjuster.Adjust(trace) + for _, trace := range traces { + if !rawTraces { + qs.options.Adjuster.Adjust(trace) + } + jptrace.SpanIter(trace)(func(_ jptrace.SpanIterPos, span ptrace.Span) bool { + foundTraceIDs[span.TraceID()] = struct{}{} + return true + }) } - jptrace.SpanIter(trace)(func(_ jptrace.SpanIterPos, span ptrace.Span) bool { - foundTraceIDs[span.TraceID()] = struct{}{} - return true - }) - proceed = yield([]ptrace.Traces{trace}, nil) + proceed = yield(traces, nil) return proceed - }) + } + + if rawTraces { + seq(processTraces) + } else { + jptrace.AggregateTraces(seq)(func(trace ptrace.Traces, err error) bool { + return processTraces([]ptrace.Traces{trace}, err) + }) + } + return foundTraceIDs, proceed } diff --git a/cmd/query/app/querysvc/v2/querysvc/service_test.go b/cmd/query/app/querysvc/v2/querysvc/service_test.go index 4794c2d4c99..a8de595afa1 100644 --- a/cmd/query/app/querysvc/v2/querysvc/service_test.go +++ b/cmd/query/app/querysvc/v2/querysvc/service_test.go @@ -15,6 +15,7 @@ import ( "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" + "github.com/jaegertracing/jaeger/cmd/query/app/querysvc/v2/adjuster" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/iter" "github.com/jaegertracing/jaeger/storage_v2/depstore" @@ -58,6 +59,12 @@ func withArchiveTraceWriter() testOption { } } +func withAdjuster(adj adjuster.Adjuster) testOption { + return func(_ *testQueryService, options *QueryServiceOptions) { + options.Adjuster = adj + } +} + func initializeTestService(opts ...testOption) *testQueryService { traceReader := &tracestoremocks.Reader{} dependencyStorage := &depstoremocks.Reader{} @@ -89,7 +96,6 @@ func makeTestTrace() ptrace.Traces { spanB := scopes.Spans().AppendEmpty() spanB.SetTraceID(testTraceID) spanB.SetSpanID(pcommon.SpanID([8]byte{2})) - spanB.Attributes() return trace } @@ -295,7 +301,7 @@ func TestFindTraces_Success(t *testing.T) { require.EqualValues(t, [8]byte{2}, gotSpans.At(1).SpanID()) } -func TestFindTraces_WithRawTraces(t *testing.T) { +func TestFindTraces_WithRawTraces_PerformsAdjustment(t *testing.T) { tests := []struct { rawTraces bool attributes pcommon.Map @@ -376,6 +382,139 @@ func TestFindTraces_WithRawTraces(t *testing.T) { } } +func TestFindTraces_WithRawTraces_PerformsAggregation(t *testing.T) { + tests := []struct { + rawTraces bool + traces []ptrace.Traces + expected []ptrace.Traces + expectedAdjustCalls int + }{ + { + rawTraces: true, + traces: func() []ptrace.Traces { + traceA := ptrace.NewTraces() + resourcesA := traceA.ResourceSpans().AppendEmpty() + scopesA := resourcesA.ScopeSpans().AppendEmpty() + spanA := scopesA.Spans().AppendEmpty() + spanA.SetTraceID(testTraceID) + spanA.SetName("spanA") + spanA.SetSpanID(pcommon.SpanID([8]byte{1})) + + traceB := ptrace.NewTraces() + resourcesB := traceB.ResourceSpans().AppendEmpty() + scopesB := resourcesB.ScopeSpans().AppendEmpty() + spanB := scopesB.Spans().AppendEmpty() + spanB.SetTraceID(testTraceID) + spanB.SetName("spanB") + spanB.SetSpanID(pcommon.SpanID([8]byte{2})) + + return []ptrace.Traces{traceA, traceB} + }(), + expected: func() []ptrace.Traces { + traceA := ptrace.NewTraces() + resourcesA := traceA.ResourceSpans().AppendEmpty() + scopesA := resourcesA.ScopeSpans().AppendEmpty() + spanA := scopesA.Spans().AppendEmpty() + spanA.SetTraceID(testTraceID) + spanA.SetName("spanA") + spanA.SetSpanID(pcommon.SpanID([8]byte{1})) + + traceB := ptrace.NewTraces() + resourcesB := traceB.ResourceSpans().AppendEmpty() + scopesB := resourcesB.ScopeSpans().AppendEmpty() + spanB := scopesB.Spans().AppendEmpty() + spanB.SetTraceID(testTraceID) + spanB.SetName("spanB") + spanB.SetSpanID(pcommon.SpanID([8]byte{2})) + + return []ptrace.Traces{traceA, traceB} + }(), + expectedAdjustCalls: 0, + }, + { + rawTraces: false, + traces: func() []ptrace.Traces { + traceA := ptrace.NewTraces() + resourcesA := traceA.ResourceSpans().AppendEmpty() + scopesA := resourcesA.ScopeSpans().AppendEmpty() + spanA := scopesA.Spans().AppendEmpty() + spanA.SetTraceID(testTraceID) + spanA.SetSpanID(pcommon.SpanID([8]byte{1})) + + traceB := ptrace.NewTraces() + resourcesB := traceB.ResourceSpans().AppendEmpty() + scopesB := resourcesB.ScopeSpans().AppendEmpty() + spanB := scopesB.Spans().AppendEmpty() + spanB.SetTraceID(testTraceID) + spanB.SetSpanID(pcommon.SpanID([8]byte{2})) + + return []ptrace.Traces{traceA, traceB} + }(), + expected: func() []ptrace.Traces { + traceA := ptrace.NewTraces() + resourcesA := traceA.ResourceSpans().AppendEmpty() + scopesA := resourcesA.ScopeSpans().AppendEmpty() + spanA := scopesA.Spans().AppendEmpty() + spanA.SetTraceID(testTraceID) + spanA.SetSpanID(pcommon.SpanID([8]byte{1})) + + resourcesB := ptrace.NewResourceSpans() + scopesB := resourcesB.ScopeSpans().AppendEmpty() + spanB := scopesB.Spans().AppendEmpty() + spanB.SetTraceID(testTraceID) + spanB.SetSpanID(pcommon.SpanID([8]byte{2})) + + resourcesB.CopyTo(traceA.ResourceSpans().AppendEmpty()) + + return []ptrace.Traces{traceA} + }(), + // even though there are 2 input chunks, they are for the same trace, + // so we expect only 1 call to Adjuster. + expectedAdjustCalls: 1, + }, + } + for _, test := range tests { + t.Run(fmt.Sprintf("rawTraces=%v", test.rawTraces), func(t *testing.T) { + responseIter := iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) { + yield(test.traces, nil) + }) + adjustCalls := 0 + adj := adjuster.Func(func(_ ptrace.Traces) { + adjustCalls++ + }) + + tqs := initializeTestService(withAdjuster(adj)) + duration, err := time.ParseDuration("20ms") + require.NoError(t, err) + now := time.Now() + tqs.traceReader.On("FindTraces", mock.Anything, tracestore.TraceQueryParams{ + ServiceName: "service", + OperationName: "operation", + StartTimeMax: now, + DurationMin: duration, + NumTraces: 200, + }). + Return(responseIter).Once() + + query := TraceQueryParams{ + TraceQueryParams: tracestore.TraceQueryParams{ + ServiceName: "service", + OperationName: "operation", + StartTimeMax: now, + DurationMin: duration, + NumTraces: 200, + }, + RawTraces: test.rawTraces, + } + getTracesIter := tqs.queryService.FindTraces(context.Background(), query) + gotTraces, err := iter.FlattenWithErrors(getTracesIter) + require.NoError(t, err) + assert.Equal(t, test.expected, gotTraces) + assert.Equal(t, test.expectedAdjustCalls, adjustCalls) + }) + } +} + func TestArchiveTrace(t *testing.T) { tests := []struct { name string