当前位置: 首页 > 知识库问答 >
问题:

与MongoDB相比,使用Java驱动程序的卡桑德拉批量写入性能非常糟糕

杨成礼
2023-03-14

我已经为MongoDB和Cassandra建立了一个进口商。基本上,导入程序的所有操作都是相同的,除了最后一部分,在最后一部分中形成数据以匹配所需的cassandra表模式和所需的mongodb文档结构。与MongoDB相比,Cassandra的写性能真的很差,我认为我做错了什么。

基本上,我的抽象导入器类加载数据,读取所有数据,并将其传递给扩展的MongoDBImporter或CassandraImporter类,以将数据发送到数据库。一次只针对一个数据库-没有同时向C*和MongoDB进行“双重”插入。导入程序在同一台机器上针对相同数量的节点运行(6)。

问题是:

57 分钟后,蒙哥DB 导入完成。我摄取了 10.000.000 个文档,我预计 Cassandra 的行数大致相同。我的 Cassandra 导入器现在运行了 2,5 小时,只有 5.000.000 插入的行。我将等待导入者完成并在此处编辑实际的完成时间。

我如何使用Cassandra导入:

在引入数据之前,我准备两个语句一次。这两个语句都是 UPDATE 查询,因为有时我必须将数据追加到现有列表中。在开始导入之前,我的表已完全清除。准备好的语句被一遍又一遍地使用。

PreparedStatement statementA = session.prepare(queryA);
PreparedStatement statementB = session.prepare(queryB);

对于每一行,我创建一个BoundStatement并将该语句传递给我的“自定义”批处理方法:

    BoundStatement bs = new BoundStatement(preparedStatement); //either statementA or B
    bs = bs.bind();

    //add data... with several bs.setXXX(..) calls

    cassandraConnection.executeBatch(bs);

使用MongoDB,我可以一次插入1000个文档(这是最大值)而不会出现问题。对于Cassandra,导入程序因<code>com.datastax.driver.core.exceptions而崩溃。InvalidQueryException:在某个时候,对于我的10条语句来说,批处理太大了。我正在使用此代码构建批处理。顺便说一句,我从1000、500、300、200、100、50、20个批次开始,但显然它们也不起作用。然后我将其设置为10,它再次抛出异常。现在我不知道它为什么会破裂。

private static final int MAX_BATCH_SIZE = 10;

private Session session;
private BatchStatement currentBatch;

...

@Override
public ResultSet executeBatch(Statement statement) {
    if (session == null) {
        throw new IllegalStateException(CONNECTION_STATE_EXCEPTION);
    }

    if (currentBatch == null) {
        currentBatch = new BatchStatement(Type.UNLOGGED);
    }

    currentBatch.add(statement);
    if (currentBatch.size() == MAX_BATCH_SIZE) {
        ResultSet result = session.execute(currentBatch);
        currentBatch = new BatchStatement(Type.UNLOGGED);
        return result;
    }

    return null;
}

我的C*模式如下所示

CREATE TYPE stream.event (
    data_dbl frozen<map<text, double>>,
    data_str frozen<map<text, text>>,
    data_bool frozen<map<text, boolean>>,
);

CREATE TABLE stream.data (
    log_creator text,
    date text, //date of the timestamp
    ts timestamp,
    log_id text, //some id
    hour int, //just the hour of the timestmap
    x double,
    y double,
    events list<frozen<event>>,
    PRIMARY KEY ((log_creator, date, hour), ts, log_id)
) WITH CLUSTERING ORDER BY (ts ASC, log_id ASC)

我有时需要在现有的行中添加更多的新事件。所以我需要一份udt的清单。我的UDT包含三个映射,因为事件创建者产生不同的数据(string/double/boolean类型的键/值对)。我知道udt被冻结的事实,我不能触摸已经摄入事件的地图。这对我来说很好,我只是需要添加新的事件,这些事件有时具有相同的时间戳。我根据日志的创建者(某个传感器名称)以及记录的日期(即“22-09-2016”)和时间戳的小时(为了在将相关数据紧密保存在一个分区中的同时分发更多的数据)。

我在我的pom中使用Cassandra 3.0.8和Datastax Java驱动程序,版本3.1.0。根据Cassandra中的批量限制是多少?,我不应该通过调整我的< code>cassandra.yaml中的< code > batch _ size _ fail _ threshold _ in _ kb 来增加批处理大小。因此...我的导入有什么问题吗?

