当前位置: 首页 > 知识库问答 >
问题:

Spring云流Kafka活页夹Ktable不工作

安浩瀚
2023-03-14

我正在尝试通过SCSt频道构建并获取KTable。但这并不奏效。输入KTable没有数据,但如果我尝试查看KSTream聚合(toStream()),我可以看到一些数据。我明白了,KTable是不可查询的,它没有可查询的名称。

类别:

@Slf4j
@EnableBinding({LimitBinding.class})
public class CommonWorker {

  @Value("${app.dataflow.out-destination}")
  private String customerOut;

  private LimitCustomersHelper custHelper = new LimitCustomersHelper();

  @StreamListener(CUSTOMER_IN)
  public void groupCustomersByLimitIdKTable(KStream<Key, Envelope> input) {
   input
        .filter(custHelper::afterIsNotNull)
        .groupBy(custHelper::groupBy)
        .aggregate(
            custHelper::create,
            custHelper::aggregate,
            custHelper.materialized(customerOut)
        );
  }

  @StreamListener
  public void checkCustomerasTable(@Input(CUSTOMER_OUT) KTable<StringWrapper,LimitCustomers> customers){
    customers.toStream().peek(StreamUtils::peek);
  }

绑定:

public interface LimitBinding {

  String CUSTOMER_IN = "customer-in";
  String CUSTOMER_OUT = "customer-out";


  @Input(CUSTOMER_IN)
  KStream<Key, Envelope> customerInput();

  @Input(CUSTOMER_OUT)
  KTable<StringWrapper, LimitCustomers> customersStream();

}

application.yml:

server.port: 0
spring:
  application.name: connect-producer
  cloud.stream:
    kafka.streams.binder.configuration:
      schema:
        registry.url: http://192.168.99.100:8081
      default:
        key.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
        value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
    schema.avro.dynamic-schema-generation-enabled: true
    bindings:
      customer-in:
        contentType: application/*+avro
        destination: ${app.dataflow.in-destination}
        group: ${app.dataflow.in-destination}
      customer-out:
        consumer.materializedAs: ${app.dataflow.out-destination}

app.dataflow:
  in-destination: customer_link
  out-destination: customer_link.next


spring.cloud.stream.kafka.streams.binder:
  brokers: 192.168.99.100:9092
  configuration.application.server: 192.168.99.100:9092

共有1个答案

芮星海
2023-03-14

通过添加主题名称模拟表名称解决了该问题

 类似资料:
  • 如何使用Spring Cloud Stream Kafka Binder为生产者启用压缩(例如GZIP)?

  • 我有一个使用kafka活页夹的spring cloud stream应用程序,它可以消费和发送消息。在应用程序中,我使用重试策略配置自定义错误处理程序,并将不可重试的异常添加到处理程序中。配置示例: 但是我看到,如果异常抛出,比应用程序重试处理消息3次。预期行为-如果App. MyCustomException.class抛出,将不会重复消费消息。如何为Spring云流kafka绑定应用程序配置重

  • 我们如何使用Spring-Cloud-stream-binder-kinesis建立两个AWS kinesis连接? 第一个连接:Spring应用程序和AWS kinesis流在同一个AWS账户中。 第二个连接:其他AWS运动流位于不同的AWS帐户中。 从spring应用程序到不同AWS帐户中的两个不同运动流是否可能有两个不同的连接?如果是,我们如何实施?

  • 我们有一个要求,我们正在消费来自一个主题的消息,然后发生了一些丰富,然后我们将消息发布到另一个主题。以下是事件 使用者 - 使用消息 扩充 - 扩充使用的消息 制作人 - 已发布 向其他主题发送的丰富消息 我正在使用Spring cloud kafka binder,一切正常。突然,我们观察到生产者正在向主题发送重复的消息,然后我们使生产者是幂等的。为了更好地控制,我们将autocommitOff

  • spring . cloud . stream . Kafka . binder . zknodes是必须的吗?如果价值缺失会发生什么?