我在Apache Kafka
上构建了一个排队系统。应用程序将为特定的Kafka主题
生成消息,在使用者端,我必须使用为该主题生成的所有记录。
我使用新的Java使用者API编写了consumer。代码看起来像
Properties props = new Properties();
props.put("bootstrap.servers", kafkaBrokerIp+":9092");
props.put("group.id",groupId);
props.put("enable.auto.commit", "true");
props.put("session.timeout.ms", "30000");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer(props);
consumer.subscribe(Arrays.asList("consumertest"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records){
System.out.println("Data recieved : "+record.value());
}
}
这里我需要永远运行消费者,这样生产者推入kafka主题的任何记录都应该立即消费和处理。
所以我的困惑是,使用无限while循环(像示例代码中那样)消费数据是正确的方法吗?
虽然使用无限循环是可以的,但在Kafka消费者文档中可以找到一种稍微优雅的方法,如下所示:
public class KafkaConsumerRunner implements Runnable {
private final AtomicBoolean closed = new AtomicBoolean(false);
private final KafkaConsumer consumer;
public void run() {
try {
consumer.subscribe(Arrays.asList("topic"));
while (!closed.get()) {
ConsumerRecords records = consumer.poll(10000);
// Handle new records
}
} catch (WakeupException e) {
// Ignore exception if closing
if (!closed.get()) throw e;
} finally {
consumer.close();
}
}
// Shutdown hook which can be called from a separate thread
public void shutdown() {
closed.set(true);
consumer.wakeup();
}
}
这使您可以选择使用钩子进行优雅的关机。
问题内容: 有什么办法可以使这两个软件包一起运行? 因此,基本上,我希望两全其美。自动运行服务器(并在出现错误时重新启动),并在发生.js文件更改时自动更新。 问题答案: 但是要避免该问题,您可以执行以下操作: 这样可以确保实际退出(而不是给您“应用程序崩溃”消息),然后再次将其接收。 在此指定要运行的命令,否则将默认为节点。不使用-c会导致在此答案的注释中提及的错误。
我正在使用PHP mongoClient类连接mongob远程主机。 除了简单的连接代码,我没有写什么特别简单的东西。 它确实连接到mongodb,但一段时间后,我的应用程序挂起,所谓挂起,是指php sript永远运行,没有响应,最后apache和我的服务器崩溃。我需要重启服务器才能重新启动。 为什么PHP脚本永远运行而mongolerver没有响应。 我将设置为30秒。 MongoClient
我有一个奇怪的问题Laravel队列:工作。在我的crontab中,我设置了一个工作,就像Laravel文档中描述的那样 在我的应用程序/控制台/Kernel.php我设置这个: 在我的正式服php工匠队列中:工作运行几秒钟,然后被“杀死”。这就是我所期望的。 在我的开发盒php artisan队列中:工作永远运行。因此,激活cron作业会产生php进程,直到整个内存被填满。 两个盒子都是Cent
在其他语言中,此任务很简单,只需在Python中使用类似以下内容: 如果没有什么要处理的,我的程序也会想暂停执行。我不希望它用查询使数据库过载。 我正在尝试编写一个应用程序,它将充当后台队列处理器。它将检查数据库,看看队列中是否有任何需要处理的项目,然后该程序将数据写入磁盘上的文件。连接到同一数据库的不同系统的用户会间歇性地将数据添加到数据库中。 我认为永久npm模块不太合适,因为该模块只是检查脚
有没有办法让BeaconTransmitter实例永远发送? 问题是:我想要一个RPi检测器,当我回家时,不需要Wifi运行,或者RPi甚至可以访问互联网(例如,当它检测到电话时,它可以打开所有路由器等)。 我的android手机上有一个极简的应用程序,发送iBeacon运行,而RPi上的beacontools库会检测到它。原则上,这是运作良好的。(虽然我知道它应该是相反的,但我再次,我希望RPi
问题内容: 如您所见,我是新来的… 我建立了第一个网站,建立了第一个Node.js服务器来为其提供服务,然后将所有内容实时发布到EC2上。 我在我的EC2 IP地址上测试了所有内容,并且似乎一切正常。 现在直到现在,我一直在本地测试我的应用程序,因此每当我关闭终端时,app.js都将停止运行,因此在localhost上将不会提供任何服务,这是有道理的。 现在,我的服务器位于EC2上,每当我关闭终端