This gem provides an adapter for Sidekiq for use in the queue-bus system. It uses Redis and the Sidekiq that you are already using to allow simple asynchronous communication between apps.
To install, include the 'sidekiq-bus' gem and add the following to your Rakefile:
require"sidekiq_bus/tasks"Starting from version 7, Sidekiq no longer provides a standard Redis connection pool, instead providing a limited wrapper around the low-level RedisClient gem. To maintain compatibility with both QueueBus and later versions of Sidekiq, SidekiqBus now requires an explicit Redis handler. To preserve your existing QueueBus data, this handler should connect to the same Redis instance and database as your Sidekiq configuration.
The handler can be any object that responds to call and yields a Redis connection. In an initializer, set it with:
# Naive exampleSidekiqBus.redis_handler=->(&block){block.call(Redis.new(my_config)}# More typical example using the Redis-recommended ConnectionPool gempool=ConnectionPool.new(size: pool_size,timeout: pool_timeout){Redis.new(my_config)}SidekiqBus.redis_handler=pool.method(:with).to_proc# Or, for clarity:SidekiqBus.redis_handler=->(&block)dopool.with{ |redis| block.call(redis)}endApplication A can publish an event
# pick an adapterrequire'sidekiq-bus'# (or other adapter)# business logicQueueBus.publish("user_created","id"=>42,"first_name"=>"John","last_name"=>"Smith")# or do it laterQueueBus.publish_at(1.hour.from_now,"user_created","id"=>42,"first_name"=>"John","last_name"=>"Smith")Application B is subscribed to events
# pick an adapterrequire'sidekiq-bus'# (or other adapter)# initializerQueueBus.dispatch("app_b")do# processes event on app_b_default queue# subscribe is short-hand to subscribe to your 'default' queue and this block will process events with the name "user_created"subscribe"user_created"do |attributes| NameCount.find_or_create_by_name(attributes["last_name"]).increment!end# processes event on app_b_critical queue# critical is short-hand to subscribe to your 'critical' queue and this block will process events with the name "user_paid"critical"user_paid"do |attributes| CreditCard.charge!(attributes)end# you can pass any queue name you would like to process from as well IE: `banana "peeled" do |attributes|`# and regexes work as well. note that with the above configuration along with this regex,# the following as well as the corresponding block above would both be executedsubscribe/^user_/do |attributes| Metrics.record_user_action(attributes["bus_event_type"],attributes["id"])end# the above all filter on just the event_type, but you can filter on anything# this would be _any_ event that has a user_id and the page value of homepage regardless of bus_event_typesubscribe"my_key",{"user_id"=>:present,"page"=>"homepage"}doMixpanel.homepage_action!(attributes["action"])endendApplications can also subscribe within classes using the provided Subscriber module.
classSimpleSubscriberincludeQueueBus::Subscribersubscribe:my_methoddefmy_method(attributes)# heavy liftingendendThe following is equivalent to the original initializer and shows more options:
classOtherSubscriberincludeQueueBus::Subscriberapplication:app_bsubscribe:user_createdsubscribe_queue:app_b_critical,:user_paidsubscribe_queue:app_b_default,:user_action,:bus_event_type=>/^user_/subscribe:homepage_method,:user_id=>:present,:page=>"homepage"defuser_created(attributes)NameCount.find_or_create_by_name(attributes["last_name"]).increment!enddefuser_paid(attributes)CreditCard.charge!(attributes)enddefuser_action(attributes)Metrics.record_user_action(attributes["bus_event_type"],attributes["id"])enddefhomepage_methodMixpanel.homepage_action!(attributes["action"])endendNote: This subscribes when this class is loaded, so it needs to be in your load or otherwise referenced/required during app initialization to work properly.
To make sure that your sidekiq server is consuming on the proper queues, add code like this to your config/sidekiq.rb file:
# config/sidekiq.rb# Load the environment hererequire'./boot.rb'ifSidekiq.server?# Load the queues into sidekiq:weights={'app_events'=>10,'app_heartbeat'=>3,'app_refresh'=>1}Sidekiq.options[:queues]=SidekiqBus.generate_weighted_queues(overrides: weights,default: 2)endEach app needs to tell Redis about its subscriptions:
$ rake queuebus:subscribe You'll then need to run Sidekiq. Make sure the bus_incoming queues and the ones you are using are included.
$ bundle exec sidekiq -q default -q app_b_default -q bus_incoming For development, a local mode is provided and is specified in the configuration.
# configQueueBus.local_mode=:standaloneorQueueBus.local_mode=:inlineStandalone mode does not require a separate queuebus:driver task to be running to process the incoming queue. Simply publishing to the bus will distribute the incoming events to the appropriate application specific queue. A separate queuebus:work task does still need to be run to process these events
Inline mode skips queue processing entirely and directly dispatches the event to the appropriate code block.
You can also say QueueBus.local_mode = :suppress to turn off publishing altogether. This can be helpful inside some sort of migration, for example.
The test suite requires a Redis instance. For convenience, we provide Docker Compose configuration to run Redis in a container:
docker-compose up -d bundle exec rspecThe test suite uses database 15 by default to avoid conflicts with your development Redis data.
You can override the connection string (including the database number) by setting the REDIS_URL environment variable.
- Replace local modes with adapters
- We might not actually need to publish in tests
- Add some rspec helpers for the apps to use: should_ post an event_publish or something along those lines