我是斯卡拉和Kafka的新手,遇到了一些麻烦。
我正在尝试将scala kafka producer连接到安装在cloudera express服务器上的kafka服务器。我已经用这些指令在VMs中这样做过一次了,没有任何问题。
当我运行producer时,所需的主题被创建,但没有任何消息被发送,或者我是这样认为的。
Kafka制作人
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
class KafkaProducerManager {
val props = new Properties()
props.put("bootstrap.servers", KafkaServer.KAFKA_ADDRESS)
props.put("acks", "all")
props.put("retries", "2")
props.put("auto.commit.interval.ms", "1000")
props.put("linger.ms", "1")
props.put("block.on.buffer.full", "true")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("auto.create.topics.enable", "true")
val producer = new KafkaProducer[String, String](props)
def startCounter() {
println("Start Producer Counter")
for (i <- 1 to 100) {
producer.send(new ProducerRecord("test-counter", i.toString, "Package " + i))
println("Producer - Send: " + i)
}
println("Closing producer")
producer.close()
}
}
当我执行run方法时,我看到“producer-send:#”作为该方法的输出,并且没有错误。所以我假设这段代码已经把信息发送给了Kafka。
在运行Producer之前,我在kafka服务器上启动了以下操作:
kafka-console-consumer --zookeeper zk:2181 --topic test-counter
kafka-topics -zookeeper zk:2181 --list
import java.util.{Arrays, Properties}
import org.apache.kafka.clients.consumer.KafkaConsumer
class KafkaConsumerManager {
val props = new Properties()
props.put("bootstrap.servers", KafkaServer.KAFKA_ADDRESS)
props.put("group.id", "testGroup")
props.put("enable.auto.commit", "true")
props.put("auto.commit.interval.ms", "1000")
props.put("linger.ms", "1")
props.put("session.timeout.ms", "3000")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("zookeeper.connect", KafkaServer.ZOOKEEPER_ADDRESS)
val consumer = new KafkaConsumer[String, String](props)
def start() {
println("Start Consumer")
consumer.subscribe(Arrays.asList("test-counter"))
while (true) {
val records = consumer.poll(100)
val iterator = records.iterator()
while (iterator.hasNext) {
val record = iterator.next()
printf("Consumer: offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value())
}
}
}
}
当我通过kafka-console-producer在服务器上创建消息时,我看到它们出现在服务器上的kafka-console-consumer中,但不出现在我编写的consumer中。
kafka-console-producer --broker-list ks:9092 --topic test-counter
kafkaserver.zookeeper_address与具有kafka-console-consumer的参数zk:2181相同,kafkaserver.kafka_address与具有kafka-console-producer的参数ks:9092相同。
我试了一下代码,发现:
>
应在使用者属性中指定键和值反序列化器:
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
session.timeout.ms
属性有问题。从这里:
我在向我的Kafka主题发送序列化XML时遇到问题。每当我运行我的代码时,我都不会收到任何异常或错误消息,但我仍然无法在Kafka主题中看到我的任何消息。 我的Kafka制作人设置如下: 当我运行代码时,我得到: 知道怎么做吗?提前谢谢!
我试图使用谷歌云计算引擎VM实例作为Kafka消费者。我发现虚拟机阻止了来自任何外部计算机的通信,我成功地设置了防火墙规则,从本地计算机访问虚拟机。 我能够在云虚拟机实例上创建和列出主题。但我无法收发Kafka主题的信息。它抛出超时异常。 我使用telnet检查端口是否打开,并获得了端口的转义序列(9092)。 当我尝试使用另一个云虚拟机实例实现相同的事情时,我能够执行所有kafka操作。(发送/
我正在使用kafka java客户端和kafka服务器。 我的代码: Kafka马纳格 当我的循环长度如果在1000左右(在类)时,我就能成功地向Kafka主题发送数据。 但当我的循环长度为1或小于10时,我无法向Kafka主题发送数据。注意我没有得到任何错误。 根据我的发现,如果我想发送一个单一的消息到Kafka主题,根据这个程序我得到了成功的消息,但从来没有得到一个关于我的主题的消息。 但是如
我已经设置了kafka客户端,它可以产生和消费消息,当我们把有效载荷从生产者发送到主题时,它可以正常工作,所以我有问题生产者现在第一个消息我可以发送到主题,我也可以从kafka主题中消费,现在我尝试发送第二个消息,但是消费者没有从kafka主题中读取第二个消息,知道这里发生了什么吗? Producer.js consumer.js
我正在尝试将Spring Cloud合同合并到现有项目中。我在REST方面取得了一些成功,但我正在努力设置消息端。 到目前为止,我已经在producer上建立了一个契约,它确实在target/generated test sources/contracts中生成了一个测试。我还为测试设置了一个基类。 我无法克服这个错误: 2017-09-08 17:10:51.759错误 - --[]- [ 主]
我在用Kafka。这是我的代码,在那里我想发送消息到kafka服务器,主题名是“west”,消息是“message1”。我没有收到任何错误,虽然我没有看到我发送的消息在主题中有什么问题吗?