当前位置: 首页 > 知识库问答 >
问题:

Spark-Cassandra-每个分区的连接器限制

金子轩
2023-03-14

是否有一种方法可以使用datastax/spark-cassandra-connector来选择每个分区密钥的最新版本,该版本相当于Cassandra3.6和更高版本的每个分区限制选项?

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;

import com.datastax.spark.connector.japi.CassandraRow;
import com.datastax.spark.connector.japi.rdd.CassandraJavaPairRDD;
import com.datastax.spark.connector.japi.rdd.CassandraJavaRDD;
import com.datastax.spark.connector.japi.rdd.CassandraTableScanJavaRDD;
import com.google.common.collect.ImmutableMap;

import lombok.extern.slf4j.Slf4j;
import scala.Tuple2;
import scala.Tuple3;

import static java.lang.Double.*;
import static java.lang.Integer.*;

@Slf4j
public class Main extends Configured implements Tool {

    public static void main(String[] args) throws Exception {
        System.exit(ToolRunner.run(new Main(), args));
    }

    @Override
    public int run(String[] args) throws Exception {
        SPARK_SESSION = SparkSession
            .builder()
            .master(SPARK_MASTER)
            .appName(APP_NAME)
            .config("spark.cassandra.connection.host", CASSANDRA_HOST_IPS)
            .config("spark.cassandra.auth.username", CASSANDRA_USER_NAME)
            .config("spark.cassandra.auth.password", CASSANDRA_PASSWORD)
            .config("pushdown", "true")
            .getOrCreate();

        try (JavaSparkContext sc = new JavaSparkContext(SPARK_SESSION.sparkContext())) {
            insertPerPartitionLimitTestList();
            getJavaRddPerPartitionLimitTest(sc);
            getTypedJavaRddPerPartitionLimitTest(sc);
            getJavaPairRddPerPartitionLimitTest(sc);
            getCassandraJavaRddPerPartitionLimitTest(sc);
            getTypedCassandraJavaRddPerPartitionLimitTest(sc);
            getCassandraTableScanJavaRddPerPartitionLimitTest(sc);
            getTypedCassandraTableScanJavaRddPerPartitionLimitTest(sc);
            getCassandraJavaRddToJavaRddPerPartitionLimitTest(sc);
            getSparkDatasetPerPartitionLimitTest(sc);
            getSparkSqlDatasetPerPartitionLimitTest();
            log.info("Done");
            return 0;  // success exit code
        } catch (Throwable t) {
            log.error("Spark transform failed.", t);
            return 1;  // failure exit code
        }
    }

    public final Map<String, String> cassandraConfig(String keyspace, String table) {
        return ImmutableMap.<String, String>builder()
            .put("spark.cassandra.connection.host", CASSANDRA_HOST_IPS)
            .put("spark.cassandra.auth.username", CASSANDRA_USER_NAME)
            .put("spark.cassandra.auth.password", CASSANDRA_PASSWORD)
            .put("pushdown", "true")
            .put("keyspace", keyspace)
            .put("table", table)
            .build();
    }

    /**
     * Generate test data to INSERT INTO the Cassandra bug.per_partition_limit_test table.
     *
     * @param listSize The number of rows of test data to generate.
     * @return {@link List} of {@link PerPartitionLimitTest} containing test data.
     */
    public List<PerPartitionLimitTest> buildPerPartitionLimitTestList(Integer listSize){
        final Timestamp timeSeriesDate = Timestamp.from(LocalDateTime.now().atZone(ZoneId.of("UTC")).toInstant());
        final List<PerPartitionLimitTest> perPartitionLimitTests = new ArrayList<>(listSize);
        // Populate List of objects with test data.
        for(int i = 0; i < listSize; i++){
            final String itemUuid = UUID.randomUUID().toString();
            perPartitionLimitTests.add(
                PerPartitionLimitTest.of(
                    itemUuid,
                    timeSeriesDate,
                    String.format("/items/%s", itemUuid.toString())
                )
            );
        }
        return perPartitionLimitTests;
    }

