当前位置: 首页 > 面试题库 >

如何在Datastax Java驱动程序中使用异步/批量写入功能

章哲茂
2023-03-14
问题内容

我打算使用Datastax Java驱动程序来编写Cassandra。.我主要对Datastax Java驱动程序Batch Writes及其Asycnhronous功能感兴趣,但是我无法获得任何教程来解释如何将这些功能合并到下面的使用Datastax的代码中Java驱动程序

/**
 * Performs an upsert of the specified attributes for the specified id.
 */
public void upsertAttributes(final String userId, final Map<String, String> attributes, final String columnFamily) {

    try {

        // make a sql here using the above input parameters.

        String sql = sqlPart1.toString()+sqlPart2.toString();

        DatastaxConnection.getInstance();
        PreparedStatement prepStatement = DatastaxConnection.getSession().prepare(sql);
        prepStatement.setConsistencyLevel(ConsistencyLevel.ONE);

        BoundStatement query = prepStatement.bind(userId, attributes.values().toArray(new Object[attributes.size()]));

        DatastaxConnection.getSession().execute(query);

    } catch (InvalidQueryException e) {
        LOG.error("Invalid Query Exception in DatastaxClient::upsertAttributes "+e);
    } catch (Exception e) {
        LOG.error("Exception in DatastaxClient::upsertAttributes "+e);
    }
}

在下面的代码中,我正在使用Datastax Java驱动程序创建与Cassandra节点的连接。

/**
 * Creating Cassandra connection using Datastax Java driver
 *
 */
private DatastaxConnection() {

    try{
        builder = Cluster.builder();
        builder.addContactPoint("some_nodes");

        builder.poolingOptions().setCoreConnectionsPerHost(
                HostDistance.LOCAL,
                builder.poolingOptions().getMaxConnectionsPerHost(HostDistance.LOCAL));

        cluster = builder
                .withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE)
                .withReconnectionPolicy(new ConstantReconnectionPolicy(100L))
                .build();

        StringBuilder s = new StringBuilder();
        Set<Host> allHosts = cluster.getMetadata().getAllHosts();
        for (Host h : allHosts) {
            s.append("[");
            s.append(h.getDatacenter());
            s.append(h.getRack());
            s.append(h.getAddress());
            s.append("]");
        }
        System.out.println("Cassandra Cluster: " + s.toString());

        session = cluster.connect("testdatastaxks");

    } catch (NoHostAvailableException e) {
        e.printStackTrace();
        throw new RuntimeException(e);
    } catch (Exception e) {

    }
}

有人可以帮助我如何在上述代码中添加批处理写入或异步功能..谢谢您的帮助。

我正在运行Cassandra 1.2.9


问题答案:

对于异步,它就像使用executeAsync函数一样简单:

...
DatastaxConnection.getSession().executeAsync(query);

对于批处理,您需要构建查询(我使用字符串,因为编译器知道如何很好地优化字符串连接):

String cql =  "BEGIN BATCH "
       cql += "INSERT INTO test.prepared (id, col_1) VALUES (?,?); ";
       cql += "INSERT INTO test.prepared (id, col_1) VALUES (?,?); ";
       cql += "APPLY BATCH; "

DatastaxConnection.getInstance();
PreparedStatement prepStatement = DatastaxConnection.getSession().prepare(cql);
prepStatement.setConsistencyLevel(ConsistencyLevel.ONE);

// this is where you need to be careful
// bind expects a comma separated list of values for all the params (?) above
// so for the above batch we need to supply 4 params:                     
BoundStatement query = prepStatement.bind(userId, "col1_val", userId_2, "col1_val_2");

DatastaxConnection.getSession().execute(query);

附带一提,假设您将属性更改为映射列表,其中每个映射表示批处理中的更新/插入,我认为您对语句的绑定可能类似于以下内容:

BoundStatement query = prepStatement.bind(userId,
                                          attributesList.get(0).values().toArray(new Object[attributes.size()]), 
                                          userId_2,
                                          attributesList.get(1).values().toArray(new Object[attributes.size()]));


 类似资料:
  • 我需要使用Datastax Java驱动程序将Batches写入Cassandra,这是我第一次尝试将batch与Datastax Java驱动程序一起使用,因此我感到有些困惑- 下面是我的代码,我试图在其中创建一个语句对象并将其添加到Batch并将ConsistencyLevel设置为QUORUM。 下面是我的类 - 现在我的问题是-我使用Batch插入带有DatastaxJava驱动程序的ca

  • 我正在使用datastax java驱动程序3.1.0连接到cassandra集群,我的cassandra集群版本是2.0.10。我正在异步地写,具有定额一致性。 下面是我的类: 我上面的方法将从多个线程调用,我认为不是线程安全的。Btw类是线程安全的,如上所示。 由于不是线程安全的。如果我从多个线程异步编写,我上面的代码会有问题吗? 其次,我在参数中使用了。这样可以吗?还是会有任何问题?或者我应

  • 我想在Play Framework 2项目中使用MongoDB异步Java驱动程序,MongoDB异步Java驱动程序return SingleResponseCallback。我不知道如何在播放控制器中处理这种结果。 例如,如何从播放控制器中的以下代码返回计数: 如何从SingleResultCallback获得结果,然后将其转换为Promise?这样好吗?这种情况下的最佳实践是什么?

  • 问题内容: 我在使用Mongoskin在Node上执行批量插入(MongoDB 2.6+)时遇到麻烦。 上面的代码给出以下警告/错误: 是否可以使用Mongoskin执行无序批量操作?如果是这样,我在做什么错? 问题答案: 您可以执行此操作,但是您需要更改调用约定才能执行此操作,因为只有“回调”形式实际上会返回可以从其调用方法的集合对象。您认为此用法的方式也存在一些差异: 因此,实际的“批量”方法

  • 我在Windows上的netbeans项目中使用Derby遇到了麻烦。我刚刚下载并配置了Derby,并将CLASSPATH变量设置为 当我运行命令时 一切都和预期的一样,我得到了输出 等等。 现在,当我运行一个直接从命令提示符使用derby的Java程序时,一切都运行得很好: 但当我尝试从Netbeans运行完全相同的程序时,我得到以下错误 为什么驱动程序在命令提示符下工作,而不是在NetBean

  • 问题内容: 我已经为MongoDB和Cassandra构建了一个导入器。基本上,导入程序的所有操作都是相同的,除了最后一部分中,数据的形成与所需的cassandra表架构和所需的mongodb文档结构相匹配。与MongoDB相比,Cassandra的写入性能确实很差,我想我做错了。 基本上,我的抽象导入程序类加载数据,读出所有数据,并将其传递给扩展的MongoDBImporter或Cassandr