当前位置: 首页 > 知识库问答 >
问题:

使用Spring Cloud Stream使用Confluent Schema注册表生成Avro消息

轩辕佑运
2023-03-14

pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.demo.kafka</groupId>
  <artifactId>kafka-demo</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>kafka-demo</name>
  <description>Demo project for Spring Boot</description>


  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>1.5.4.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
  </parent>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <java.version>1.8</java.version>
    <spring-cloud.version>Dalston.SR1</spring-cloud.version>
    <project-avro.version>7.0.0-SNAPSHOT</project-avro.version>
    <apache.avro.version>1.8.2</apache.avro.version>
  </properties>

  <dependencyManagement>
    <dependencies>
      <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-dependencies</artifactId>
        <version>${spring-cloud.version}</version>
        <type>pom</type>
        <scope>import</scope>
      </dependency>
    </dependencies>
  </dependencyManagement>

  <dependencies>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-stream-kafka</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream-schema</artifactId>
      <version>1.2.2.RELEASE</version>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>joda-time</groupId>
      <artifactId>joda-time</artifactId>
      <version>2.9.9</version>
    </dependency>
    <dependency>
      <groupId>org.apache.avro</groupId>
      <artifactId>avro</artifactId>
      <version>${apache.avro.version}</version>
    </dependency>
    <dependency>
      <groupId>project.foundation</groupId>
      <artifactId>maritz-avro</artifactId>
      <version>${project-avro.version}</version>
    </dependency>
  </dependencies>
</project>

我已将以下架构发布到注册表:

{
  "type": "record",
  "name": "RoutingSlipMsg",
  "fields": [
    {
      "name": "projectNumber",
      "type": "string"
    },
    {
      "name": "clientName",
      "type": "string"
    },
    {
      "name": "programTheme",
      "type": "string"
    },
    {
      "name": "databaseName",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "name": "actionType",
      "type": [
        "null",
        "string"
      ]
    }
  ]
}

原始测试应用程序。yml

spring:
  cloud:
    stream:
      schema:
        avro:
          dynamicSchemaGenerationEnabled: true
      bindings:
        output:
          binder: kafka
          destination: routing_slip_dev
#          contentType: application/avro
#          contentType: application/*+avro
          contentType: application/com.project.RoutingSlipMsg.v1+avro
      schemaRegistryClient:
        endpoint: http://192.168.99.100:8081
      kafka:
        binder:
          defaultBrokerPort: 9092
          defaultZkPort: 2181
          zkNodes: 192.168.99.100
          brokers: 192.168.99.100
spring:
  cloud:
    stream:
      schema:
        avro:
          dynamicSchemaGenerationEnabled: true
      bindings:
        output:
          binder: kafka
          destination: routing_slip_dev
#          contentType: application/*+avro
#          contentType: application/avro
      schemaRegistryClient:
        endpoint: http://192.168.99.100:8081
      kafka:
        binder:
          defaultBrokerPort: 9092
          defaultZkPort: 2181
          zkNodes: 192.168.99.100
          brokers: 192.168.99.100
@Configuration
public class DemoConfiguration {


  @Bean
  public ConfluentSchemaRegistryClient schemaRegistryClient() {
    ConfluentSchemaRegistryClient registry = new ConfluentSchemaRegistryClient();
    registry.setEndpoint("http://192.168.99.100:8081");
    return registry;
  }

}
package com.project.kafka.kafkademo;

import com.project.avro.RoutingSlipMsg;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkaDemoApplicationTests {

  @Autowired
  private KafkaRoutingSlipSender kafkaRoutingSlipSender;

  @Test
  public void testSend() throws Exception {

    RoutingSlipMsg routingSlip = new RoutingSlipMsg();
    routingSlip.setActionType("FAKE_ACTION");
    routingSlip.setClientName("TEST_CLIENT");
    routingSlip.setDatabaseName("NONE");
    routingSlip.setProgramTheme("THEME");
    routingSlip.setProjectNumber("ABC123");

    kafkaRoutingSlipSender.send(routingSlip);
  }
}
package com.project.kafka.kafkademo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class KafkaDemoApplication {

  public static void main(String[] args) {
    SpringApplication.run(KafkaDemoApplication.class, args);
  }
}
package com.project.kafka.kafkademo;

import com.project.avro.RoutingSlipMsg;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.cloud.stream.schema.client.EnableSchemaRegistryClient;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.core.MessageSource;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.stereotype.Component;

@Component
@EnableBinding(Source.class)
@EnableSchemaRegistryClient
public class KafkaRoutingSlipSender {


