使用以下代码,我发送Elasticsearch文档以进行索引。我尝试将基本对象转换为JSON并通过生产者发送。但是,每条消息(从控制台检查)都附加了乱码,例如
- t。{“ productId”:2455
public boolean sendMessage()
{
PageRequest page = new PageRequest(0, 1);
Product p = product.findByName("Cream", page).getContent().get(0);
String json = "";
ObjectMapper mapper = new ObjectMapper();
try {
json = mapper.writeValueAsString(p);
} catch (JsonProcessingException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
logger.info("JSON = " + json);
boolean status = inputToKafka.send(org.springframework.integration.support.MessageBuilder.withPayload(json).build());
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return status;
}
出站配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
<int:channel id="inputToKafka">
<int:queue/>
</int:channel>
<int-kafka:outbound-channel-adapter
id="kafkaOutboundChannelAdapter"
kafka-producer-context-ref="kafkaProducerContext"
channel="inputToKafka">
<int:poller fixed-delay="1000" time-unit="MILLISECONDS" receive-timeout="0" task-executor="taskExecutor"/>
</int-kafka:outbound-channel-adapter>
<task:executor id="taskExecutor" pool-size="5" keep-alive="120" queue-capacity="500"/>
<int-kafka:producer-context id="kafkaProducerContext">
<int-kafka:producer-configurations>
<int-kafka:producer-configuration broker-list="localhost:9092"
topic="test_topic"
compression-codec="default"/>
</int-kafka:producer-configurations>
</int-kafka:producer-context>
</beans>
有什么线索吗?
使用的插件:Spring Extension Kafka
我今天遇到了这个问题,可以通过在生产者配置中设置正确的value-serializer类来解决此问题,如下所示:
<int-kafka:producer-configuration
broker-list="localhost:9092" topic="headers['kafka_topic']"
key-class-type="java.lang.String" value-class-type="java.lang.String"
key-serializer="kafkaSerializer" value-serializer="kafkaSerializer"/>
<bean id="kafkaSerializer" class="org.apache.kafka.common.serialization.StringSerializer" />
我在一个视频教程中看到,当制作人发布消息时,Kafka Broker支持3种类型的确认。 0-开火并忘记 1-领导确认 2-确认所有经纪人 我正在使用Kafka的Java API发布消息。这是必须为每个使用服务器的代理设置的吗。每个经纪人的特定属性,还是必须由制作人设定?如果必须由制作人设置,请解释如何使用Java API设置。
我在用Kafka。 我有10k个jsons列表, 我该怎么做呢? 谢谢
我知道Kafka制作人会将消息分批处理。每个批属于一个特定的分区。 我的问题是 生产者是否知道每个批次属于哪个分区? 生产者是否知道每个分区的代理地址? 当生产者发送请求时,每个请求包含多个批次还是只包含一个属于目标分区的批次。 如果生产者发送多个批次,接收kafka服务器是否将批次重传到目标分区。
我使用spring框架和有3个代理集群的kafka。我发现使用者没有使用某些消息(假设在所有发送消息中使用0.01%),所以在生产者代码中,我记录了API返回的消息偏移量: 我使用返回偏移量来查询所有分区中的kafka主题,但它没有找到消息(我测试了与消费者使用的和他们在kafka中的消息相关的其他偏移量),问题是什么,我如何确保该消息发送到kafka? 我还在producer中使用了
我是斯卡拉和Kafka的新手,遇到了一些麻烦。 我正在尝试将scala kafka producer连接到安装在cloudera express服务器上的kafka服务器。我已经用这些指令在VMs中这样做过一次了,没有任何问题。 当我运行producer时,所需的主题被创建,但没有任何消息被发送,或者我是这样认为的。 Kafka制作人 当我执行run方法时,我看到“producer-send:#”
我试图使用谷歌云计算引擎VM实例作为Kafka消费者。我发现虚拟机阻止了来自任何外部计算机的通信,我成功地设置了防火墙规则,从本地计算机访问虚拟机。 我能够在云虚拟机实例上创建和列出主题。但我无法收发Kafka主题的信息。它抛出超时异常。 我使用telnet检查端口是否打开,并获得了端口的转义序列(9092)。 当我尝试使用另一个云虚拟机实例实现相同的事情时,我能够执行所有kafka操作。(发送/