我想定制Spring云流生产者、消费者和KStreams中Avro模式主题的命名策略。 这将在Kafka中使用和->https://docs.confluent.io/current/schema-registry/serializer-formatter.html#subject-name-strategy 在一个土生土长的Kafka制作人中,这是这样的: 显然,Spring Cloud使用自己
我有一个应用程序(spring-boot-shipping-service),其中包含一个KStream,它获取由外部生产者(spring-boot-order-service)生成的OrderCreatedEvent消息。此生成器使用以下架构: Order-Created-Event.avsc 我的与联接,并向order主题发布一种新的消息:Ordershippedevent。 Order-Sh
我有多个微服务和前面的API,喜欢使用相同的主题为事件每个域事件在单独的分区,我能够配置spring kafka绑定器发送到不同的分区使用 实现 我的问题是,我能否将Kstream绑定器配置为只能为@input和@output使用分区。
关于errorChannel没有任何订阅者的投诉 Kafka生产者线程日志 编辑:以下是我的channels类的内容:
我正在努力为Kafka-Streams正确配置Spring Cloud Stream,以便使用带有信任存储和密钥存储的SSL。 在我的应用程序中,我有多个流正在运行,所有流的SSL配置应该是相同的。 stream2:Topic2>Topic4 Topic3 stream3:Topic4>Topic5 我使用最新的Spring-Cloud Stream框架和Kafka-Streams以及Avro模型
pom.xml: 我已将以下架构发布到注册表: 原始测试应用程序。yml 发件人类编辑: 由于不适当使用导致原始Stacktrace/Exception 我在这里回顾了几个不同的帖子,但似乎没有什么帮助。我可能错过了什么? Spring Boot:配置类被忽略,不加载 现在正在使用。我可以在调试时单步执行代码,但它没有找到模式。 Register(字符串主题、字符串格式、字符串模式)
我可以通过跟随Kafka和Spring Cloud的Start Streaming来运行示例,但不幸的是,它没有使用confluent schema Registry。我阅读了Spring Cloud Stream reference guide的confluent schema registry部分,但它不适用于我的confluent 3.0.0,而且该指南没有提到如何使用confluent s
我们可以创建一个不同的输出通道来在发生异常时发布消息,但它感觉我们试图实现一些可能不必要的东西。 更新1 我们添加了这些豆子: 加里的例子似乎普遍有效。尽管我们需要在使用不推荐的StreamListner方法而不是函数时进行一些修改,但仍有一些问题无法解决。 主题名称似乎应该总是,因为我们不知道如何使用这样的不同名称。我们为所有使用者使用一个主题,这似乎不是Spring-kafka默认DLT所期望
我的应用程序成功发送Kafka消息,但只有在Kafka初始化后。在此之前,我得到错误“Dispatcher没有订阅者”。如何等待订阅者完成频道注册? 17.165 SenderClass已创建 17.816初始化类,@PostConstruct启动PollingTask 24.781PollingTask发送第一条Kafka消息 24.816第一个错误:“Dispatcher没有订阅服务器” 25
CustomerDao.InsertCustomer调用回滚,但仍然发送了kafka消息。如果在customer事件上有一个使用者,该事件将客户插入数据仓库,则在转换回滚时,数据仓库和记录系统将不同步。有没有办法让Kafka活页夹在这里是事务性的?
我有一个问题反序列化来自Kafka主题的消息。这些消息已经使用spring-cloud-stream和Apache Avro序列化。我正在用斯普林斯·Kafka阅读它们,并试图反序列化它们。如果我使用spring-cloud来生成和使用消息,那么我就可以很好地反序列化消息。问题是当我用Spring Kafka消费它们,然后试图反序列化。 我正在使用一个模式注册表(用于开发的spring-boot模
我用以下组件构建了一个spring boot kinesis消费者: Spring boot(版本-2.1.2.Release) Spring cloud(version-greenwich.release) Spring cloud stream kinesis绑定器(版本-1.1.0.发行版) 假设我有3个消费者实例部署到PCF(通过在MANIFEST.YML文件中将instances属性设置
我有一个Spring Cloud Stream Kafka Stream应用程序,它读取主题(事件)并执行一个简单的处理: 该应用程序使用来自Confluent Cloud的Kafka环境,带有6个分区的事件主题。完整的配置是: 首先,它显示还原使用者客户端的创建。自动偏移复位无: > 配置了两个消费者的原因是什么? 为什么第二个函数具有,而我没有显式配置它,而且Kafka的默认值是最新的? 我已
我正在函数式编程中使用。我尝试了许多组合来设置GroupId,但是使用者总是将GroupId打印为spring.application.name。 pom.xml 如您所见,groupId不是从yml文件设置的。我在这上面花了很多小时,但没有运气。请帮忙。 更新 当我不使用将pom依赖项修改为spring-cloud-stream-binder-kafka:3.1.1的streams API时,G
我有一个用例,其中处理MS的单个kafka流将有一个处理器和一个消费者,消费者将使用处理器输出消息。类似于下面github中的示例 https://github.com/spring-cloud/spring-cloud-stream-samples/tree/master/kafka-streams-samples/kafka-streams-message-channel 在执行上面的示例时,