我的应用程序使用一台机器上运行的Kafka服务器上的消息,然后将它们转发给另一台在其他实例上运行的远程Kafka服务器。在我将应用程序部署到Cloud Foundry并向第一台Kafka服务器发送消息后,应用程序按预期工作。消息被消费并转发到远程Kafka。
然而,在这之后,我在Cloud Foundry(以及在我的本地机器上以较慢的速度)中得到了下面的无限循环异常:
StackTrace:
Fri Jun 03 2016 12:20:34 GMT-0600 (Mountain Daylight Time) [App/0] OUT 2016-06-03 18:20:34.900 WARN 29 --- [ad | producer-1] o.apache.kafka.common.network.Selector : Error in I/O with localhost/127.0.0.1
Fri Jun 03 2016 12:20:34 GMT-0600 (Mountain Daylight Time) [App/0] OUT at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.8.0_65-]
Fri Jun 03 2016 12:20:34 GMT-0600 (Mountain Daylight Time) [App/0] OUT java.net.ConnectException: Connection refused
Fri Jun 03 2016 12:20:34 GMT-0600 (Mountain Daylight Time) [App/0] OUT at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) ~[na:1.8.0_65-]
Fri Jun 03 2016 12:20:34 GMT-0600 (Mountain Daylight Time) [App/0] OUT at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122) [kafka-clients-0.8.2.2.jar!/:na]
Fri Jun 03 2016 12:20:34 GMT-0600 (Mountain Daylight Time) [App/0] OUT at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191) [kafka-clients-0.8.2.2.jar!/:na]
Fri Jun 03 2016 12:20:34 GMT-0600 (Mountain Daylight Time) [App/0] OUT at org.apache.kafka.common.network.Selector.poll(Selector.java:238) ~[kafka-clients-0.8.2.2.jar!/:na]
Fri Jun 03 2016 12:20:34 GMT-0600 (Mountain Daylight Time) [App/0] OUT at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192) [kafka-clients-0.8.2.2.jar!/:na]
Fri Jun 03 2016 12:20:34 GMT-0600 (Mountain Daylight Time) [App/0] OUT at java.lang.Thread.run(Thread.java:745) [na:1.8.0_65-]
我的应用程序yaml文件是这样的
应用程序YML:
spring:
cloud:
stream:
bindings:
activationMsgQueue:
binder: kafka1
destination: test
contentType: application/json
consumer:
resetOffsets: true
startOffset: latest
input:
binder: kafka2
content-type: application/x-java-object;type=com.comcast.activation.message.vo.ActivationDataInfo
destination: test
group: prac
binders:
kafka1:
type: kafka
environment:
spring:
kafka:
host: caapmsg-as-a1p.sys.comcast.net
kafka2:
type: kafka
environment:
spring:
kafka:
host: caapmsg-as-a3p.sys.comcast.net
default-binder: kafka2
kafka:
binder:
zk-nodes: caapmsg-as-a1p.sys.comcast.net, caapmsg-as-a3p.sys.comcast.net
我观察到,如果我包含下面的配置,错误就会消失,但现在我有一个无限循环的消息被消费和发送。
SNIPPET:
kafka:
binder:
brokers: caapmsg-as-a1p.sys.comcast.net, caapmsg-as-a3p.sys.comcast.net
zk-nodes: caapmsg-as-a1p.sys.comcast.net, caapmsg-as-a3p.sys.comcast.net
我需要做什么来阻止这个无限循环?
你好,谢谢你回复求救电话。我对上述问题有一个改进。现在,流将从a1p(主题:test)消费,如果消息有效,则转发到a3p(主题:test),否则将错误消息发送到a1p(主题:errorMsgQueue)。我有下面的申请表。yml文件
spring:cloud:stream:bindings:errorMsgQueue:binder:kafka1目的地:errorMsgQueue contentType:application/json输入:binder:kafka2内容类型:application/x-java-object;type=com。康卡斯特。激活。消息哇。ActivationDataInfo目的地:测试组:prac
activationMsgQueue:binder:kafka3目的地:测试内容类型:应用程序/json绑定器:kafka1:type:kafka环境:spring:cloud:stream:kafka:binder:brokers:caapmsg-as-a1p。系统。康卡斯特。net zk节点:caapmsg-as-a1p。系统。康卡斯特。net kafka2:type:kafka环境:spring:cloud:stream:kafka:binder:brokers:caapmsg-as-a3p。系统。康卡斯特。net zk节点:caapmsg-as-a3p。系统。康卡斯特。net kafka3:type:kafka环境:spring:cloud:stream:kafka:binder:brokers:caapmsg-as-a1p。系统。康卡斯特。net zk节点:caapmsg-as-a1p。系统。康卡斯特。净默认活页夹:kafka2
我仍然得到一个无限循环。我做错了什么?
spring。Kafka。主机
不是Spring Cloud Stream的有效配置选项。http://docs.spring.io/spring-cloud-stream/docs/1.0.0.RELEASE/reference/htmlsingle/index.html#_kafka_binder_properties是粘合剂支持的唯一属性。
此外,应用程序似乎正在混合这两个集群的配置。(我假设它们是独立的集群?)
应该是这样的:
spring:cloud:stream:bindings:activationMsgQueue:binder:kafka1 destination:test contentType:application/json consumer:resetoffset:true startOffset:latest input:binder:kafka2 content type:application/x-java-object;type=com。康卡斯特。激活。消息哇。ActivationDataInfo目的地:测试组:prac
绑定器:Kafka1:类型:Kafka环境:Spring:云:流:Kafka:绑定器:代理:caapmsg-as-a1p。系统。康卡斯特。net zk节点:caapmsg-as-a1p。系统。康卡斯特。net kafka2:type:kafka环境:spring:cloud:stream:kafka:binder:brokers:caapmsg-as-a3p。系统。康卡斯特。net zk节点:caapmsg-as-a3p。系统。康卡斯特。净默认活页夹:kafka2
看到这个例子了吗https://github.com/spring-cloud/spring-cloud-stream-samples/blob/master/multibinder-differentsystems/src/main/resources/application.yml详细信息。
我怀疑无限循环是由发送和接收同一主题的消息引起的。
我有两个代理1.0.0Kafka集群,我正在针对这个Kafka运行1.0.0Kafka流API应用程序。我增加了制片人的要求。暂停。毫秒到5分钟来修复生产者超时异常。 目前,在运行一段时间后,我发现以下两种类型的异常。我试图按照ApacheKafka中的建议修复这些异常:TimeoutException,然后什么都不起作用 但不完整的解决方案就在这里。建议使用此解决方案(减少生产批量)。请帮忙。
我试图从JMS源读取数据,并将它们推送到KAFKA主题中,几个小时后,我观察到推送到KAFKA主题的频率几乎为零,经过一些初步分析,我在FLUME日志中发现以下异常。 my flume显示max.request的当前设置值(在日志中)。尺寸为1048576,明显小于1399305,增加了此最大要求。大小可能会消除这些异常,但我无法找到更新该值的正确位置。 我的水槽。配置, 任何帮助都将不胜感激!!
问题内容: 在Java 1.4+中,有3种方法来中断在套接字I / O上阻塞的流: 如果套接字是使用常规构造函数创建的,则可以从单独的线程中关闭它。结果,在被阻塞的线程中抛出了a 。 如果套接字是使用创建的。(非阻塞I / O)—同样,可以从单独的线程关闭它,但是现在在阻塞的线程中引发了一个不同的异常()。 另外,在使用非阻塞I / O的情况下,有可能引发抛出中断的阻塞线程。使用旧式Java I
用途: 提供独立于平台的基于select模块的I/O多路复用的抽象 运行模式 回显服务端 # selectors_echo_server.py import selectors import socket mysel = selectors.DefaultSelector() keep_running = True def read(connection, mask): "Callb
什么是同步?什么是异步?阻塞和非阻塞又有什么区别?本文先从 Unix 的 I/O 模型讲起,介绍了5种常见的 I/O 模型。而后再引出 Java 的 I/O 模型的演进过程,并用实例说明如何选择合适的 Java I/O 模型来提高系统的并发量和可用性。 由于,Java 的 I/O 依赖于操作系统的实现,所以先了解 Unix 的 I/O 模型有助于理解 Java 的 I/O。 相关概念 同步和异步
我正试图找出这两种设置之间的区别。大小和缓冲区。Kafka制作人的记忆。 据我所知。大小:这是可以发送的批次的最大大小。 文档描述了缓冲区。memory as:生产者可以用来缓冲等待发送的记录的内存字节。 我不明白这两者之间的区别。有人能解释一下吗? 谢啦