当前位置: 首页 > 知识库问答 >
问题:

如何使用Spring Cloud stream集成Spring启动应用程序以IBMBluemix云上的事件流

岳刚洁
2023-03-14

Pom文件包含依赖项-spring cloud starter流kafka

控制器代码

private final GreetingsService greetingsService;
public GreetingsController(GreetingsService greetingsService) {
      super();
      this.greetingsService = greetingsService;
  }
  @RequestMapping(value="/sendgreetings",method = RequestMethod.GET)
  public void sendGreetings(@RequestParam("message") String message) {
      System.out.println("message " + message);
      Greetings greetings = new Greetings(System.currentTimeMillis(), message);
      greetingsService.sendGreeting(greetings);
  }

问候听众类

 @Component
public class GreetingsListener 
{

private static final Logger LOG = LoggerFactory.getLogger(GreetingsListener.class);

@StreamListener(GreetingsStream.INPUT)
public void handleGreetings(@Payload Greetings greetings) {
    LOG.info("Received Greetings {}", greetings);
}

 }

问候语流接口

 public interface GreetingsStream {

 String INPUT = "greetings-in";
 String OUTPUT = "greetings-out";

 @Input(INPUT)
SubscribableChannel inBoundGreetings();

@Output(OUTPUT)
MessageChannel outBoundGreetings();

}

问候服务

 @Service
 public class GreetingsService {

private final GreetingsStream greetingsStream;

public GreetingsService(GreetingsStream greetingsStream) {
    super();
    this.greetingsStream = greetingsStream;
}


public void sendGreeting(final Greetings greetings) {

    MessageChannel messageChannel = greetingsStream.outBoundGreetings();
    messageChannel.send(MessageBuilder.
            withPayload(greetings)
            //.setHeader(MessageHeaders.CONTENT_TYPE,MimeTypeUtils.APPLICATION_JSON)     
            .build());
  }
}

问候课程

 @Getter @Setter @ToString @Builder
 public class Greetings {

 private long timestamp;
 private String message;

 public Greetings() {

 }

public Greetings(long timestamp, String message) {
    super();
    this.timestamp = timestamp;
    this.message = message;
}


 }

结合

 @EnableBinding(GreetingsStream.class)
 public class EventstreamConfig {}

主类

 @SpringBootApplication
 public class EventstreamApplication {

 public static void main(String[] args) {
    SpringApplication.run(EventstreamApplication.class, args);
 }
 }

我能够使用属性文件中指定的以下配置连接、发送和接收消息到本地Kafka实例

 server:
   port : 8082
 spring:
   cloud:
    stream:
      kafka:
        binder:
           brokers: localhost:9092
     bindings:
         greetings-in:
           destination: greetings
           contentType: application/json
         greetings-out:
           destination: greetings
           contentType: application/json

但是,我无法连接到blue mix云上的IBM事件流。下面是我连接到云上事件流的配置

    spring:
       cloud:
        stream:
          kafka:
            binder:
               brokers: kafk*****et:9093
               consumerProperties: 
                 group-id: foo
                 auto-offset-reset: earliest
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
         value-deserializer:org.apache.kafka.common.serialization.StringDeserializer       
              producerProperties:
                 client-id: eventstream
         key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
         value-deserializer:org.apache.kafka.common.serialization.StringDeserializer 
              configuration: 
                 security:
                   protocol: SASL_SSL
                 sasl:
                   mechanism: PLAIN  
              jaas:
                 enabled: true
                 loginModule:org.apache.kafka.common.security.plain.PlainLoginModule  
                 options: 
                   username: ***
                   password: ***
       bindings:
          greetings-in:
          destination: greetings
          contentType: application/json
          greetings-out:
          destination: greetings
          contentType: application/json

请让我知道配置有什么问题。我没有找到任何符合我要求的例子。

 This is the error i am getting.
    2019-12-28 15:57:46.832  WARN 5780 --- [0:0:0:0:1:2181)]   org.apache.zookeeper.ClientCnxn          : Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect

java.net.ConnectException: Connection refused: no further information
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.8.0_144]
    at sun.nio.ch.SocketChannelImpl.finishConnect(Unknown Source) ~[na:1.8.0_144]
    at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361) ~[zookeeper-3.4.8.jar:3.4.8--1]
    at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141) ~[zookeeper-3.4.8.jar:3.4.8--1]

2019-12-28 15:57:46.948  INFO 5780 --- [127.0.0.1:2181)] org.apache.zookeeper.ClientCnxn          : Opening socket connection to server 127.0.0.1/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2019-12-28 15:57:48.973  WARN 5780 --- [127.0.0.1:2181)] org.apache.zookeeper.ClientCnxn          : Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect

java.net.ConnectException: Connection refused: no further information
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.8.0_144]
    at sun.nio.ch.SocketChannelImpl.finishConnect(Unknown Source) ~[na:1.8.0_144]
    at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361) ~[zookeeper-3.4.8.jar:3.4.8--1]
    at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141) ~[zookeeper-3.4.8.jar:3.4.8--1]

2019-12-28 15:57:50.078  INFO 5780 --- [0:0:0:0:1:2181)] org.apache.zookeeper.ClientCnxn          : Opening socket connection to server 0:0:0:0:0:0:0:1/0:0:0:0:0:0:0:1:2181. Will not attempt to authenticate using SASL (unknown error)
2019-12-28 15:57:52.100  WARN 5780 --- [0:0:0:0:1:2181)] org.apache.zookeeper.ClientCnxn          : Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect

java.net.ConnectException: Connection refused: no further information
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.8.0_144]
    at sun.nio.ch.SocketChannelImpl.finishConnect(Unknown Source) ~[na:1.8.0_144]
    at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361) ~[zookeeper-3.4.8.jar:3.4.8--1]
    at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141) ~[zookeeper-3.4.8.jar:3.4.8--1]

2019-12-28 15:57:52.201  INFO 5780 --- [127.0.0.1:2181)] org.apache.zookeeper.ClientCnxn          : Opening socket connection to server 127.0.0.1/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2019-12-28 15:57:54.210  WARN 5780 --- [127.0.0.1:2181)] org.apache.zookeeper.ClientCnxn          : Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect

java.net.ConnectException: Connection refused: no further information
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.8.0_144]
    at sun.nio.ch.SocketChannelImpl.finishConnect(Unknown Source) ~[na:1.8.0_144]
    at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361) ~[zookeeper-3.4.8.jar:3.4.8--1]
    at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141) ~[zookeeper-3.4.8.jar:3.4.8--1]

2019-12-28 15:57:55.328  INFO 5780 --- [0:0:0:0:1:2181)] org.apache.zookeeper.ClientCnxn          : Opening socket connection to server 0:0:0:0:0:0:0:1/0:0:0:0:0:0:0:1:2181. Will not attempt to authenticate using SASL (unknown error)
2019-12-28 15:57:57.458  INFO 5780 --- [           main] org.apache.zookeeper.ZooKeeper           : Session: 0x0 closed
2019-12-28 15:57:57.462  WARN 5780 --- [           main] ationConfigEmbeddedWebApplicationContext : Exception encountered during context initialization - cancelling refresh attempt: org.springframework.context.ApplicationContextException: Failed to start bean 'outputBindingLifecycle'; nested exception is org.springframework.cloud.stream.binder.BinderException: Exception thrown while building outbound endpoint
2019-12-28 15:57:57.467  INFO 5780 --- [           main] o.s.i.monitor.IntegrationMBeanExporter   : Unregistering JMX-exposed beans on shutdown
2019-12-28 15:57:57.467  INFO 5780 --- [           main] o.s.i.monitor.IntegrationMBeanExporter   : Unregistering JMX-exposed beans
2019-12-28 15:57:57.468  INFO 5780 --- [           main] o.s.i.monitor.IntegrationMBeanExporter   : Summary on shutdown: errorChannel
2019-12-28 15:57:57.469  INFO 5780 --- [           main] o.s.i.monitor.IntegrationMBeanExporter   : Summary on shutdown: nullChannel
2019-12-28 15:57:57.469  INFO 5780 --- [           main] o.s.i.monitor.IntegrationMBeanExporter   : Summary on shutdown: greetings-out
2019-12-28 15:57:57.469  INFO 5780 --- [           main] o.s.i.monitor.IntegrationMBeanExporter   : Summary on shutdown: greetings-in
2019-12-28 15:57:57.469  INFO 5780 --- [           main] o.s.i.monitor.IntegrationMBeanExporter   : Summary on shutdown: _org.springframework.integration.errorLogger.handler
2019-12-28 15:57:57.469  INFO 5780 --- [           main] o.s.i.monitor.IntegrationMBeanExporter   : Summary on shutdown: org.springframework.cloud.stream.binding.StreamListenerMessageHandler@681c0ae6
2019-12-28 15:57:57.469  INFO 5780 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Unregistering JMX-exposed beans on shutdown
2019-12-28 15:57:57.469  INFO 5780 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Unregistering JMX-exposed beans
2019-12-28 15:57:57.473  INFO 5780 --- [           main] o.s.s.c.ThreadPoolTaskScheduler          : Shutting down ExecutorService 'taskScheduler'
2019-12-28 15:57:57.473  INFO 5780 --- [           main] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@44bc2449: startup date [Sat Dec 28 15:57:44 IST 2019]; parent: org.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@67ab1c47
2019-12-28 15:57:57.480  INFO 5780 --- [           main] o.apache.catalina.core.StandardService   : Stopping service [Tomcat]
2019-12-28 15:57:57.458  INFO 5780 --- [ain-EventThread] org.apache.zookeeper.ClientCnxn          : EventThread shut down for session: 0x0
2019-12-28 15:57:57.500  INFO 5780 --- [           main] utoConfigurationReportLoggingInitializer : 

Error starting ApplicationContext. To display the auto-configuration report re-run your application with 'debug' enabled.
2019-12-28 15:57:57.516 ERROR 5780 --- [           main] o.s.boot.SpringApplication               : Application startup failed

org.springframework.context.ApplicationContextException: Failed to start bean 'outputBindingLifecycle'; nested exception is org.springframework.cloud.stream.binder.BinderException: Exception thrown while building outbound endpoint
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:176) ~[spring-context-4.3.18.RELEASE.jar:4.3.18.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:50) ~[spring-context-4.3.18.RELEASE.jar:4.3.18.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:346) ~[spring-context-4.3.18.RELEASE.jar:4.3.18.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:149) ~[spring-context-4.3.18.RELEASE.jar:4.3.18.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:112) ~[spring-context-4.3.18.RELEASE.jar:4.3.18.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:880) ~[spring-context-4.3.18.RELEASE.jar:4.3.18.RELEASE]
    at org.springframework.boot.context.embedded.EmbeddedWebApplicationContext.finishRefresh(EmbeddedWebApplicationContext.java:144) ~[spring-boot-1.5.14.RELEASE.jar:1.5.14.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:546) ~[spring-context-4.3.18.RELEASE.jar:4.3.18.RELEASE]
    at org.springframework.boot.context.embedded.EmbeddedWebApplicationContext.refresh(EmbeddedWebApplicationContext.java:122) ~[spring-boot-1.5.14.RELEASE.jar:1.5.14.RELEASE]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:693) [spring-boot-1.5.14.RELEASE.jar:1.5.14.RELEASE]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:360) [spring-boot-1.5.14.RELEASE.jar:1.5.14.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:303) [spring-boot-1.5.14.RELEASE.jar:1.5.14.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1118) [spring-boot-1.5.14.RELEASE.jar:1.5.14.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1107) [spring-boot-1.5.14.RELEASE.jar:1.5.14.RELEASE]
    at com.example.eventstream.EventstreamApplication.main(EventstreamApplication.java:10) [classes/:na]
Caused by: org.springframework.cloud.stream.binder.BinderException: Exception thrown while building outbound endpoint
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:137) ~[spring-cloud-stream-1.3.2.RELEASE.jar:1.3.2.RELEASE]
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:66) ~[spring-cloud-stream-1.3.2.RELEASE.jar:1.3.2.RELEASE]
    at org.springframework.cloud.stream.binder.AbstractBinder.bindProducer(AbstractBinder.java:138) ~[spring-cloud-stream-1.3.2.RELEASE.jar:1.3.2.RELEASE]
    at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:124) ~[spring-cloud-stream-1.3.2.RELEASE.jar:1.3.2.RELEASE]
    at org.springframework.cloud.stream.binding.BindableProxyFactory.bindOutputs(BindableProxyFactory.java:238) ~[spring-cloud-stream-1.3.2.RELEASE.jar:1.3.2.RELEASE]
    at org.springframework.cloud.stream.binding.OutputBindingLifecycle.start(OutputBindingLifecycle.java:57) ~[spring-cloud-stream-1.3.2.RELEASE.jar:1.3.2.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:173) ~[spring-context-4.3.18.RELEASE.jar:4.3.18.RELEASE]
    ... 14 common frames omitted