  @InboundChannelAdapter(value = Source.OUTPUT)
  public MessageSource<RoutingSlipMsg> send(RoutingSlipMsg data) {
    System.out.println("**** Hello World ****");
    return () -> {
      return new GenericMessage<>(data);
    };
  }
}

发件人类编辑:

@Component
@EnableBinding(Source.class)
@EnableSchemaRegistryClient
public class KafkaRoutingSlipSender {

  @Autowired
  private Source source;

  public void send(RoutingSlipMsg data) {
    System.out.println("**** Hello World ****");
    source.output().send(MessageBuilder.withPayload(data).build());
  }
}

由于不适当使用@InboundChannelAdapter导致原始Stacktrace/Exception

2017-07-06 12:20:48.645 ERROR 69011 --- [ask-scheduler-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessagingException: Failed to invoke method; nested exception is java.lang.IllegalArgumentException: wrong number of arguments
    at org.springframework.integration.endpoint.MethodInvokingMessageSource.doReceive(MethodInvokingMessageSource.java:119)
    at org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:134)
    at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:224)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:245)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.access$000(AbstractPollingEndpoint.java:58)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:190)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:186)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:353)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:55)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:51)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:344)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException: wrong number of arguments
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:216)
    at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:201)
    at org.springframework.integration.endpoint.MethodInvokingMessageSource.doReceive(MethodInvokingMessageSource.java:116)
    ... 20 more

我在这里回顾了几个不同的帖子,但似乎没有什么帮助。我可能错过了什么?

ÿcontentType?"application/x-java-object;type=com.maritz.avro.RoutingSlipMsg"FAKE_ACTIOÎTEST_CLIENÔNONÅTHEMÅABC12³
org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'output'; nested exception is java.lang.NullPointerException
, failedMessage=GenericMessage [payload={"projectNumber": "ABC123", "clientName": "TEST_CLIENT", "programTheme": "THEME", "databaseName": "NONE", "actionType": "FAKE_ACTION"}, headers={id=9a9ebc45-9431-8de5-1cb0-40d191082599, timestamp=1499438830456}]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:449)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373)
    at com.maritz.kafka.kafkademo.KafkaRoutingSlipSender.send(KafkaRoutingSlipSender.java:22)
    at com.maritz.kafka.kafkademo.KafkaDemoApplicationTests.testSend(KafkaDemoApplicationTests.java:51)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75)
    at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86)
    at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:252)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:94)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
    at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)
    at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:191)
    at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
    at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
    at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
    at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
    at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
Caused by: java.lang.NullPointerException
    at org.springframework.cloud.stream.schema.client.DefaultSchemaRegistryClient.register(DefaultSchemaRegistryClient.java:57)
    at org.springframework.cloud.stream.schema.avro.AvroSchemaRegistryClientMessageConverter.resolveSchemaForWriting(AvroSchemaRegistryClientMessageConverter.java:242)
    at org.springframework.cloud.stream.schema.avro.AbstractAvroMessageConverter.convertToInternal(AbstractAvroMessageConverter.java:174)
    at org.springframework.messaging.converter.AbstractMessageConverter.toMessage(AbstractMessageConverter.java:193)
    at org.springframework.cloud.stream.binding.MessageConverterConfigurer$ContentTypeConvertingInterceptor.preSend(MessageConverterConfigurer.java:253)
    at org.springframework.integration.channel.AbstractMessageChannel$ChannelInterceptorList.preSend(AbstractMessageChannel.java:538)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:415)
    ... 31 more

Spring Boot:配置类被忽略,不加载

现在正在使用ConfluentSchemaRegistryClient。我可以在调试时单步执行代码,但它没有找到模式。

Register(字符串主题、字符串格式、字符串模式)

ResponseEntity<Map> response = this.template.exchange(this.endpoint + path, HttpMethod.POST, request, Map.class, new Object[0]);
org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'output'; nested exception is org.springframework.web.client.HttpClientErrorException: 404 Not Found
, failedMessage=GenericMessage [payload={"projectNumber": "ABC123", "clientName": "TEST_CLIENT", "programTheme": "THEME", "databaseName": "NONE", "actionType": "FAKE_ACTION"}, headers={id=73365130-a91b-ba9c-aaf0-77cfaa26f73d, timestamp=1499700409997}]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:449)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373)
    at com.maritz.kafka.kafkademo.KafkaRoutingSlipSender.send(KafkaRoutingSlipSender.java:29)
    at com.maritz.kafka.kafkademo.KafkaDemoApplicationTests.testSend(KafkaDemoApplicationTests.java:52)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75)
    at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86)
    at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:252)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:94)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
    at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)
    at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:191)
    at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
    at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
    at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
    at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
    at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
