当前位置: 首页 > 知识库问答 >
问题:

Dispatcher没有订阅频道employee-management-1.kafka-log发布者的用户

骆利
2023-03-14

我已经完成了以下配置和java代码,以向Kafka主题发送消息。我只想发信息。

  • 我正在使用Spring boot和kafka集成
  • Kafka应用程序已从Docker启动
  • Spring Boot应用程序通过简单的配置连接kafka

波姆。xml

        <parent>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-parent</artifactId>
           <version>2.5.1</version>
           <relativePath /> <!-- lookup parent from repository -->
        </parent>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>2020.0.3</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>

应用yml

spring:
  kafka:
    bootstrap-servers:
    - localhost:19091
  cloud:
    stream:
      bindings:
        kafka-log-publisher:
          binder: kafka
          destination: com.tonitingaurav.kafka.log          
      default-binder: kafka
      kafka:
        binder:
          brokers:
          - localhost:19091

消息通道Bean

@Configuration
@EnableIntegration
public class LogProducerKafkaConfig {

    @Bean("kafka-log-publisher")
    public MessageChannel kafkaLogPublisher() {
        return new DirectChannel();
    }
}

日志事件发布程序

@Component
public class LogEventPublisher {

    @Autowired
    @Qualifier("kafka-log-publisher")
    MessageChannel messageChannel;
    
    public void logMessage(Log log) {
        Message<Log> message = MessageBuilder.withPayload(log).build();
        messageChannel.send(message);
    }
    
}

Restendpoint发布消息

public class EmployeeController {

    private static final Logger LOGGER = LoggerFactory.getLogger(EmployeeController.class);

    @Autowired
    private LogEventPublisher logEventPublisher;

    @GetMapping
    public ResponseEntity<Employees> getAll() {
        LOGGER.info("Getting All Employees");
        logEventPublisher.logMessage(new Log("Getting All Employees"));
    }
}

下面是我在执行其余endpoint时得到的异常跟踪。

org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'employee-management-1.kafka-log-publisher'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=com.tonitingaurav.microservice.audit.Log@3e170b0f, headers={id=e549b9d1-8ada-8032-2ff9-6cf1e02bae53, timestamp=1638119747020}]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:76) ~[spring-integration-core-5.5.0.jar:5.5.0]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317) ~[spring-integration-core-5.5.0.jar:5.5.0]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272) ~[spring-integration-core-5.5.0.jar:5.5.0]
    at com.tonitingaurav.microservice.event.log.LogEventPublisher.logMessage(LogEventPublisher.java:21) ~[classes/:na]
    at com.tonitingaurav.microservice.controller.EmployeeController.getAll(EmployeeController.java:49) ~[classes/:na]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_291]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_291]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_291]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_291]
    at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:197) ~[spring-web-5.3.8.jar:5.3.8]
    at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:141) ~[spring-web-5.3.8.jar:5.3.8]
    at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:106) ~[spring-webmvc-5.3.8.jar:5.3.8]
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:894) ~[spring-webmvc-5.3.8.jar:5.3.8]
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:808) ~[spring-webmvc-5.3.8.jar:5.3.8]
    at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) ~[spring-webmvc-5.3.8.jar:5.3.8]
    at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1063) ~[spring-webmvc-5.3.8.jar:5.3.8]
    at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:963) ~[spring-webmvc-5.3.8.jar:5.3.8]
    at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006) [spring-webmvc-5.3.8.jar:5.3.8]
    at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:898) [spring-webmvc-5.3.8.jar:5.3.8]
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:626) [tomcat-embed-core-9.0.46.jar:4.0.FR]
    at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883) [spring-webmvc-5.3.8.jar:5.3.8]
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:733) [tomcat-embed-core-9.0.46.jar:4.0.FR]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:227) [tomcat-embed-core-9.0.46.jar:9.0.46]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) [tomcat-embed-core-9.0.46.jar:9.0.46]
    at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53) [tomcat-embed-websocket-9.0.46.jar:9.0.46]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) [tomcat-embed-core-9.0.46.jar:9.0.46]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) [tomcat-embed-core-9.0.46.jar:9.0.46]
    at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100) [spring-web-5.3.8.jar:5.3.8]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) [spring-web-5.3.8.jar:5.3.8]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) [tomcat-embed-core-9.0.46.jar:9.0.46]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) [tomcat-embed-core-9.0.46.jar:9.0.46]
    at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93) [spring-web-5.3.8.jar:5.3.8]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) [spring-web-5.3.8.jar:5.3.8]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) [tomcat-embed-core-9.0.46.jar:9.0.46]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) [tomcat-embed-core-9.0.46.jar:9.0.46]
    at org.springframework.cloud.sleuth.instrument.web.servlet.TracingFilter.doFilter(TracingFilter.java:89) [spring-cloud-sleuth-instrumentation-3.0.3.jar:3.0.3]
    at org.springframework.cloud.sleuth.autoconfig.instrument.web.LazyTracingFilter.doFilter(TraceWebServletConfiguration.java:114) [spring-cloud-sleuth-autoconfigure-3.0.3.jar:3.0.3]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) [tomcat-embed-core-9.0.46.jar:9.0.46]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) [tomcat-embed-core-9.0.46.jar:9.0.46]
    at org.springframework.boot.actuate.metrics.web.servlet.WebMvcMetricsFilter.doFilterInternal(WebMvcMetricsFilter.java:96) [spring-boot-actuator-2.5.1.jar:2.5.1]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) [spring-web-5.3.8.jar:5.3.8]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) [tomcat-embed-core-9.0.46.jar:9.0.46]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) [tomcat-embed-core-9.0.46.jar:9.0.46]
    at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201) [spring-web-5.3.8.jar:5.3.8]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) [spring-web-5.3.8.jar:5.3.8]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) [tomcat-embed-core-9.0.46.jar:9.0.46]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) [tomcat-embed-core-9.0.46.jar:9.0.46]
    at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202) [tomcat-embed-core-9.0.46.jar:9.0.46]
    at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97) [tomcat-embed-core-9.0.46.jar:9.0.46]
    at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:542) [tomcat-embed-core-9.0.46.jar:9.0.46]
    at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:143) [tomcat-embed-core-9.0.46.jar:9.0.46]
    at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92) [tomcat-embed-core-9.0.46.jar:9.0.46]
    at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78) [tomcat-embed-core-9.0.46.jar:9.0.46]
    at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:357) [tomcat-embed-core-9.0.46.jar:9.0.46]
    at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:374) [tomcat-embed-core-9.0.46.jar:9.0.46]
    at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65) [tomcat-embed-core-9.0.46.jar:9.0.46]
    at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:893) [tomcat-embed-core-9.0.46.jar:9.0.46]
    at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1707) [tomcat-embed-core-9.0.46.jar:9.0.46]
    at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) [tomcat-embed-core-9.0.46.jar:9.0.46]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_291]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_291]
    at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-embed-core-9.0.46.jar:9.0.46]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_291]
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:139) ~[spring-integration-core-5.5.0.jar:5.5.0]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) ~[spring-integration-core-5.5.0.jar:5.5.0]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72) ~[spring-integration-core-5.5.0.jar:5.5.0]
    ... 62 common frames omitted