Caused by: org.I0Itec.zkclient.exception.ZkTimeoutException: Unable to connect to zookeeper server within timeout: 10000
    at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:1232) ~[zkclient-0.9.jar:na]
    at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:156) ~[zkclient-0.9.jar:na]
    at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:130) ~[zkclient-0.9.jar:na]
    at kafka.utils.ZkUtils$.createZkClientAndConnection(ZkUtils.scala:76) ~[kafka_2.11-0.10.1.1.jar:na]
    at kafka.utils.ZkUtils$.apply(ZkUtils.scala:58) ~[kafka_2.11-0.10.1.1.jar:na]
    at kafka.utils.ZkUtils.apply(ZkUtils.scala) ~[kafka_2.11-0.10.1.1.jar:na]
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicAndPartitions(KafkaTopicProvisioner.java:171) ~[spring-cloud-stream-binder-kafka-core-1.3.2.RELEASE.jar:1.3.2.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicsIfAutoCreateEnabledAndAdminUtilsPresent(KafkaTopicProvisioner.java:153) ~[spring-cloud-stream-binder-kafka-core-1.3.2.RELEASE.jar:1.3.2.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionProducerDestination(KafkaTopicProvisioner.java:108) ~[spring-cloud-stream-binder-kafka-core-1.3.2.RELEASE.jar:1.3.2.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionProducerDestination(KafkaTopicProvisioner.java:60) ~[spring-cloud-stream-binder-kafka-core-1.3.2.RELEASE.jar:1.3.2.RELEASE]
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:119) ~[spring-cloud-stream-1.3.2.RELEASE.jar:1.3.2.RELEASE]
    ... 20 common frames omitted

共有1个答案

颛孙晗昱
2023-03-14

<代码>java。网ConnectException:连接被拒绝

这仅仅意味着主机/端口对zoomaster来说是错误的。

在Kafka。UTIL。ZkUtils。应用(ZkUtils.scala)~[kafka\u 2.11-0.10.1.1.jar:na]

在org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicAnd分区(KafkaTopicProvisioner.java:171)~[spring-cloud-stream-binder-kafka-core-1.3.2。RELEASE. jar:1.3.2。释放]

您使用的是非常旧的SCSt版本(1.3. x);自2.0以来,活页夹没有直接与Zoomaster对话。

 类似资料:
  • 是否有一种干净的方法来检测spring启动应用程序何时停止,并在停止之前执行一些操作?一种停止服务的CommandLineRunner 提前谢谢

  • 我有以下情况。在我的微服务[MA]中,我希望在应用程序启动后或基于某个事件初始化/销毁一些bean。想象一下,还有另一个微服务[MB],它保存关于ContentStores的信息。在MA启动之后,我想向MB请求contentStore条目,基于此,我想根据需要创建这么多bean。可能会有一个事件触发storrecreated/storredeleted,在这种情况下,我需要销毁bean。 我目前不

  • 我正在尝试创建一个JavaFX程序,该程序从用户那里获取第一个数字和第二个数字,并根据用户选择的按钮进行加法,乘法,减法或除法,并在文本字段中显示结果。我已经创建了这些方法并正确设置了我的类,但我不确定为什么当我单击按钮时我的事件不起作用。任何类型的指导将不胜感激。我也在使用Eclipse。谢谢 主要.java Controller.java fxml.java

  • 我们的prod环境架构决定如下:2台机器,每台机器有2个tomcat实例(在vm上)。tomcat上有运行hibernate的spring web应用程序。还有2个db实例分布到这两台机器上。 因此,我们认为hazelcast非常适合这种结构。hazelcast将是hibernate的二级缓存,它将通过db实例管理集群缓存。 我们安装了hibernate服务器并在其上定义了集群。我已经搜索了官方的

  • 我想实现下面的用例——我的Spring Boot应用程序应该只在应用程序中有某个属性时启动。yaml设置为: 如果未设置该属性,则上下文初始化将失败,并显示一条消息,表明该属性丢失。 我在这个主题中找到了如何实现它:Spring启动-检测和终止,如果属性没有设置?但是我不能遵循这种方法的问题是,在加载检查此属性的bean之前,上下文初始化可能会失败。 例如,如果其他bean由于缺少另一个属性而无法

  • 因此,我完成了以下步骤: > 然后提示我输入密码和其他各种信息。我像这样完成了: 在应用程序中添加了以下行。yml文件来自src/main/resources: 已启动Spring Boot应用程序。出现此异常: 其主要思想是PKCS12提供程序不存在。 有什么想法吗? 也许使用不同的提供商?如果答案是肯定的,我应该使用哪个提供商?