当前位置: 首页 > 知识库问答 >
问题:

PySpark Cassandra:获取分区键的不同值

丁光华
2023-03-14

我正在尝试获取 pyspark 中 cassandra 表的分区键的不同值。但是,pyspark 似乎不理解我,并完全迭代所有数据(很多),而不是查询索引。

这是我使用的代码,在我看来非常简单:

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Spark! This town not big enough for the two of us.") \
    .getOrCreate()

ct = spark.read\
    .format("org.apache.spark.sql.cassandra")\
    .options(table="avt_sensor_data", keyspace="ipe_smart_meter")\
    .load()

all_sensors = ct.select("machine_name", "sensor_name")\
    .distinct() \
    .collect()

列“机器名”和“传感器名”一起构成了分区键(完整的模式见下文)。在我看来,这应该非常快,事实上,如果我在cql中html" target="_blank">执行这个查询,只需要几秒钟:

select distinct machine_name,sensor_name from ipe_smart_meter.avt_sensor_data;

但是,火花作业大约需要 10 个小时才能完成。从 Spark 告诉我的计划来看,它似乎真的想迭代所有数据:

== Physical Plan ==
*HashAggregate(keys=[machine_name#0, sensor_name#1], functions=[], output=[machine_name#0, sensor_name#1])
+- Exchange hashpartitioning(machine_name#0, sensor_name#1, 200)
   +- *HashAggregate(keys=[machine_name#0, sensor_name#1], functions=[], output=[machine_name#0, sensor_name#1])
      +- *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@2ee2f21d [machine_name#0,sensor_name#1] ReadSchema: struct<machine_name:string,sensor_name:string>

我不是专家,但这对我来说不像是“使用卡桑德拉指数”。

我做错了什么?有没有办法告诉 spark 委派从 cassandra 获取不同值的任务?任何帮助将不胜感激!

如果有帮助,以下是底层cassandra表的架构描述:

CREATE KEYSPACE ipe_smart_meter WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '2'}  AND durable_writes = true;

CREATE TABLE ipe_smart_meter.avt_sensor_data (
    machine_name text,
    sensor_name text,
    ts timestamp,
    id bigint,
    value double,
    PRIMARY KEY ((machine_name, sensor_name), ts)
) WITH CLUSTERING ORDER BY (ts DESC)
    AND bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = '[PRODUCTION] Table for raw data from AVT smart meters.'
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';

共有1个答案

樊浩初
2023-03-14

似乎只有在选择、过滤或排序时,cassandra服务器端下推谓词才起作用。

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/14_data_frames.md

因此,在您的的不同()的情况下,火花获取所有行,然后,执行不同()

你说你的cql选择不同…已经超级快了。我猜分区键的数量相对较少(machine_name和sensor_name的组合),而且有这么多“ts”。

所以,最简单的解决方案就是使用cql(例如,cassandra驱动程序)。

由于cassandra是一个查询优先的数据库,因此只需再创建一个表,该表仅包含您的不同查询所需的分区键。

CREATE TABLE ipe_smart_meter.avt_sensor_name_machine_name (
    machine_name text,
    sensor_name text,
    PRIMARY KEY ((machine_name, sensor_name))
);

然后,每次在原始表中插入行时,将machine_name和sensor_name插入到新表中。由于它只有分区键,因此对于您的查询来说,这是一个自然的独立表。只需获取所有行。也许超级快。不需要不同的过程。

我认为解决方案2是最好的。但是如果您不想对一条记录执行两次插入,还有一个解决方案是更改您的表并创建一个物化视图表。

CREATE TABLE ipe_smart_meter.ipe_smart_meter.avt_sensor_data (
    machine_name text,
    sensor_name text,
    ts timestamp,
    id bigint,
    value double,
    dist_hint_num smallint,
    PRIMARY KEY ((machine_name, sensor_name), ts)
) WITH CLUSTERING ORDER BY (ts DESC)
;

CREATE MATERIALIZED VIEW IF NOT EXISTS ipe_smart_meter.avt_sensor_data_mv AS
  SELECT
    machine_name
    ,sensor_name
    ,ts
    ,dist_hint_num
  FROM ipe_smart_meter.avt_sensor_data
  WHERE
    machine_name IS NOT NULL
    AND sensor_name IS NOT NULL
    AND ts IS NOT NULL
    AND dist_hint_num IS NOT NULL
  PRIMARY KEY ((dist_hint_num), machine_name, sensor_name, ts)
  WITH
  AND CLUSTERING ORDER BY (machine_name ASC, sensor_name DESC, ts DESC)
;

dist_hint_num列用于限制查询循环访问和分发记录的分区总数。

例如从0到15。随机整数< code>random.randint(0,15)或基于哈希的整数< code > hash _ func(machine _ name sensor _ name)% 16 都可以。然后,当您如下查询时。cassandra只从16个分区获取所有记录,可能比你现在的情况效率更高。

但是,无论如何,必须读取所有记录,然后distinct()(发生混洗)。不节省空间。我认为这不是一个好的解决方案。

functools.reduce(
    lambda df, dist_hint_num: df.union(
        other=spark_session.read.format(
            'org.apache.spark.sql.cassandra',
        ).options(
            keyspace='ipe_smart_meter',
            table='avt_sensor_data_mv',
        ).load().filter(
            col('dist_hint_num') == expr(
                f'CAST({dist_hint_num} AS SMALLINT)'
            )
        ).select(
            col('machine_name'),
            col('sensor_name'),
        ),
    ),
    range(0, 16),
    spark_session.createDataFrame(
        data=(),
        schema=StructType(
            fields=(
                StructField(
                    name='machine_name',
                    dataType=StringType(),
                    nullable=False,
                ),
                StructField(
                    name='sensor_name',
                    dataType=StringType(),
                    nullable=False,
                ),
            ),
        ),
    ),
).distinct().persist().alias(
    'df_all_machine_sensor',
)
 类似资料:
  • 如何使用Amazon DynamoDB模块获取仅匹配分区键(表有排序键)的所有项目。我正在使用GetItemRequest查询没有排序键。 当我仅使用分区键和GetItemRequest进行查询时,我会遇到以下错误。 <代码>原因:软件。亚马逊。awssdk。服务。发电机B。模型DynamoDbException:提供的关键字元素与架构不匹配(服务:DynamoDb,状态代码:400,请求ID:6

  • 我有一个HashMap(字符串,对象)。键是多个唯一ID的组合。我有一个输入,一个字符串是键的一部分(1个唯一ID)。我需要使用键的这一部分在HashMap中获取值,而无需在HashMap中迭代数千个值。 我们可以使用HashMap.get()中的任何正则表达式实现它吗? 我的钥匙是xxx。yyy年。zzz,其中xxx的组合。zzz在整个地图上都是独一无二的。我有xxx和zzz作为输入。此外,我还

  • 问题内容: 我想使用Java代码获取不同时区的本地时间。基于传递给函数的时区,我需要该时区的本地时间。如何实现呢? 问题答案:

  • 问题内容: 我在尝试为此表定义SQL查询时遇到了麻烦: 有一张患者表格,其访问时记录的体重读数包括以下几列: 患者编号 体重读数 访问ID(每次访问一个) 换句话说,如果在两个记录中两个访问ID相同,则在相同的访问日期读取了两个权重。 我有这个查询来“让所有至少有两个体重读数高于150的患者”: 这是我的问题:如果我想修改此查询以便查询以下内容,该怎么办: “让所有患者在不同的访问中至少有两个体重

  • 如果我有一个主题,它有5个分区,然后我有一个服务消耗这5个分区。然后在consumer,我轮询并返回一个ConsumerRecords数组。 每个单独的ConsumerRecord是否可以来自这5个分区中的任何一个?