pom.xml:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.demo.kafka</groupId>
<artifactId>kafka-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>kafka-demo</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.4.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<spring-cloud.version>Dalston.SR1</spring-cloud.version>
<project-avro.version>7.0.0-SNAPSHOT</project-avro.version>
<apache.avro.version>1.8.2</apache.avro.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-schema</artifactId>
<version>1.2.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.9.9</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${apache.avro.version}</version>
</dependency>
<dependency>
<groupId>project.foundation</groupId>
<artifactId>maritz-avro</artifactId>
<version>${project-avro.version}</version>
</dependency>
</dependencies>
</project>
我已将以下架构发布到注册表:
{
"type": "record",
"name": "RoutingSlipMsg",
"fields": [
{
"name": "projectNumber",
"type": "string"
},
{
"name": "clientName",
"type": "string"
},
{
"name": "programTheme",
"type": "string"
},
{
"name": "databaseName",
"type": [
"null",
"string"
]
},
{
"name": "actionType",
"type": [
"null",
"string"
]
}
]
}
原始测试应用程序。yml
spring:
cloud:
stream:
schema:
avro:
dynamicSchemaGenerationEnabled: true
bindings:
output:
binder: kafka
destination: routing_slip_dev
# contentType: application/avro
# contentType: application/*+avro
contentType: application/com.project.RoutingSlipMsg.v1+avro
schemaRegistryClient:
endpoint: http://192.168.99.100:8081
kafka:
binder:
defaultBrokerPort: 9092
defaultZkPort: 2181
zkNodes: 192.168.99.100
brokers: 192.168.99.100
spring:
cloud:
stream:
schema:
avro:
dynamicSchemaGenerationEnabled: true
bindings:
output:
binder: kafka
destination: routing_slip_dev
# contentType: application/*+avro
# contentType: application/avro
schemaRegistryClient:
endpoint: http://192.168.99.100:8081
kafka:
binder:
defaultBrokerPort: 9092
defaultZkPort: 2181
zkNodes: 192.168.99.100
brokers: 192.168.99.100
@Configuration
public class DemoConfiguration {
@Bean
public ConfluentSchemaRegistryClient schemaRegistryClient() {
ConfluentSchemaRegistryClient registry = new ConfluentSchemaRegistryClient();
registry.setEndpoint("http://192.168.99.100:8081");
return registry;
}
}
package com.project.kafka.kafkademo;
import com.project.avro.RoutingSlipMsg;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkaDemoApplicationTests {
@Autowired
private KafkaRoutingSlipSender kafkaRoutingSlipSender;
@Test
public void testSend() throws Exception {
RoutingSlipMsg routingSlip = new RoutingSlipMsg();
routingSlip.setActionType("FAKE_ACTION");
routingSlip.setClientName("TEST_CLIENT");
routingSlip.setDatabaseName("NONE");
routingSlip.setProgramTheme("THEME");
routingSlip.setProjectNumber("ABC123");
kafkaRoutingSlipSender.send(routingSlip);
}
}
package com.project.kafka.kafkademo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class KafkaDemoApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaDemoApplication.class, args);
}
}
package com.project.kafka.kafkademo;
import com.project.avro.RoutingSlipMsg;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.cloud.stream.schema.client.EnableSchemaRegistryClient;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.core.MessageSource;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.stereotype.Component;
@Component
@EnableBinding(Source.class)
@EnableSchemaRegistryClient
public class KafkaRoutingSlipSender {
@InboundChannelAdapter(value = Source.OUTPUT)
public MessageSource<RoutingSlipMsg> send(RoutingSlipMsg data) {
System.out.println("**** Hello World ****");
return () -> {
return new GenericMessage<>(data);
};
}
}
发件人类编辑:
@Component
@EnableBinding(Source.class)
@EnableSchemaRegistryClient
public class KafkaRoutingSlipSender {
@Autowired
private Source source;
public void send(RoutingSlipMsg data) {
System.out.println("**** Hello World ****");
source.output().send(MessageBuilder.withPayload(data).build());
}
}
由于不适当使用@InboundChannelAdapter
导致原始Stacktrace/Exception
2017-07-06 12:20:48.645 ERROR 69011 --- [ask-scheduler-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessagingException: Failed to invoke method; nested exception is java.lang.IllegalArgumentException: wrong number of arguments
at org.springframework.integration.endpoint.MethodInvokingMessageSource.doReceive(MethodInvokingMessageSource.java:119)
at org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:134)
at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:224)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:245)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.access$000(AbstractPollingEndpoint.java:58)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:190)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:186)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:353)
at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:55)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:51)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:344)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException: wrong number of arguments
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:216)
at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:201)
at org.springframework.integration.endpoint.MethodInvokingMessageSource.doReceive(MethodInvokingMessageSource.java:116)
... 20 more
我在这里回顾了几个不同的帖子,但似乎没有什么帮助。我可能错过了什么?
ÿcontentType?"application/x-java-object;type=com.maritz.avro.RoutingSlipMsg"FAKE_ACTIOÎTEST_CLIENÔNONÅTHEMÅABC12³
org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'output'; nested exception is java.lang.NullPointerException
, failedMessage=GenericMessage [payload={"projectNumber": "ABC123", "clientName": "TEST_CLIENT", "programTheme": "THEME", "databaseName": "NONE", "actionType": "FAKE_ACTION"}, headers={id=9a9ebc45-9431-8de5-1cb0-40d191082599, timestamp=1499438830456}]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:449)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373)
at com.maritz.kafka.kafkademo.KafkaRoutingSlipSender.send(KafkaRoutingSlipSender.java:22)
at com.maritz.kafka.kafkademo.KafkaDemoApplicationTests.testSend(KafkaDemoApplicationTests.java:51)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75)
at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86)
at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:252)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:94)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)
at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:191)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
Caused by: java.lang.NullPointerException
at org.springframework.cloud.stream.schema.client.DefaultSchemaRegistryClient.register(DefaultSchemaRegistryClient.java:57)
at org.springframework.cloud.stream.schema.avro.AvroSchemaRegistryClientMessageConverter.resolveSchemaForWriting(AvroSchemaRegistryClientMessageConverter.java:242)
at org.springframework.cloud.stream.schema.avro.AbstractAvroMessageConverter.convertToInternal(AbstractAvroMessageConverter.java:174)
at org.springframework.messaging.converter.AbstractMessageConverter.toMessage(AbstractMessageConverter.java:193)
at org.springframework.cloud.stream.binding.MessageConverterConfigurer$ContentTypeConvertingInterceptor.preSend(MessageConverterConfigurer.java:253)
at org.springframework.integration.channel.AbstractMessageChannel$ChannelInterceptorList.preSend(AbstractMessageChannel.java:538)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:415)
... 31 more
Spring Boot:配置类被忽略,不加载
现在正在使用ConfluentSchemaRegistryClient
。我可以在调试时单步执行代码,但它没有找到模式。
Register(字符串主题、字符串格式、字符串模式)
ResponseEntity<Map> response = this.template.exchange(this.endpoint + path, HttpMethod.POST, request, Map.class, new Object[0]);
org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'output'; nested exception is org.springframework.web.client.HttpClientErrorException: 404 Not Found
, failedMessage=GenericMessage [payload={"projectNumber": "ABC123", "clientName": "TEST_CLIENT", "programTheme": "THEME", "databaseName": "NONE", "actionType": "FAKE_ACTION"}, headers={id=73365130-a91b-ba9c-aaf0-77cfaa26f73d, timestamp=1499700409997}]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:449)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373)
at com.maritz.kafka.kafkademo.KafkaRoutingSlipSender.send(KafkaRoutingSlipSender.java:29)
at com.maritz.kafka.kafkademo.KafkaDemoApplicationTests.testSend(KafkaDemoApplicationTests.java:52)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75)
at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86)
at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:252)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:94)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)
at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:191)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
Caused by: org.springframework.web.client.HttpClientErrorException: 404 Not Found
at org.springframework.web.client.DefaultResponseErrorHandler.handleError(DefaultResponseErrorHandler.java:63)
at org.springframework.web.client.RestTemplate.handleResponse(RestTemplate.java:700)
at org.springframework.web.client.RestTemplate.doExecute(RestTemplate.java:653)
at org.springframework.web.client.RestTemplate.execute(RestTemplate.java:613)
at org.springframework.web.client.RestTemplate.exchange(RestTemplate.java:531)
at org.springframework.cloud.stream.schema.client.ConfluentSchemaRegistryClient.register(ConfluentSchemaRegistryClient.java:73)
at org.springframework.cloud.stream.schema.avro.AvroSchemaRegistryClientMessageConverter.resolveSchemaForWriting(AvroSchemaRegistryClientMessageConverter.java:242)
at org.springframework.cloud.stream.schema.avro.AbstractAvroMessageConverter.convertToInternal(AbstractAvroMessageConverter.java:174)
at org.springframework.messaging.converter.AbstractMessageConverter.toMessage(AbstractMessageConverter.java:193)
at org.springframework.cloud.stream.binding.MessageConverterConfigurer$ContentTypeConvertingInterceptor.preSend(MessageConverterConfigurer.java:253)
at org.springframework.integration.channel.AbstractMessageChannel$ChannelInterceptorList.preSend(AbstractMessageChannel.java:538)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:415)
... 31 more
由:java.lang.IllegalArgumentException引起:参数数量错误
@InboundChannelAdapter
方法不能有任何参数...
/**
* Indicates that a method is capable of producing a {@link org.springframework.messaging.Message}
* or {@link org.springframework.messaging.Message} {@code payload}.
* <p>
* A method annotated with {@code @InboundChannelAdapter} can't accept any parameters.
* <p>
* Return values from the annotated method may be of any type. If the return
* value is not a {@link org.springframework.messaging.Message}, a {@link org.springframework.messaging.Message}
* will be created with that object as its {@code payload}.
...
它被轮询为消息或消息有效负载。
我正在尝试使用Confluent schema registry,下面是我在Github中找到的一些示例(https://github.com/gAmUssA/springboot-kafka-avro). 当消费者和生产者与模型共享相同的命名空间而不是其工作时。 当使用者位于具有不同名称空间但具有相同类(名称和属性方面)的不同项目中时,它不工作。 合流Avro反序列化程序可以使用正确的值反序列化
我尝试将我的自定义类型的< code>ProducerRecord发送到Kafka,但我收到错误消息: 我在schema:GET中设置了schema 回应: 得到 回答 } 这是我的Scala类: Kafka制作人的部分: 我错过了什么?我看到我可以为我的类实现SpecificRecord,但在我阅读的书/教程中,我没有看到这一点。谢谢 编辑:固定类名
我们在当前的基础架构中安装了普通的apache Kafka,并开始记录一些我们想要使用Kafka Connect处理的数据。目前,我们使用Avro作为消息格式,但我们的基础架构中没有模式注册表。将来,我们计划用Confluent替换当前堆栈,并使用Schema Registry和Connect,但在一段时间内,我们只需要为此部署Connect。 是否可以以某种方式配置连接接收器,以便它们使用显式a
我可以通过跟随Kafka和Spring Cloud的Start Streaming来运行示例,但不幸的是,它没有使用confluent schema Registry。我阅读了Spring Cloud Stream reference guide的confluent schema registry部分,但它不适用于我的confluent 3.0.0,而且该指南没有提到如何使用confluent s
我正在使用Spring Cloud Stream和Confluent Schema Registry注册Avro模式。 架构注册成功。但是,当我的流侦听器接收到消息时,负载仍然以字节为单位。 这是我的财产。 在接收消息时,我注意到“AbstractAvroMessageConverter”中的“convertFromInternal”从未被调用,而这应该是用来解码消息的。
如何使用Spring Kafka通过合流模式注册表读取AVRO消息?有样品吗?我在官方参考文件中找不到它。