import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class ProducerDemo {
public static void main(String[] args) {
// create producer properties
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "<IP_TO_REMOTE_SERVER>:9092");
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
//create producer
KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);
//producer record
ProducerRecord <String,String> record = new ProducerRecord<String, String>("first_topic", "jello there");
System.out.println("SENDING RECORD");
//send data - async
producer.send(record);
producer.flush();
producer.close();
System.out.println("complete");
}
}
[kafka-producer-network-thread>producer-1]警告org.apache.kafka.clients.networkclient-[Producer clientid=producer-1]无法建立到节点0的连接(/xx.xx.xx.xx.xx:9092)。代理可能不可用。
[主]信息org.apache.Kafka.clients.Producer.kafkaProducer-[Producer clientid=producer-1]关闭Kafka生产者,时间为9223372036854775807毫秒。
在查看了Stackoverflow之后,我将server.Properties listeners部分更新为服务器的私有IP
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://10.0.1.51:9092
我应该如何将服务器上的Kafka配置为可以远程访问和监听?
我想您面临的主要问题是从配置的角度来看。在通过生产者沟通之前,请检查您是否进行了所有必要的更改。您需要进行以下更改:
Kafka更改:您需要在Zookeeper.properties中为相关代理添加配置。
AWS更改:当连接到AWS时,您需要设置传递。pem文件的方式。您可能需要在AWS实例中启用直接访问。默认情况下,它将阻止所有未知流量。
我和我的Kafka制作人遇到了一个奇怪的问题。我使用Kafka-0.11服务器/客户端版本。我有一个zookeper和一个kafka经纪人节点。此外,我还创建了带有3个分区的“事件”主题: 在我的Java代码中,我创建了具有以下属性的producer: 此外,我还向Producer#send()方法添加了一个回调,该方法将失败的消息添加到队列中,该队列由另一个“重新发送”线程在循环中迭代: 一切正
我正在尝试实现PowerShell远程处理(用于在远程服务器上执行PowerShell脚本)。我的远程服务器运行的是Windows server 2008和PowerShell v2。 在提升权限的PS控制台中,我执行了以下cmdlet: 返回控制台: WinRM已设置为在此计算机上接收请求。 WinRM已设置为在此计算机上进行远程管理。 从客户端计算机上提升的PowerShell会话中,我执行了
我正试图在我的本地(windows 7虚拟桌面)中使用来自Kafka的消息 Zookeeper和kafka在同一台本地计算机上运行 创建主题 produce message kafka-console-producer--broker-list 127.0.0.1:9092--topic first_topic没有错误,我只是做Ctrl+C do end producing 使用邮件
我在CentOS7(confluent)上安装了Apache Kafka,正试图以分布式模式运行filestream Kafka connect,但收到以下错误: 现在可以通过更新workers.properties(如http://docs.confluent.io/current/connect/userguide.html#connect-userguide-distributed-conf
我正在研究一个POC,用于在我的项目中实现Kafka集群。我已经设置了一个Kafka集群在我的本地机器与3经纪人。现在,我使用Spring MVC REST服务向Kafka服务器发送消息,该服务在内部使用Spring Kafka生成和使用往返于Kafka集群的消息。现在,我试图发送警报时,消费者无法接收消息从主题时,经纪人是下降。我关闭消费者连接到的唯一代理。我没有得到任何异常在我的日志,但我得到
我用的是Kafka 0.8.2-beta,有2台Ubuntu 14虚拟机: 172.30.141.127正在运行动物园管理员 172.30.141.184在经营一家Kafka经纪人 我正在启动动物园管理员实例,如果一切顺利的话。然后,我尝试启动代理并将其连接到172.30.141.127:2181。它似乎能够在特定的端口上连接并建立会话,但是由于一些似乎没有记录的异常,它失去了连接。 代理输出: