当前位置: 首页 > 知识库问答 >
问题:

将Spark streaming PySpark dataframe写入Cassandra会覆盖表而不是追加

仲俊豪
2023-03-14

我正在运行一个由Kafka、Spark和Cassandra组成的1节点集群。全部本地在同一台机器上。

从一个简单的Python脚本中,我每5秒将一些虚拟数据流到一个Kafka主题中。然后使用Spark结构化流,我将这个数据流(一次一行)读入PySpark DataFrame中,并使用startingoffset=lates。最后,我尝试将此行追加到一个已经存在的Cassandra表中。

我一直在关注(如何向Cassandra编写流数据集?)和(来自Kafka主题的PySpark结构流的Cassandra Sink)。

有一行数据被成功写入Cassandra表,但我的问题是它每次都被覆盖,而不是追加到表的末尾。我可能做错了什么?

下面是我的代码:

CQL DDL用于在Cassandra中创建KafKasparkkeyspace,后跟RandintStream表:

DESCRIBE keyspaces;

CREATE KEYSPACE kafkaspark
  WITH REPLICATION = { 
   'class' : 'SimpleStrategy', 
   'replication_factor' : 1 
  };
  
USE kafkaspark; 

CREATE TABLE randIntStream (
    key int,
    value int,
    topic text,
    partition int,
    offset bigint,
    timestamp timestamp,
    timestampType int,
    PRIMARY KEY (partition, topic)
);
./bin/pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1,com.datastax.spark:spark-cassandra-connector_2.12:3.0.0 --conf spark.cassandra.connection.host=127.0.0.1,spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions
df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("startingOffsets","latest").option("subscribe","topic1").load()
df2 = df.withColumn("key", df["key"].cast("string")).withColumn("value", df["value"].cast("string"))
df3 = df2.withColumn("key", df2["key"].cast("integer")).withColumn("value", df2["value"].cast("integer"))
df4 = df3.withColumnRenamed("timestampType","timestamptype")
df4.printSchema()
def writeToCassandra(writeDF, epochId):
    writeDF.write \
    .format("org.apache.spark.sql.cassandra") \
    .options(table="randintstream", keyspace="kafkaspark") \
    .mode("append") \
    .save()

最后,从Spark向Cassandra写查询:

query = df4.writeStream \
.trigger(processingTime="5 seconds") \
.outputMode("update") \
.foreachBatch(writeToCassandra) \
.start()

共有1个答案

养鸿运
2023-03-14

如果行总是在Cassandra中重写,那么表中的主键可能不正确--您需要确保每一行都有唯一的主键。如果您是从Spark创建Cassandra表,那么默认情况下它只是将第一列作为分区键,而且它可能不是唯一的。

提供架构后更新:

是的,这就是我提到的情况-主键为(partition,topic),但是从该主题读取的特定分区中的每一行都具有相同的主键值,因此它将覆盖以前的版本。您需要使主键唯一-例如,将offsettimestamp列添加到主键(尽管如果在同一毫秒内生成数据,timestamp可能不唯一)。

df4.writeStream \
  .trigger(processingTime="5 seconds") \
  .format("org.apache.spark.sql.cassandra") \
  .options(table="randintstream", keyspace="kafkaspark") \
  .mode("update") \
  .start()
 类似资料:
  • 我有一个方法应该用新内容覆盖当前文件,但是FileWriter()只是附加新内容,而不是覆盖旧内容。 这是我的FileWriter的设置方式 这是保存方法 当我运行此方法时会出现问题,它没有覆盖文件,而是将新内容附加到文件中。 我想要的: 发生了什么:

  • 问题内容: 我正在尝试编写一种方法(如果尚不存在),然后制作一个“ log.txt文件”,然后将其写入该文件。我遇到的问题是每次调用该方法时,它都会覆盖现有的日志。如何更改方法,以使它不会覆盖数据而是仅更新文件? 我的写入文件方法: 问题答案: 只需更改为

  • 问题内容: 下面的代码是我到目前为止所拥有的。当它写入.csv文件时,它将覆盖我之前在文件中写入的内容。如何以不擦除我之前的文本的方式写入文件(我的代码的目的是要有一个人输入他们的名字,让程序记住他们) 问题答案: 您需要在下次添加文件。这可以通过以追加模式打开文件来完成。

  • 问题内容: 我有以下代码: 我想用新内容替换文件中的旧内容。但是,当我执行代码时,将附加文件“ test.xml”,即,我的旧内容被新的“替换”内容所取代。为了删除旧内容而只保留新内容,我该怎么办? 问题答案: 您需要先写入文件的开头,然后再使用(如果要进行就地替换): 另一种方法是读取文件,然后使用再次打开它: 无论是也将改变inode的文件的数量(我测试过两次,一次是与Ubuntu 12.04

  • 我正在使用为我的两个测试生成报告。我正在从事selenium、java、testng、cucumber和maven项目。我的cucumberOptions中有两个TestRunner文件,我使用。当我只运行一个TestRunner时,数据块报告会按预期生成。但是,当我一个接一个地运行两个TestRunner时,第二个测试生成的区段报告将覆盖第一个测试生成的区段报告。如何将第二个测试生成的Exten

  • 问题内容: 更新:Python 3.7+可确保保留插入顺序的字典 我想使用.py文件,例如配置文件。因此,使用这种表示法,我可以使用字符串作为键来创建字典,但是在标准的python字典中定义顺序会丢失。 我的问题:是否可以重写表示法,以便我得到而不是? 我希望仅使用OrderedDict()覆盖dict构造函数即可,但事实并非如此。 例如: 输出: 问题答案: 这几乎可以为您提供所需的语法: 编辑