我一直试图让入站SubscribableChannel和出站MessageChannel在我的spring boot应用程序中工作。
我已经成功设置了kafka频道并成功测试了它。
此外,我还创建了一个基本的spring-boot应用程序,用于测试从通道添加和接收内容。
我遇到的问题是,当我把相同的代码放在它所属的应用程序中时,消息似乎永远不会被发送或接收。通过调试很难确定发生了什么,但对我来说唯一不同的是频道名。在工作impl中,通道名称类似于非工作应用程序中的application.channel,其localhost:8080/channel。
我想知道是否有一些Spring引导配置阻塞或将通道的创建更改为不同的通道源?
有人有类似的问题吗?
应用程序.yml
spring:
datasource:
url: jdbc:h2:mem:dpemail;DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE
platform: h2
username: hello
password:
driverClassName: org.h2.Driver
jpa:
properties:
hibernate:
show_sql: true
use_sql_comments: true
format_sql: true
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
bindings:
email-in:
destination: email
contentType: application/json
email-out:
destination: email
contentType: application/json
电子邮件
public class Email {
private long timestamp;
private String message;
public long getTimestamp() {
return timestamp;
}
public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
}
绑定配置
@EnableBinding(EmailQueues.class)
public class EmailQueueConfiguration {
}
接口
public interface EmailQueues {
String INPUT = "email-in";
String OUTPUT = "email-out";
@Input(INPUT)
SubscribableChannel inboundEmails();
@Output(OUTPUT)
MessageChannel outboundEmails();
}
控制器
@RestController
@RequestMapping("/queue")
public class EmailQueueController {
private EmailQueues emailQueues;
@Autowired
public EmailQueueController(EmailQueues emailQueues) {
this.emailQueues = emailQueues;
}
@RequestMapping(value = "sendEmail", method = POST)
@ResponseStatus(ACCEPTED)
public void sendToQueue() {
MessageChannel messageChannel = emailQueues.outboundEmails();
Email email = new Email();
email.setMessage("hello world: " + System.currentTimeMillis());
email.setTimestamp(System.currentTimeMillis());
messageChannel.send(MessageBuilder.withPayload(email).setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON).build());
}
@StreamListener(EmailQueues.INPUT)
public void handleEmail(@Payload Email email) {
System.out.println("received: " + email.getMessage());
}
}
我不确定使用Spring-Cloud的继承配置项目之一,Spring-Cloud-Sleuth可能会阻止它工作,但即使我删除它仍然不会。但与我的应用程序使用上述代码不同,我从未看到配置过ConsumeConfig,例如:
o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
auto.commit.interval.ms = 100
auto.offset.reset = latest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.id = consumer-2
connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
(这个配置是我在运行上述代码时在我的基本 Spring 启动应用程序中看到的,并且代码可以从 kafka 通道写入和读取)....
我假设我正在使用的一个库中有一些超Spring引导配置,创建一个不同类型的通道,我只是找不到该配置是什么。
好的,经过大量调试…我发现有东西正在创建一个测试支持绑定器(怎么还不知道),所以很明显,这是用来不影响将消息添加到真实通道的。
添加后
@SpringBootApplication(exclude = TestSupportBinderAutoConfiguration.class)
kafka 通道配置已正常工作,消息正在添加。想知道到底是什么在设置这个测试支持粘合剂会很有趣。我最终会找到那个傻瓜。
您发布的内容包含许多不相关的配置,因此很难确定是否有任何东西妨碍了您。此外,当您说“…似乎消息从未被发送或接收…”日志中是否有任何异常?此外,请说明您正在使用的Kafka版本以及Spring Cloud Stream。现在,我确实尝试根据您的代码重现它(在清理了一点以只留下相关部分之后),并且能够成功发送/接收。
我的Kafka版本是0.11,Spring Cloud Stream 2.0.0。以下是相关代码:
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
bindings:
email-in:
destination: email
email-out:
destination: email
@SpringBootApplication
@EnableBinding(KafkaQuestionSoApplication.EmailQueues.class)
public class KafkaQuestionSoApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaQuestionSoApplication.class, args);
}
@Bean
public ApplicationRunner runner(EmailQueues emailQueues) {
return new ApplicationRunner() {
@Override
public void run(ApplicationArguments args) throws Exception {
emailQueues.outboundEmails().send(new GenericMessage<String>("Hello"));
}
};
}
@StreamListener(EmailQueues.INPUT)
public void handleEmail(String payload) {
System.out.println("received: " + payload);
}
public interface EmailQueues {
String INPUT = "email-in";
String OUTPUT = "email-out";
@Input(INPUT)
SubscribableChannel inboundEmails();
@Output(OUTPUT)
MessageChannel outboundEmails();
}
}
我已经为Postgresql启用了复制,并且正在使用PGPool进行负载平衡。 我在使用HikariCP甚至Apache DBCP连接到Postgres时遇到了问题。 在SpringBoot应用程序中有没有使用PGPool的方法? 请查找堆栈跟踪: 2018-08-10 10:20:19.124信息37879----[main]com.zaxxer.hikari.hikaridatasource:
我在WebSphere控制台上部署了WAR文件,并将其映射到数据源。我能够测试我用PostgreSQL服务器详细信息配置的数据源。但是我的应用程序没有连接到服务器。我是新来的WebSphere,谁能帮我配置基于下面context.xml文件的数据源。我的应用程序在tomcat中工作得很好,但在WebSphere中却不行。 我认为我在数据源配置中做错了什么。
我试图在SpringMVC中运行SpringBoot应用程序,在SpringMVCPOM中添加SpringBoot应用程序依赖项,并扫描SpringBoot包,但我面临以下问题
上面的代码片段显示了在一些Windows10系统中的Toast通知,而在其他一些Windows10系统中不起作用。请给我指点一下。提前道谢。 你好,维韦克
我和Asterisk ARI一起工作。当我通过ARI创建通道时,我指定了应用程序名称,我可以毫无问题地将这些通道添加到网桥中。但是当我使用其他客户端(如Zoiper)进行调用,并尝试将这些通道添加到网桥时,我得到一个错误通道不在静态应用程序中。我不知道什么是停滞,我遵循这篇文章,并把以下扩展conf文件,但它仍然不工作: 那么,什么是静态应用程序,我如何能够桥接在ARI之外创建的频道?
我是kubernetes的新手,需要在openshift平台上使用k8s confimap将springboot应用程序的属性文件外部化。我已将属性文件保存在git repo中,作为“greeter.message=Spring Bootmyapplication.properties已在库伯内特斯上挂载为卷!”并使用“oc create confimap myconfig--from-file=