我正在尝试加载一个简单的文本文件,而不是Kafka中的标准输入。下载Kafka后,我执行了以下步骤:
开始动物园管理员:
zookeeper-server-start.sh配置zookeeper.properties
已启动服务器
kafka-server-start.sh配置server.properties
创建了一个名为“test”的主题:
<code>bin/kafka主题。sh--创建--zookeeper本地主机:2181--复制因子1--分区1--主题测试
制片人跑了:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
Test1
Test2
消费者听到的:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
Test1
Test2
我想将数据文件甚至简单的文本文件传递给生产者,而不是标准输入,消费者可以直接看到。任何帮助都将不胜感激。谢谢!
这里有几个稍微通用的方法,但是对于简单的文件来说可能是多余的
尾巴
尾部 -n0 -F my_file.txt | kafka-console-producer.sh --代理列表本地主机:9092 --主题my_topic
解释
尾部
在文件增长或日志不断添加时从文件末尾读取-n0
表示输出最后0行,因此只选择新行设置
options {
flush_lines (0);
time_reopen (10);
log_fifo_size (1000);
long_hostnames (off);
use_dns (no);
use_fqdn (no);
create_dirs (no);
keep_hostname (no);
};
source s_file {
file("path to my-file.txt" flags(no-parse));
}
destination loghost {
tcp("*.*.*.*" port(5140));
}
消耗
< code > NC-k-l 5140 | Kafka-console-producer . sh-broker-list localhost:9092-topic my _ topic
解释(来自man nc
)
-k' Forces nc to stay listening for another connection after its current connection is completed. It is an error to use this option without the -l option.
-l' Used to specify that nc should listen for an incoming connection rather than initiate a connection to a remote host. It is an error to use this option in conjunction with the -p, -s, or -z options. Additionally, any timeouts specified with the -w option are ignored.
裁判
系统日志-ng
$ kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic < my_file.txt
在Kafka中为我工作-0.9.0
你可以通过管道输入:
kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic
--new-producer < my_file.txt
在这里找到的。
从 0.9.0 开始:
kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic < my_file.txt
所以我有一个设计,其中我有多个生产者P1、P2、P3、P4... PN写入单个主题T1,它有32个分区。 另一方面,我在一个消费者组中最多有32个消费者。 我想负载平衡我的消息消耗 阅读文档时,我可以看到3个选项: 1。自己定义分区(缺点是我必须知道最后一条消息发送到哪里,或者为每个生产者定义分区范围P) 2。定义一个密钥并将分区决定权交给Kafka哈希算法(缺点-负载平衡将在运气好的情况下定义)
假设我有一个主题T1,它有三个分区,即P1、P2和P3。其中p1是领导者,rest是追随者。
我试图在Kafka(大约15mb)中写入一条大消息,但没有写入,程序结束时好像一切正常,但主题中没有任何消息。 小消息确实会被写入。 以下是代码: 这个主题已经被配置为接受大消息,我已经能够用python写进去了。下面是python代码: 但java版本不起作用。
我知道Kafka制作人会将消息分批处理。每个批属于一个特定的分区。 我的问题是 生产者是否知道每个批次属于哪个分区? 生产者是否知道每个分区的代理地址? 当生产者发送请求时,每个请求包含多个批次还是只包含一个属于目标分区的批次。 如果生产者发送多个批次,接收kafka服务器是否将批次重传到目标分区。
我正在考虑创建一个独立的Kafka生产者,它作为守护进程运行,通过套接字接收消息,并将其可靠地发送给Kafka。 但是,我决不能是第一个想到这个想法的人。这样做的目的是避免使用PHP或Node编写Kafka生成器,而只是通过套接字将消息从这些语言传递到独立的守护进程,这些语言负责传递,而主应用程序则一直在做自己的事情。 此守护进程应负责在发生中断时进行重试传递,并充当服务器上运行的所有程序的传递点
我想让我的Kafka制作人变得富有交易性。我正在发送10条消息。如果发生任何错误,则不应向Kafka发送任何消息,即无或全部。 我使用的是Spring Boot KafkaTemplate。 我正在发送文件中提到的10条信息,如下所示。应发送9条消息,且I消息大小超过1MB,由于 https://docs.spring.io/spring-kafka/reference/html/#using-K