当前位置: 首页 > 面试题库 >

Kafka制片人发送无效字符

慕皓君
2023-03-14
问题内容

使用以下代码,我发送Elasticsearch文档以进行索引。我尝试将基本对象转换为JSON并通过生产者发送。但是,每条消息(从控制台检查)都附加了乱码,例如
- t。{“ productId”:2455

public boolean sendMessage()
{
    PageRequest page = new PageRequest(0, 1); 
    Product p = product.findByName("Cream", page).getContent().get(0);
    String json = "";
    ObjectMapper mapper = new ObjectMapper();
    try {
        json = mapper.writeValueAsString(p);
    } catch (JsonProcessingException e1) {
        // TODO Auto-generated catch block
        e1.printStackTrace();
    }       
    logger.info("JSON = " + json);

    boolean status =  inputToKafka.send(org.springframework.integration.support.MessageBuilder.withPayload(json).build());
    try {
        Thread.sleep(10000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return status;
}

出站配置

 <?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
       xmlns:task="http://www.springframework.org/schema/task"
       xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
        http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">

    <int:channel id="inputToKafka">
        <int:queue/>
    </int:channel>

    <int-kafka:outbound-channel-adapter
            id="kafkaOutboundChannelAdapter"
            kafka-producer-context-ref="kafkaProducerContext"
            channel="inputToKafka">
        <int:poller fixed-delay="1000" time-unit="MILLISECONDS" receive-timeout="0" task-executor="taskExecutor"/>
    </int-kafka:outbound-channel-adapter>

    <task:executor id="taskExecutor" pool-size="5" keep-alive="120" queue-capacity="500"/>

    <int-kafka:producer-context id="kafkaProducerContext">
        <int-kafka:producer-configurations>
            <int-kafka:producer-configuration broker-list="localhost:9092"
                                              topic="test_topic"
                                              compression-codec="default"/>
        </int-kafka:producer-configurations>
    </int-kafka:producer-context>

    </beans>

有什么线索吗?

使用的插件:Spring Extension Kafka


问题答案:

我今天遇到了这个问题,可以通过在生产者配置中设置正确的value-serializer类来解决此问题,如下所示:

<int-kafka:producer-configuration
                broker-list="localhost:9092" topic="headers['kafka_topic']"
                key-class-type="java.lang.String" value-class-type="java.lang.String"
                key-serializer="kafkaSerializer" value-serializer="kafkaSerializer"/>

<bean id="kafkaSerializer" class="org.apache.kafka.common.serialization.StringSerializer" />


 类似资料:
  • 我在一个视频教程中看到,当制作人发布消息时,Kafka Broker支持3种类型的确认。 0-开火并忘记 1-领导确认 2-确认所有经纪人 我正在使用Kafka的Java API发布消息。这是必须为每个使用服务器的代理设置的吗。每个经纪人的特定属性,还是必须由制作人设定?如果必须由制作人设置,请解释如何使用Java API设置。

  • 我在用Kafka。 我有10k个jsons列表, 我该怎么做呢? 谢谢

  • 我知道Kafka制作人会将消息分批处理。每个批属于一个特定的分区。 我的问题是 生产者是否知道每个批次属于哪个分区? 生产者是否知道每个分区的代理地址? 当生产者发送请求时,每个请求包含多个批次还是只包含一个属于目标分区的批次。 如果生产者发送多个批次,接收kafka服务器是否将批次重传到目标分区。

  • 我使用spring框架和有3个代理集群的kafka。我发现使用者没有使用某些消息(假设在所有发送消息中使用0.01%),所以在生产者代码中,我记录了API返回的消息偏移量: 我使用返回偏移量来查询所有分区中的kafka主题,但它没有找到消息(我测试了与消费者使用的和他们在kafka中的消息相关的其他偏移量),问题是什么,我如何确保该消息发送到kafka? 我还在producer中使用了

  • 我是斯卡拉和Kafka的新手,遇到了一些麻烦。 我正在尝试将scala kafka producer连接到安装在cloudera express服务器上的kafka服务器。我已经用这些指令在VMs中这样做过一次了,没有任何问题。 当我运行producer时,所需的主题被创建,但没有任何消息被发送,或者我是这样认为的。 Kafka制作人 当我执行run方法时,我看到“producer-send:#”

  • 我试图使用谷歌云计算引擎VM实例作为Kafka消费者。我发现虚拟机阻止了来自任何外部计算机的通信,我成功地设置了防火墙规则,从本地计算机访问虚拟机。 我能够在云虚拟机实例上创建和列出主题。但我无法收发Kafka主题的信息。它抛出超时异常。 我使用telnet检查端口是否打开,并获得了端口的转义序列(9092)。 当我尝试使用另一个云虚拟机实例实现相同的事情时,我能够执行所有kafka操作。(发送/