我正在使用datastax java驱动程序3.1.0连接到cassandra集群,我的cassandra集群版本是2.0.10。我正在异步地写,具有定额一致性。
public void save(final String process, final int clientid, final long deviceid) {
String sql = "insert into storage (process, clientid, deviceid) values (?, ?, ?)";
try {
BoundStatement bs = CacheStatement.getInstance().getStatement(sql);
bs.setConsistencyLevel(ConsistencyLevel.QUORUM);
bs.setString(0, process);
bs.setInt(1, clientid);
bs.setLong(2, deviceid);
ResultSetFuture future = session.executeAsync(bs);
Futures.addCallback(future, new FutureCallback<ResultSet>() {
@Override
public void onSuccess(ResultSet result) {
logger.logInfo("successfully written");
}
@Override
public void onFailure(Throwable t) {
logger.logError("error= ", t);
}
}, Executors.newFixedThreadPool(10));
} catch (Exception ex) {
logger.logError("error= ", ex);
}
}
下面是我的缓存语句
类:
public class CacheStatement {
private static final Map<String, PreparedStatement> cache =
new ConcurrentHashMap<>();
private static class Holder {
private static final CacheStatement INSTANCE = new CacheStatement();
}
public static CacheStatement getInstance() {
return Holder.INSTANCE;
}
private CacheStatement() {}
public BoundStatement getStatement(String cql) {
Session session = CassUtils.getInstance().getSession();
PreparedStatement ps = cache.get(cql);
// no statement cached, create one and cache it now.
if (ps == null) {
synchronized (this) {
ps = cache.get(cql);
if (ps == null) {
cache.put(cql, session.prepare(cql));
}
}
}
return ps.bind();
}
}
我上面的保存
方法将从多个线程调用,我认为Bound语句
不是线程安全的。BtwStatementCache
类是线程安全的,如上所示。
Bound语句
不是线程安全的。如果我从多个线程异步编写,我上面的代码会有问题吗?addCallback
参数中使用了Executors.newFixedThreadPool(10)
。这样可以吗?还是会有任何问题?或者我应该使用MoreExecutors.directExecutor
。那么这两者之间有什么区别?最好的方法是什么?下面是我使用datastax java驱动程序连接到cassandra的连接设置:
Builder builder = Cluster.builder();
cluster =
builder
.addContactPoints(servers.toArray(new String[servers.size()]))
.withRetryPolicy(new LoggingRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE))
.withPoolingOptions(poolingOptions)
.withReconnectionPolicy(new ConstantReconnectionPolicy(100L))
.withLoadBalancingPolicy(
DCAwareRoundRobinPolicy
.builder()
.withLocalDc(
!TestUtils.isProd() ? "DC2" : TestUtils.getCurrentLocation()
.get().name().toLowerCase()).withUsedHostsPerRemoteDc(3).build())
.withCredentials(username, password).build();
我认为你正在做的很好。您可以通过在应用程序启动时准备所有语句来进一步优化,这样您就可以缓存所有内容,这样您就不会在“保存”时因准备语句而受到任何性能影响,并且您不会锁定工作流中的任何内容。
绑定语句
不是线程安全的,但准备语句
是的,并且每次调用 get 语句
时,您都会返回一个新的绑定
语句。实际上,预准备语句
的 .bind()
函数实际上是新绑定语句 (ps).bind()
的快捷方式。并且您没有从多个线程访问相同的绑定语句
。所以你的代码很好。
相反,对于线程池,您实际上是在每个addCallback
函数上创建一个新的线程池。这是浪费资源。我不使用这种回调方法,我更喜欢自己管理纯FutureResultSet
,但我在datastax留档上看到了使用MoreExecutors.sameThreadExecutor()
而不是MoreExecutors.directExecutor()
的示例。
我需要使用Datastax Java驱动程序将Batches写入Cassandra,这是我第一次尝试将batch与Datastax Java驱动程序一起使用,因此我感到有些困惑- 下面是我的代码,我试图在其中创建一个语句对象并将其添加到Batch并将ConsistencyLevel设置为QUORUM。 下面是我的类 - 现在我的问题是-我使用Batch插入带有DatastaxJava驱动程序的ca
问题内容: 在进行大量数据加载时,基于日志数据增加计数器,但是遇到超时异常。我正在使用Datastax 2.0-rc2 Java驱动程序。 这是服务器无法跟上问题的问题(即服务器端配置问题),还是客户端无聊的等待服务器响应的问题?无论哪种方式,我都可以进行简单的配置更改来解决此问题吗? 节点之一在大致发生时报告此情况: 问题答案: 虽然我不了解此问题的根本原因,但我可以通过增加conf / cas
在进行大容量数据加载、根据日志数据递增计数器时,我遇到了超时异常。我使用DataStax2.0-RC2 java驱动程序。 这是服务器无法跟上的问题(即服务器端配置问题),还是客户端在等待服务器响应时感到厌烦的问题?不管怎样,我能做一个简单的配置更改来解决这个问题吗? 其中一个节点大致在它发生的时间报告这一点:
我们正在尝试使用DataStax驱动程序将CSV文件中的数据插入Cassandra。有哪些方法可以做到这一点? 我们目前使用运行cqlsh从CSV文件加载。
问题内容: 我打算使用Datastax Java驱动程序来编写Cassandra。.我主要对Datastax Java驱动程序及其功能感兴趣,但是我无法获得任何教程来解释如何将这些功能合并到下面的使用Datastax的代码中Java驱动程序 在下面的代码中,我正在使用Datastax Java驱动程序创建与Cassandra节点的连接。 有人可以帮助我如何在上述代码中添加批处理写入或异步功能..谢
我正在使用php的Datastax Cassandra驱动程序,希望能够检查是否查询失败,在数据库中没有找到结果。现在,如果查询失败,日志报告如下 如果查询成功,它将返回预期的数据。下面是函数 根据发布的建议,我最后做了以下几点 谢谢你的建议