我可以在datastax cassandra driver(3.0)中看到一个奇怪的行为。我创建了一个新集群,然后使用同一个集群对象启动了一组线程。如果我将线程数保持在1或2,我会看到平均提取时间为5ms,但如果我将线程数增加到60,提取时间将增加到200ms(每个线程)。奇怪的是,如果我让60个线程的应用程序运行,并在同一台机器上启动另一个只有1个线程的进程,那么单线程应用程序的提取时间又是5毫秒。所以这似乎与客户有关。我多次重复相同的测试,以避免缓存冷启动问题。以下是群集对象的配置方式:
PoolingOptions poolingOptions = new PoolingOptions();
poolingOptions
.setConnectionsPerHost(HostDistance.LOCAL, parallelism, parallelism+20)
.setConnectionsPerHost(HostDistance.REMOTE, parallelism, parallelism+20)
.setMaxRequestsPerConnection(HostDistance.LOCAL, 32768)
.setMaxRequestsPerConnection(HostDistance.REMOTE, 2000);
this.cluster = Cluster.builder()
.addContactPoints(nodes)
.withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE)
.withReconnectionPolicy(new ConstantReconnectionPolicy(100L))
.withLoadBalancingPolicy(new TokenAwarePolicy(DCAwareRoundRobinPolicy.builder().build()))
.withCompression(Compression.LZ4)
.withPoolingOptions(poolingOptions)
.withProtocolVersion(ProtocolVersion.V4)
.build();
有没有人经历过同样的问题?这似乎是一个客户端配置问题。也许Netty还缺少一些配置?
更新1应用程序正在使用以下查询提取数据块:
select * from table where id=? and ts>=? and ts<?
所以我有60个线程并行地提取这些数据。id是分区键。每个查询都由线程执行,如下所示:
//Prepare statement
PreparedStatement stmt = ... get the prepared statment cached
BoundStatement bstmt = stmt.bind(...)
//Execute query
long te1 = System.nanoTime();
ResultSet rs = this.session.execute(bstmt);
long te2 = System.nanoTime();
//Fetch...
Iterator<Row> iterator = rs.iterator();
while (!rs.isExhausted() && iterator.hasNext()) { .... }
会话是一个跨所有线程共享的会话。我所测量的是会议的平均时间。execute()方法调用。
谢谢
更新2:这里是模式定义
CREATE TABLE d_t (
id bigint,
xid bigint,
ts timestamp,
avg double,
ce double,
cg double,
p double,
w double,
c double,
sum double,
last double,
max double,
min double,
p75 double,
p90 double,
p95 double,
squad double,
sumq double,
wavg double,
weight double,
PRIMARY KEY ((id), xid, ts)
) WITH CLUSTERING ORDER BY (xid DESC, ts DESC)
and compaction = {'class': 'SizeTieredCompactionStrategy'}
and gc_grace_seconds=86400
and caching = { 'keys' : 'ALL', 'rows_per_partition':'36000' }
and min_index_interval = 2
and max_index_interval = 20;
更新3也尝试了
.setMaxRequestsPerConnection(HostDistance.LOCAL, 1)
.setMaxRequestsPerConnection(HostDistance.REMOTE, 1)
没有任何改变
最终我认为这将取决于你的代码在做什么。你能举个例子吗?
关于延迟的增加,你是如何衡量的?根据你的陈述:
奇怪的是,如果我让60个线程的应用程序运行,并在同一台机器上启动另一个只有1个线程的进程,那么单线程应用程序的提取时间又是5毫秒。
60个并发请求并不是太多,一般来说,不需要使用datastax java驱动程序对每个请求执行一个线程。您可以通过单个应用程序线程实现高吞吐量,因为驱动程序使用的netty事件循环组将完成大部分工作。
C*使用的本机协议允许每个连接有多个请求。正如您在这里配置的,每个连接最多可以有32768个并发请求。实际上,您根本不需要接触这个配置,因为默认(每个连接1000个请求)是明智的,因为在实践中,C*一次不会处理cassandra.yaml(128个默认)超过native_transport_max_threads
的请求,并对其余的进行排队。
正因为如此,您不需要为每个主机建立许多连接。默认的每个主机1个核心连接应该足以满足60个并发请求。增加每个主机的连接数对您没有多大帮助,在html" target="_blank">分析中,我发现超过8个高吞吐量的主机连接(数千个并发请求)的收益递减,超过16个主机连接的吞吐量越来越差,尽管您的里程可能因环境而异。
综上所述,我建议不要在默认设置之外配置poolgoptions
,除了在试图获得更高吞吐量的情况下将core和max设置为8之外(
我正在使用Datastax Cassandra驱动程序,并设置了RetryPolicy,以便在主机不可用时重试。然而,我注意到它尽可能快地重试。我想将其更改为在重试之间有越来越大的延迟,而不是在集群陷入困境时对其进行重击。这对于过载的请求错误尤其重要,因为我确实希望在这些情况下重试,但会有很大的延迟。 哪里是实施延迟的正确地点,什么是正确的机制?我应该抛出一个
用于cassandra的Datastax Java驱动程序(cassandra-driver-core 2.0.2)支持PreparedStatements以及QueryBuilder API。使用其中一种比另一种有什么特别的优势吗?缺点? 文档:http://www.datastax.com/documentation/developer/java-driver/2.0/common/drive
我一直在尝试写一些java应用程序。这个应用程序想要运行的是处理一个文本文件。 但是,输入文本文件很大(超过200MB),我尝试将200MB拆分为四个拆分文件(每个50MB) 所以,每一个都只需要0.5秒,但是用这种线性运行,每一个也需要2秒。(worker1+0.5s,worker2+0.5s,worker3+0.5s,worker4)如果我可以同时运行4个线程,我预计这个应用程序只需要0.5秒
我希望使用Spark从大约1500个远程Oracle表中提取数据,并且希望有一个多线程应用程序,该应用程序每个线程提取一个表,或者每个线程提取10个表,并启动一个Spark作业以从各自的表中读取数据。 从spark的官方站点https://spark.apache.org/docs/latest/job-scheduling.html可以看出这是可行的... ...Spark运行的群集管理器提供了
我正在尝试通过TestNG进行多线程测试为测试实例化WebDrivers<代码>@AfterMethod在测试后关闭WebDrivers
TL;DR:同时使用Hive和MySql JDBC有问题吗? 我正在开发一个应用程序,它使用MySql JDBC驱动程序执行多个SQL查询,然后它还使用Hive JDBC发送另一个Hive查询。 现在发生的情况是,MySql查询正常工作,当代码尝试执行配置单元查询时,它会抛出以下异常: 现在,在抛出这个异常之后,查询将正确执行。 我的猜测是,由于我同时加载了MySql和Hive驱动程序,MySql