As of version 0.7 docker images migrated to Docker Hub
RabbitMQ to Kafka bridge
The main idea is to read messages from provided exchanges in RabbitMQ and send them to Kafka.
Application uses intermediate permanent storage for keeping read messages in case of Kafka unavailability.
Service is written in Go language and can be build with go compiler of version 1.14 and above.
Application is configured with environment variables or config files of different formats - JSON, TOML, YAML, HCL, and Java properties.
By default it tries to read config file from /etc/kandalf/conf/config.<ext> and ./config.<ext>. You can change the path using -c <file_path> or --config <file_path> application parameters. If file is not found config loader does fallback to reading config values from environment variables.
RABBIT_DSN- RabbiMQ server DSNSTORAGE_DSN- Permanent storage DSN, where Scheme is storage type. The following storage types are currently supported:- Redis - requires,
keyas DSN query parameter as redis storage key, e.g.redis://localhost:6379/?key=kandalf
- Redis - requires,
LOG_*- Logging settings, see hellofresh/logging-go for detailsKAFKA_BROKERS- Kafka brokers comma-separated list, e.g.192.168.0.1:9092,192.168.0.2:9092KAFKA_MAX_RETRY- Total number of times to retry sending a message to Kafka (default:5)KAFKA_PIPES_CONFIG- Path to RabbitMQ-Kafka bridge mappings config, see details below (default:/etc/kandalf/conf/pipes.yml)STATS_DSN- Stats host, see hellofresh/stats-go for usage details.STATS_PREFIX- Stats prefix, see hellofresh/stats-go for usage details.STATS_PORT- Stats port, used only forprometheusmetrics, metrics are exposed onlocalhost:<port>/metrics(default:8080).WORKER_CYCLE_TIMEOUT- Main application bridge worker cycle timeout to avoid CPU overload, must be valid duration string (default:2s)WORKER_CACHE_SIZE- Max messages number that we store in memory before trying to publish to Kafka (default:10)WORKER_CACHE_FLUSH_TIMEOUT- Max amount of time we store messages in memory before trying to publish to Kafka, must be valid duration string (default:5s)WORKER_STORAGE_READ_TIMEOUT- Timeout between attempts of reading persisted messages from storage, to publish them to Kafka, must be at least 2x greater thanWORKER_CYCLE_TIMEOUT, must be valid duration string (default:10s)WORKER_STORAGE_MAX_ERRORS- Max storage read errors in a row before worker stops trying reading in current read cycle. Next read cycle will be inWORKER_STORAGE_READ_TIMEOUTinterval. (default:10)
Config should have the following structure:
logLevel: "info"# same as env LOG_LEVELrabbitDSN: "amqp://user:password@rmq"# same as env RABBIT_DSNstorageDSN: "redis://redis.local/?key=storage:key"# same as env STORAGE_DSNkafka: brokers: # same as env KAFKA_BROKERS - "192.0.0.1:9092" - "192.0.0.2:9092"maxRetry: 5# same as env KAFKA_MAX_RETRYpipesConfig: "/etc/kandalf/conf/pipes.yml"# same as env KAFKA_PIPES_CONFIGstats: dsn: "statsd.local:8125"# same as env STATS_DSNprefix: "kandalf"# same as env STATS_PREFIXworker: cycleTimeout: "2s"# same as env WORKER_CYCLE_TIMEOUTcacheSize: 10# same as env WORKER_CACHE_SIZEcacheFlushTimeout: "5s"# same as env WORKER_CACHE_FLUSH_TIMEOUTstorageReadTimeout: "10s"# same as env WORKER_STORAGE_READ_TIMEOUTstorageMaxErrors: 10# same as env WORKER_STORAGE_MAX_ERRORSYou can find sample config file in assets/config.yml.
The rules, defining which messages should be send to which Kafka topics, are defined in Kafka Pipes Config file and are called "pipes". Each pipe has the following structure:
- kafkaTopic: "loyalty"# name of the topic in Kafka where message will be sentrabbitExchangeName: "customers"# name of the exchange in RabbitMQrabbitTransientExchange: false # determines if the exchange should be declared as durable or transientrabbitRoutingKey: "badge.received"# routing key for exchangerabbitQueueName: "kandalf-customers-badge.received"# the name of RabbitMQ queue to read messages fromrabbitDurableQueue: true # determines if the queue should be declared as durablerabbitAutoDeleteQueue: false # determines if the queue should be declared as auto-deleteYou can find sample Kafka Pipes Config file in assets/pipes.yml.
- Make sure you have
goandmakeutility installed on your machine; - Run:
maketo install all required dependencies and build binaries; - Binaries for Linux and MacOS X would be in
./dist/.
For testing and development you can use docker-compose file with all the required services.
For production you can use minimalistic prebuilt hellofresh/kandalf image as base image or mount pipes configuration volume to /etc/kandalf/conf/.
- Handle dependencies in a proper way (gvt, glide or smth.)
- Tests
To start contributing, please check CONTRIBUTING.