    /**
     * Generate test data and INSERT Dataset data into Cassandra table
     */
    public void insertPerPartitionLimitTestList(){
        final Map<String, String> cassandraConfig = cassandraConfig("bug", "per_partition_limit_test");
        createDatasetFromList(
            PerPartitionLimitTest.class,
            buildPerPartitionLimitTestList(20)
        )
            .select("itemUuid", "timeSeriesDate", "itemUri")
            .toDF("item_uuid",
                "time_series_date",
                "item_uri")
            .write()
            .format("org.apache.spark.sql.cassandra")
            .mode(SaveMode.Append)
            .options(cassandraConfig)
            .save();
    }

    private PerPartitionLimitTestRowReaderFactory perPartitionLimitTestRowReaderFactory = new PerPartitionLimitTestRowReaderFactory();

    public String getPerPartitionLimitTestItemUuidMin(JavaSparkContext sc){
        return String.valueOf(
            getPerPartitionLimitTestDataset(
                PerPartitionLimitTest.class,
                "org.apache.spark.sql.cassandra",
                cassandraConfig("bug", "per_partition_limit_test")
            )
                .first()
                .getItemUuid());
    }

    public void getJavaRddPerPartitionLimitTest(JavaSparkContext sc){
        final String itemUuidMin = String.valueOf(
            getPerPartitionLimitTestDataset(
                PerPartitionLimitTest.class,
                "org.apache.spark.sql.cassandra",
                cassandraConfig("bug", "per_partition_limit_test")
            )
            .first()
            .getItemUuid());

        JavaRDD<CassandraRow> javaRDD = javaFunctions(sc)
            .cassandraTable("bug", "per_partition_limit_test")
            .where(String.format("TOKEN(item_uuid) > TOKEN(%s) PER PARTITION LIMIT 1", itemUuidMin));
        log.info(String.format("javaRDD.count() = %s", javaRDD.count()));
    }

    public void getTypedJavaRddPerPartitionLimitTest(JavaSparkContext sc){
        JavaRDD<PerPartitionLimitTest> javaRDD = javaFunctions(sc)
            .cassandraTable("bug", "per_partition_limit_test", perPartitionLimitTestRowReaderFactory)
            .where("PER PARTITION LIMIT 1");
        log.info(String.format("javaRDD.count() = %s", javaRDD.count()));
    }

    public void getJavaPairRddPerPartitionLimitTest(JavaSparkContext sc){
        JavaPairRDD<String, PerPartitionLimitTest> javaPairRDD = javaFunctions(sc)
            .cassandraTable("bug", "per_partition_limit_test", perPartitionLimitTestRowReaderFactory)
            .where("PER PARTITION LIMIT 1")
            .keyBy((Function<PerPartitionLimitTest, String>) PerPartitionLimitTest::getItemUuid);
        log.info(String.format("javaPairRDD.count() = %s", javaPairRDD.count()));
    }

    public void getTypedCassandraJavaRddPerPartitionLimitTest(JavaSparkContext sc){
        CassandraJavaRDD<PerPartitionLimitTest> cassandraJavaRDD = javaFunctions(sc)
            .cassandraTable("bug", "per_partition_limit_test", perPartitionLimitTestRowReaderFactory)
            .where("PER PARTITION LIMIT 1");
        log.info(String.format("cassandraJavaRDD.count() = %s", cassandraJavaRDD.count()));
    }

    public void getCassandraTableScanJavaRddPerPartitionLimitTest(JavaSparkContext sc){
        CassandraTableScanJavaRDD<CassandraRow> cassandraTableScanJavaRDD = javaFunctions(sc)
            .cassandraTable("bug", "per_partition_limit_test")
            .where("PER PARTITION LIMIT 1");
        log.info(String.format("cassandraTableScanJavaRDD.count() = %s", cassandraTableScanJavaRDD.count()));
    }

    public void getTypedCassandraTableScanJavaRddPerPartitionLimitTest(JavaSparkContext sc){
        CassandraTableScanJavaRDD<PerPartitionLimitTest> cassandraTableScanJavaRDD = javaFunctions(sc)
            .cassandraTable("bug", "per_partition_limit_test", perPartitionLimitTestRowReaderFactory)
            .where("PER PARTITION LIMIT 1");
        log.info(String.format("cassandraTableScanJavaRDD.count() = %s", cassandraTableScanJavaRDD.count()));
    }

