当前位置: 首页 > 知识库问答 >
问题:

如何给Kafka制作人写一个文件

戈曾琪
2023-03-14

我正在尝试加载一个简单的文本文件,而不是Kafka中的标准输入。下载Kafka后,我执行了以下步骤:

开始动物园管理员:

zookeeper-server-start.sh配置zookeeper.properties

已启动服务器

kafka-server-start.sh配置server.properties

创建了一个名为“test”的主题:

<code>bin/kafka主题。sh--创建--zookeeper本地主机:2181--复制因子1--分区1--主题测试

制片人跑了:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
Test1
Test2

消费者听到的:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
Test1
Test2

我想将数据文件甚至简单的文本文件传递给生产者,而不是标准输入,消费者可以直接看到。任何帮助都将不胜感激。谢谢!

共有3个答案

汪弘毅
2023-03-14

这里有几个稍微通用的方法,但是对于简单的文件来说可能是多余的

尾巴

尾部 -n0 -F my_file.txt | kafka-console-producer.sh --代理列表本地主机:9092 --主题my_topic

解释

  1. 尾部在文件增长或日志不断添加时从文件末尾读取
  2. -n0表示输出最后0行,因此只选择新行
  3. <code>-F</code>按名称而不是描述符跟随文件,因此即使旋转也能正常工作

设置

options {                                                                                                                             
    flush_lines (0);                                                                                                                
    time_reopen (10);                                                                                                               
    log_fifo_size (1000);                                                                                                          
    long_hostnames (off);                                                                                                           
    use_dns (no);                                                                                                                   
    use_fqdn (no);                                                                                                                  
    create_dirs (no);                                                                                                               
    keep_hostname (no);                                                                                                             
};

source s_file {
    file("path to my-file.txt" flags(no-parse));
}


destination loghost {
    tcp("*.*.*.*" port(5140));
} 

消耗

< code > NC-k-l 5140 | Kafka-console-producer . sh-broker-list localhost:9092-topic my _ topic

解释(来自man nc

-k' Forces nc to stay listening for another connection after its current connection is completed. It is an error to use this option without the -l option.

-l' Used to specify that nc should listen for an incoming connection rather than initiate a connection to a remote host. It is an error to use this option in conjunction with the -p, -s, or -z options. Additionally, any timeouts specified with the -w option are ignored.

裁判

系统日志-ng

南门承教
2023-03-14
$ kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic < my_file.txt

在Kafka中为我工作-0.9.0

顾永福
2023-03-14

你可以通过管道输入:

kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic
--new-producer < my_file.txt

在这里找到的。

从 0.9.0 开始:

kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic < my_file.txt
 类似资料:
  • 所以我有一个设计,其中我有多个生产者P1、P2、P3、P4... PN写入单个主题T1,它有32个分区。 另一方面,我在一个消费者组中最多有32个消费者。 我想负载平衡我的消息消耗 阅读文档时,我可以看到3个选项: 1。自己定义分区(缺点是我必须知道最后一条消息发送到哪里,或者为每个生产者定义分区范围P) 2。定义一个密钥并将分区决定权交给Kafka哈希算法(缺点-负载平衡将在运气好的情况下定义)

  • 假设我有一个主题T1,它有三个分区,即P1、P2和P3。其中p1是领导者,rest是追随者。

  • 我试图在Kafka(大约15mb)中写入一条大消息,但没有写入,程序结束时好像一切正常,但主题中没有任何消息。 小消息确实会被写入。 以下是代码: 这个主题已经被配置为接受大消息,我已经能够用python写进去了。下面是python代码: 但java版本不起作用。

  • 我知道Kafka制作人会将消息分批处理。每个批属于一个特定的分区。 我的问题是 生产者是否知道每个批次属于哪个分区? 生产者是否知道每个分区的代理地址? 当生产者发送请求时,每个请求包含多个批次还是只包含一个属于目标分区的批次。 如果生产者发送多个批次,接收kafka服务器是否将批次重传到目标分区。

  • 我正在考虑创建一个独立的Kafka生产者,它作为守护进程运行,通过套接字接收消息,并将其可靠地发送给Kafka。 但是,我决不能是第一个想到这个想法的人。这样做的目的是避免使用PHP或Node编写Kafka生成器,而只是通过套接字将消息从这些语言传递到独立的守护进程,这些语言负责传递,而主应用程序则一直在做自己的事情。 此守护进程应负责在发生中断时进行重试传递,并充当服务器上运行的所有程序的传递点

  • 我想让我的Kafka制作人变得富有交易性。我正在发送10条消息。如果发生任何错误,则不应向Kafka发送任何消息,即无或全部。 我使用的是Spring Boot KafkaTemplate。 我正在发送文件中提到的10条信息,如下所示。应发送9条消息,且I消息大小超过1MB,由于 https://docs.spring.io/spring-kafka/reference/html/#using-K