共有2个答案

井逸明
2023-03-14

生产者代码通过添加以下代码开始工作。

    @Bean
    @ServiceActivator(inputChannel = KAFKA_LOG_PUBLISHER)
    public AbstractMappingMessageRouter getMessageHandler() {
        return new AbstractMappingMessageRouter() {

            @Override
            protected List<Object> getChannelKeys(Message<?> message) {
                BindingProperties bindingProperties = binders.getBindings().get(KAFKA_LOG_PUBLISHER);
                return Collections.singletonList(bindingProperties.getDestination());
            }
        };
    }

读取application.yml文件的spring.cloud.stream.bindings

@ConfigurationProperties(prefix = "spring.cloud.stream")
@NoArgsConstructor
@Getter
@Setter
@Component
public class KafkaBinders {

    private Map<String, BindingProperties> bindings = ImmutableMap.of();
}
张银龙
2023-03-14

您的kafka日志发布者确实有o个订户,因此出现了错误。您只需定义消息通道并向其发送消息,因为它没有任何订户,所以您会看到错误。是什么让你认为它与Kafka有某种联系?如果您只是想生成一条消息并将其发送到绑定目的地,那么有两种选择。一个是通过供应商,另一个是通过StreamBridge

 类似资料:
  • 我为使用spring cloud starter stream rabbit编写了一个测试类,当我只运行receiver并从rabbitmq发送消息时,它可以工作;但是当我运行test sender类时,出现了一个错误:

  • 我的应用程序成功发送Kafka消息,但只有在Kafka初始化后。在此之前,我得到错误“Dispatcher没有订阅者”。如何等待订阅者完成频道注册? 17.165 SenderClass已创建 17.816初始化类,@PostConstruct启动PollingTask 24.781PollingTask发送第一条Kafka消息 24.816第一个错误:“Dispatcher没有订阅服务器” 25

  • 您可以帮助进行spring集成配置吗。 收到消息后,我收到: 我的代码: 我有类似的JmsInboudGateway配置,用于其他没有.requestChannel和.replyChannel的队列。 如果不是注入请求通道bean,而是用文本名称声明它,得到了这个 以及更多的文字宣传问题。

  • 我需要在发布/订阅模式下调用Kafka消费者1000次。据我所知,为了让kafka在发布/订阅模式下工作,我需要给每个消费者一个新的groupId(props . put(" group . id ",String.valueOf(Instant.now())。toEpochMilli()));).但是当我这样做的时候,如果两个消费线程同时访问消费线程,就会出现问题。这个问题应该怎么解决?

  • 目前,我已经开始使用ActiveMQ处理JMS主题。我已经通过JAVA代码(如下所述)创建了发布者和持久订阅者,并且在订阅者端也收到了消息。 Publisher.Java 订阅者.java 我对以下主题有一些疑问, 如何检查有多少订阅者使用 Java JMS 在主题中主动查找消息? 如何从主题中获取活动和持久订阅者列表? 我们是否可以删除主题中发布的消息? 在这些情况下帮助我。 提前致谢。

  • 我想用Java实现各种各样的发布者/订阅者模式,但目前已经没有主意了。 有1个发布者和N个订阅者,发布者发布对象,然后每个订阅者需要按照正确的顺序对每个对象进行一次且仅处理一次。发布者和每个订阅者在自己的线程中运行。 在我最初的实现中,每个订阅者都有自己的阻塞队列,发布者将对象放入每个订阅者的队列中。这可以正常工作,但如果任何订阅者的队列已满,发布者将被阻塞。这会导致性能下降,因为每个订阅者处理对