当前位置: 首页 > 知识库问答 >
问题:

使用comitAsync将结果发送到另一主题的spark流

岳安福
2023-03-14

我正在使用spark streaming文档中提供的策略来致力于Kafka本身。我的流程是这样的:主题A-->火花流[foreachRdd process->send to Topic b]向主题A提交偏移量

    JavaInputDStream<ConsumerRecord<String, Request>> kafkaStream = KafkaUtils.createDirectStream(
            streamingContext,
            LocationStrategies.PreferConsistent(),
            ConsumerStrategies.<String, Request>Subscribe(inputTopics, kafkaParams)
    );

    kafkaStream.foreachRDD(rdd -> {
                OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd).offsetRanges();
                rdd.foreachPartition(
                        consumerRecords -> {
                            OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
                            System.out.println(String.format("$s %d %d $d", o.topic(), o.partition(), o.fromOffset(), o.untilOffset()));
                            consumerRecords.forEachRemaining(record -> doProcess(record));
                        });

                ((CanCommitOffsets) kafkaStream.inputDStream()).commitAsync(offsetRanges);
            }
    );

共有1个答案

满增
2023-03-14

这就是我想出来的,它接收输入数据,然后使用输出主题发送请求。生产者必须在foreach循环中创建,否则spark将尝试序列化并将其发送给所有工作人员。注意,响应是异步发送的。这意味着我在这个系统中至少使用了一种语义。

kafkaStream.foreachRDD(rdd -> {
        OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
        rdd.foreachPartition(
                partition -> {
                    OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
                    System.out.println(String.format("%s %d %d %d", o.topic(), o.partition(), o.fromOffset(), o.untilOffset()));

                    // Print statements in this section are shown in the executor's stdout logs
                    KafkaProducer<String, MLMIOutput> producer = new KafkaProducer(producerConfig(o.partition()));
                    partition.forEachRemaining(record -> {

                        System.out.println("request: "+record.value());

                        Response data = new  Response …
                        // As as debugging technique, users can write to DBFS to verify that records are being written out
                        // dbutils.fs.put("/tmp/test_kafka_output",data,true)
                        ProducerRecord<String, Response> message = new ProducerRecord(outputTopic, null, data);
                        Future<RecordMetadata> result = producer.send(message);
                        try {
                            RecordMetadata metadata = result.get();
                            System.out.println(String.format("offset='$d' partition='%d' topic='%s'timestamp='$d",
                            metadata.offset(),metadata.partition(),metadata.topic(),metadata.timestamp()));
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        } catch (ExecutionException e) {
                            e.printStackTrace();
                        }
                    });
                    producer.close();
                });

        ((CanCommitOffsets) kafkaStream.inputDStream()).commitAsync(offsetRanges);
    }

);

 类似资料:
  • 我的任务是编写一个java程序,从一个主题中读取xml,将其转换为JSON并发送到另一个主题。我已经创建了一个将xml转换为json的程序,但我不知道接下来该怎么做,比如如何使用该主题中的xml并将其发送给另一个主题。

  • 我已经设置了kafka客户端,它可以产生和消费消息,当我们把有效载荷从生产者发送到主题时,它可以正常工作,所以我有问题生产者现在第一个消息我可以发送到主题,我也可以从kafka主题中消费,现在我尝试发送第二个消息,但是消费者没有从kafka主题中读取第二个消息,知道这里发生了什么吗? Producer.js consumer.js

  • 你好,我想把数据从一个html页面发送到另一个html页面 这是我的index.html页面 这是newpage.html:

  • 问题内容: 我有我的自定义Java对象,希望利用JVM的内置序列化将其发送到Kafka主题,但是序列化失败并出现以下错误 org.apache.kafka.common.errors.SerializationException:无法将com.spring.kafka.Payload类的值转换为value.serializer中指定的org.apache.kafka.common.serializ

  • 我试图使用pyspark将每日批次的数据发送到Kafka主题,但我当前收到以下错误: Traceback(最近的最后一次调用): File", line 5, in File"/usr/local/rms/lib/hdp26_c5000/park2/python/pyspark/sql/readwriter.py", line 548, in保存自己。_jwrite.save()File"/usr

  • 我有我的自定义Java对象,并希望在构建序列化中利用JVM将其发送到Kafka主题,但序列化失败,出现以下错误 org.apache.kafka.Common.Errors.SerializationException:无法将类com.spring.kafka.payload的值转换为value.Serializer中指定的类org.apache.kafka.Common.Serializatio