当前位置: 首页 > 知识库问答 >
问题:

空指针:带活页夹kafka的Spring cloud stream

盖锐进
2023-03-14

我对Kafka和Spring-Cloud-Stream还不熟悉。现在,我在启动Kafka项目以发送消息时遇到了一个问题。第一次运行应用程序时显示空指针异常。

日志

java.lang.NullPointerException: null
    at java.base/java.util.HashMap.putMapEntries(HashMap.java:497) ~[na:na]
    at java.base/java.util.LinkedHashMap.<init>(LinkedHashMap.java:385) ~[na:na]
    at org.springframework.core.annotation.MapAnnotationAttributeExtractor.enrichAndValidateAttributes(MapAnnotationAttributeExtractor.java:93) ~[spring-core-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.core.annotation.MapAnnotationAttributeExtractor.<init>(MapAnnotationAttributeExtractor.java:58) ~[spring-core-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.core.annotation.AnnotationUtils.synthesizeAnnotation(AnnotationUtils.java:1609) ~[spring-core-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.cloud.stream.config.BindingBeansRegistrar.collectClasses(BindingBeansRegistrar.java:56) ~[spring-cloud-stream-3.0.8.RELEASE.jar:3.0.8.RELEASE]
    at org.springframework.cloud.stream.config.BindingBeansRegistrar.registerBeanDefinitions(BindingBeansRegistrar.java:43) ~[spring-cloud-stream-3.0.8.RELEASE.jar:3.0.8.RELEASE]
    at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.lambda$loadBeanDefinitionsFromRegistrars$1(ConfigurationClassBeanDefinitionReader.java:364) ~[spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at java.base/java.util.LinkedHashMap.forEach(LinkedHashMap.java:723) ~[na:na]
    at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.loadBeanDefinitionsFromRegistrars(ConfigurationClassBeanDefinitionReader.java:363) ~[spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.loadBeanDefinitionsForConfigurationClass(ConfigurationClassBeanDefinitionReader.java:145) ~[spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.loadBeanDefinitions(ConfigurationClassBeanDefinitionReader.java:117) ~[spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.context.annotation.ConfigurationClassPostProcessor.processConfigBeanDefinitions(ConfigurationClassPostProcessor.java:327) ~[spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.context.annotation.ConfigurationClassPostProcessor.postProcessBeanDefinitionRegistry(ConfigurationClassPostProcessor.java:232) ~[spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.context.support.PostProcessorRegistrationDelegate.invokeBeanDefinitionRegistryPostProcessors(PostProcessorRegistrationDelegate.java:275) ~[spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.context.support.PostProcessorRegistrationDelegate.invokeBeanFactoryPostProcessors(PostProcessorRegistrationDelegate.java:95) ~[spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.invokeBeanFactoryPostProcessors(AbstractApplicationContext.java:705) ~[spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:531) ~[spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:141) ~[spring-boot-2.1.8.RELEASE.jar:2.1.8.RELEASE]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:744) ~[spring-boot-2.1.8.RELEASE.jar:2.1.8.RELEASE]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:391) ~[spring-boot-2.1.8.RELEASE.jar:2.1.8.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:312) ~[spring-boot-2.1.8.RELEASE.jar:2.1.8.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1215) ~[spring-boot-2.1.8.RELEASE.jar:2.1.8.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1204) ~[spring-boot-2.1.8.RELEASE.jar:2.1.8.RELEASE]
    at stream.websokect.demo.DemoApplication.main(DemoApplication.java:15) ~[classes/:na]

application.properties


spring.cloud.stream.bindings.output.destination= meetUpTopic
spring.cloud.stream.bindings.output.producer.partition-count= 1

spring.cloud.stream.bindings.output.content-type= text/plain
spring.cloud.stream.bindings.output.producer.header-mode= raw

人口应用

import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.web.socket.client.WebSocketClient;
import org.springframework.web.socket.client.standard.StandardWebSocketClient;

@SpringBootApplication
public class DemoApplication {
    public static String MEET_UP_STREAM = "ws://stream.meetup.com/2/rsvps";

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

    @Bean
    public ApplicationRunner initializeConnection(RsvpWebSocketHandler rsvpWebSocketHandler) {
        System.out.println("Hello=============");
        return args -> {
            WebSocketClient webSocketClient = new StandardWebSocketClient();

            webSocketClient.doHandshake(rsvpWebSocketHandler, MEET_UP_STREAM);
        };
    }

}

RsvpKafkaProducer公司

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketMessage;
@Component
@EnableBinding(Source.class)
public class RsvpKafkaProducer {
    private final Source source;

    public RsvpKafkaProducer(Source source) {
        this.source = source;
    }

    public void sendRsvpMessage(WebSocketMessage<?> message) {
        source.output()
                .send(MessageBuilder.withPayload(message.getPayload()).build(), 10000);
    }
}

RsvpWebSocketHandler

import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.AbstractWebSocketHandler;

@Component
public class RsvpWebSocketHandler extends AbstractWebSocketHandler {

    private final RsvpKafkaProducer rsvpKafkaProducer;

    public RsvpWebSocketHandler(RsvpKafkaProducer rsvpKafkaProducer) {
        this.rsvpKafkaProducer = rsvpKafkaProducer;
    }

    @Override
    public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) {
        System.out.println(message.getPayload() + "================/n");

        rsvpKafkaProducer.sendRsvpMessage(message);
    }
}

pom。xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.8.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>stream.websokect</groupId>
    <artifactId>demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>demo</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>11</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-websocket</artifactId>
            <version>4.0.0.RELEASE</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-stream -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
            <version>3.0.8.RELEASE</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-stream-binder-kafka -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
            <version>3.0.8.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

共有1个答案

胡墨竹
2023-03-14

BindingBeansRegistrar.collect类这是@EnableBinda注释的一些问题;您使用的是什么版本?

我试图用另一个没有类的EnableBinding来复制它,但它仍然加载正常。

您可以在BindingBeanSregistar中设置断点。collectClasses查看它试图从哪些属性合成注释。

我把你的制作人复制到了一个新的Spring Boot应用程序中,它运行起来没有任何问题;如果你可以在某个地方发布一个精简的应用程序(没有websocket的东西),我可以看看有什么问题。

 类似资料:
  • 我们有一个要求,我们正在消费来自一个主题的消息,然后发生了一些丰富,然后我们将消息发布到另一个主题。以下是事件 使用者 - 使用消息 扩充 - 扩充使用的消息 制作人 - 已发布 向其他主题发送的丰富消息 我正在使用Spring cloud kafka binder,一切正常。突然,我们观察到生产者正在向主题发送重复的消息,然后我们使生产者是幂等的。为了更好地控制,我们将autocommitOff

  • 我在活动onCreate()中初始化列表,如下所示: 编辑:我的活动的launchmode是SingleTask 编辑2: 我无法发送更多有用的日志,因为这次崩溃是在生产中。我只有一些布料日志。 谢谢你的帮助,但我不能粘贴整个代码,因为隐私。 我想我在SingleTask-OnCreate-OnNewIntent用法上有问题。简单地说,我试图打开我的应用程序从通知与参数决定哪个片段将打开时,用户导

  • spring . cloud . stream . Kafka . binder . zknodes是必须的吗?如果价值缺失会发生什么?

  • 如何使用Spring Cloud Stream Kafka Binder为生产者启用压缩(例如GZIP)?

  • 我有一个使用kafka活页夹的spring cloud stream应用程序,它可以消费和发送消息。在应用程序中,我使用重试策略配置自定义错误处理程序,并将不可重试的异常添加到处理程序中。配置示例: 但是我看到,如果异常抛出,比应用程序重试处理消息3次。预期行为-如果App. MyCustomException.class抛出,将不会重复消费消息。如何为Spring云流kafka绑定应用程序配置重

  • 指针变量也是变量,是变量就可以任意赋值,不要越界即可(32位编译器指针大小为4字节,64位编译器指针大小为8字节),但是,任意数值赋值给指针变量没有意义,因为这样的指针就成了野指针,此指针指向的区域是未知(操作系统不允许操作此指针指向的内存区域)。所以,野指针不会直接引发错误,操作野指针指向的内存区域才会出问题。 int a = 100; int *p; p = a; //把a的值赋值给指针变量p