我对Kafka和Spring-Cloud-Stream还不熟悉。现在,我在启动Kafka项目以发送消息时遇到了一个问题。第一次运行应用程序时显示空指针异常。
日志
java.lang.NullPointerException: null
at java.base/java.util.HashMap.putMapEntries(HashMap.java:497) ~[na:na]
at java.base/java.util.LinkedHashMap.<init>(LinkedHashMap.java:385) ~[na:na]
at org.springframework.core.annotation.MapAnnotationAttributeExtractor.enrichAndValidateAttributes(MapAnnotationAttributeExtractor.java:93) ~[spring-core-5.1.9.RELEASE.jar:5.1.9.RELEASE]
at org.springframework.core.annotation.MapAnnotationAttributeExtractor.<init>(MapAnnotationAttributeExtractor.java:58) ~[spring-core-5.1.9.RELEASE.jar:5.1.9.RELEASE]
at org.springframework.core.annotation.AnnotationUtils.synthesizeAnnotation(AnnotationUtils.java:1609) ~[spring-core-5.1.9.RELEASE.jar:5.1.9.RELEASE]
at org.springframework.cloud.stream.config.BindingBeansRegistrar.collectClasses(BindingBeansRegistrar.java:56) ~[spring-cloud-stream-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.cloud.stream.config.BindingBeansRegistrar.registerBeanDefinitions(BindingBeansRegistrar.java:43) ~[spring-cloud-stream-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.lambda$loadBeanDefinitionsFromRegistrars$1(ConfigurationClassBeanDefinitionReader.java:364) ~[spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
at java.base/java.util.LinkedHashMap.forEach(LinkedHashMap.java:723) ~[na:na]
at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.loadBeanDefinitionsFromRegistrars(ConfigurationClassBeanDefinitionReader.java:363) ~[spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.loadBeanDefinitionsForConfigurationClass(ConfigurationClassBeanDefinitionReader.java:145) ~[spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.loadBeanDefinitions(ConfigurationClassBeanDefinitionReader.java:117) ~[spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
at org.springframework.context.annotation.ConfigurationClassPostProcessor.processConfigBeanDefinitions(ConfigurationClassPostProcessor.java:327) ~[spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
at org.springframework.context.annotation.ConfigurationClassPostProcessor.postProcessBeanDefinitionRegistry(ConfigurationClassPostProcessor.java:232) ~[spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
at org.springframework.context.support.PostProcessorRegistrationDelegate.invokeBeanDefinitionRegistryPostProcessors(PostProcessorRegistrationDelegate.java:275) ~[spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
at org.springframework.context.support.PostProcessorRegistrationDelegate.invokeBeanFactoryPostProcessors(PostProcessorRegistrationDelegate.java:95) ~[spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.invokeBeanFactoryPostProcessors(AbstractApplicationContext.java:705) ~[spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:531) ~[spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:141) ~[spring-boot-2.1.8.RELEASE.jar:2.1.8.RELEASE]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:744) ~[spring-boot-2.1.8.RELEASE.jar:2.1.8.RELEASE]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:391) ~[spring-boot-2.1.8.RELEASE.jar:2.1.8.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:312) ~[spring-boot-2.1.8.RELEASE.jar:2.1.8.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1215) ~[spring-boot-2.1.8.RELEASE.jar:2.1.8.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1204) ~[spring-boot-2.1.8.RELEASE.jar:2.1.8.RELEASE]
at stream.websokect.demo.DemoApplication.main(DemoApplication.java:15) ~[classes/:na]
application.properties
spring.cloud.stream.bindings.output.destination= meetUpTopic
spring.cloud.stream.bindings.output.producer.partition-count= 1
spring.cloud.stream.bindings.output.content-type= text/plain
spring.cloud.stream.bindings.output.producer.header-mode= raw
人口应用
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.web.socket.client.WebSocketClient;
import org.springframework.web.socket.client.standard.StandardWebSocketClient;
@SpringBootApplication
public class DemoApplication {
public static String MEET_UP_STREAM = "ws://stream.meetup.com/2/rsvps";
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@Bean
public ApplicationRunner initializeConnection(RsvpWebSocketHandler rsvpWebSocketHandler) {
System.out.println("Hello=============");
return args -> {
WebSocketClient webSocketClient = new StandardWebSocketClient();
webSocketClient.doHandshake(rsvpWebSocketHandler, MEET_UP_STREAM);
};
}
}
RsvpKafkaProducer公司
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketMessage;
@Component
@EnableBinding(Source.class)
public class RsvpKafkaProducer {
private final Source source;
public RsvpKafkaProducer(Source source) {
this.source = source;
}
public void sendRsvpMessage(WebSocketMessage<?> message) {
source.output()
.send(MessageBuilder.withPayload(message.getPayload()).build(), 10000);
}
}
RsvpWebSocketHandler
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.AbstractWebSocketHandler;
@Component
public class RsvpWebSocketHandler extends AbstractWebSocketHandler {
private final RsvpKafkaProducer rsvpKafkaProducer;
public RsvpWebSocketHandler(RsvpKafkaProducer rsvpKafkaProducer) {
this.rsvpKafkaProducer = rsvpKafkaProducer;
}
@Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) {
System.out.println(message.getPayload() + "================/n");
rsvpKafkaProducer.sendRsvpMessage(message);
}
}
pom。xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.8.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>stream.websokect</groupId>
<artifactId>demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>demo</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>11</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-websocket</artifactId>
<version>4.0.0.RELEASE</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-stream -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<version>3.0.8.RELEASE</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-stream-binder-kafka -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<version>3.0.8.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
BindingBeansRegistrar.collect类
这是@EnableBinda
注释的一些问题;您使用的是什么版本?
我试图用另一个没有类的EnableBinding来复制它,但它仍然加载正常。
您可以在BindingBeanSregistar中设置断点。collectClasses查看它试图从哪些属性合成注释。
我把你的制作人复制到了一个新的Spring Boot应用程序中,它运行起来没有任何问题;如果你可以在某个地方发布一个精简的应用程序(没有websocket的东西),我可以看看有什么问题。
我们有一个要求,我们正在消费来自一个主题的消息,然后发生了一些丰富,然后我们将消息发布到另一个主题。以下是事件 使用者 - 使用消息 扩充 - 扩充使用的消息 制作人 - 已发布 向其他主题发送的丰富消息 我正在使用Spring cloud kafka binder,一切正常。突然,我们观察到生产者正在向主题发送重复的消息,然后我们使生产者是幂等的。为了更好地控制,我们将autocommitOff
我在活动onCreate()中初始化列表,如下所示: 编辑:我的活动的launchmode是SingleTask 编辑2: 我无法发送更多有用的日志,因为这次崩溃是在生产中。我只有一些布料日志。 谢谢你的帮助,但我不能粘贴整个代码,因为隐私。 我想我在SingleTask-OnCreate-OnNewIntent用法上有问题。简单地说,我试图打开我的应用程序从通知与参数决定哪个片段将打开时,用户导
spring . cloud . stream . Kafka . binder . zknodes是必须的吗?如果价值缺失会发生什么?
如何使用Spring Cloud Stream Kafka Binder为生产者启用压缩(例如GZIP)?
我有一个使用kafka活页夹的spring cloud stream应用程序,它可以消费和发送消息。在应用程序中,我使用重试策略配置自定义错误处理程序,并将不可重试的异常添加到处理程序中。配置示例: 但是我看到,如果异常抛出,比应用程序重试处理消息3次。预期行为-如果App. MyCustomException.class抛出,将不会重复消费消息。如何为Spring云流kafka绑定应用程序配置重
指针变量也是变量,是变量就可以任意赋值,不要越界即可(32位编译器指针大小为4字节,64位编译器指针大小为8字节),但是,任意数值赋值给指针变量没有意义,因为这样的指针就成了野指针,此指针指向的区域是未知(操作系统不允许操作此指针指向的内存区域)。所以,野指针不会直接引发错误,操作野指针指向的内存区域才会出问题。 int a = 100; int *p; p = a; //把a的值赋值给指针变量p