当前位置: 首页 > 知识库问答 >
问题:

如何有效地使用使用datastax java驱动程序批量写入cassandra?

蓬弘
2023-03-14

我需要使用Datastax Java驱动程序将Batches写入Cassandra,这是我第一次尝试将batch与Datastax Java驱动程序一起使用,因此我感到有些困惑-

下面是我的代码,我试图在其中创建一个语句对象并将其添加到Batch并将ConsistencyLevel设置为QUORUM。

Session session = null;
Cluster cluster = null;

// we build cluster and session object here and we use  DowngradingConsistencyRetryPolicy as well
// cluster = builder.withSocketOptions(socketOpts).withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE)

public void insertMetadata(List<AddressMetadata> listAddress) {
    // what is the purpose of unloggedBatch here?
    Batch batch = QueryBuilder.unloggedBatch();

    try {
        for (AddressMetadata data : listAddress) {
            Statement insert = insertInto("test_table").values(
                    new String[] { "address", "name", "last_modified_date", "client_id" },
                    new Object[] { data.getAddress(), data.getName(), data.getLastModifiedDate(), 1 });
            // is this the right way to set consistency level for Batch?
            insert.setConsistencyLevel(ConsistencyLevel.QUORUM);
            batch.add(insert);
        }

        // now execute the batch
        session.execute(batch);
    } catch (NoHostAvailableException e) {
        // log an exception
    } catch (QueryExecutionException e) {
        // log an exception
    } catch (QueryValidationException e) {
        // log an exception
    } catch (IllegalStateException e) {
        // log an exception
    } catch (Exception e) {
        // log an exception
    }
}

下面是我的地址元数据类 -

public class AddressMetadata {

    private String name;
    private String address;
    private Date lastModifiedDate;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getAddress() {
        return address;
    }

    public void setAddress(String address) {
        this.address = address;
    }

    public Date getLastModifiedDate() {
        return lastModifiedDate;
    }

    public void setLastModifiedDate(Date lastModifiedDate) {
        this.lastModifiedDate = lastModifiedDate;
    }
}

现在我的问题是-我使用Batch插入带有DatastaxJava驱动程序的cassandra的方式是正确的吗?重试策略呢?这意味着如果批处理语句执行失败,会发生什么,它会再次重试吗?

有没有更好的方法来使用java驱动程序批量写入卡桑德拉?

共有2个答案

孙项禹
2023-03-14

我们在使用异步和批处理之间争论了一段时间。我们尝试了两者进行比较。与单个“异步”请求相比,使用“未记录批处理”可以获得更好的吞吐量。我们不知道为什么,但根据Ryan的博客,我猜这与写入大小有关。我们可能进行了太多较小的写入,因此批处理它们可能会给我们带来更好的性能,因为它确实减少了流量。

我不得不提一下,我们甚至没有以推荐的方式进行“未记录的批处理”。建议的方法是使用单分区键执行批处理。基本上,批处理属于同一分区键的所有记录。但是,我们只是批处理了一些可能属于不同分区的记录。

有人做了一些基准测试来比较异步和“未标记的批处理”,我们发现这非常有用。这是链接。

长孙高远
2023-03-14

Cassandra中的批处理关键字不是将大量数据批处理在一起以进行批量加载的性能优化。

批处理用于将原子操作组合在一起,即您希望一起发生的操作。批处理保证如果批处理的一部分成功,整个批处理都会成功。

分批使用可能不会让你的大量摄入运行得更快

此处取消记录批处理的目的是什么?

Cassandra使用一种称为批处理日志的机制来确保批处理的原子性。通过指定未记录的批处理,您关闭了此功能,因此批处理不再是原子的,可能会因部分完成而失败。当然,记录批处理并确保它们的原子性会有性能损失,使用未记录的批处理会消除这种损失。

在某些情况下,您可能希望使用未记录的批处理来确保属于同一分区的请求(插入)一起发送。如果一起批处理操作,并且需要在不同的分区/节点中执行它们,那么您基本上为协调器创建了更多工作。在Ryan的博客中查看这方面的具体示例:

现在我的问题是-我使用Batch插入带有DatastaxJava驱动程序的cassandra的方式是否正确?

我看不出你的代码有什么问题,这取决于你想实现什么。深入阅读我分享的那篇博文,了解更多信息。

重试策略呢,也就是说,如果批处理语句执行失败,会发生什么情况,它会再次重试吗?

如果批处理失败,则批处理本身不会重试。驱动程序确实有重试策略,但您必须单独应用这些策略。

java驱动程序中的默认策略仅在以下情况下重试:

    < li >在读取超时时,如果有足够的副本回复,但未检索到数据。 < li >关于写入超时,如果我们在写入批处理语句使用的分布式日志时超时。

阅读有关默认策略的更多信息,并根据您的用例考虑不那么保守的策略。

 类似资料:
  • 问题内容: 我打算使用Datastax Java驱动程序来编写Cassandra。.我主要对Datastax Java驱动程序及其功能感兴趣,但是我无法获得任何教程来解释如何将这些功能合并到下面的使用Datastax的代码中Java驱动程序 在下面的代码中,我正在使用Datastax Java驱动程序创建与Cassandra节点的连接。 有人可以帮助我如何在上述代码中添加批处理写入或异步功能..谢

  • 我正在使用datastax java驱动程序3.1.0连接到cassandra集群,我的cassandra集群版本是2.0.10。我正在异步地写,具有定额一致性。 下面是我的类: 我上面的方法将从多个线程调用,我认为不是线程安全的。Btw类是线程安全的,如上所示。 由于不是线程安全的。如果我从多个线程异步编写,我上面的代码会有问题吗? 其次,我在参数中使用了。这样可以吗?还是会有任何问题?或者我应

  • 在QMetry中,我试图在一个测试用例中从appiumDriver切换到androidDriver。这是由于函数<code>驱动程序之一而产生的。IsKeyboardShowed()显示为应用程序未定义。因此需要将其更改为androidDriver并使用此功能。 当前在Qmetry项目中,对于appiumDriver,功能在application.properties文件中设置为: 现在想切换到a

  • 问题内容: 在早期版本的MongoDB Java驱动程序中,要运行查询并对结果进行无序批量增补,我们要做的就是: 但是在版本3中,随着Bson Document支持和MongoCollection.bulkWrite()方法的引入,该怎么做? 我尝试了这个: 但是,我需要upsert功能。 谢谢。 问题答案: 您仍然可以使用所有功能,只是BulkWrites现在具有不同的语法: 因此,您可以使用(

  • 问题内容: 我已经为MongoDB和Cassandra构建了一个导入器。基本上,导入程序的所有操作都是相同的,除了最后一部分中,数据的形成与所需的cassandra表架构和所需的mongodb文档结构相匹配。与MongoDB相比,Cassandra的写入性能确实很差,我想我做错了。 基本上,我的抽象导入程序类加载数据,读出所有数据,并将其传递给扩展的MongoDBImporter或Cassandr

  • 问题内容: 我正在使用sqlsrv驱动程序的PHP MSSQL项目上。阻止SQL注入攻击的最佳方法是什么?我需要类似mysql_real_escape_string()的东西,但要使用sqlsrv驱动程序。 问题答案: 最好的方法是不要编写SQL,以便您需要使用的类似物,可以通过使用占位符作为值,然后在执行语句或打开时传递变量(否则将由来处理)来实现。光标或其他任何东西。 如果失败,请查看;的输出