我正试图编写一个应用程序,将与Kafka集成使用骆驼。(版本-3.4.2)
我从这个问题的答案中借用了一种方法。
我有一条路线可以监听Kafka主题的信息。通过使用一个简单的执行器,该消息的处理与消耗是分离的。每个处理都作为任务提交给该执行者。消息的顺序并不重要,唯一需要考虑的因素是消息处理的速度和效率。我已禁用自动提交,并在任务提交给执行者后手动提交消息。丢失当前正在处理的消息(由于崩溃/关闭)是可以的,但Kafka中从未提交处理的消息不应丢失(由于提交了偏移量)。现在来回答问题,
现在,我的解决方案是阻止消费者轮询线程,并尝试持续提交作业。但暂停投票会是一个更好的办法,但我找不到任何办法来实现这一点。
public static void main(String[] args) throws Exception {
String consumerId = System.getProperty("consumerId", "1");
ExecutorService executor = new ThreadPoolExecutor(100, 100, 0L, TimeUnit.MILLISECONDS,
new SynchronousQueue<>());
LOGGER.info("Consumer {} starting....", consumerId);
Main main = new Main();
main.init();
CamelContext context = main.getCamelContext();
ComponentsBuilderFactory.kafka().brokers("localhost:9092").metadataMaxAgeMs(120000).groupId("consumer")
.autoOffsetReset("earliest").autoCommitEnable(false).allowManualCommit(true).maxPollRecords(100)
.register(context, "kafka");
ConsumerBean bean = new ConsumerBean();
context.addRoutes(new RouteBuilder() {
@Override
public void configure() {
from("kafka:test").process(exchange -> {
LOGGER.info("Consumer {} - Exhange is {}", consumerId, exchange.getIn().getHeaders());
processTask(exchange);
commitOffset(exchange);
});
}
private void processTask(Exchange exchange) throws InterruptedException {
try {
executor.submit(() -> bean.execute(exchange.getIn().getBody(String.class)));
} catch (Exception e) {
LOGGER.error("Exception occured {}", e.getMessage());
Thread.sleep(1000);
processTask(exchange);
}
}
private void commitOffset(Exchange exchange) {
boolean lastOne = exchange.getIn().getHeader(KafkaConstants.LAST_RECORD_BEFORE_COMMIT, Boolean.class);
if (lastOne) {
KafkaManualCommit manual = exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT,
KafkaManualCommit.class);
if (manual != null) {
LOGGER.info("manually committing the offset for batch");
manual.commitSync();
}
} else {
LOGGER.info("NOT time to commit the offset yet");
}
}
});
main.run();
}
为此,可以使用throttle
EIP。
from("your uri here")
.throttle(maxRequestCount)
.timePeriodMillis(inTimePeriodMs)
.to(yourProcessorUri)
.end()
请看这里的原始留档。
我这么说是因为来自avro的POJO一代不是那么直截了当的。在此基础上,它需要maven插件和一个.avsc文件。 例如,我在我的Kafka制作人上创建了一个POJO,名为User: 我连载它,并发送到我的用户主题在Kafka。然后我有一个消费者,它本身有一个POJO用户,并反序列化消息。是空间问题吗?这样序列化和反序列化不是也更快吗?更不用说维护模式注册表的开销了。
我有一份flink的工作,从Kafka读取数据,执行某些聚合,并将结果写入elasticsearch索引。我看到震源上有很高的背压。高背压导致数据从Kafka缓慢读取,我看到数据在网络堆栈中排队(netstat RecvQ显示源Kafka连接中有上万字节的数据,数据最终被读取),这反过来会导致数据在延迟后沉入elasticsearch,并且延迟持续增加。 源每分钟产生约17500条记录,Flink
这个问题似乎不是关于特定的编程问题、软件算法或主要由程序员使用的软件工具。如果您认为这个问题在另一个Stack Exchange网站上是主题,您可以留下评论来解释这个问题在哪里可以得到回答。 我们构建了一个定制的Kafka Connect sink,它反过来调用一个远程REST API。我如何将背压传播到Kafka Connect基础设施,以便在远程系统比内部使用者向put()传递消息慢的情况下,
我有一个注册了回拨的服务,现在我想将其公开为,具有某些要求/限制: 接收回调的线程不应该被阻塞(工作应该交给观察者指定的不同线程/调度程序) 不应该有任何异常抛出由于消费者是慢下来流 多个消费者可以相互独立订阅 消费者可以选择缓冲所有的物品,这样它们就不会丢失,但是它们不应该在生产者类中被缓冲 以下是我目前的情况 我不确定这是否符合我的要求。在javadoc上有一条关于的注释,我不明白: 请注意,
当使用int-kafka: out站通道适配器生成到kafka时,似乎没有可用的错误通道。在这种情况下,如何处理reties次后无法向kafka生成消息? 任何可能导致kafka失败的错误。(以下代码只是来自Internet的代码片段,只是想知道如何向其添加错误句柄)
问题内容: 我试图用一个批量一些 KTable 值,并送他们。似乎30秒钟超出了使用者超时间隔,在此间隔之后,Kafka认为该使用者已失效并释放了分区。 我尝试提高 轮询 和 提交间隔 的频率来避免这种情况: 不幸的是,这些错误仍在发生: (很多) 其次是: 显然,我需要更频繁地将心跳发送回服务器。怎么样? 我的拓扑是: 该 KTable 是关键,每30秒分组值。在 Processor.init(