我试图用wifi接口从一台电脑上的Kafka制作人发送消息到另一台电脑上的Kafka经纪人,但消息不出现在Kafka经纪人的指定主题中。
我用华硕无线路由器连接了两台PC机,并禁用了PC机和路由器上的所有防火墙。两台PC都成功地ping了对方。当我转向有线连接时,它工作了,消息被摄取到kafka broker PC上的指定主题。
Kafka制片人:
public class CarDataProducer {
public static void main(String[] args) {
CarDataProducer fProducer= new CarDataProducer();
Producer<String, CarData> producer= fProducer.initializeKafkaProducer();
String topicName = "IN-DATA";
CSVReaderCarData csvReader = new CSVReaderCarData();
List<CarData> CarDataList = csvReader.readCarDataFromCSV("data/mllib/TrainTest_101.csv");
//read from CSV file and send
for (CarData val : CarDataList) {
producer.send(new ProducerRecord<String, CarData>(topicName, val));
}
}
public KafkaProducer<String, CarData> initializeKafkaProducer() {
// Set the producer configuration properties.
Properties props = ProducerProperties.getInstance();
// Instantiate a producerSampleJDBC
KafkaProducer<String, CarData> producer = new KafkaProducer<String, CarData>(props);
return producer;
}
public class ProducerProperties {
private ProducerProperties() {
}
public static final Properties props = new Properties();
static {
props.put("bootstrap.servers", "192.168.1.124:9092");
props.put("acks", "0");
props.put("retries", 0);
props.put("batch.size", 500);
props.put("linger.ms", 500);
props.put("buffer.memory", 500);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "com.iov.safety.vehicleproducer.CarDataSerializer");
}
public static Properties getInstance() {
return props;
}
}
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic IN-DATA
kafka-console-consumer.sh --bootstrap-server 192.168.1.124:9092 --topic IN-DATA
{"instSpeed":19.0,"time":15.0,"label":0.0}
{"instSpeed":64.0,"time":15.0,"label":1.0}
{"instSpeed":10.0,"time":16.0,"label":0.0}
Listeners=明文://:9092
tcp6 0 0:::9092::*
监听(0.00/0/0)tcp6 0 0 127.0.0.1:53880
127.0.1.1:9092建立keepalive(6659.53/0/0)tcp6 0 0 127.0.1.1:9092建立keepalive(6659.53/0/0)tcp6 0 0 127.0.1.1:9092建立keepalive(6659.15/0/0)tcp6 0 0 127.0.1.1:53878建立keepalive(6659.15/0/0)tcp6 0 0 127.0.0.1:53878 127.0.1.1:53878 127.0.1.1:9092建立keepalive(6659.15/0/0)
通过向kafka生产者的发送添加回调,我得到超时错误:
timeoutexception:自上次追加后,in-data-0:30045毫秒的8条记录过期
我解决了。每个Kafka经纪人必须公布其主机名/IP,以便从另一台PC上的Kafka制作人那里获得。
kafka-server-start.sh config/server.properties --override advertised.listeners=PLAINTEXT://192.168.1.124:9092
sha
相反,我们可以按以下方式更新config/server.properties:
advertised.listeners=PLAINTEXT://your.host.name:9092
或
advertised.listeners=PLAINTEXT://192.168.1.124:9092
问题内容: 我目前在开发PC上使用Jenkins。我将其安装在开发PC上,因为我对该工具的了解有限。因此我在开发PC上对其进行了测试。现在,我对Jenkins感到很满意,可以作为我在构建过程中的长期“合作伙伴”,并希望将此Jenkins“移动”到专用服务器上。 在此之前,我完成了很少的构建,并从每个构建中存档了工件。特别是,对于我来说,内部版本号对于版本控制非常重要。 如何将所有Jenkins信息
我已经设置了kafka客户端,它可以产生和消费消息,当我们把有效载荷从生产者发送到主题时,它可以正常工作,所以我有问题生产者现在第一个消息我可以发送到主题,我也可以从kafka主题中消费,现在我尝试发送第二个消息,但是消费者没有从kafka主题中读取第二个消息,知道这里发生了什么吗? Producer.js consumer.js
问题内容: 我正在尝试在pyinstaller的可执行文件中添加Chromedriver。虽然这是可能的,但似乎在尝试在另一台计算机上运行此错误消息。 我已经尝试了一些职位,包括本的一个,但不幸的是,这并没有提供预期的效果。最好的情况是,当chrome exe位于同一文件夹中时,我可以在自己的计算机上运行它,这无济于事。 代码1: 主程序 在另一台PC上运行时,我得到的是: 错误1: 找不到Chr
我使用spring框架和有3个代理集群的kafka。我发现使用者没有使用某些消息(假设在所有发送消息中使用0.01%),所以在生产者代码中,我记录了API返回的消息偏移量: 我使用返回偏移量来查询所有分区中的kafka主题,但它没有找到消息(我测试了与消费者使用的和他们在kafka中的消息相关的其他偏移量),问题是什么,我如何确保该消息发送到kafka? 我还在producer中使用了
在我的本地系统中,我已经启动了一个单独的Kafka实例,旁边还有动物园管理员。Zookeper和kafka服务器都运行在默认端口上。 我创建了一个主题“test”,复制因子为1,因为我只有一个kafka实例正在运行。 同时,我还创建了两个分区。 但是当我使用java kafka-client jar创建一个生产者时,即使我对消息使用不同的键,生产者也会将所有消息推送到同一个分区,因为所有消息都是在
我在用Kafka。 我有10k个jsons列表, 我该怎么做呢? 谢谢