所以我已经调整了代码来运行异步查询,并将当前运行的插入存储在一个列表中。每当异步插入完成时,它将从列表中删除。当列表大小超过阈值并且之前的插入中出现错误时,该方法将等待500毫秒,直到插入低于阈值。当没有插入失败时,我的代码现在会自动增加阈值。

但是在流式传输3300.000行之后,有280.000个插入正在处理,但没有发生错误。这似乎是当前处理的插入数量看起来太高了。6个cassandra节点运行在商品硬件上,这是2年前的事了。

这是并发插入的高数字(6个节点为280.000)是一个问题吗?我应该添加一个像MAX_CONCURRENT_INSERT_LIMIT这样的变量吗?

private List<ResultSetFuture> runningInsertList;
private static int concurrentInsertLimit = 1000;
private static int concurrentInsertSleepTime = 500;
...

@Override
public void executeBatch(Statement statement) throws InterruptedException {
    if (this.runningInsertList == null) {
        this.runningInsertList = new ArrayList<>();
    }

    //Sleep while the currently processing number of inserts is too high
    while (concurrentInsertErrorOccured && runningInsertList.size() > concurrentInsertLimit) {
        Thread.sleep(concurrentInsertSleepTime);
    }

    ResultSetFuture future = this.executeAsync(statement);
    this.runningInsertList.add(future);

    Futures.addCallback(future, new FutureCallback<ResultSet>() {
        @Override
        public void onSuccess(ResultSet result) {
            runningInsertList.remove(future);
        }

        @Override
        public void onFailure(Throwable t) {
            concurrentInsertErrorOccured = true;
        }
    }, MoreExecutors.sameThreadExecutor());

    if (!concurrentInsertErrorOccured && runningInsertList.size() > concurrentInsertLimit) {
        concurrentInsertLimit += 2000;
        LOGGER.info(String.format("New concurrent insert limit is %d", concurrentInsertLimit));
    }

    return;
}

共有2个答案

澹台鸿熙
2023-03-14

当您在Cassandra中运行批处理时,它会选择一个节点作为协调器。然后,该节点负责确保批量写入找到其适当的节点。例如,通过将10000个写操作批处理在一起,您现在将协调10000个写操作的任务分配给了一个节点,其中大部分写操作将针对不同的节点。通过这样做,很容易翻转一个节点,或者消除整个集群的延迟。因此,限制批量的原因。

问题是,Cassandra CQL BATCH用词不当,它没有做你或其他人认为它做的事。它不能用于提高性能。并行、异步写入总是比运行相同数量的语句(BATCH)更快。

我知道我可以轻松地将10.000行批处理在一起,因为它们将进入同一个分区。...你还会使用单行插入(异步)而不是批处理吗?

这取决于编写性能是否是您的真正目标。如果是这样,那么我仍然会坚持并行、异步写入。

想了解更多这方面的信息,可以看看DataStax的Ryan Svihla的两篇博文:

卡桑德拉:批量加载,不带 Batch 关键字

Cassandra:无批处理的批处理加载——细致入微的版本

夏侯元忠
2023-03-14

使用C*一段时间后,我确信您应该真正使用批处理来保持多个表同步。如果您不需要该功能,那么根本不要使用批处理,因为您会受到性能损失。

将数据加载到C*中的正确方法是使用异步写入,如果集群无法跟上摄取速率,可以使用可选的背压。您应该将“自定义”批处理方法替换为以下内容:

  • 执行异步写入
  • 控制你在飞机上的写作次数
  • 当写入超时时执行一些重试

若要执行异步写入,请使用 .executeAsync 方法,该方法将返回一个结果集未来对象

要控制多少空中查询,只需收集从中检索到的ResultSetFuture对象。executeAsync</code>方法,如果列表中有(此处的大致值),则表示1k个元素,然后等待所有元素完成,然后再发出更多写入操作。或者,您可以等待第一个完成,然后再发出一次写入,以保持列表完整。

最后,您可以在等待操作完成时检查写入失败。在这种情况下,您可以:

  1. 使用相同的超时值再次写入
  2. 使用增加的超时值再次写入
  3. 等待一段时间,然后使用相同的超时值再次写入
  4. 等待一段时间,然后使用增加的超时值再次写入

从1到4,你有一个增加的背压强度。选择一个最适合你的情况。

问题更新后编辑