Caused by: org.springframework.web.client.HttpClientErrorException: 404 Not Found
    at org.springframework.web.client.DefaultResponseErrorHandler.handleError(DefaultResponseErrorHandler.java:63)
    at org.springframework.web.client.RestTemplate.handleResponse(RestTemplate.java:700)
    at org.springframework.web.client.RestTemplate.doExecute(RestTemplate.java:653)
    at org.springframework.web.client.RestTemplate.execute(RestTemplate.java:613)
    at org.springframework.web.client.RestTemplate.exchange(RestTemplate.java:531)
    at org.springframework.cloud.stream.schema.client.ConfluentSchemaRegistryClient.register(ConfluentSchemaRegistryClient.java:73)
    at org.springframework.cloud.stream.schema.avro.AvroSchemaRegistryClientMessageConverter.resolveSchemaForWriting(AvroSchemaRegistryClientMessageConverter.java:242)
    at org.springframework.cloud.stream.schema.avro.AbstractAvroMessageConverter.convertToInternal(AbstractAvroMessageConverter.java:174)
    at org.springframework.messaging.converter.AbstractMessageConverter.toMessage(AbstractMessageConverter.java:193)
    at org.springframework.cloud.stream.binding.MessageConverterConfigurer$ContentTypeConvertingInterceptor.preSend(MessageConverterConfigurer.java:253)
    at org.springframework.integration.channel.AbstractMessageChannel$ChannelInterceptorList.preSend(AbstractMessageChannel.java:538)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:415)
    ... 31 more

共有1个答案

艾才良
2023-03-14

由:java.lang.IllegalArgumentException引起:参数数量错误

@InboundChannelAdapter方法不能有任何参数...

/**
 * Indicates that a method is capable of producing a {@link org.springframework.messaging.Message}
 * or {@link org.springframework.messaging.Message} {@code payload}.
 * <p>
 * A method annotated with {@code @InboundChannelAdapter} can't accept any parameters.
 * <p>    
 * Return values from the annotated method may be of any type. If the return
 * value is not a {@link org.springframework.messaging.Message}, a {@link org.springframework.messaging.Message}
 * will be created with that object as its {@code payload}.

...

它被轮询为消息或消息有效负载。

 类似资料:
  • 我正在尝试使用Confluent schema registry,下面是我在Github中找到的一些示例(https://github.com/gAmUssA/springboot-kafka-avro). 当消费者和生产者与模型共享相同的命名空间而不是其工作时。 当使用者位于具有不同名称空间但具有相同类(名称和属性方面)的不同项目中时,它不工作。 合流Avro反序列化程序可以使用正确的值反序列化

  • 我尝试将我的自定义类型的< code>ProducerRecord发送到Kafka,但我收到错误消息: 我在schema:GET中设置了schema 回应: 得到 回答 } 这是我的Scala类: Kafka制作人的部分: 我错过了什么?我看到我可以为我的类实现SpecificRecord,但在我阅读的书/教程中,我没有看到这一点。谢谢 编辑:固定类名

  • 我们在当前的基础架构中安装了普通的apache Kafka,并开始记录一些我们想要使用Kafka Connect处理的数据。目前,我们使用Avro作为消息格式,但我们的基础架构中没有模式注册表。将来,我们计划用Confluent替换当前堆栈,并使用Schema Registry和Connect,但在一段时间内,我们只需要为此部署Connect。 是否可以以某种方式配置连接接收器,以便它们使用显式a

  • 我可以通过跟随Kafka和Spring Cloud的Start Streaming来运行示例,但不幸的是,它没有使用confluent schema Registry。我阅读了Spring Cloud Stream reference guide的confluent schema registry部分,但它不适用于我的confluent 3.0.0,而且该指南没有提到如何使用confluent s

  • 我正在使用Spring Cloud Stream和Confluent Schema Registry注册Avro模式。 架构注册成功。但是,当我的流侦听器接收到消息时,负载仍然以字节为单位。 这是我的财产。 在接收消息时,我注意到“AbstractAvroMessageConverter”中的“convertFromInternal”从未被调用,而这应该是用来解码消息的。

  • 如何使用Spring Kafka通过合流模式注册表读取AVRO消息?有样品吗?我在官方参考文件中找不到它。