Skip to content

Full featured high performance kafka library for Tarantool based on librdkafka.

License

Notifications You must be signed in to change notification settings

tarantool/kafka

Repository files navigation

Tarantool kafka

Full featured high performance kafka library for Tarantool based on librdkafka.

Can produce more then 150k messages per second and consume more then 140k messages per second.

Features

  • Kafka producer and consumer implementations.
  • Fiber friendly.
  • Mostly errorless functions and methods. Error handling in Tarantool ecosystem is quite a mess, some libraries throw lua native error while others throws box.error instead. kafka returns non-critical errors as strings which allows you to decide how to handle it.

Requirements

  • Tarantool >= 1.10.2
  • Tarantool development headers
  • librdkafka >= 0.11.5
  • librdkafka development headers
  • openssl-libs
  • openssl development headers
  • make
  • cmake
  • gcc

Installation

 tt rocks install kafka

Build module with statically linked librdkafka

To install the kafka module with builtin librdkafka dependency, use the STATIC_BUILD option:

tt rocks STATIC_BUILD=ON install kafka

Be aware that this approach doesn't include static openssl. Instead, it assumes tarantool has openssl symbols exported. That means, kafka static build is only usable with static tarantool build.

For a successful static build, you need to compile kafka against the same version of openssl that tarantool does.

Usage

Consumer

localos=require('os') locallog=require('log') localtnt_kafka=require('kafka') localconsumer, err=tnt_kafka.Consumer.create({brokers="localhost:9092" }) iferr~=nilthenprint(err) os.exit(1) endlocalerr=consumer:subscribe({"some_topic" }) iferr~=nilthenprint(err) os.exit(1) endlocalout, err=consumer:output() iferr~=nilthenprint(string.format("got fatal error '%s'", err)) os.exit(1) endwhiletruedoifout:is_closed() thenos.exit(1) endlocalmsg=out:get() ifmsg~=nilthenprint(string.format( "got msg with topic='%s' partition='%s' offset='%s' key='%s' value='%s'", msg:topic(), msg:partition(), msg:offset(), msg:key(), msg:value() )) endend-- from another fiber on app shutdownconsumer:close()

Producer

localos=require('os') locallog=require('log') localtnt_kafka=require('kafka') localproducer, err=tnt_kafka.Producer.create({brokers="kafka:9092" }) iferr~=nilthenprint(err) os.exit(1) endfori=1, 1000dolocalmessage="test_value " ..tostring(i) localerr=producer:produce({topic="test_topic", key="test_key", value=message }) iferr~=nilthenprint(string.format("got error '%s' while sending value '%s'", err, message)) elseprint(string.format("successfully sent value '%s'", message)) endendproducer:close()

You can pass additional configuration parameters for librdkafka https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md in special table options on client creation:

tnt_kafka.Producer.create({options={["some.key"] ="some_value", }, }) tnt_kafka.Consumer.create({options={["some.key"] ="some_value", }, })

More examples in examples folder.

Using SSL

Connection to brokers using SSL supported by librdkafka itself so you only need to properly configure brokers by using this guide https://github.com/confluentinc/librdkafka/wiki/Using-SSL-with-librdkafka

After that you only need to pass following configuration parameters on client creation:

tnt_kafka.Producer.create({brokers="broker_list", options={["security.protocol"] ="ssl", -- CA certificate file for verifying the broker's certificate. ["ssl.ca.location"] ="ca-cert", -- Client's certificate ["ssl.certificate.location"] ="client_?????_client.pem", -- Client's key ["ssl.key.location"] ="client_?????_client.key", -- Key password, if any ["ssl.key.password"] ="abcdefgh", }, }) tnt_kafka.Consumer.create({brokers="broker_list", options={["security.protocol"] ="ssl", -- CA certificate file for verifying the broker's certificate. ["ssl.ca.location"] ="ca-cert", -- Client's certificate ["ssl.certificate.location"] ="client_?????_client.pem", -- Client's key ["ssl.key.location"] ="client_?????_client.key", -- Key password, if any ["ssl.key.password"] ="abcdefgh", }, })

Known issues

TODO

  • Ordered storage for offsets to prevent commits unprocessed messages
  • More examples
  • Better documentation

Benchmarks

Before any commands init and updated git submodule

 git submodule init git submodule update

Producer

Async

Result: over 160000 produced messages per second on macbook pro 2016

Local run in docker:

 make docker-run-environment make docker-create-benchmark-async-producer-topic make docker-run-benchmark-async-producer-interactive

Sync

Result: over 90000 produced messages per second on macbook pro 2016

Local run in docker:

 make docker-run-environment make docker-create-benchmark-sync-producer-topic make docker-run-benchmark-sync-producer-interactive

Consumer

Auto offset store enabled

Result: over 190000 consumed messages per second on macbook pro 2016

Local run in docker:

 make docker-run-environment make docker-create-benchmark-auto-offset-store-consumer-topic make docker-run-benchmark-auto-offset-store-consumer-interactive

Manual offset store

Result: over 190000 consumed messages per second on macbook pro 2016

Local run in docker:

 make docker-run-environment make docker-create-benchmark-manual-commit-consumer-topic make docker-run-benchmark-manual-commit-consumer-interactive

Developing

Tests

Before run any test you should add to /etc/hosts entry

127.0.0.1 kafka 

You can run docker based integration tests via makefile target

 make test-run-with-docker

About

Full featured high performance kafka library for Tarantool based on librdkafka.

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages

  • C54.8%
  • Lua22.3%
  • Python13.3%
  • CMake4.7%
  • Makefile4.6%
  • Dockerfile0.3%