The current package is just a wrapper for FastStream objects to make them compatible with Taskiq library.
The main goal of it - provide FastStream with a great Taskiq tasks scheduling feature.
If you already have FastStream project to interact with your Message Broker, you can add scheduling to it by installing just a taskiq-faststream
pip install taskiq-faststreamIf you starting with a clear project, you can specify taskiq-faststream broker by the following distributions:
pip install taskiq-faststream[rabbit] # or pip install taskiq-faststream[kafka] # or pip install taskiq-faststream[confluent] # or pip install taskiq-faststream[nats] # or pip install taskiq-faststream[redis]For OpenTelemetry distributed tracing support:
pip install taskiq-faststream[otel]The package gives you two classes: AppWrapper and BrokerWrapper
These are just containers for the related FastStream objects to make them taskiq-compatible
To create scheduling tasks for your broker, just wrap it to BrokerWrapper and use it like a regular taskiq Broker.
# regular FastStream codefromfaststream.natsimportNatsBrokerbroker=NatsBroker() @broker.subscriber("test-subject")asyncdefhandler(msg: str): print(msg) # taskiq-faststream schedulingfromtaskiq.schedule_sourcesimportLabelScheduleSourcefromtaskiq_faststreamimportBrokerWrapper, StreamScheduler# wrap FastStream objecttaskiq_broker=BrokerWrapper(broker) # create periodic tasktaskiq_broker.task( message="Hi!", # If you are using RabbitBroker, then you need to replace subject with queue.# If you are using KafkaBroker, then you need to replace subject with topic.subject="test-subject", schedule=[{"cron": "* * * * *", }], ) # create scheduler objectscheduler=StreamScheduler( broker=taskiq_broker, sources=[LabelScheduleSource(taskiq_broker)], )To run the scheduler, just use the following command
taskiq scheduler module:schedulerAlso, you can wrap your FastStream application the same way (allows to use lifespan events and AsyncAPI documentation):
# regular FastStream codefromfaststreamimportFastStreamfromfaststream.natsimportNatsBrokerbroker=NatsBroker() app=FastStream(broker) @broker.subscriber("test-subject")asyncdefhandler(msg: str): print(msg) # wrap FastStream objectfromtaskiq_faststreamimportAppWrappertaskiq_broker=AppWrapper(app) # Code below omitted 👇A little feature: instead of using a final message argument, you can set a message callback to collect information right before sending:
asyncdefcollect_information_to_send(): return"Message to send"taskiq_broker.task( message=collect_information_to_send, ..., )Also, you can send a multiple message by one task call just using generator message callback with yield
asyncdefcollect_information_to_send(): """Sends 10 messages per task call."""foriinrange(10): yielditaskiq_broker.task( message=collect_information_to_send, ..., )taskiq-faststream supports taskiq's OpenTelemetry middleware. To enable it, pass OpenTelemetryMiddleware when creating the broker wrapper:
fromfaststream.natsimportNatsBrokerfromtaskiq_faststreamimportBrokerWrapperfromtaskiq.middlewares.otel_middlewareimportOpenTelemetryMiddlewarebroker=NatsBroker() # Enable OpenTelemetry middlewaretaskiq_broker=BrokerWrapper(broker, middlewares=[OpenTelemetryMiddleware()])This will automatically add OpenTelemetry middleware to track task execution, providing insights into:
- Task execution spans
- Task dependencies and call chains
- Performance metrics
- Error tracking
Make sure to configure your OpenTelemetry exporter (e.g., Jaeger, Zipkin) according to your monitoring setup.
The same applies to AppWrapper:
fromfaststreamimportFastStreamfromtaskiq_faststreamimportAppWrapperfromtaskiq.middlewares.otel_middlewareimportOpenTelemetryMiddlewareapp=FastStream(broker) # Enable OpenTelemetry middlewaretaskiq_broker=AppWrapper(app, middlewares=[OpenTelemetryMiddleware()])