    public void getCassandraJavaRddToJavaRddPerPartitionLimitTest(JavaSparkContext sc){
        CassandraJavaRDD<CassandraRow> cassandraJavaRDD = javaFunctions(sc)
            .cassandraTable("bug", "per_partition_limit_test");
        JavaRDD<PerPartitionLimitTest> javaRDD = cassandraJavaRDD
            .where("PER PARTITION LIMIT 1")
            .map((Function<CassandraRow, PerPartitionLimitTest>) cassandraRow -> PerPartitionLimitTest.of(
                cassandraRow.getUUID("item_uuid").toString(),
                new Timestamp(cassandraRow.getDateTime("time_series_date").getMillis()),
                cassandraRow.getString("item_uri")
            ));
        log.info(String.format("javaRDD.count() = %s", javaRDD.count()));
    }

    /**
     * SELECT data from an input data source into a typed {@link Dataset}.
     *
     * @param clazz {@link Class} The class of type T that Spark should used to convert the internal Spark SQL representation into.  This
     *                                  tells Spark the type of object each row in this Dataset should be encoded as.
     * @param format Specifies the input data source format.
     * @param config {@link Map} of {@link String} containing options defining the input data source connection.
     * @param <T> type of class.
     * @return Typed {@link Dataset} containing table data selected from the input data source.
     */
    public <T> Dataset<T> getPerPartitionLimitTestDataset(Class<T> clazz, String format, Map<String, String> config) {
        final Encoder<T> encoder = Encoders.bean(clazz);
        return SPARK_SESSION
            .read()
            .format(format)
            .options(config)
            .load()
            .select("item_uuid", "time_series_date", "item_uri")
            .toDF("itemUuid", "timeSeriesDate", "itemUri")
            .as(encoder);
    }

    public void getSparkDatasetPerPartitionLimitTest(JavaSparkContext sc){
        final Dataset<PerPartitionLimitTest> perPartitionLimitTestDataset =
            getPerPartitionLimitTestDataset(
                PerPartitionLimitTest.class,
                "org.apache.spark.sql.cassandra",
                cassandraConfig("bug", "per_partition_limit_test")
            )
            .where("PER PARTITION LIMIT 1");
        log.info(String.format("perPartitionLimitTestDataset.count() = %s", perPartitionLimitTestDataset.count()));
    }

    public void getSparkDatasetPerPartitionLimitTestWithTokenGreaterThan(JavaSparkContext sc){
        final String itemUuidMin = getPerPartitionLimitTestItemUuidMin(sc);
        final Dataset<PerPartitionLimitTest> perPartitionLimitTestDataset =
            getPerPartitionLimitTestDataset(
                PerPartitionLimitTest.class,
                "org.apache.spark.sql.cassandra",
                cassandraConfig("bug", "per_partition_limit_test")
            )
            .where(String.format("TOKEN(item_uuid) > TOKEN(%s) PER PARTITION LIMIT 1", itemUuidMin));
        log.info(String.format("perPartitionLimitTestDataset.count() = %s", perPartitionLimitTestDataset.count()));
    }

