当前位置: 首页 > 知识库问答 >
问题:

使用datastax java驱动程序异步写入cassandra的有效方法?

卞昀
2023-03-14

我正在使用datastax java驱动程序3.1.0连接到cassandra集群,我的cassandra集群版本是2.0.10。我正在异步地写,具有定额一致性。

  public void save(final String process, final int clientid, final long deviceid) {
    String sql = "insert into storage (process, clientid, deviceid) values (?, ?, ?)";
    try {
      BoundStatement bs = CacheStatement.getInstance().getStatement(sql);
      bs.setConsistencyLevel(ConsistencyLevel.QUORUM);
      bs.setString(0, process);
      bs.setInt(1, clientid);
      bs.setLong(2, deviceid);

      ResultSetFuture future = session.executeAsync(bs);
      Futures.addCallback(future, new FutureCallback<ResultSet>() {
        @Override
        public void onSuccess(ResultSet result) {
          logger.logInfo("successfully written");
        }

        @Override
        public void onFailure(Throwable t) {
          logger.logError("error= ", t);
        }
      }, Executors.newFixedThreadPool(10));
    } catch (Exception ex) {
      logger.logError("error= ", ex);
    }
  }

下面是我的缓存语句类:

public class CacheStatement {
  private static final Map<String, PreparedStatement> cache =
      new ConcurrentHashMap<>();

  private static class Holder {
    private static final CacheStatement INSTANCE = new CacheStatement();
  }

  public static CacheStatement getInstance() {
    return Holder.INSTANCE;
  }

  private CacheStatement() {}

  public BoundStatement getStatement(String cql) {
    Session session = CassUtils.getInstance().getSession();
    PreparedStatement ps = cache.get(cql);
    // no statement cached, create one and cache it now.
    if (ps == null) {
      synchronized (this) {
        ps = cache.get(cql);
        if (ps == null) {
          cache.put(cql, session.prepare(cql));
        }
      }
    }
    return ps.bind();
  }
}

我上面的保存方法将从多个线程调用,我认为Bound语句不是线程安全的。BtwStatementCache类是线程安全的,如上所示。

  • 由于Bound语句不是线程安全的。如果我从多个线程异步编写,我上面的代码会有问题吗?
  • 其次,我在addCallback参数中使用了Executors.newFixedThreadPool(10)。这样可以吗?还是会有任何问题?或者我应该使用MoreExecutors.directExecutor。那么这两者之间有什么区别?最好的方法是什么?

下面是我使用datastax java驱动程序连接到cassandra的连接设置:

Builder builder = Cluster.builder();
    cluster =
        builder
            .addContactPoints(servers.toArray(new String[servers.size()]))
            .withRetryPolicy(new LoggingRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE))
            .withPoolingOptions(poolingOptions)
            .withReconnectionPolicy(new ConstantReconnectionPolicy(100L))
            .withLoadBalancingPolicy(
                DCAwareRoundRobinPolicy
                    .builder()
                    .withLocalDc(
                        !TestUtils.isProd() ? "DC2" : TestUtils.getCurrentLocation()
                            .get().name().toLowerCase()).withUsedHostsPerRemoteDc(3).build())
            .withCredentials(username, password).build();

共有1个答案

郁景龙
2023-03-14

我认为你正在做的很好。您可以通过在应用程序启动时准备所有语句来进一步优化,这样您就可以缓存所有内容,这样您就不会在“保存”时因准备语句而受到任何性能影响,并且您不会锁定工作流中的任何内容。

绑定语句不是线程安全的,但准备语句是的,并且每次调用 get 语句时,您都会返回一个新的绑定语句。实际上,预准备语句.bind() 函数实际上是新绑定语句 (ps).bind() 的快捷方式。并且您没有从多个线程访问相同的绑定语句。所以你的代码很好。

相反,对于线程池,您实际上是在每个addCallback函数上创建一个新的线程池。这是浪费资源。我不使用这种回调方法,我更喜欢自己管理纯FutureResultSet,但我在datastax留档上看到了使用MoreExecutors.sameThreadExecutor()而不是MoreExecutors.directExecutor()的示例。

 类似资料:
  • 我需要使用Datastax Java驱动程序将Batches写入Cassandra,这是我第一次尝试将batch与Datastax Java驱动程序一起使用,因此我感到有些困惑- 下面是我的代码,我试图在其中创建一个语句对象并将其添加到Batch并将ConsistencyLevel设置为QUORUM。 下面是我的类 - 现在我的问题是-我使用Batch插入带有DatastaxJava驱动程序的ca

  • 问题内容: 在进行大量数据加载时,基于日志数据增加计数器,但是遇到超时异常。我正在使用Datastax 2.0-rc2 Java驱动程序。 这是服务器无法跟上问题的问题(即服务器端配置问题),还是客户端无聊的等待服务器响应的问题?无论哪种方式,我都可以进行简单的配置更改来解决此问题吗? 节点之一在大致发生时报告此情况: 问题答案: 虽然我不了解此问题的根本原因,但我可以通过增加conf / cas

  • 在进行大容量数据加载、根据日志数据递增计数器时,我遇到了超时异常。我使用DataStax2.0-RC2 java驱动程序。 这是服务器无法跟上的问题(即服务器端配置问题),还是客户端在等待服务器响应时感到厌烦的问题?不管怎样,我能做一个简单的配置更改来解决这个问题吗? 其中一个节点大致在它发生的时间报告这一点:

  • 我们正在尝试使用DataStax驱动程序将CSV文件中的数据插入Cassandra。有哪些方法可以做到这一点? 我们目前使用运行cqlsh从CSV文件加载。

  • 问题内容: 我打算使用Datastax Java驱动程序来编写Cassandra。.我主要对Datastax Java驱动程序及其功能感兴趣,但是我无法获得任何教程来解释如何将这些功能合并到下面的使用Datastax的代码中Java驱动程序 在下面的代码中,我正在使用Datastax Java驱动程序创建与Cassandra节点的连接。 有人可以帮助我如何在上述代码中添加批处理写入或异步功能..谢

  • 我正在使用php的Datastax Cassandra驱动程序,希望能够检查是否查询失败,在数据库中没有找到结果。现在,如果查询失败,日志报告如下 如果查询成功,它将返回预期的数据。下面是函数 根据发布的建议,我最后做了以下几点 谢谢你的建议