Instrument Queues
Learn how to manually instrument your code to use Sentry's Queues module.
Sentry comes with automatic instrumentation for the most common messaging queue systems. In case yours isn't supported, you can still instrument custom spans and transactions around your queue producers and consumers to ensure that you have performance data about your messaging queues.
To start capturing performance metrics, use the sentry_sdk.start_span()
function to wrap your queue producer events. Your span op
must be set to queue.publish
. Include the following span data to enrich your producer spans with queue metrics:
Data Attribute | Type | Description |
---|---|---|
messaging.message.id | string | The message identifier |
messaging.destination.name | string | The queue or topic name |
messaging.message.body.size | int | Size of the message body in bytes |
Your queue.publish
span must exist inside a transaction in order to be recognized as a producer span. If you are using a supported web framework, the transaction is created by the integration. If you use plain Python, you can start a new one using sentry_sdk.start_transaction()
.
You must also include trace headers (sentry-trace
and baggage
) in your message so that your consumers can continue your trace once your message is picked up.
from datetime import datetime, timezone
import sentry_sdk
import my_custom_queue
# Initialize Sentry
sentry_sdk.init(...)
connection = my_custom_queue.connect()
# The message you want to send to the queue
queue = "messages"
message = "Hello World!"
message_id = "abc123"
# Create transaction
# If you are using a web framework, the framework integration
# will create this for you and you can omit this.
with sentry_sdk.start_transaction(
op="function",
name="queue_producer_transaction",
):
# Create the span
with sentry_sdk.start_span(
op="queue.publish",
description="queue_producer",
) as span:
# Set span data
span.set_data("messaging.message.id", message_id)
span.set_data("messaging.destination.name", queue)
span.set_data("messaging.message.body.size", len(message.encode("utf-8")))
# Publish the message to the queue (including trace information and current time stamp)
now = int(datetime.now(timezone.utc).timestamp())
connection.publish(
queue=queue,
body=message,
timestamp=now,
headers={
"sentry-trace": sentry_sdk.get_traceparent(),
"baggage": sentry_sdk.get_baggage(),
},
)
To start capturing performance metrics, use the sentry_sdk.start_span()
function to wrap your queue consumers. Your span op
must be set to queue.process
. Include the following span data to enrich your consumer spans with queue metrics:
Data Attribute | Type | Description |
---|---|---|
messaging.message.id | string | The message identifier |
messaging.destination.name | string | The queue or topic name |
messaging.message.body.size | number | Size of the message body in bytes |
messaging.message.retry.count | number | The number of times a message was attempted to be processed |
messaging.message.receive.latency | number | The time in milliseconds that a message awaited processing in queue |
Your queue.process
span must exist inside a transaction in order to be recognized as a consumer span. If you are using a supported web framework, the transaction is created by the integration. If you use plain Python, you can start a new one using sentry_sdk.start_transaction()
.
Use sentry_sdk.continue_trace()
to connect your consumer spans to their associated producer spans, and span.set_status()
to mark the trace of your message as success or failed.
from datetime import datetime, timezone
import sentry_sdk
import my_custom_queue
# Initialize Sentry
sentry_sdk.init(...)
connection = my_custom_queue.connect()
# Pick up message from queues
queue = "messages"
message = connection.consume(queue=queue)
# Calculate latency (optional, but valuable)
now = datetime.now(timezone.utc)
message_time = datetime.fromtimestamp(message["timestamp"], timezone.utc)
latency = now - message_time
# Continue the trace started in the producer
# If you are using a web framework, the framework integration
# will create this for you and you can omit this.
transaction = sentry_sdk.continue_trace(
message["headers"],
op="function",
name="queue_consumer_transaction",
)
with sentry_sdk.start_transaction(transaction):
# Create the span
with sentry_sdk.start_span(
op="queue.process",
description="queue_consumer",
) as span:
# Set span data
span.set_data("messaging.message.id", message["message_id"])
span.set_data("messaging.destination.name", queue)
span.set_data("messaging.message.body.size", message["body"])
span.set_data("messaging.message.receive.latency", latency)
span.set_data("messaging.message.retry.count", 0)
try:
# Process the message
process_message(message)
except Exception:
# In case of an error set the status to "internal_error"
span.set_status("internal_error")
Our documentation is open source and available on GitHub. Your contributions are welcome, whether fixing a typo (drat!) or suggesting an update ("yeah, this would be better").