当前位置: 首页 > 知识库问答 >
问题:

Cassandra 写入使用 Spark 提供非常慢的性能

唐钊
2023-03-14

我有一个包含大约 5 亿条记录的 cassandra 表(在 6 个节点中),现在我正在尝试在 Amazon EMR 中使用 spark-cassandra-connector 插入数据

表结构

  CREATE TABLE dmp.dmp_user_profiles_latest (
        pid text PRIMARY KEY,
        xnid int,
        day_count map<text, int>,
        first_seen map<text, timestamp>,
        last_seen map<text, timestamp>,
        usage_count map<text, int>,
        city text,
        country text,
        lid set<text>,

    )WITH bloom_filter_fp_chance = 0.01
    AND caching = '{"keys":"NONE", "rows_per_partition":"ALL"}'
    AND comment = ''
    AND compaction = {'min_threshold': '4', 'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy', 'max_threshold': '32'}
    AND compression = {'chunk_length_kb': '256', 'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 172800
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.1
    AND speculative_retry = '99.0PERCENTILE';
CREATE INDEX dmp_user_profiles_latest_app_day_count_idx ON dmp.dmp_user_profiles_latest (day_count);
CREATE INDEX dmp_user_profiles_latest_country_idx ON dmp.dmp_user_profiles_latest (country);

以下是我的火花提交选项

--class com.mobi.vserv.driver.Query5kPids1
--conf spark.dynamicAllocation.enabled=true  
--conf spark.yarn.executor.memoryOverhead=1024    
--conf spark.yarn.driver.memoryOverhead=1024 
--executor-memory 1g
--executor-cores 2
--driver-memory 4g

但是在日志中,我看到写入 Cassandra 大约需要 4-5 分钟才能加载 200,000 条记录(而总执行时间为 6 分钟)

我还在Spark conf中添加了以下内容

conf.set("spark.cassandra.output.batch.size.rows", "auto");
conf.set("spark.cassandra.output.concurrent.writes", "500");
conf.set("spark.cassandra.output.batch.size.bytes", "100000");
conf.set("spark.cassandra.output.throughput_mb_per_sec","1");

但仍然没有性能改进,增加Amazon EMR中的内核数量也无济于事。

请注意,在我的Cassandra表中,我们没有使用任何分区/集群列,所以这可能是性能如此慢的原因。

请注意网络速度为30 MB PS,主键是字母数字值,例如-a9be3eb4-751f-48ee-b593-b3f89e18622d

Cassandra.yaml

cluster_name: 'dmp Cluster'
num_tokens: 100
hinted_handoff_enabled: true
max_hint_window_in_ms: 10800000 # 3 hours
hinted_handoff_throttle_in_kb: 1024
max_hints_delivery_threads: 2
batchlog_replay_throttle_in_kb: 1024
authenticator: AllowAllAuthenticator
authorizer: AllowAllAuthorizer
permissions_validity_in_ms: 2000
partitioner: org.apache.cassandra.dht.Murmur3Partitioner
data_file_directories:
     - /data/cassandra/data
disk_failure_policy: stop
commit_failure_policy: stop

key_cache_size_in_mb:

key_cache_save_period: 14400
row_cache_size_in_mb: 0
row_cache_save_period: 0
counter_cache_size_in_mb:
counter_cache_save_period: 7200
saved_caches_directory: /data/cassandra/saved_caches
commitlog_sync: periodic
commitlog_sync_period_in_ms: 10000
seed_provider:
 - class_name: org.apache.cassandra.locator.SimpleSeedProvider
    parameters:
 - seeds: "10.142.76.97,10.182.19.301"

concurrent_reads: 256
concurrent_writes: 128
concurrent_counter_writes: 32

memtable_allocation_type: heap_buffers
memtable_flush_writers: 8
index_summary_capacity_in_mb:
index_summary_resize_interval_in_minutes: 60
trickle_fsync: false
trickle_fsync_interval_in_kb: 10240
storage_port: 7000
ssl_storage_port: 7001
listen_address: 10.142.76.97
start_rpc: true
rpc_address: 10.23.244.172
rpc_port: 9160
rpc_keepalive: true
rpc_server_type: sync
thrift_framed_transport_size_in_mb: 15
incremental_backups: false
snapshot_before_compaction: false
auto_snapshot: true
tombstone_warn_threshold: 1000
tombstone_failure_threshold: 100000
column_index_size_in_kb: 64
batch_size_warn_threshold_in_kb: 5
concurrent_compactors: 4
compaction_throughput_mb_per_sec: 64
sstable_preemptive_open_interval_in_mb: 50
read_request_timeout_in_ms: 500000

range_request_timeout_in_ms: 1000000

write_request_timeout_in_ms: 200000

counter_write_request_timeout_in_ms: 500000

cas_contention_timeout_in_ms: 100000

endpoint_snitch: Ec2Snitch

dynamic_snitch_update_interval_in_ms: 100

dynamic_snitch_reset_interval_in_ms: 600000

dynamic_snitch_badness_threshold: 0.1

request_scheduler: org.apache.cassandra.scheduler.NoScheduler

server_encryption_options:
    internode_encryption: none
    keystore: conf/.keystore
    keystore_password: cassandra
    truststore: conf/.truststore
    truststore_password: cassandra

client_encryption_options:
    enabled: false
    keystore: conf/.keystore
    keystore_password: cassandra

internode_compression: all

inter_dc_tcp_nodelay: false

共有1个答案

养俊驰
2023-03-14

正如评论中所说,看起来你的问题来自于你的日计数指数。

正如在本页中看到的,如果您必须一直更新它们,index将不会有效,当您将不同的值插入day_count(可能是每次)时,它就会有效。

您需要重新设计数据库,但由于这是您的生产环境,您不能只删除索引(如果存在keyspace.index_name如果需要此索引),而是可以使用day_count作为主键创建辅助数据库,或使用day_count作为排序索引。

 类似资料:
  • 我有一个带有复合分区键的 cassandra 表(time_bucket 时间戳,节点 int)。time_bucket值是插入数据的时间,秒转换为 00,节点值范围为 0 到 100 spark作业每分钟运行一次,从表中提取数据。该表包含近2500万条记录,每分钟都有记录被添加。 如果我的 Spark 作业每次运行时都选择所有记录,则作业将在 2 分钟内完成。但是如果我使用: s < code

  • 我可以以大约每秒10,000次插入的速度将插入直接流式传输到BigQuery,但是当我试图使用Dataflow插入时,'tobqrow'步骤(如下所示)非常慢。每10分钟只有50排,这是4名工人。知道为什么吗?以下是相关代码:

  • 谁能告诉我为什么火花连接器要花这么多时间插入?我在代码中做了什么错误吗?或者使用spark-cassandra连接器进行插入操作是否不可取?

  • 问题内容: 我面临一个非常奇怪的问题:使用Redis时,我的写入速度非常糟糕(在理想情况下,写入速度应该接近RAM上的写入速度)。 这是我的基准: 是生成随机字符串的类(arg是字符串长度) 以下是几个结果: [写入] nb:100000 |时间:4.408319378 |速度:0.713905907055318 MB / s [写入] nb:100000 |时间:4.4139469070553

  • 我正在使用presto查询Cassandra记录,它需要大约8分钟来响应结果。需要提高响应时间。 Presto配置如下: 片段2成本:CPU 1.98M,输入:17833912行(1.49GB),输出:13089502行(1.31GB) ScanFilterProject[table=cassandra:cassandra:rasapp:raslog,originalConstraint=((“B

  • 我的要求是尽可能的实时,这似乎离得很远。生产环境大约每3秒有400个事件。 是否需要对Cassandra中的YAML文件进行调优,或者对cassandra-connector本身进行任何更改