From a985a2b7eb3b5fcc14e5b542d6da6fea3b880217 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A1vio=20Lucena?= Date: Sun, 24 Aug 2025 21:49:54 -0700 Subject: [PATCH] feat: add support for OpenTelemetry messaging/queue system spans Adds parsing logic for OTel spans with messaging.system attributes to properly categorize queue operations. Producer spans get 'queue.publish' operation and consumer spans get 'queue.process' operation. The span description is extracted from the span name (e.g., "Sidekiq::Worker" from "Sidekiq::Worker publish"). --- .../sentry/opentelemetry/span_processor.rb | 3 + .../opentelemetry/span_processor_spec.rb | 78 ++++++++++++++++++- 2 files changed, 79 insertions(+), 2 deletions(-) diff --git a/sentry-opentelemetry/lib/sentry/opentelemetry/span_processor.rb b/sentry-opentelemetry/lib/sentry/opentelemetry/span_processor.rb index e898c6b33..34efaee27 100644 --- a/sentry-opentelemetry/lib/sentry/opentelemetry/span_processor.rb +++ b/sentry-opentelemetry/lib/sentry/opentelemetry/span_processor.rb @@ -142,6 +142,9 @@ def parse_span_description(otel_span) statement = otel_span.attributes[SEMANTIC_CONVENTIONS::DB_STATEMENT] description = statement if statement + elsif (messaging_system = otel_span.attributes[SEMANTIC_CONVENTIONS::MESSAGING_SYSTEM]) + op = "queue.#{otel_span.kind == :producer ? "publish" : "process"}" + description = description&.split(" ")&.first&.strip || messaging_system end [op, description] diff --git a/sentry-opentelemetry/spec/sentry/opentelemetry/span_processor_spec.rb b/sentry-opentelemetry/spec/sentry/opentelemetry/span_processor_spec.rb index 194af2c52..29d8dd9e9 100644 --- a/sentry-opentelemetry/spec/sentry/opentelemetry/span_processor_spec.rb +++ b/sentry-opentelemetry/spec/sentry/opentelemetry/span_processor_spec.rb @@ -97,6 +97,30 @@ tracer.start_span('connect', with_parent: root_parent_context, attributes: attributes, kind: :internal) end + let(:child_queue_span_producer) do + attributes = { + 'messaging.system' => 'sidekiq', + 'messaging.destination' => 'default', + 'messaging.destination_kind' => 'queue', + 'messaging.operation' => 'publish', + 'messaging.sidekiq_job_class' => 'Sidekiq::Worker' + } + + tracer.start_span('Sidekiq::Worker publish', with_parent: root_parent_context, attributes: attributes, kind: :producer) + end + + let(:child_queue_span_consumer) do + attributes = { + 'messaging.system' => 'sidekiq', + 'messaging.destination' => 'default', + 'messaging.destination_kind' => 'queue', + 'messaging.operation' => 'process', + 'messaging.sidekiq_job_class' => 'Sidekiq::Worker' + } + + tracer.start_span('Sidekiq::Worker process', with_parent: root_parent_context, attributes: attributes, kind: :consumer) + end + before do perform_basic_setup perform_otel_setup @@ -220,6 +244,8 @@ subject.on_start(child_http_span, root_parent_context) subject.on_start(error_span, empty_context) subject.on_start(http_error_span, empty_context) + subject.on_start(child_queue_span_producer, root_parent_context) + subject.on_start(child_queue_span_consumer, root_parent_context) end let(:finished_db_span) { child_db_span.finish } @@ -228,6 +254,8 @@ let(:finished_invalid_span) { invalid_span.finish } let(:finished_error_span) { error_span.finish } let(:finished_http_error_span) { http_error_span.finish } + let(:finished_queue_span) { child_queue_span_producer.finish } + let(:finished_queue_span_consumer) { child_queue_span_consumer.finish } it 'noops when not initialized' do allow(Sentry).to receive(:initialized?).and_return(false) @@ -290,7 +318,7 @@ expect(sentry_span.data).to include({ 'otel.kind' => finished_db_span.kind }) expect(sentry_span.timestamp).to eq(finished_db_span.end_timestamp / 1e9) - expect(subject.span_map.size).to eq(4) + expect(subject.span_map.size).to eq(6) expect(subject.span_map.keys).not_to include(span_id) end @@ -312,7 +340,51 @@ expect(sentry_span.timestamp).to eq(finished_http_span.end_timestamp / 1e9) expect(sentry_span.status).to eq('ok') - expect(subject.span_map.size).to eq(4) + expect(subject.span_map.size).to eq(6) + expect(subject.span_map.keys).not_to include(span_id) + end + + it 'finishes sentry child span on otel child queue producer span finish' do + expect(subject.span_map).to receive(:delete).and_call_original + + span_id = finished_queue_span.context.hex_span_id + sentry_span = subject.span_map[span_id] + expect(sentry_span).to be_a(Sentry::Span) + + expect(sentry_span).to receive(:finish).and_call_original + subject.on_finish(finished_queue_span) + + expect(sentry_span.op).to eq('queue.publish') + expect(sentry_span.origin).to eq('auto.otel') + expect(sentry_span.description).to eq('Sidekiq::Worker') + expect(sentry_span.data).to include(finished_queue_span.attributes) + expect(sentry_span.data).to include({ 'otel.kind' => finished_queue_span.kind }) + expect(sentry_span.timestamp).to eq(finished_queue_span.end_timestamp / 1e9) + expect(sentry_span.status).to eq('ok') + + expect(subject.span_map.size).to eq(6) + expect(subject.span_map.keys).not_to include(span_id) + end + + it 'finishes sentry child span on otel child queue consumer span finish' do + expect(subject.span_map).to receive(:delete).and_call_original + + span_id = finished_queue_span_consumer.context.hex_span_id + sentry_span = subject.span_map[span_id] + expect(sentry_span).to be_a(Sentry::Span) + + expect(sentry_span).to receive(:finish).and_call_original + subject.on_finish(finished_queue_span_consumer) + + expect(sentry_span.op).to eq('queue.process') + expect(sentry_span.origin).to eq('auto.otel') + expect(sentry_span.description).to eq('Sidekiq::Worker') + expect(sentry_span.data).to include(finished_queue_span_consumer.attributes) + expect(sentry_span.data).to include({ 'otel.kind' => finished_queue_span_consumer.kind }) + expect(sentry_span.timestamp).to eq(finished_queue_span_consumer.end_timestamp / 1e9) + expect(sentry_span.status).to eq('ok') + + expect(subject.span_map.size).to eq(6) expect(subject.span_map.keys).not_to include(span_id) end @@ -321,6 +393,8 @@ subject.on_finish(finished_http_span) subject.on_finish(finished_error_span) subject.on_finish(finished_http_error_span) + subject.on_finish(finished_queue_span) + subject.on_finish(finished_queue_span_consumer) expect(subject.span_map).to receive(:delete).and_call_original