当前位置: 首页 > 知识库问答 >
问题:

Spring Kafka流处理器不工作

凤自珍
2023-03-14

我正在尝试使用Spring boot编写一个Kafka流处理器,但当消息产生到主题中时,它不会被调用。

@Service
public class Producer {

    private static final Logger logger = LoggerFactory.getLogger(Producer.class);
    private static final String TOPIC = "adt.events.location";
    private final KafkaTemplate<String, Object> kafkaTemplate;

    public Producer(KafkaTemplate<String, Object> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    @EventListener(ApplicationStartedEvent.class)
    public void produce() {
        this.kafkaTemplate.send(TOPIC, "1", new EventPatientCheckedIn(1L, 1001L, 104L, 11L, 6L));
        this.kafkaTemplate.send(TOPIC, "1", new EventPatientBedChanged(1L, 1001L, 7L));
        this.kafkaTemplate.send(TOPIC, "1", new EventPatientRoomChanged(1L, 1001L, 10L));
        this.kafkaTemplate.send(TOPIC, "2", new EventPatientCheckedIn(2L, 1002L, 110L, 18L, 2L));
        this.kafkaTemplate.send(TOPIC, "3", new EventPatientCheckedIn(3L, 1003L, 111L, 16L, 1L));
        this.kafkaTemplate.send(TOPIC, "1", new EventPatientCheckedOut(1L, 1001L));
        this.kafkaTemplate.send(TOPIC, "3", new EventPatientBedChanged(3L, 1003L, 3L));
    }
}

主题消息有不同的类型,并且是Avro格式的。在模式注册表中使用Avro UNION注册模式。

这些是主题

@Configuration
public class KafkaConfig {

    @Bean
    public NewTopic topicEventsLocation() {
        return TopicBuilder.name("adt.events.location").partitions(1).replicas(1).build();
    }

    @Bean
    public NewTopic topicPatientLocation() {
        return TopicBuilder.name("adt.patient.location").partitions(1).replicas(1).build();
    }
}

application.yml我正在使用cp-all-in-one-community作为docker-file

server:
  port: 9000
spring:
  kafka:
    properties:
      auto:
        register:
          schemas: false
      use:
        latest:
          version: true
      schema:
        registry:
          url: http://localhost:8081
    consumer:
      bootstrap-servers: localhost:9092
      group-id: group_id
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
    producer:
      bootstrap-servers: localhost:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
    streams:
      application-id: kafka-demo
      properties:
        default.key.serde: org.apache.kafka.common.serialization.Serdes$LongSerde
        default.value.serde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
@SpringBootApplication
@EnableKafkaStreams
public class KafkaDemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(KafkaDemoApplication.class, args);
    }
}
@Component
public class Processor {

    final StreamsBuilder builder = new StreamsBuilder();

    @Autowired
    public void process() {
        ...    
    }
}

但现在我得到以下错误:

2021-04-07 16:02:16.967 ERROR 120225 --- [           main] org.apache.kafka.streams.KafkaStreams    : stream-client [LocationService-9611eedf-df9b-4fe5-9a7d-058027cee22a] Topology with no input topics will create no stream threads and no global thread.
2021-04-07 16:02:16.967  WARN 120225 --- [           main] ConfigServletWebServerApplicationContext : Exception encountered during context initialization - cancelling refresh attempt: org.springframework.context.ApplicationContextException: Failed to start bean 'defaultKafkaStreamsBuilder'; nested exception is org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is org.apache.kafka.streams.errors.TopologyException: Invalid topology: Topology has no stream threads and no global threads, must subscribe to at least one source topic or global table.

共有1个答案

贾越
2023-03-14

kafkatemplate上使用@autowired。我想这就是你所缺少的东西。我给出的示例没有使用AvroSerializer。所以我假设您的序列化程序正在工作。至少您应该看到到达使用者的消息或序列化错误。而且,您可以改进您的方法来处理回调,并使用更加一致的消息记录。例如,使用ProducerRecord创建要发送的消息。使用ListenableFuture添加回调。

@Slf4j
@Service
public class Producer {
   @Autowired
   KafkaTemplate<String, Object> kafkaTemplate;

   public void produce() {
        String key = "1";
        Object value = EventPatientCheckedIn....

        ProducerRecord<String, Object> producerRecord = buildProducerRecord(TOPIC, key, value);

        ListenableFuture<SendResult<String, Object>> listenableFuture = kafkaTemplate.send(producerRecord);

        listenableFuture.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable ex) { handleFailure(key, value, ex); }

            @Override
            public void onSuccess(SendResult<String, Object> result) { handleSuccess(key, value, result); }
        });
    }

    private ProducerRecord<String, Object> buildProducerRecord(String topic, String key, Object value) {
        List<Header> recordHeaders = List.of(new RecordHeader("event-source", "scanner".getBytes()));
        return new ProducerRecord<String, Object>(topic, null, key, value, recordHeaders);
    }

    private void handleSuccess(String key, Object value, SendResult<String, Object> result) {
        log.info("message send successfully for the key: {} and value: {} at partition: {}", key, value, result.getRecordMetadata().partition());
    }

    private void handleFailure(String key, Object value, Throwable ex) {
        log.error("error sending the message and the exception us {}", ex.getMessage());
        try { throw ex; }
        catch (Throwable throwable) {
            log.error("error on failure: {}", throwable.getMessage());
        }
    }
}

更新:我认为您没有配置属性然后在处理器上创建streams.start();。我将这个例子建立在这个参考的基础上。

Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "adt.events.location");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, YOUR_AVRO_SERDE_HERE);
...
...
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
 类似资料:
  • 我有一个批处理步骤 读取器和处理器流程如何工作?读取器是读取块并等待处理器处理它,还是一次读取所有块。

  • 我试图更改添加到docx文件中的图像的对齐方式,但对齐方式不起作用。我尝试了以下方法: 虽然图像是在导出的docx文件中呈现的,但是图像对齐始终保持不变。我如何改变图像的对齐中心或结束?

  • 在使用cypress cucumber预处理器时,我的package.json中有以下内容 当我运行npm时,运行ci-test,它可以工作,所有使用@othertag的场景都会被忽略。 但却找不到任何标记。

  • 我知道这里之前有人问过这个问题:Kafka流并发? 但这对我来说很奇怪。根据文档(或者我可能遗漏了什么),每个分区都有一个任务,这意味着不同的处理器实例,每个任务由不同的线程执行。但是当我测试它的时候,我看到不同的线程可以得到不同的处理器实例。因此,如果你想在处理器中保持内存状态(老式的方式),你必须锁定? 线程ID:88 ID:c667e669-9023-494b-9345-236777e9df

  • 我正在Kafka流中的处理器节点上工作。对于一个简单的代码,我编写如下代码只是为了过滤用户ID,这是在kafka流中处理处理器节点的正确方法吗? 但是,下面的代码没有编译,抛出了一个错误:

  • 我正在尝试获得骆驼路线JMS- 下面的例子说明了如果REST服务的服务器出现故障而无法交付route时会发生什么情况。 我得到了正确的例外: 但是消息被消费并从队列中删除。我的假设是使用事务/事务骆驼和AMQ可以解决这个问题并将消息移动到ActiveMQ.DLQ. 我已经阅读了《骆驼行动》第一版的第9章,并在谷歌上搜索,但没有找到任何解决我问题的方法。 我知道我可以创建/定义自己的Transact