当前位置: 首页 > 知识库问答 >
问题:

如何使用kafka控制台生产者发送键、值消息

闽念
2023-03-14

我有一个需要使用Kafka Console Producer发送键值消息的用例。那么如何通过Kafka Console Producer命令实现这一点呢?

共有2个答案

顾赞
2023-03-14

默认情况下,生产者不关心消息写入的主题分区,并将在主题的所有分区上公平平衡消息。生产者根据记录键的哈希值选择分区,如果记录没有键,则以循环方式选择分区。

>

  • Kafka使用key指定目标分区,默认的
    策略是根据key的哈希值选择分区,或者如果key为null则使用循环算法。

    Kafka使用键-值对,如果未指定键,它将被视为默认值为null,分区将被标识为循环方式。

    如果我们指定密钥,具有相同密钥的消息/记录将进入同一个分区

    要启用从命令行发送完整的键值对,我们需要使用以下两个属性:

    属性

    >

  • 解析。key:如果为true–key是强制的,默认情况下设置为false。

    key.separator:如下

    示例

    • key.separator=,
    • key.separator=-
    • key.separator=:

    Kafka控制台生成器命令

    kafka-console-producer --broker-list MY-KAFKA:29092 --topic kafka-prod --property parse.key=true --property key.separator=,
    

    参考号:https://shashirl9.medium.com/kafka-producer-internals-d971ac582688

  • 宫铭
    2023-03-14

    我经过一些研究找到了解决方案,解决方案就在这里。

    kafka控制台生成器命令

    kafka-console-producer.sh --broker-list localhost:9092 --topic topic-name --property "parse.key=true" --property "key.separator=:"
    

    运行此命令后,您将在producer控制台中输入,并从那里可以发送键、值消息。

    举个例子

    key1:value1
    key2:value2
    key3:value3
    

    为了更清楚,我在这里提供了示例键值消息,emp\u info是一个键,JSON对象是一个值。

    emp_info: {"emp_id":100,"first_name":"Keshav","last_name":"Lodhi","designation":"DataEngineer"}
    

    注意:只需发送几行文本,就会生成键为null的消息。要发送同时包含键和值的消息,必须设置解析。按键和按键。运行生成器时,命令行上的分隔符属性。

     类似资料:
    • 我试着写一些关于主题的消息,但是控制台不允许(生产者不等待标准输入)。我也看不到任何错误日志。尽管主题已成功创建。我正在使用: 动物园管理员和Kafka服务器运行良好。我使用的是苹果电脑。可能的问题是什么。我正在关注阿帕奇Kafka文档 http://kafka.apache.org/documentation.html#quickstart。

    • 我需要更改Kafka设置中的值序列化器/反序列化器(出于测试目的,我一直在使用IntegerSerializer/IntegerDeserializer)。使用javaapi,它完全按照预期工作;但是,当使用控制台工具时,它似乎无法正常工作。 我所做的所有故障排除都让我得出了一个结论:Kafka控制台制作人似乎忽略了序列化程序选项。我尝试了和,并将其设置为带有参数。 它不仅不能将数据序列化为整数,

    • 我们使用sping-cloud-stream-binder-kafka(3.0.3.RELEASE)向我们的Kafka集群(2.4.1)发送消息。时不时地,其中一个生产者线程会收到NOT_LEADER_FOR_PARTITION异常,甚至超过重试(当前设置为12,由依赖sping-retry激活)。我们限制了重试,因为我们发送了大约1kmsg/s(每个生产者实例),并且担心缓冲区的大小。这样我们会

    • java 我正在使用控制中心来检查这个主题的消费者,并跟踪正在消费的数据。在运行这个应用程序时,它与Kafka和所有分区都连接得很好,我可以在控制中心看到所有的数据都被提取了,但在我的java控制台中没有打印任何数据。但是我注意到,在向Kafka发送一些新数据时,它会在java控制台中打印出来(即,在运行我的消费者后将新数据发送给Kafka)。它应该是这样的吗?还是我做错了什么?根据我的理解,Ka

    • 例如,我有一个消费者,最初在时间t1发送100条消息,然后我的消费者在t1+30秒启动并运行,那么我的消费者会使用t1+30秒之后发布的消息,还是会使用t1之后发布的消息?