    public void getSparkSqlDatasetPerPartitionLimitTest(){
        final Dataset<PerPartitionLimitTest> perPartitionLimitTestDataset =
            getPerPartitionLimitTestDataset(PerPartitionLimitTest.class, "org.apache.spark.sql.cassandra", cassandraConfig("bug", "per_partition_limit_test"));
        // Register the DataFrame as a SQL temporary view
        perPartitionLimitTestDataset.createOrReplaceTempView("perPartitionLimitTests");
        final Encoder<PerPartitionLimitTest> perPartitionLimitTestEncoder = Encoders.bean(PerPartitionLimitTest.class);
        // Modify data using Spark SQL
        final Dataset<PerPartitionLimitTest> perPartitionLimitTestSqlDS = SPARK_SESSION.sql(
            "SELECT item_uuid, "
                + "time_series_date, "
                + "'item_uri "
                + "FROM perPartitionLimitTests "
                + "PER PARTITION LIMIT 1")
            .as(perPartitionLimitTestEncoder);
        log.info(String.format("perPartitionLimitTestSqlDS.count() = %s", perPartitionLimitTestSqlDS.count()));
    }
}
    import java.io.Serializable;
    import java.sql.Timestamp;

    import com.datastax.driver.core.Row;
    import com.datastax.spark.connector.CassandraRowMetadata;
    import com.datastax.spark.connector.ColumnRef;
    import com.datastax.spark.connector.cql.TableDef;
    import com.datastax.spark.connector.rdd.reader.RowReader;
    import com.datastax.spark.connector.rdd.reader.RowReaderFactory;

    import scala.collection.IndexedSeq;

    public class PerPartitionLimitTestRowReader extends GenericRowReader<PerPartitionLimitTest> {
        private static final long serialVersionUID = 1L;
        private static RowReader<PerPartitionLimitTest> reader = new PerPartitionLimitTestRowReader();

        public static class PerPartitionLimitTestRowReaderFactory implements RowReaderFactory<PerPartitionLimitTest>, Serializable{
            private static final long serialVersionUID = 1L;

            @Override
            public RowReader<PerPartitionLimitTest> rowReader(TableDef arg0, IndexedSeq<ColumnRef> arg1) {
                return reader;
            }

            @Override
            public Class<PerPartitionLimitTest> targetClass() {
                return PerPartitionLimitTest.class;
            }
        }

        @Override
        public PerPartitionLimitTest read(Row row, CassandraRowMetadata rowMetaData) {
            PerPartitionLimitTest perPartitionLimitTest = new PerPartitionLimitTest();
            perPartitionLimitTest.setItemUuid(row.getUUID("item_uuid").toString());
            perPartitionLimitTest.setTimeSeriesDate(new Timestamp(row.getTimestamp("time_series_date").getTime()));
            perPartitionLimitTest.setItemUri(row.getString("item_uri"));
            return perPartitionLimitTest;
        }
    }
}
import java.io.Serializable;

import com.datastax.spark.connector.ColumnRef;
import com.datastax.spark.connector.rdd.reader.RowReader;

import scala.Option;
import scala.collection.Seq;

public abstract class GenericRowReader<T> implements RowReader<T>, Serializable {
    private static final long serialVersionUID = 1L;

    @Override
    public Option<Seq<ColumnRef>> neededColumns() {
        return Option.empty();
    }

}

PerPartitionLimitTest域实体

import java.io.Serializable;
import java.sql.Timestamp;

import javax.validation.Valid;
import javax.validation.constraints.NotNull;
import javax.xml.bind.annotation.XmlRootElement;
import javax.xml.bind.annotation.XmlType;

import com.datastax.driver.mapping.annotations.Column;
import com.datastax.driver.mapping.annotations.Table;

import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;

@Data
@NoArgsConstructor
@Table(keyspace = "bug", name = "per_partition_limit_test")
@RequiredArgsConstructor(staticName = "of")
@XmlType(name = "PerPartitionLimitTest")
@XmlRootElement(name = "perPartitionLimitTest")
public class PerPartitionLimitTest implements Serializable {

    /**
     * Type 4 uuid that uniquely identifies the item.
     */
    @Valid
    @NotNull @NonNull
    @Column(name = "item_uuid")
    private String itemUuid;

    /**
     * The timestamp when the data was inserted into Cassandra.
     */
    @NotNull @NonNull
    @Column(name = "time_series_date")//, codec = TimestampTypeCodec.class)
    private Timestamp timeSeriesDate;

    /**
     * URI that points to an itme.
     */
    @Column(name = "item_uri")
    @NotNull @NonNull
    private String itemUri;

}

卡桑德拉表:

USE bug;

DROP TABLE IF EXISTS bug.per_partition_limit_test;

CREATE TABLE bug.per_partition_limit_test (
  item_uuid uuid,
  time_series_date timestamp,
  item_uri text static,
  PRIMARY KEY ((item_uuid), time_series_date)
) WITH CLUSTERING ORDER BY (time_series_date DESC)
AND comment = 'Table Properties:
default_time_to_live - set to 518400 seconds which is 6 days, data will be automatically dropped after 6 days
Compaction
class - set to TimeWindowCompactionStrategy which is used for time series data stored in tables that use the default TTL for all data
compaction_window_unit - set to DAYS which is time unit used to define the bucket size
compaction_window_size - set to 6 which is how many units per bucket'
AND compaction = {'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy', 'compaction_window_size': '6', 'compaction_window_unit': 'DAYS'}
AND default_time_to_live = 518400
AND gc_grace_seconds = 519400;

