KafkaEx is an Elixir client for Apache Kafka withsupport for Kafka versions 0.8.0 and newer. KafkaEx requires Elixir 1.5+ andErlang OTP 19+.
See http://hexdocs.pm/kafka_ex/ fordocumentation,https://github.com/kafkaex/kafka_ex/for code.
KakfaEx supports the following Kafka features:
See Kafka Protocol Documentation andA Guide to the Kafka Protocolfor details of these features.
TL;DR:
kafka_version: "kayrock"
to use the new client implementation.api_version
parameter, see below for details,e.g., how to store offsets in Kafka instead of Zookeeper.KafkaEx.New
namespace.See below for details.To support some oft-requested features (offset storage in Kafka, messagetimestamps), we have integrated KafkaEx withKayrock which is a library that handlesserialization and deserialization of the Kafka message protocol in a way thatcan grow as Kafka does.
Unfortunately, the existing KafkaEx API is built in such a way that it doesn'teasily support this growth. This, combined with a number of other existingwarts in the current API, has led us to the conclusion that v1.0 of KafkaExshould have a new and cleaner API.
The path we have planned to get to v1.0 is:
KafkaEx.New
namespace (EARLY PROGRESS).KafkaEx
namespace(i.e., drop the New
part) and it will replace the legacy API. This will bereleased as v1.0.Users of KafkaEx can help a lot by testing the new code. At first, we needpeople to test the Kayrock-based client using compatibility mode. You can dothis by simply setting kafka_version: "kayrock"
in your configuration. Thatshould be all you need to change. If you want to test new features enabled byapi_versions
options then that is also very valuable to us (see below forlinks to details). Then, as work on the new API ramps up, users cancontribute feedback to pull requests (or even contribute pull requests!) andtest out the new API as it becomes available.
For more information on using the Kayrock-based client, see
For more information on the v1.0 API, see
The standard approach for adding dependencies to an Elixir application applies:add KafkaEx to the deps list in your project's mix.exs file.You may also optionally addsnappyer (requiredonly if you want to use snappy compression).
# mix.exs
defmodule MyApp.Mixfile do
# ...
defp deps do
[
# add to your existing deps
{:kafka_ex, "~> 0.11"},
# If using snappy-erlang-nif (snappy) compression
{:snappy, git: "https://github.com/fdmanana/snappy-erlang-nif"}
# if using snappyer (snappy) compression
{:snappyer, "~> 1.2"}
]
end
end
Then run mix deps.get
to fetch dependencies.
See config/config.exsor KafkaEx.Configfor a description of configuration variables, including the Kafka broker listand default consumer group.
You can also override options when creating a worker, see below.
When using certain versions of OTP,random timeouts can occur if using SSL.
Impacted versions:
Upgrade respectively to 21.3.8.15 or 22.3.2 to solve this.
To use a consumer group, first implement a handler module usingKafkaEx.GenConsumer
.
defmodule ExampleGenConsumer do
use KafkaEx.GenConsumer
alias KafkaEx.Protocol.Fetch.Message
require Logger
# note - messages are delivered in batches
def handle_message_set(message_set, state) do
for %Message{value: message} <- message_set do
Logger.debug(fn -> "message: " <> inspect(message) end)
end
{:async_commit, state}
end
end
Then add a KafkaEx.ConsumerGroup
to your application's supervisiontree and configure it to use the implementation module.
See the KafkaEx.GenConsumer
and KafkaEx.ConsumerGroup
documentation fordetails.
KafkaEx worker processes manage the state of the connection to the Kafka broker.
iex> KafkaEx.create_worker(:pr) # where :pr is the process name of the created worker
{:ok, #PID<0.171.0>}
With custom options:
iex> uris = [{"localhost", 9092}, {"localhost", 9093}, {"localhost", 9094}]
[{"localhost", 9092}, {"localhost", 9093}, {"localhost", 9094}]
iex> KafkaEx.create_worker(:pr, [uris: uris, consumer_group: "kafka_ex", consumer_group_update_interval: 100])
{:ok, #PID<0.172.0>}
You may find you want to create many workers, say in conjunction witha poolboy
pool. In this scenario you usually won't want to name these worker processes.
To create an unnamed worked with create_worker
:
iex> KafkaEx.create_worker(:no_name) # indicates to the server process not to name the process
{:ok, #PID<0.171.0>}
Note that KafkaEx has a supervisor to manage its workers. If you are using Poolboy or a similarlibrary, you will want to manually create a worker so that it is not supervised by KafkaEx.Supervisor
.To do this, you will need to call:
GenServer.start_link(KafkaEx.Config.server_impl,
[
[uris: KafkaEx.Config.brokers(),
consumer_group: Application.get_env(:kafka_ex, :consumer_group)],
:no_name
]
)
Alternatively, you can call
KafkaEx.start_link_worker(:no_name)
For all metadata
iex> KafkaEx.metadata
%KafkaEx.Protocol.Metadata.Response{brokers: [%KafkaEx.Protocol.Metadata.Broker{host:
"192.168.59.103",
node_id: 49162, port: 49162, socket: nil}],
topic_metadatas: [%KafkaEx.Protocol.Metadata.TopicMetadata{error_code: :no_error,
partition_metadatas: [%KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error,
isrs: [49162], leader: 49162, partition_id: 0, replicas: [49162]}],
topic: "LRCYFQDVWUFEIUCCTFGP"},
%KafkaEx.Protocol.Metadata.TopicMetadata{error_code: :no_error,
partition_metadatas: [%KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error,
isrs: [49162], leader: 49162, partition_id: 0, replicas: [49162]}],
topic: "JSIMKCLQYTWXMSIGESYL"},
%KafkaEx.Protocol.Metadata.TopicMetadata{error_code: :no_error,
partition_metadatas: [%KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error,
isrs: [49162], leader: 49162, partition_id: 0, replicas: [49162]}],
topic: "SCFRRXXLDFPOWSPQQMSD"},
%KafkaEx.Protocol.Metadata.TopicMetadata{error_code: :no_error,
...
For a specific topic
iex> KafkaEx.metadata(topic: "foo")
%KafkaEx.Protocol.Metadata.Response{brokers: [%KafkaEx.Protocol.Metadata.Broker{host: "192.168.59.103",
node_id: 49162, port: 49162, socket: nil}],
topic_metadatas: [%KafkaEx.Protocol.Metadata.TopicMetadata{error_code: :no_error,
partition_metadatas: [%KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error,
isrs: [49162], leader: 49162, partition_id: 0, replicas: [49162]}],
topic: "foo"}]}
Kafka will get the starting offset of the log segment that is created no later than the given timestamp. Due to this, and since the offset request is served only at segment granularity, the offset fetch request returns less accurate results for larger segment sizes.
iex> KafkaEx.offset("foo", 0, {{2015, 3, 29}, {23, 56, 40}}) # Note that the time specified should match/be ahead of time on the server that kafka runs
[%KafkaEx.Protocol.Offset.Response{partition_offsets: [%{error_code: :no_error, offset: [256], partition: 0}], topic: "foo"}]
iex> KafkaEx.latest_offset("foo", 0) # where 0 is the partition
[%KafkaEx.Protocol.Offset.Response{partition_offsets: [%{error_code: :no_error, offset: [16], partition: 0}], topic: "foo"}]
iex> KafkaEx.earliest_offset("foo", 0) # where 0 is the partition
[%KafkaEx.Protocol.Offset.Response{partition_offsets: [%{error_code: :no_error, offset: [0], partition: 0}], topic: "foo"}]
NOTE You must pass auto_commit: false
in the options for fetch/3
when using Kafka < 0.8.2 or when using :no_consumer_group
.
iex> KafkaEx.fetch("foo", 0, offset: 5) # where 0 is the partition and 5 is the offset we want to start fetching from
[%KafkaEx.Protocol.Fetch.Response{partitions: [%{error_code: :no_error,
hw_mark_offset: 115,
message_set: [
%KafkaEx.Protocol.Fetch.Message{attributes: 0, crc: 4264455069, key: nil, offset: 5, value: "hey"},
%KafkaEx.Protocol.Fetch.Message{attributes: 0, crc: 4264455069, key: nil, offset: 6, value: "hey"},
%KafkaEx.Protocol.Fetch.Message{attributes: 0, crc: 4264455069, key: nil, offset: 7, value: "hey"},
%KafkaEx.Protocol.Fetch.Message{attributes: 0, crc: 4264455069, key: nil, offset: 8, value: "hey"},
%KafkaEx.Protocol.Fetch.Message{attributes: 0, crc: 4264455069, key: nil, offset: 9, value: "hey"}
...], partition: 0}], topic: "foo"}]
iex> KafkaEx.produce("foo", 0, "hey") # where "foo" is the topic and "hey" is the message
:ok
See the KafkaEx.stream/3
documentation for details on streaming.
iex> KafkaEx.produce("foo", 0, "hey")
:ok
iex> KafkaEx.produce("foo", 0, "hi")
:ok
iex> KafkaEx.stream("foo", 0, offset: 0) |> Enum.take(2)
[%{attributes: 0, crc: 4264455069, key: nil, offset: 0, value: "hey"},
%{attributes: 0, crc: 4251893211, key: nil, offset: 1, value: "hi"}]
For Kafka < 0.8.2 the stream/3
requires auto_commit: false
iex> KafkaEx.stream("foo", 0, offset: 0, auto_commit: false) |> Enum.take(2)
Snappy and gzip compression is supported. Example usage for producing compressed messages:
message1 = %KafkaEx.Protocol.Produce.Message{value: "value 1"}
message2 = %KafkaEx.Protocol.Produce.Message{key: "key 2", value: "value 2"}
messages = [message1, message2]
#snappy
produce_request = %KafkaEx.Protocol.Produce.Request{
topic: "test_topic",
partition: 0,
required_acks: 1,
compression: :snappy,
messages: messages}
KafkaEx.produce(produce_request)
#gzip
produce_request = %KafkaEx.Protocol.Produce.Request{
topic: "test_topic",
partition: 0,
required_acks: 1,
compression: :gzip,
messages: messages}
KafkaEx.produce(produce_request)
Compression is handled automatically on the consuming/fetching end.
It is strongly recommended to test using the Dockerized test cluster describedbelow. This is required for contributions to KafkaEx.
NOTE You may have to run the test suite twice to get tests to pass. Due toasynchronous issues, the test suite sometimes fails on the first try.
Testing KafkaEx requires a local SSL-enabled Kafka cluster with 3 nodes: onenode listening on each port 9092, 9093, and 9093. The easiest way to do thisis using the scripts inthis repository that utilize Docker andDocker Compose (both of whichare freely available). This is the method we use for our CI testing ofKafkaEx.
To launch the included test cluster, run
./scripts/docker_up.sh
The docker_up.sh
script will attempt to determine an IP address for yourcomputer on an active network interface.
The test cluster runs Kafka 0.11.0.1.
The KafkaEx tests are split up using tags to handle testing multiple scenariosand Kafka versions.
These tests do not require a Kafka cluster to be running (see test/test_helper.exs:3 for the tags excluded when running this).
mix test --no-start
If you are not using the Docker test cluster, you may need to modifyconfig/config.exs
for your set up.
The full test suite requires Kafka 0.10.1.0+.
The 0.9 client includes functionality that cannot be tested with olderclusters.
./scripts/all_tests.sh
The 0.9 client includes functionality that cannot be tested with olderclusters.
mix test --include integration --include consumer_group --include server_0_p_9_p_0
Kafka 0.8.2 introduced the consumer group API.
mix test --include consumer_group --include integration
If your test cluster is older, the consumer group tests must be omitted.
mix test --include integration --include server_0_p_8_p_0
mix dialyzer
All contributions are managed through thekafkaex github repo.
If you find a bug or would like to contribute, please open anissue or submit a pullrequest. Please refer to CONTRIBUTING.md for ourcontribution process.
KafkaEx has a Slack channel: #kafkaex onelixir-lang.slack.com. You can requestan invite via http://bit.ly/slackelixir.The Slack channel is appropriate for quick questions or general designdiscussions. The Slack discussion is archived athttp://slack.elixirhq.com/kafkaex.
It can be changed to snappyer by using this:
config :kafka_ex, snappy_module: :snappyer
Snappy erlang nif is deprecated and will be changed to :snappyer in 1.0.0 release.
Kafka集群安装 规划: cancer01 1 cancer02 2 cancer03 3 说明1:在cancer01主机上配置好kafka目录后,复制到其他主机再修改下即可。 说明2:每台主机上都要安装zookeeper,配置好zookeeper集群。 解压: tar -xzvf kafka_2.13-2.7.0.tgz mv kafka_2.13-
Kafka安装 安装 cd /opt # 下载kafka wget https://mirrors.bfsu.edu.cn/apache/kafka/2.8.0/kafka_2.13-2.8.0.tgz # 解压 tar xzvf kafka_2.13-2.8.0.tgz 将kafka配置到 /etc/profile 文件 vi /etc/profile # 往该文件中追加下面内容,保存退出
监控组件:kafka-exporter github地址:GitHub - imduffy15/kafka_exporter: Kafka exporter for Prometheus 启动: docker run -d \ --restart=always \ --restart=on-failure:5 \ --name kafka_exporter \ -v /etc/localtime:
1.添加对应依赖 <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dep