第一步-除了Kafka代理的标准属性之外,我还配置了以下内容:
listeners=SASL_PLAINTEXT://0.0.0.0:9092
advertised.listeners=SASL_PLAINTEXT://<hostname>:9092
我能够启动控制台使用者
kafka-console-consumer --new consumer --topic <topicname> --from-beginning --bootstrap-server <hostname>:9092 --consumer.config consumer.properties
我可以从集群内的另一台计算机上使用自定义生成器。《生产者属性》相关节选:
security.protocol=SASL_PLAINTEXT
bootstrap.servers=<hostname>:9092
无法从远程计算机使用自定义生成器:
Exception org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for <topicname>-<partition>
listeners=SASL_PLAINTEXT://0.0.0.0:9092
advertised.listeners=SASL_PLAINTEXT://<kafkaBrokerInternalIP>:9092
listeners=SASL_PLAINTEXT://0.0.0.0:9092
advertised.listeners=SASL_PLAINTEXT://<kafkaBrokerPublicIP>:9092
从我的消费者开始使用
kafka-console-consumer --new-consumer --topic <topicname> --from-beginning --bootstrap-server <hostname>:9092 --consumer.config consumer.properties
给了我一个警告,但我不认为这是对的...
WARN clients.NetworkClient: Error while fetching metadata with correlation id 1 : {<topicname>=LEADER_NOT_AVAILABLE}
从我的消费者开始使用
kafka-console-consumer --new-consumer --topic <topicname> --from-beginning --bootstrap-server <KafkaBrokerPublicIP>:9092 --consumer.config consumer.properties
INFO utils.AppInfoParser: Kafka version : 0.10.0-kafka-2.1.0
INFO utils.AppInfoParser: Kafka commitId : unknown
INFO internals.AbstractCoordinator: Discovered coordinator <hostname>:9092 (id: <someNumber> rack: null) for group console-consumer-<someNumber>.
WARN NetworkClient:600 - Error while fetching metadata with correlation id 0 : {<topicname>=LEADER_NOT_AVAILABLE}
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
NetworkClient:600 - Error while fetching metadata with correlation id 0 : {<topicname>=LEADER_NOT_AVAILABLE}
在过去的三天里,我一直在努力使它工作,但是我没有想法:/我的理解是广告.hostnames正是为了这个目的,但是要么是我做错了什么,要么是机器设置有问题。
任何提示都非常感谢!
我最近遇到了这个问题。在我的例子中,我启用了Kafka ACL,在通过注释this 2配置禁用它之后,问题得以解决。
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:kafka
我想一个线程可能会对您有所帮助:https://gist.github.com/jorisdevrede/a7933a99251452bb1867
它在最后提到:
在文章底部加一个广告: 到后台扩展工具->网站广告->添加广告名称 为'portal_article_bottom'的广告,同时加上广告代码; 模板里代码如下: <div> {:sp_getad("portal_article_bottom")} </div>
我正在考虑创建一个独立的Kafka生产者,它作为守护进程运行,通过套接字接收消息,并将其可靠地发送给Kafka。 但是,我决不能是第一个想到这个想法的人。这样做的目的是避免使用PHP或Node编写Kafka生成器,而只是通过套接字将消息从这些语言传递到独立的守护进程,这些语言负责传递,而主应用程序则一直在做自己的事情。 此守护进程应负责在发生中断时进行重试传递,并充当服务器上运行的所有程序的传递点
我想让我的Kafka制作人变得富有交易性。我正在发送10条消息。如果发生任何错误,则不应向Kafka发送任何消息,即无或全部。 我使用的是Spring Boot KafkaTemplate。 我正在发送文件中提到的10条信息,如下所示。应发送9条消息,且I消息大小超过1MB,由于 https://docs.spring.io/spring-kafka/reference/html/#using-K
我正在探索反应性Kafka,只是想确认反应性Kafka是否等同于同步制作人。与同步生产者,我们得到消息传递保证与确认字符和生产者序列保持。但是,ASYNC不能保证交付和测序。反应式生产者等同于SYNC还是ASYNC?
我想使用spring cloud stream framework创建一个kafkaendpoint,它将有一个http post api到。如何动态更改属性 我可以使用实现来实现上述功能,但不知道是否有可能在Spring中开发此功能。
有没有可能让我的不和谐机器人发送消息,而不必在不和谐中键入命令?相反,我想从我的应用程序内部触发它。 上面的代码给出了一个属性错误:'NoneType'对象没有属性'send'