Maven引用:

    <dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector_2.10</artifactId>
        <version>2.0.0-M3</version>
    </dependency>
    <dependency>
        <groupId>com.datastax.cassandra</groupId>
        <artifactId>cassandra-driver-core</artifactId>
        <version>3.1.2</version>
    </dependency>
    <dependency>
        <groupId>com.datastax.cassandra</groupId>
        <artifactId>cassandra-driver-mapping</artifactId>
        <version>3.1.2</version>
    </dependency>
    <dependency>
        <groupId>com.datastax.cassandra</groupId>
        <artifactId>cassandra-driver-extras</artifactId>
        <version>3.1.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-catalyst_2.10</artifactId>
        <version>2.0.2</version>
        <scope>compile</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>2.0.2</version>
        <scope>compile</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.10</artifactId>
        <version>2.0.2</version>
        <scope>compile</scope>
    </dependency>
[Stage 0:>                                                         (0 + 8) / 18]ERROR [2017-01-27 04:24:38,061] (Executor task launch worker-1) org.apache.spark.executor.Executor: Exception in task 1.0 in stage 0.0 (TID 1)
com.datastax.driver.core.exceptions.SyntaxError: line 1:154 no viable alternative at input 'PARTITION' (...("item_uuid") <= ? AND [PER] PARTITION...)
    at com.datastax.driver.core.Responses$Error.asException(Responses.java:132)
    at com.datastax.driver.core.SessionManager$4.apply(SessionManager.java:224)
    at com.datastax.driver.core.SessionManager$4.apply(SessionManager.java:200)
    at shade.com.datastax.spark.connector.google.common.util.concurrent.Futures$ChainingListenableFuture.run(Futures.java:906)
    at shade.com.datastax.spark.connector.google.common.util.concurrent.Futures$1$1.run(Futures.java:635)
    ... 3 common frames omitted
Wrapped by: com.datastax.driver.core.exceptions.SyntaxError: line 1:154 no viable alternative at input 'PARTITION' (...("item_uuid") <= ? AND [PER] PARTITION...)
    at com.datastax.driver.core.exceptions.SyntaxError.copy(SyntaxError.java:58)
    at com.datastax.driver.core.exceptions.SyntaxError.copy(SyntaxError.java:24)
    at com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:37)
    at com.datastax.driver.core.AbstractSession.prepare(AbstractSession.java:113)
    at com.datastax.spark.connector.cql.PreparedStatementCache$.prepareStatement(PreparedStatementCache.scala:45)
    at com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:28)
    at com.sun.proxy.$Proxy8.prepare(Unknown Source)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.createStatement(CassandraTableScanRDD.scala:279)
    ... 16 common frames omitted
Wrapped by: java.io.IOException: Exception during preparation of SELECT "item_uuid", "time_series_date", "item_uri" FROM "bug"."per_partition_limit_test" WHERE token("item_uuid") > ? AND token("item_uuid") <= ? AND PER PARTITION LIMIT 1   ALLOW FILTERING: line 1:154 no viable alternative at input 'PARTITION' (...("item_uuid") <= ? AND [PER] PARTITION...)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.createStatement(CassandraTableScanRDD.scala:293)


[Stage 0:>                                                         (0 + 8) / 18]ERROR [2017-01-27 04:26:02,044] (Executor task launch worker-3) org.apache.spark.executor.Executor: Exception in task 3.0 in stage 0.0 (TID 3)
com.datastax.driver.core.exceptions.SyntaxError: line 1:154 no viable alternative at input 'PARTITION' (...("item_uuid") <= ? AND [PER] PARTITION...)
    at com.datastax.driver.core.Responses$Error.asException(Responses.java:132)
    at com.datastax.driver.core.SessionManager$4.apply(SessionManager.java:224)
    at com.datastax.driver.core.SessionManager$4.apply(SessionManager.java:200)
    at shade.com.datastax.spark.connector.google.common.util.concurrent.Futures$ChainingListenableFuture.run(Futures.java:906)
    at shade.com.datastax.spark.connector.google.common.util.concurrent.Futures$1$1.run(Futures.java:635)
    ... 3 common frames omitted
