我们的Spring Integration DSL流之一是基于cron表达式的简单轮询器。通常,您会配置PollableChannel实现(例如QueueChannel)或使用InboundAdapter(例如FTP、HTTPS、S3等)来处理具有轮询属性的入站消息。
在我们的情况下,我们不会有任何入站消息要处理。我们只想每晚启动基于SI DSL的集成流,最好使用cron表达式。我们有没有办法用轮询器启动SI流,或者根据cron作业“伪造”消息?
IntegrationFlows
.from(() -> new GenericMessage<>(""),
e -> e.poller(p -> p.cron("0 0 0 * * ?")))
由于没有像NullMessage
或Message
这样的null
负载,我们只发送一条带有空字符串作为负载的“假”消息。你可以忽略下游的信息。
第一个Lambda是消息源的实现
它将仍然是相同的入站通道适配器,a-la相当于
我有一个Kafka消费者,其中消息通过HTTP POST调用传递给另一个应用程序。我还使用手动提交偏移量 确认。确认(); 有一些HTTP返回错误代码,我们忽略错误并提交偏移量,还有一些错误代码我们不提交偏移量。问题是,kafka使用者仅在我重新启动使用者时才轮询未提交的消息。如果分区中有未提交的消息,是否还有轮询消息的地方?
需求:构建一个基于。NET的应用程序,该应用程序可以定期从IBM Websphere消息队列读取消息,并将这些消息保存到数据库中
我是Kafka新手,我正在使用Kafka1.0。 我使用拉取模式读取kafka消息,也就是说,我定期查看Kafka主题以获取新消息,但我没有将偏移量写回Kafka。 我会问Kafka如何知道我消耗了哪些偏移量,或者Kafka记住进度的机制是什么(Kafka偏移量)
是否使用@kafkaListener在间隔基中轮询()Kafka消息?如何在特定时间内阻止KafkaListenerEndpointRegistry轮询消息
如何轮询azure服务总线以持续检查消息?下面是我从队列接收消息的方式。 我想不断地寻找信息,然后处理它。
任何建议或忠告都会真的很有帮助。提前道谢。