我正在尝试通过SCSt频道构建并获取KTable。但这并不奏效。输入KTable没有数据,但如果我尝试查看KSTream聚合(toStream()),我可以看到一些数据。我明白了,KTable是不可查询的,它没有可查询的名称。
类别:
@Slf4j
@EnableBinding({LimitBinding.class})
public class CommonWorker {
@Value("${app.dataflow.out-destination}")
private String customerOut;
private LimitCustomersHelper custHelper = new LimitCustomersHelper();
@StreamListener(CUSTOMER_IN)
public void groupCustomersByLimitIdKTable(KStream<Key, Envelope> input) {
input
.filter(custHelper::afterIsNotNull)
.groupBy(custHelper::groupBy)
.aggregate(
custHelper::create,
custHelper::aggregate,
custHelper.materialized(customerOut)
);
}
@StreamListener
public void checkCustomerasTable(@Input(CUSTOMER_OUT) KTable<StringWrapper,LimitCustomers> customers){
customers.toStream().peek(StreamUtils::peek);
}
绑定:
public interface LimitBinding {
String CUSTOMER_IN = "customer-in";
String CUSTOMER_OUT = "customer-out";
@Input(CUSTOMER_IN)
KStream<Key, Envelope> customerInput();
@Input(CUSTOMER_OUT)
KTable<StringWrapper, LimitCustomers> customersStream();
}
application.yml:
server.port: 0
spring:
application.name: connect-producer
cloud.stream:
kafka.streams.binder.configuration:
schema:
registry.url: http://192.168.99.100:8081
default:
key.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
schema.avro.dynamic-schema-generation-enabled: true
bindings:
customer-in:
contentType: application/*+avro
destination: ${app.dataflow.in-destination}
group: ${app.dataflow.in-destination}
customer-out:
consumer.materializedAs: ${app.dataflow.out-destination}
app.dataflow:
in-destination: customer_link
out-destination: customer_link.next
spring.cloud.stream.kafka.streams.binder:
brokers: 192.168.99.100:9092
configuration.application.server: 192.168.99.100:9092
通过添加主题名称模拟表名称解决了该问题
如何使用Spring Cloud Stream Kafka Binder为生产者启用压缩(例如GZIP)?
我有一个使用kafka活页夹的spring cloud stream应用程序,它可以消费和发送消息。在应用程序中,我使用重试策略配置自定义错误处理程序,并将不可重试的异常添加到处理程序中。配置示例: 但是我看到,如果异常抛出,比应用程序重试处理消息3次。预期行为-如果App. MyCustomException.class抛出,将不会重复消费消息。如何为Spring云流kafka绑定应用程序配置重
我们如何使用Spring-Cloud-stream-binder-kinesis建立两个AWS kinesis连接? 第一个连接:Spring应用程序和AWS kinesis流在同一个AWS账户中。 第二个连接:其他AWS运动流位于不同的AWS帐户中。 从spring应用程序到不同AWS帐户中的两个不同运动流是否可能有两个不同的连接?如果是,我们如何实施?
我们有一个要求,我们正在消费来自一个主题的消息,然后发生了一些丰富,然后我们将消息发布到另一个主题。以下是事件 使用者 - 使用消息 扩充 - 扩充使用的消息 制作人 - 已发布 向其他主题发送的丰富消息 我正在使用Spring cloud kafka binder,一切正常。突然,我们观察到生产者正在向主题发送重复的消息,然后我们使生产者是幂等的。为了更好地控制,我们将autocommitOff
spring . cloud . stream . Kafka . binder . zknodes是必须的吗?如果价值缺失会发生什么?