你的插入逻辑对我来说似乎有点不合理:

    < li >我看不到任何重试逻辑 < li >如果失败,您不能删除列表中的项目 < li >您的< code>while(并发插入错误发生

我通常会保留一个(失败的)查询列表,以便以后重试。这使我对查询具有强大的控制力,当失败的查询开始累积时,我会睡一会儿,然后继续重试它们(最多X次,然后硬失败...)。

这个列表应该是非常动态的,例如当查询失败时添加项目,当执行重试时删除项目。现在,您可以了解集群的限制,并根据最后一秒的平均失败查询数调整concurrentInsertLimit,或者使用更简单的方法“如果重试列表中有一个项目,请暂停”等。。。

评论后编辑2

由于您不需要任何重试逻辑,我将以这种方式更改您的代码:

private List<ResultSetFuture> runningInsertList;
private static int concurrentInsertLimit = 1000;
private static int concurrentInsertSleepTime = 500;
...

@Override
public void executeBatch(Statement statement) throws InterruptedException {
    if (this.runningInsertList == null) {
        this.runningInsertList = new ArrayList<>();
    }

    ResultSetFuture future = this.executeAsync(statement);
    this.runningInsertList.add(future);

    Futures.addCallback(future, new FutureCallback<ResultSet>() {
        @Override
        public void onSuccess(ResultSet result) {
            runningInsertList.remove(future);
        }

        @Override
        public void onFailure(Throwable t) {
            runningInsertList.remove(future);
            concurrentInsertErrorOccured = true;
        }
    }, MoreExecutors.sameThreadExecutor());

    //Sleep while the currently processing number of inserts is too high
    while (runningInsertList.size() >= concurrentInsertLimit) {
        Thread.sleep(concurrentInsertSleepTime);
    }

    if (!concurrentInsertErrorOccured) {
        // Increase your ingestion rate if no query failed so far
        concurrentInsertLimit += 10;
    } else {
        // Decrease your ingestion rate because at least one query failed
        concurrentInsertErrorOccured = false;
        concurrentInsertLimit = Max(1, concurrentInsertLimit - 50);
        while (runningInsertList.size() >= concurrentInsertLimit) {
            Thread.sleep(concurrentInsertSleepTime);
        }
    }

    return;
}

您还可以通过替换列表来优化一些过程

希望有所帮助。

 类似资料:
  • 问题内容: 我已经为MongoDB和Cassandra构建了一个导入器。基本上,导入程序的所有操作都是相同的,除了最后一部分中,数据的形成与所需的cassandra表架构和所需的mongodb文档结构相匹配。与MongoDB相比,Cassandra的写入性能确实很差,我想我做错了。 基本上,我的抽象导入程序类加载数据,读出所有数据,并将其传递给扩展的MongoDBImporter或Cassandr

  • 我们有这个Cassandra集群,想知道当前的性能是否正常,我们可以做些什么来改善它。 集群由位于同一数据中心的3个节点组成,每个节点的总容量为465GB,堆容量为2GB。每个节点有8个内核和8GB或RAM。不同组件的版本为 工作量描述如下: 空格键使用org.apache.cassandra.locator。SimpleStrategy布局策略和复制因子为3(这对我们非常重要) 工作负载主要由写

  • 我有一个. net核心应用程序,它通过DataStax驱动程序将数据持久化到Cassandra实例。 我为我的Cassandra实体创建了一个基类。现在,如果我想把我的TimeUUID类型ID放入这个基类,当插入时,我得到错误: 同样的方法在EntityFramework中也适用。问题也不在于我的表、连接或键空间,因为当我将id字段返回到entty本身时,它就起作用了。 我的插入方法 我的基类 继

  • 我有一个必须添加到采购订单的产品列表。采购订单具有序列号,添加产品后,应更改其状态以指示这些产品已出货。 1 个采购订单中处理的典型产品数为 500。 在DB上-我有2张桌子- 各方面的建议告诉我应该使用多个异步查询。然而,我关心的是整个操作的原子性。鉴于我的要求,请建议什么是最好的前进方式。 先谢谢你。

  • 我有这个代码: 我得到以下异常: 所有主机尝试查询失败(已尝试:/127.0.0.1:9042(com.datastax.driver.core.TransportException:[/127.0.0.1:9042]无法连接)),堆栈跟踪:com.datastax.driver.core.exceptions.NoHostAvailableException:所有主机尝试查询失败(已尝试:/12

  • 驱动程序版本为: 我的问题是,当我使用api find和一些来自java的过滤器时,操作需要15秒。 我检查了mongo服务器日志文件,发现跟踪是一个命令,而不是一个查询: 2015-09-01T12:11:47.496+0200I命令[conn503]命令b.$CMD命令:计数{count:“logs”,查询:{timestamp:{$GTE:新日期(1433109600000)},aplica