Wrapped by: com.datastax.driver.core.exceptions.SyntaxError: line 1:154 no viable alternative at input 'PARTITION' (...("item_uuid") <= ? AND [PER] PARTITION...)
    at com.datastax.driver.core.exceptions.SyntaxError.copy(SyntaxError.java:58)
    at com.datastax.driver.core.exceptions.SyntaxError.copy(SyntaxError.java:24)
    at com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:37)
    at com.datastax.driver.core.AbstractSession.prepare(AbstractSession.java:113)
    at com.datastax.spark.connector.cql.PreparedStatementCache$.prepareStatement(PreparedStatementCache.scala:45)
    at com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:28)
    at com.sun.proxy.$Proxy8.prepare(Unknown Source)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.createStatement(CassandraTableScanRDD.scala:279)
    ... 16 common frames omitted
Wrapped by: java.io.IOException: Exception during preparation of SELECT "item_uuid", "time_series_date", "item_uri" FROM "bug"."per_partition_limit_test" WHERE token("item_uuid") > ? AND token("item_uuid") <= ? AND PER PARTITION LIMIT 1   ALLOW FILTERING: line 1:154 no viable alternative at input 'PARTITION' (...("item_uuid") <= ? AND [PER] PARTITION...)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.createStatement(CassandraTableScanRDD.scala:293)

ERROR [2017-01-27 01:41:50,369] (main) Main: Spark transform failed.
org.apache.spark.sql.catalyst.parser.ParseException: 
mismatched input 'PARTITION' expecting <EOF>(line 1, pos 67)

== SQL ==
TOKEN(item_uuid) > TOKEN(13432d97-3849-4158-8405-804447d1b0c3) PER PARTITION LIMIT 1
-------------------------------------------------------------------^^^



ERROR [2017-01-27 04:27:31,265] (main) Main: Spark transform failed.
org.apache.spark.sql.catalyst.parser.ParseException: 
extraneous input ''' expecting {'(', 'SELECT', 'FROM', 'ADD', 'AS', 'ALL', 'DISTINCT', 'WHERE', 'GROUP', 'BY', 'GROUPING', 'SETS', 'CUBE', 'ROLLUP', 'ORDER', 'HAVING', 'LIMIT', 'AT', 'OR', 'AND', 'IN', NOT, 'NO', 'EXISTS', 'BETWEEN', 'LIKE', RLIKE, 'IS', 'NULL', 'TRUE', 'FALSE', 'NULLS', 'ASC', 'DESC', 'FOR', 'INTERVAL', 'CASE', 'WHEN', 'THEN', 'ELSE', 'END', 'JOIN', 'CROSS', 'OUTER', 'INNER', 'LEFT', 'SEMI', 'RIGHT', 'FULL', 'NATURAL', 'ON', 'LATERAL', 'WINDOW', 'OVER', 'PARTITION', 'RANGE', 'ROWS', 'UNBOUNDED', 'PRECEDING', 'FOLLOWING', 'CURRENT', 'ROW', 'WITH', 'VALUES', 'CREATE', 'TABLE', 'VIEW', 'REPLACE', 'INSERT', 'DELETE', 'INTO', 'DESCRIBE', 'EXPLAIN', 'FORMAT', 'LOGICAL', 'CODEGEN', 'CAST', 'SHOW', 'TABLES', 'COLUMNS', 'COLUMN', 'USE', 'PARTITIONS', 'FUNCTIONS', 'DROP', 'UNION', 'EXCEPT', 'INTERSECT', 'TO', 'TABLESAMPLE', 'STRATIFY', 'ALTER', 'RENAME', 'ARRAY', 'MAP', 'STRUCT', 'COMMENT', 'SET', 'RESET', 'DATA', 'START', 'TRANSACTION', 'COMMIT', 'ROLLBACK', 'MACRO', 'IF', '+', '-', '*', 'DIV', '~', 'PERCENT', 'BUCKET', 'OUT', 'OF', 'SORT', 'CLUSTER', 'DISTRIBUTE', 'OVERWRITE', 'TRANSFORM', 'REDUCE', 'USING', 'SERDE', 'SERDEPROPERTIES', 'RECORDREADER', 'RECORDWRITER', 'DELIMITED', 'FIELDS', 'TERMINATED', 'COLLECTION', 'ITEMS', 'KEYS', 'ESCAPED', 'LINES', 'SEPARATED', 'FUNCTION', 'EXTENDED', 'REFRESH', 'CLEAR', 'CACHE', 'UNCACHE', 'LAZY', 'FORMATTED', TEMPORARY, 'OPTIONS', 'UNSET', 'TBLPROPERTIES', 'DBPROPERTIES', 'BUCKETS', 'SKEWED', 'STORED', 'DIRECTORIES', 'LOCATION', 'EXCHANGE', 'ARCHIVE', 'UNARCHIVE', 'FILEFORMAT', 'TOUCH', 'COMPACT', 'CONCATENATE', 'CHANGE', 'CASCADE', 'RESTRICT', 'CLUSTERED', 'SORTED', 'PURGE', 'INPUTFORMAT', 'OUTPUTFORMAT', DATABASE, DATABASES, 'DFS', 'TRUNCATE', 'ANALYZE', 'COMPUTE', 'LIST', 'STATISTICS', 'PARTITIONED', 'EXTERNAL', 'DEFINED', 'REVOKE', 'GRANT', 'LOCK', 'UNLOCK', 'MSCK', 'REPAIR', 'RECOVER', 'EXPORT', 'IMPORT', 'LOAD', 'ROLE', 'ROLES', 'COMPACTIONS', 'PRINCIPALS', 'TRANSACTIONS', 'INDEX', 'INDEXES', 'LOCKS', 'OPTION', 'ANTI', 'LOCAL', 'INPATH', 'CURRENT_DATE', 'CURRENT_TIMESTAMP', STRING, BIGINT_LITERAL, SMALLINT_LITERAL, TINYINT_LITERAL, INTEGER_VALUE, DECIMAL_VALUE, SCIENTIFIC_DECIMAL_VALUE, DOUBLE_LITERAL, BIGDECIMAL_LITERAL, IDENTIFIER, BACKQUOTED_IDENTIFIER}(line 1, pos 36)

== SQL ==
SELECT item_uuid, time_series_date, 'item_uri FROM perPartitionLimitTests PER PARTITION LIMIT 1
------------------------------------^^^

    at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:197)
    at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:99)
    at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:45)
    at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:53)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:582)
    at Main.getSparkSqlDatasetPerPartitionLimitTest(Main.java:397)
    at Main.run(Main.java:177)
    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)

共有1个答案

曹伟泽
2023-03-14

不,有一个JIRA来添加这个特性https://datastax-oss.atlassian.net/browse/sparkc-446

我为这个添加了一个公关,如果你想测试

 类似资料:
  • 使用Spark连接器通过分区键查询cassandra的理想方法是什么。我使用传入键,但这导致cassandra在引擎盖下添加,从而导致超时。 当前设置: 这里是分区(不是主)键,我有一个复合主键,只使用分区键进行查询 更新:是的,我得到了一个异常:

  • 我想在我的spark rdd上做一个映射, 但是,这给了我一个已经关闭的连接异常,正如预期的那样,因为在控件到达之前,我的是关闭的。我想为每个RDD分区创建一个连接,并正确地关闭它。我如何实现这一点? 谢谢

  • 因此,我尝试使用Spark SQL进行以下查询('timestamp'是分区键): 虽然作业产生200个任务,但查询不会返回任何数据。 另外,我可以保证会返回数据,因为在cqlsh上运行查询(使用'token'函数进行适当的转换)确实会返回数据。 但不幸的是我不知道什么是“过滤器”...

  • 注意,这里是每个cassandra分区的限制,而不是每个spark分区的限制(连接器中现有的限制函数支持这一点)。 spark 2.0.1,连接器-2.0.0-M3

  • 我得到了一个错误:- 线程“main”java.lang.nosuchmethoderror:com.datastax.driver.core.queryoptions.setrefreshnodeintervalmillis(I)lcom/datastax/driver/core/queryoptions;**在com.datastax.spark.connector.cql.defaultCo

  • **dataframe2:从另一个来源获得的键的Dataframe(这些键是上表中ID列的分区键)-此表中不同键的数量约为0.15万** 现在,此代码总是导致“com.datastax.oss.driver.api.core.servererrors.ReadFailureException:在一致性LOCAL_ONE读取查询期间Cassandra失败(需要1个响应,但只有0个副本响应,1个失败)