我试图使用来自kafka主题的合流avro消息作为spring Boot2.0的Kstream。
我能够以messageChannel
的形式使用消息,但不能以kstream
的形式使用消息。
@Input(ORGANIZATION)
KStream<String, Organization> organizationMessageChannel();
@StreamListener
public void processOrganization(@Input(KstreamBinding.ORGANIZATION)KStream<String, Organization> organization) {
log.info("Organization Received:" + organization);
}
例外情况:
线程“pcs-7bb7b444-044d-41bb-945d-450c902337ff-StreamThread-3”org.apache.kafka.streams.errors.StreamsException:流线程[pcs-7bb7b444-044d-41bb-945d-450c902337ff-StreamThread-3]重新平衡失败。在org.apache.kafka.streams.processor.internals.streamthread.pollrequests(streamthread.java:860)在org.apache.kafka.streams.processor.streamthread.runonce(streamthread.java:808)在org.apache.kafka.streams.process.streamthread.runloop(streamthread.java:774)在org.apache.kafka.preams.preamthread.run(streamthread.在org.apache.kafka.streams.streamsconfig.defaultValueserde(streamsconfig.java:859)在org.apache.kafka.streams.processor.internals.abstractProcessorContext.(abstractProcessorContext.java:59)在404)在org.apache.kafka.streams.processor.internals.streamthread$taskcreator.createTask(streamthread.java:365)在org.apache.kafka.streams.processor.streamtread$abstracttaskcreator.creamtread(streamthread.java:350)在org.apache.kafka.streams.processor.internalts.addstreamtasks(streamthread.java:350)在balanceListener.onPartitionsAssigned(streamthread.java:259)在org.apache.kafka.clients.consumer.cordinator.onjoIntegrate(consumercordinator.java:264)在org.apache.kafka.clients.consumer.internals.consumercordinator.java:264)在org.apache.kafka.clients.consumactcordinator.joingRoupifNeed(abstractcordinator.java:367)在pollonce(kafkaconsumer.java:1146)在org.apache.kafka.clients.consumer.kafkaconsumer.poll(kafkaconsumer.java:1111)在org.apache.kafka.streams.processor.internals.streamthread.pollrequests(streamthread.java:851).还有3个原因是:io.confluent.common.config.ConfigException:缺少必需的配置“schema.registry.url”,该配置没有默认值。在io.confluent.common.config.config.configdef.parse(configdef.java:243),在io.confluent.common.config.config.abstractconfig.(abstractconfig.java:78),在io.confluent.kafka.serializers.abstractKafkaavroserdeconfig.(abstractKafkaavroserdeconfig.(abstractKafkaavroserdeconfig.java:61),在.streams.serdes.avro.specificaVroserializer.configure(specificaVroserializer.java:58)在io.confluent.kafka.streams.serdes.avro.specificaVroserde.configure(specificaVroserede.java:107)在org.apache.kafka.streams.streams.streamsconfig.defaultValueserde(streamsconfig.java:855)...19多个
基于这个错误,我认为我没有为Confluent配置schema.registry.url
。我快速浏览了一下这里的示例,在如何使用StreamListener
对spring cloud stream执行相同的操作方面略有丢失
这需要单独配置吗?或者,是否有一种方法可以配置confluent正在寻找的application.yml
中的schema.registry.url
?
以下是代码repo https://github.com/naveenpop/springboot-kstream-confluent
{
"namespace":"com.test.demo.avro",
"type":"record",
"name":"Organization",
"fields":[
{
"name":"orgId",
"type":"string",
"default":"null"
},
{
"name":"orgName",
"type":"string",
"default":"null"
},
{
"name":"orgType",
"type":"string",
"default":"null"
},
{
"name":"parentOrgId",
"type":"string",
"default":"null"
}
]
}
demokstreamapplication.java
@SpringBootApplication
@EnableSchemaRegistryClient
@Slf4j
public class DemokstreamApplication {
public static void main(String[] args) {
SpringApplication.run(DemokstreamApplication.class, args);
}
@Component
public static class organizationProducer implements ApplicationRunner {
@Autowired
private KafkaProducer kafkaProducer;
@Override
public void run(ApplicationArguments args) throws Exception {
log.info("Starting: Run method");
List<String> names = Arrays.asList("blue", "red", "green", "black", "white");
List<String> pages = Arrays.asList("whiskey", "wine", "rum", "jin", "beer");
Runnable runnable = () -> {
String rPage = pages.get(new Random().nextInt(pages.size()));
String rName = names.get(new Random().nextInt(names.size()));
try {
this.kafkaProducer.produceOrganization(rPage, rName, "PARENT", "111");
} catch (Exception e) {
log.info("Exception :" +e);
}
};
Executors.newScheduledThreadPool(1).scheduleAtFixedRate(runnable ,1 ,1, TimeUnit.SECONDS);
}
}
}
java
@Configuration
public class KafkaConfig {
@Value("${spring.cloud.stream.schemaRegistryClient.endpoint}")
private String endpoint;
@Bean
public SchemaRegistryClient confluentSchemaRegistryClient() {
ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient();
client.setEndpoint(endpoint);
return client;
}
}
java
@Slf4j
@EnableBinding(KstreamBinding.class)
public class KafkaConsumer {
@StreamListener
public void processOrganization(@Input(KstreamBinding.ORGANIZATION_INPUT) KStream<String, Organization> organization) {
organization.foreach((s, organization1) -> log.info("KStream Organization Received:" + organization1));
}
}
@EnableBinding(KstreamBinding.class)
public class KafkaProducer {
@Autowired
private KstreamBinding kstreamBinding;
public void produceOrganization(String orgId, String orgName, String orgType, String parentOrgId) {
try {
Organization organization = Organization.newBuilder()
.setOrgId(orgId)
.setOrgName(orgName)
.setOrgType(orgType)
.setParentOrgId(parentOrgId)
.build();
kstreamBinding.organizationOutputMessageChannel()
.send(MessageBuilder.withPayload(organization)
.setHeader(KafkaHeaders.MESSAGE_KEY, orgName)
.build());
} catch (Exception e){
log.error("Failed to produce Organization Message:" +e);
}
}
}
public interface KstreamBinding {
String ORGANIZATION_INPUT= "organizationInput";
String ORGANIZATION_OUTPUT= "organizationOutput";
@Input(ORGANIZATION_INPUT)
KStream<String, Organization> organizationInputMessageChannel();
@Output(ORGANIZATION_OUTPUT)
MessageChannel organizationOutputMessageChannel();
}
我已经向GitHub示例提交了从spring boot本身生成消息的请求,并且仍然以空值的形式获取消息。
感谢您花时间讨论这个问题。
以下实现将不会执行您打算执行的操作:
@StreamListener
public void processOrganization(@Input(KstreamBinding.ORGANIZATION)KStream<String, Organization> organization) {
log.info("Organization Received:" + organization);
}
该log语句只在引导阶段调用一次。为了使其发挥作用,您需要在接收的kstream
上调用一些操作,然后在那里提供逻辑。例如,在下面的工作中,我在foreach
方法调用上提供了一个lambda表达式。
@StreamListener
public void processOrganization(@Input(KstreamBinding.ORGANIZATION) KStream<String, Organization> organization) {
organization.foreach((s, organization1) -> log.info("Organization Received:" + organization1));
}
配置中还有一个问题,即错误地为键分配了avroSerde
,而键实际上是字符串
。像这样更改它:
default:
key:
serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value:
serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
spring:
application:
name: kstream
cloud:
stream:
schemaRegistryClient:
endpoint: http://localhost:8081
schema:
avro:
schema-locations: classpath:avro/Organization.avsc
bindings:
organizationInput:
destination: organization-updates
group: demokstream.org
consumer:
useNativeDecoding: true
organizationOutput:
destination: organization-updates
producer:
useNativeEncoding: true
kafka:
bindings:
organizationOutput:
producer:
configuration:
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
schema.registry.url: http://localhost:8081
streams:
binder:
brokers: localhost
configuration:
schema.registry.url: http://localhost:8081
commit:
interval:
ms: 1000
default:
key:
serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value:
serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
您还可以从主应用程序类中删除Kafkaconfig
类以及EnableSchemareGistryClient
注释。
我有以下场景: 生产者通过Confluent的REST代理(在Confluent的模式注册表上注册模式)向Kafka主题发送Avro编码的消息,如http://docs.confluent.io/3.0.0/kafka-rest/docs/intro.html#produce-and-consument-avro-messages所述 Spring Cloud Stream enabled mes
本节将详细介绍如何使用Spring Cloud Stream。它涵盖了创建和运行流应用程序等主题。
Spring cloud stream starter kafka在连接消费者时没有加载配置。以下是我在调试模式下运行控制台时在控制台中看到的配置: 我有以下引导yml文件的配置部分
当从shell/data flow UI部署流时,有没有什么解决方法不使用git build pack 以下是https://github.com/cloudfoundry/java-buildpack.git的错误消息:克隆git repository失败
使用Spring-Cloud-Stream的kafka绑定器,如何配置并发消息消费者(在单个消费者jvm中)?如果我没有理解错的话,在使用kafka时并发使用消息需要分区,但是s-c-s文档指出,要使用分区,您需要通过partitionKeyExpression或PartitionKeyExtractorClass在生成器中指定分区选择。Kafka博士提到循环分区。 s-c-s文档根本没有提到sp
我开发了spring批处理应用程序,该应用程序生成由json对象列表组成的amqp(rabbitmq)消息。消息具有包含一些元数据的标头。Spring cloud stream应用程序正在消费消息,我使用了功能性方法。如何访问标题<将消息头用于除路由之外的任何内容,这是一种糟糕的方法吗?