我正在使用spark streaming文档中提供的策略来致力于Kafka本身。我的流程是这样的:主题A-->火花流[foreachRdd process->send to Topic b]向主题A提交偏移量
JavaInputDStream<ConsumerRecord<String, Request>> kafkaStream = KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, Request>Subscribe(inputTopics, kafkaParams)
);
kafkaStream.foreachRDD(rdd -> {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd).offsetRanges();
rdd.foreachPartition(
consumerRecords -> {
OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
System.out.println(String.format("$s %d %d $d", o.topic(), o.partition(), o.fromOffset(), o.untilOffset()));
consumerRecords.forEachRemaining(record -> doProcess(record));
});
((CanCommitOffsets) kafkaStream.inputDStream()).commitAsync(offsetRanges);
}
);
这就是我想出来的,它接收输入数据,然后使用输出主题发送请求。生产者必须在foreach循环中创建,否则spark将尝试序列化并将其发送给所有工作人员。注意,响应是异步发送的。这意味着我在这个系统中至少使用了一种语义。
kafkaStream.foreachRDD(rdd -> {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
rdd.foreachPartition(
partition -> {
OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
System.out.println(String.format("%s %d %d %d", o.topic(), o.partition(), o.fromOffset(), o.untilOffset()));
// Print statements in this section are shown in the executor's stdout logs
KafkaProducer<String, MLMIOutput> producer = new KafkaProducer(producerConfig(o.partition()));
partition.forEachRemaining(record -> {
System.out.println("request: "+record.value());
Response data = new Response …
// As as debugging technique, users can write to DBFS to verify that records are being written out
// dbutils.fs.put("/tmp/test_kafka_output",data,true)
ProducerRecord<String, Response> message = new ProducerRecord(outputTopic, null, data);
Future<RecordMetadata> result = producer.send(message);
try {
RecordMetadata metadata = result.get();
System.out.println(String.format("offset='$d' partition='%d' topic='%s'timestamp='$d",
metadata.offset(),metadata.partition(),metadata.topic(),metadata.timestamp()));
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
});
producer.close();
});
((CanCommitOffsets) kafkaStream.inputDStream()).commitAsync(offsetRanges);
}
);
我的任务是编写一个java程序,从一个主题中读取xml,将其转换为JSON并发送到另一个主题。我已经创建了一个将xml转换为json的程序,但我不知道接下来该怎么做,比如如何使用该主题中的xml并将其发送给另一个主题。
我已经设置了kafka客户端,它可以产生和消费消息,当我们把有效载荷从生产者发送到主题时,它可以正常工作,所以我有问题生产者现在第一个消息我可以发送到主题,我也可以从kafka主题中消费,现在我尝试发送第二个消息,但是消费者没有从kafka主题中读取第二个消息,知道这里发生了什么吗? Producer.js consumer.js
你好,我想把数据从一个html页面发送到另一个html页面 这是我的index.html页面 这是newpage.html:
问题内容: 我有我的自定义Java对象,希望利用JVM的内置序列化将其发送到Kafka主题,但是序列化失败并出现以下错误 org.apache.kafka.common.errors.SerializationException:无法将com.spring.kafka.Payload类的值转换为value.serializer中指定的org.apache.kafka.common.serializ
我试图使用pyspark将每日批次的数据发送到Kafka主题,但我当前收到以下错误: Traceback(最近的最后一次调用): File", line 5, in File"/usr/local/rms/lib/hdp26_c5000/park2/python/pyspark/sql/readwriter.py", line 548, in保存自己。_jwrite.save()File"/usr
我有我的自定义Java对象,并希望在构建序列化中利用JVM将其发送到Kafka主题,但序列化失败,出现以下错误 org.apache.kafka.Common.Errors.SerializationException:无法将类com.spring.kafka.payload的值转换为value.Serializer中指定的类org.apache.kafka.Common.Serializatio