我必须为每个客户端每秒存储大约250个数值,即每小时大约90万个数字。它可能不会是全天的记录(可能每天5-10个小时),但我会根据客户端ID和读取日期对数据进行分区。最大行长约为22-23M,这仍然是可管理的。无论如何,我的方案看起来像这样:
CREATE TABLE measurement (
clientid text,
date text,
event_time timestamp,
value int,
PRIMARY KEY ((clientid,date), event_time)
);
密钥空间的复制因子为2,仅用于测试,告密者为GossipingPropertyFileSnitch
和NetworkTopologyStrategy
。我知道复制因子3更符合生产标准。
接下来,我在公司服务器上创建了一个小型集群,三台裸机虚拟化机器,具有2个CPU x 2核、16GB RAM和大量空间。我和他们在千兆局域网中。基于节点醇,集群可以运行。
下面是我用来测试我的设置的代码:
Cluster cluster = Cluster.builder()
.addContactPoint("192.168.1.100")
.addContactPoint("192.168.1.102")
.build();
Session session = cluster.connect();
DateTime time = DateTime.now();
BlockingQueue<BatchStatement> queryQueue = new ArrayBlockingQueue(50, true);
try {
ExecutorService pool = Executors.newFixedThreadPool(15); //changed the pool size also to throttle inserts
String insertQuery = "insert into keyspace.measurement (clientid,date,event_time,value) values (?, ?, ?, ?)";
PreparedStatement preparedStatement = session.prepare(insertQuery);
BatchStatement batch = new BatchStatement(BatchStatement.Type.LOGGED); //tried with unlogged also
//generating the entries
for (int i = 0; i < 900000; i++) { //900000 entries is an hour worth of measurements
time = time.plus(4); //4ms between each entry
BoundStatement bound = preparedStatement.bind("1", "2014-01-01", time.toDate(), 1); //value not important
batch.add(bound);
//The batch statement must have 65535 statements at most
if (batch.size() >= 65534) {
queryQueue.put(batch);
batch = new BatchStatement();
}
}
queryQueue.put(batch); //the last batch, perhaps shorter than 65535
//storing the data
System.out.println("Starting storing");
while (!queryQueue.isEmpty()) {
pool.execute(() -> {
try {
long threadId = Thread.currentThread().getId();
System.out.println("Started: " + threadId);
BatchStatement statement = queryQueue.take();
long start2 = System.currentTimeMillis();
session.execute(statement);
System.out.println("Finished " + threadId + ": " + (System.currentTimeMillis() - start2));
} catch (Exception ex) {
System.out.println(ex.toString());
}
});
}
pool.shutdown();
pool.awaitTermination(120,TimeUnit.SECONDS);
} catch (Exception ex) {
System.out.println(ex.toString());
} finally {
session.close();
cluster.close();
}
我通过阅读这里和其他博客和网站上的帖子来想出代码。据我所知,客户端使用多个线程非常重要,这就是我这样做的原因。我还尝试使用异步操作。
底线结果是,无论我使用哪种方法,一个批处理在5-6秒内执行,尽管可能需要长达10个。如果我只输入一个批处理(因此,只有~65k列),或者如果我使用一个愚蠢的单线程应用程序,则需要相同的时间。老实说,我期望更多一点。特别是因为我在具有本地实例的笔记本电脑上或多或少地获得了相似的性能。
第二个,也许更重要的问题,是我以一种不可预知的方式面临的例外。这两个:
com . datas tax . driver . core . exceptions . writetimeoutexception:一致性1的写查询期间Cassandra超时(需要1个副本,但只有0个副本确认了写操作)
和
com.datastax.driver.core.exceptions.NoHostAvailableException:所有主机尝试查询失败(尝试:/192.168.1.102:9042(com.datastax.driver.core.TransportException:[/192.168.1.102:9042]连接已关闭),/192.168.1.100:9042(com.datastax.driver.core.TransportException:[/192.168.1.100:9042]连接已关闭),/192.168.1.101:9042(com.datastax.driver.core.TransportException:[/192.168.1.101:9042]连接已关闭))
归根结底,我做错了什么吗?我应该重新组织加载数据的方式,还是更改方案。我试图减少行长(所以我有12个小时的行),但这并没有太大的区别。
=====1====更新:
我很粗鲁,忘记粘贴我在回答问题后使用的代码示例。它工作得相当好,但是我正在继续使用KairosDB和Astyanax进行二进制传输的研究。看起来我可以比CQL获得更好的性能,尽管KairosDB在过载时可能会遇到一些问题(但我正在研究它),并且Astyanax对于我的口味来说有点冗长。然而,这是代码,我可能在某个地方弄错了。
信号量插槽数在超过5000时对性能没有影响,它几乎是恒定的。
String insertQuery = "insert into keyspace.measurement (userid,time_by_hour,time,value) values (?, ?, ?, ?)";
PreparedStatement preparedStatement = session.prepare(insertQuery);
Semaphore semaphore = new Semaphore(15000);
System.out.println("Starting " + Thread.currentThread().getId());
DateTime time = DateTime.parse("2015-01-05T12:00:00");
//generating the entries
long start = System.currentTimeMillis();
for (int i = 0; i < 900000; i++) {
BoundStatement statement = preparedStatement.bind("User1", "2015-01-05:" + time.hourOfDay().get(), time.toDate(), 500); //value not important
semaphore.acquire();
ResultSetFuture resultSetFuture = session.executeAsync(statement);
Futures.addCallback(resultSetFuture, new FutureCallback<ResultSet>() {
@Override
public void onSuccess(@Nullable com.datastax.driver.core.ResultSet resultSet) {
semaphore.release();
}
@Override
public void onFailure(Throwable throwable) {
System.out.println("Error: " + throwable.toString());
semaphore.release();
}
});
time = time.plus(4); //4ms between each entry
}
使用未记录的批处理有什么结果?您确定要使用批处理语句吗?https://medium.com/@fondev/cassandra-batch-loading-non-the-batch-keyword-40f00e35e23e
我有一个由4个节点组成的Cassandra(2.2.1)集群,由Java客户端应用程序使用。复制因子为3,读写的一致性级别为LOCAL_QUORUM。每个节点大约有5 GB的数据。请求量约为每秒2-4k。几乎没有删除操作,因此创建了少量的墓碑。 一段时间前,我注意到读写性能很差,而且随着时间的推移,性能越来越差——集群变得非常慢。读取(通常)和写入超时已变得非常频繁。硬件不应该引起问题,部署集群的
问题内容: 我正在向SQLite3数据库中进行大批量插入,并且试图了解我应该期望的性能与实际看到的性能之间的关系。 我的桌子看起来像这样: 和我的插入看起来像这样: 元组列表在哪里。 目前,在一台2008年的Macbook上运行,在数据库中大约有1200万行的情况下,插入行花了我大约16分钟的时间。 这听起来合理吗,还是发生了什么大事? 问题答案: 据我了解,性能不佳的主要原因是浪费时间来执行许多
谁能告诉我为什么火花连接器要花这么多时间插入?我在代码中做了什么错误吗?或者使用spark-cassandra连接器进行插入操作是否不可取?
我有两个集群-1。Cloudera Hadoop-Spark作业在这里运行2。云-卡桑德拉星团,多DC 在编写从spark作业到cassandra集群的dataframe时,我在编写之前在spark中进行了重新分区(repartioncount=10)。见下文: 在我的多租户spark集群中,对于一个有20M记录的spark批加载,以及以下配置,我看到了很多任务失败、资源抢占和动态失败。 PS:我
我使用。 我每秒接收15万个请求,并将其插入到具有不同分区键的8个表中。 我的问题是哪种方式更好: 批量插入这些表 一个接一个地插入。 我问这个问题是因为,考虑到我的请求大小(150k),批处理听起来是更好的选择,但是因为所有的表都有不同的分区键,所以批处理显得很昂贵。
问题内容: 我的数据包含约30 000条记录。而且我需要将此数据插入到MySQL表中。我将这些数据按包进行分组(按1000分组),并创建多个插入,如下所示: 如何优化此插入的性能?每次可以插入1000条以上的记录吗?每行包含大小约为1KB的数据。谢谢。 问题答案: 您需要检查mysql服务器配置,尤其是检查缓冲区大小等。 您可以从表中删除索引(如果有的话),以使其更快。一旦数据输入,就创建索引。