我已经为MongoDB和Cassandra构建了一个导入器。基本上,导入程序的所有操作都是相同的,除了最后一部分中,数据的形成与所需的cassandra表架构和所需的mongodb文档结构相匹配。与MongoDB相比,Cassandra的写入性能确实很差,我想我做错了。
基本上,我的抽象导入程序类加载数据,读出所有数据,并将其传递给扩展的MongoDBImporter或CassandraImporter类,以将数据发送到数据库。一次针对一个数据库-
不能同时向C *和MongoDB插入“双”。导入程序在同一台计算机上针对相同数量的节点(6)运行。
问题:
57分钟后,MongoDB导入完成。我摄取了10.000.000个文档,并且期望Cassandra的行数大致相同。我的Cassandra导入程序自2.5小时以来一直在运行,并且仅在插入的5.000.000行中运行。我将在这里等待进口商完成并编辑实际的完成时间。
如何使用Cassandra导入:
我准备两个语句 一旦 摄取数据之前。这两个语句都是 UPDATE查询,
因为有时我必须将数据追加到现有列表中。开始导入之前,我的表已完全清除。准备好的语句会一遍又一遍地使用。
PreparedStatement statementA = session.prepare(queryA);
PreparedStatement statementB = session.prepare(queryB);
对于 每一 行,我创建一个BoundStatement并将该语句传递给我的“自定义”批处理方法:
BoundStatement bs = new BoundStatement(preparedStatement); //either statementA or B
bs = bs.bind();
//add data... with several bs.setXXX(..) calls
cassandraConnection.executeBatch(bs);
使用MongoDB,我一次可以插入1000个文档(即最大数量)。对于Cassandra,com.datastax.driver.core.exceptions.InvalidQueryException: Batch too large
在某个时候,导入程序仅崩溃了我的10条语句。我正在使用此代码来构建批次。顺便说一句,我之前以1000、500、300、200、100、50、20批处理开始,但显然它们也不起作用。然后将其设置为10,然后再次引发异常。现在我不知道为什么会中断。
private static final int MAX_BATCH_SIZE = 10;
private Session session;
private BatchStatement currentBatch;
...
@Override
public ResultSet executeBatch(Statement statement) {
if (session == null) {
throw new IllegalStateException(CONNECTION_STATE_EXCEPTION);
}
if (currentBatch == null) {
currentBatch = new BatchStatement(Type.UNLOGGED);
}
currentBatch.add(statement);
if (currentBatch.size() == MAX_BATCH_SIZE) {
ResultSet result = session.execute(currentBatch);
currentBatch = new BatchStatement(Type.UNLOGGED);
return result;
}
return null;
}
我的C *模式如下所示
CREATE TYPE stream.event (
data_dbl frozen<map<text, double>>,
data_str frozen<map<text, text>>,
data_bool frozen<map<text, boolean>>,
);
CREATE TABLE stream.data (
log_creator text,
date text, //date of the timestamp
ts timestamp,
log_id text, //some id
hour int, //just the hour of the timestmap
x double,
y double,
events list<frozen<event>>,
PRIMARY KEY ((log_creator, date, hour), ts, log_id)
) WITH CLUSTERING ORDER BY (ts ASC, log_id ASC)
有时我需要向现有行中添加其他新事件。这就是为什么我需要一个UDT列表的原因。我的UDT包含三个映射,因为事件创建者会产生不同的数据(字符串/双精度/布尔型的键/值对)。我知道以下事实:UDT已冻结,并且我无法触摸已摄取事件的地图。对我来说很好,我只需要添加有时具有相同时间戳的新事件。我对日志的创建者(一些传感器名称),记录的日期(即“
22-09-2016”)和时间戳的小时数进行分区(以在分配更多数据的同时将相关数据保持在一起)一个分区)。
我在Pom中使用Cassandra 3.0.8和Datastax Java
Driver,版本3.1.0。根据什么是卡桑德拉的批次限制?,我不应该通过调整增加批量大小batch_size_fail_threshold_in_kb
在我的cassandra.yaml
。那么…我的导入有什么问题或出了什么问题?
更新
因此,我已经调整了代码以运行异步查询并将当前正在运行的插入存储在列表中。每当异步插入完成时,它将从列表中删除。当列表大小超过阈值并且之前在插入中发生错误时,该方法将等待500ms直到插入低于阈值。现在,当没有插入失败时,我的代码会自动提高阈值。
但是,在流处理330万行之后,处理了280.000个插入,但是没有发生错误。
当前正在处理的插入数似乎太高。6个cassandra节点在2岁的商用硬件上运行。
这是否是大量并发插入(6个节点为280.000)的问题?我应该添加像这样的变量MAX_CONCURRENT_INSERT_LIMIT
吗?
private List<ResultSetFuture> runningInsertList;
private static int concurrentInsertLimit = 1000;
private static int concurrentInsertSleepTime = 500;
...
@Override
public void executeBatch(Statement statement) throws InterruptedException {
if (this.runningInsertList == null) {
this.runningInsertList = new ArrayList<>();
}
//Sleep while the currently processing number of inserts is too high
while (concurrentInsertErrorOccured && runningInsertList.size() > concurrentInsertLimit) {
Thread.sleep(concurrentInsertSleepTime);
}
ResultSetFuture future = this.executeAsync(statement);
this.runningInsertList.add(future);
Futures.addCallback(future, new FutureCallback<ResultSet>() {
@Override
public void onSuccess(ResultSet result) {
runningInsertList.remove(future);
}
@Override
public void onFailure(Throwable t) {
concurrentInsertErrorOccured = true;
}
}, MoreExecutors.sameThreadExecutor());
if (!concurrentInsertErrorOccured && runningInsertList.size() > concurrentInsertLimit) {
concurrentInsertLimit += 2000;
LOGGER.info(String.format("New concurrent insert limit is %d", concurrentInsertLimit));
}
return;
}
在使用了C 一段时间之后,我相信您应该只将批处理用于保持多个表同步。如果您不需要该 功能 ,则完全不要使用批处理,因为这 会* 导致性能下降。
将数据加载到C *的正确方法是异步写入,如果集群无法跟上接收速度,则可以使用可选的反压。您应该用以下方法替换“自定义”批处理方法:
要执行异步写入,请使用.executeAsync
方法,该方法将返回一个ResultSetFuture
对象。
为了控制下,有多少个运行中查询只是ResultSetFuture
将从.executeAsync
方法中检索到的对象收集在一个列表中,并且如果列表得到了(此处的计算值),则说1k个元素,然后等待所有这些元素完成后再发出更多写操作。或者,您可以等待第一个完成后再发出更多写操作,以保持列表完整。
最后,您可以在等待操作完成时检查写入失败。在这种情况下,您可以:
从1到4,背压 强度 增加。选择最适合您的情况的一种。
问题更新后编辑
您的插入逻辑对我来说似乎有点混乱:
while (concurrentInsertErrorOccured && runningInsertList.size() > concurrentInsertLimit)
是错误的,因为仅当发出的查询数>时您才会进入睡眠状态concurrentInsertLimit
,并且由于2.您的线程将仅停留在该位置。concurrentInsertErrorOccured
我通常会保留(失败的)查询列表,以便稍后重试。这使我可以对查询进行有力的控制,并且当失败的查询开始累积时,我会睡一会儿,然后继续重试它们(最多X次,然后出现严重失败…)。
该列表应该非常动态,例如,当查询失败时,您可以在其中添加项目,而在执行重试时,则可以删除项目。现在,您可以了解群集的限制,并concurrentInsertLimit
根据例如最近一秒内失败查询的平均数量进行调整,或者使用更简单的方法“
如果重试列表中有项目则暂停 ”等。
注释后编辑2
由于您不需要任何重试逻辑,因此我将以这种方式更改代码:
private List<ResultSetFuture> runningInsertList;
private static int concurrentInsertLimit = 1000;
private static int concurrentInsertSleepTime = 500;
...
@Override
public void executeBatch(Statement statement) throws InterruptedException {
if (this.runningInsertList == null) {
this.runningInsertList = new ArrayList<>();
}
ResultSetFuture future = this.executeAsync(statement);
this.runningInsertList.add(future);
Futures.addCallback(future, new FutureCallback<ResultSet>() {
@Override
public void onSuccess(ResultSet result) {
runningInsertList.remove(future);
}
@Override
public void onFailure(Throwable t) {
runningInsertList.remove(future);
concurrentInsertErrorOccured = true;
}
}, MoreExecutors.sameThreadExecutor());
//Sleep while the currently processing number of inserts is too high
while (runningInsertList.size() >= concurrentInsertLimit) {
Thread.sleep(concurrentInsertSleepTime);
}
if (!concurrentInsertErrorOccured) {
// Increase your ingestion rate if no query failed so far
concurrentInsertLimit += 10;
} else {
// Decrease your ingestion rate because at least one query failed
concurrentInsertErrorOccured = false;
concurrentInsertLimit = Max(1, concurrentInsertLimit - 50);
while (runningInsertList.size() >= concurrentInsertLimit) {
Thread.sleep(concurrentInsertSleepTime);
}
}
return;
}
您还可以通过用List<ResultSetFuture>
计数器代替来优化过程。
希望能有所帮助。
我已经为MongoDB和Cassandra建立了一个进口商。基本上,导入程序的所有操作都是相同的,除了最后一部分,在最后一部分中形成数据以匹配所需的cassandra表模式和所需的mongodb文档结构。与MongoDB相比,Cassandra的写性能真的很差,我认为我做错了什么。 基本上,我的抽象导入器类加载数据,读取所有数据,并将其传递给扩展的MongoDBImporter或Cassandra
问题内容: 在早期版本的MongoDB Java驱动程序中,要运行查询并对结果进行无序批量增补,我们要做的就是: 但是在版本3中,随着Bson Document支持和MongoCollection.bulkWrite()方法的引入,该怎么做? 我尝试了这个: 但是,我需要upsert功能。 谢谢。 问题答案: 您仍然可以使用所有功能,只是BulkWrites现在具有不同的语法: 因此,您可以使用(
我需要使用Datastax Java驱动程序将Batches写入Cassandra,这是我第一次尝试将batch与Datastax Java驱动程序一起使用,因此我感到有些困惑- 下面是我的代码,我试图在其中创建一个语句对象并将其添加到Batch并将ConsistencyLevel设置为QUORUM。 下面是我的类 - 现在我的问题是-我使用Batch插入带有DatastaxJava驱动程序的ca
驱动程序版本为: 我的问题是,当我使用api find和一些来自java的过滤器时,操作需要15秒。 我检查了mongo服务器日志文件,发现跟踪是一个命令,而不是一个查询: 2015-09-01T12:11:47.496+0200I命令[conn503]命令b.$CMD命令:计数{count:“logs”,查询:{timestamp:{$GTE:新日期(1433109600000)},aplica
我有一个包含大约 5 亿条记录的 cassandra 表(在 6 个节点中),现在我正在尝试在 Amazon EMR 中使用 spark-cassandra-connector 插入数据 表结构 以下是我的火花提交选项 但是在日志中,我看到写入 Cassandra 大约需要 4-5 分钟才能加载 200,000 条记录(而总执行时间为 6 分钟) 我还在Spark conf中添加了以下内容 但仍然
问题内容: 我打算使用Datastax Java驱动程序来编写Cassandra。.我主要对Datastax Java驱动程序及其功能感兴趣,但是我无法获得任何教程来解释如何将这些功能合并到下面的使用Datastax的代码中Java驱动程序 在下面的代码中,我正在使用Datastax Java驱动程序创建与Cassandra节点的连接。 有人可以帮助我如何在上述代码中添加批处理写入或异步功能..谢