我试图阅读Kafka消息使用jsr223采样器在jmetmKafka消费者
[响应消息:javax.script.ScriptException:javax.script.ScriptException:java.lang.ClassCastException:[Ljava.lang.String;无法转换为java.util.List]
请帮助我解决此问题,以便我可以使用kafka consumer订阅和使用消息。
import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Properties props = new Properties();
String groupID = "REQUEST_RESPONSE_JOB_GROUP";
String clientID = "REQUEST_RESPONSE_JOB_CLIENT";
String BSID = "kafka:9092";
String topic = "PROC_REST_EVENTS";
props.put("bootstrap.servers", BSID);
props.put("group.id", groupID);
props.put("client.id", clientID);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("partition.assignment.strategy","org.apache.kafka.clients.consumer.RangeAssignor");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
//Kafka Consumer subscribes list of topics here.
consumer.subscribe(Arrays.asList(topic));
//print the topic name
System.out.println("Subscribed to topic " + topic);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
// print the offset,key and value for the consumer records.
System.out.printf("offset = %d, key = %s, value = %s\n",
record.offset(), record.key(), record.value());
return records;
}
很可能您从Kafka主题中获得了一个列表,而您的消费者需要一个字符串,您需要修改消费者配置以匹配来自该主题的类型。
尝试下面的Groovy代码,它将3条消息发送到test
主题(如果它不存在,则需要创建它),然后读取它们。
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.LongDeserializer
import org.apache.kafka.common.serialization.LongSerializer
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
def BOOTSTRAP_SERVERS = 'localhost:9092'
def TOPIC = 'test'
Properties kafkaProps = new Properties()
kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS)
kafkaProps.put(ProducerConfig.CLIENT_ID_CONFIG, 'KafkaExampleProducer')
kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName())
kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName())
kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS)
kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, 'KafkaExampleConsumer')
kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName())
kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName())
def producer = new KafkaProducer<>(kafkaProps)
def consumer = new KafkaConsumer<>(kafkaProps)
1.upto(3) {
def record = new ProducerRecord<>(TOPIC, it as long, 'Hello from JMeter ' + it)
producer.send(record)
log.info('Sent record(key=' + record.key() + 'value=' + record.value() + ')')
}
consumer.subscribe(Collections.singletonList(TOPIC))
final int giveUp = 100
int noRecordsCount = 0
while (true) {
def consumerRecords = consumer.poll(1000)
if (consumerRecords.count() == 0) {
noRecordsCount++
if (noRecordsCount > giveUp) break
else continue
}
consumerRecords.each { record ->
log.info('Received Record:(' + record.key() + ', ' + record.value() + ')')
}
consumer.commitAsync()
}
consumer.close()
您应该看到这样的输出:
完成后,您应该能够使用上述代码作为您自己的Kafka消息消费测试的基础。有关使用JMeter进行Kafka负载测试的更多信息,请参阅ApacheKafka-HowtoLoadTestwithJMeter文章。
既然我们可以在Javascript中使用关键字抛出任何东西,那么我们就不能直接抛出一个错误消息字符串吗? 有人知道这里面有什么陷阱吗? 让我对此添加一些背景:在JavaScript世界中,人们通常依赖参数检查而不是使用try-catch机制,因此只使用抛出致命错误是有意义的。不过,为了能够捕捉一些系统错误,我必须为我自己的错误使用一个不同的类,而不是创建错误的子类,我认为我应该只使用String。
我是Kafka的新手,在尝试一个示例场景时,Kafka生产者以JSON格式向消费者发送用户详细信息。我访问过类似的问题,但我无法得到我需要的答案。 如果我在终端中运行任何一个生产者或消费者,在spring boot中运行另一个生产者或消费者,我不会面临任何问题。错误发生在无限循环中(当生产者和消费者都从不同的spring boot项目启动时): 我在下面提到了消费者配置中的反序列化和受信任包: 我
我调用方法从一个工人线程在下面的代码,但Android不抛出这应该说java.lang.IllegalStateExc0019:调用视图方法在另一个线程比UI线程”,因为我正在修改UI从外部的UI线程,这在Android中是禁止的。 以下是我的工作线程作为内部类的可运行状态: 注意:host OnHoldDialog是我活动的成员。 而不是抛出,android只是不根据消息更新UI。 这是虫子吗?
我对kafka和kafka-python相当陌生。安装kafka-python后,我从这里尝试了一个简单的消费者代码实现-http://kafka-python.readthedocs.io/en/master/usage.html 我一直在kafka的bin目录中编写消费者代码,并尝试从那里运行python代码。但是,我遇到以下错误: 回溯(最近一次调用):文件 “KafkaConsumer.p
我正在使用一个Kafka产品和一个SpringKafka消费者。我正在使用Json序列化器和反序列化器。每当我试图从主题中读取消费者中的消息时,我会得到以下错误: 我没有在生产者和消费者中配置任何关于头的内容。我错过了什么?