当前位置: 首页 > 知识库问答 >
问题:

如何使用camel kafka手动控制偏移提交?

云霖
2023-03-14

我使用的是camel kafka组件,我不清楚在提交补偿时引擎盖下发生了什么。如下所示,我正在聚合记录,我认为对于我的用例来说,只有在记录保存到SFTP后提交偏移量才有意义。

是否可以手动控制何时可以执行提交?

private static class MyRouteBuilder extends RouteBuilder {

    @Override
    public void configure() throws Exception {

        from("kafka:{{mh.topic}}?" + getKafkaConfigString())
        .unmarshal().string()
        .aggregate(constant(true), new MyAggregationStrategy())
            .completionSize(1000)
            .completionTimeout(1000)
        .setHeader("CamelFileName").constant("transactions-" + (new Date()).getTime())
        .to("sftp://" + getSftpConfigString())

        // how to commit offset only after saving messages to SFTP?

        ;
    }

    private final class MyAggregationStrategy implements AggregationStrategy {
        @Override
        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
            if (oldExchange == null) {
                return newExchange;
            }
            String oldBody = oldExchange.getIn().getBody(String.class); 
            String newBody = newExchange.getIn().getBody(String.class);
            String body = oldBody + newBody;
            oldExchange.getIn().setBody(body);
            return oldExchange;
        }
    }
}

private static String getSftpConfigString() {
        return "{{sftp.hostname}}/{{sftp.dir}}?"
                + "username={{sftp.username}}"
                + "&password={{sftp.password}}"
                + "&tempPrefix=.temp."
                + "&fileExist=Append"
                ;
}

private static String getKafkaConfigString() {
        return "brokers={{mh.brokers}}" 
            + "&saslMechanism={{mh.saslMechanism}}"  
            + "&securityProtocol={{mh.securityProtocol}}"
            + "&sslProtocol={{mh.sslProtocol}}"
            + "&sslEnabledProtocols={{mh.sslEnabledProtocols}}" 
            + "&sslEndpointAlgorithm={{mh.sslEndpointAlgorithm}}"
            + "&saslJaasConfig={{mh.saslJaasConfig}}" 
            + "&groupId={{mh.groupId}}"
            ;
}

共有3个答案

沈飞翼
2023-03-14

即使在多线程路由(例如使用聚合器)中,您也可以通过使用偏移存储库(骆驼文档)来控制手动偏移提交

@Override
public void configure() throws Exception {
      // The route
      from(kafkaEndpoint())
            .routeId(ROUTE_ID)
            // Some processors...
            // Commit kafka offset
            .process(MyRoute::commitKafka)
            // Continue or not...
            .to(someEndpoint());
}

private String kafkaEndpoint() {
    return new StringBuilder("kafka:")
            .append(kafkaConfiguration.getTopicName())
            .append("?brokers=")
            .append(kafkaConfiguration.getBootstrapServers())
            .append("&groupId=")
            .append(kafkaConfiguration.getGroupId())
            .append("&clientId=")
            .append(kafkaConfiguration.getClientId())
            .append("&autoCommitEnable=")
            .append(false)
            .append("&allowManualCommit=")
            .append(true)
            .append("&autoOffsetReset=")
            .append("earliest")
            .append("&offsetRepository=")
            .append("#fileStore")
            .toString();

}

@Bean(name = "fileStore", initMethod = "start", destroyMethod = "stop")
private FileStateRepository fileStore() {
    FileStateRepository fileStateRepository = 
    FileStateRepository.fileStateRepository(new File(kafkaConfiguration.getOffsetFilePath()));
    fileStateRepository.setMaxFileStoreSize(10485760); // 10MB max

    return fileStateRepository;
}

private static void commitKafka(Exchange exchange) {
    KafkaManualCommit manual = exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
    manual.commitSync();
}
施同
2023-03-14

我认为这是最新版本骆驼(2.22.0)(文档)的变化,你应该可以做到这一点。

// Endpoint configuration &autoCommitEnable=false&allowManualCommit=true
public void process(Exchange exchange) {
     KafkaManualCommit manual = exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
     manual.commitSync();
}
董畅
2023-03-14

不,你不能。Kafka每隔X秒在后台执行一次自动提交(你可以配置它)。

camel kafka中没有手动提交支持。此外,这是不可能的,因为聚合器与kafka使用者分离,而它是执行提交的使用者。

 类似资料:
  • 我试图使用Kafka Utils Api从Kafka(0.10.0.0)到Spark(1.6.0)流媒体应用程序使用数据 Kafka提尔。createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,InputOpicSet) 要求是将偏移范围手动提交给Kafka本身。 请注意,当在java中使用Kafk

  • 问题内容: 此代码不起作用 我需要使用哪种SQLcode才能使这种代码 仅* 使用SQL 即可工作! * 注意 不起作用,因为我主要关注偏移量,而不是限制本身。 问题答案: 根据MySQL 5.5规范: 该子句可用于约束语句返回的行数。接受一个或两个数字参数,这些参数都必须是非负整数常量,但以下情况除外: 在准备好的语句中,可以使用 占位符标记指定参数。 在存储的程序中,可以使用整数值的例程参数或

  • 我有一个Kafka接收器任务,通过方法收听Kafka主题 但我不想自动提交偏移量,因为一旦从Kafka取出记录,我就有一些处理逻辑 从Kafka获取记录后,如果处理成功,则只有我想提交偏移量,否则它应该再次从同一偏移量读取。 我可以在Kafka consumer中看到方法,但在中找不到替代方法。

  • 我第一次使用Spring Kafka,我无法在我的消费者代码中使用Acknowledgement.acknowledge()方法进行手动提交。请让我知道我的消费者配置或侦听器代码中是否缺少任何内容。或者有其他方法可以根据条件处理确认偏移。在这里,我正在寻找解决方案,例如如果偏移没有手动提交/确认,它应该由消费者选择相同的消息/偏移量。 配置 听众

  • 我有一个版本1.1.0中的kafka控制台消费者,我用它从kafka获取消息。当我使用带有option-max-messages的kafka-console-consumer.sh脚本时,它似乎提交了错误的偏移量。 我创建了一个主题和一个消费者小组,并阅读了一些消息: