我正在运行一个由Kafka、Spark和Cassandra组成的1节点集群。全部本地在同一台机器上。
从一个简单的Python脚本中,我每5秒将一些虚拟数据流到一个Kafka主题中。然后使用Spark结构化流,我将这个数据流(一次一行)读入PySpark DataFrame中,并使用startingoffset
=lates
。最后,我尝试将此行追加到一个已经存在的Cassandra表中。
我一直在关注(如何向Cassandra编写流数据集?)和(来自Kafka主题的PySpark结构流的Cassandra Sink)。
有一行数据被成功写入Cassandra表,但我的问题是它每次都被覆盖,而不是追加到表的末尾。我可能做错了什么?
下面是我的代码:
CQL DDL用于在Cassandra中创建KafKaspark
keyspace,后跟RandintStream
表:
DESCRIBE keyspaces;
CREATE KEYSPACE kafkaspark
WITH REPLICATION = {
'class' : 'SimpleStrategy',
'replication_factor' : 1
};
USE kafkaspark;
CREATE TABLE randIntStream (
key int,
value int,
topic text,
partition int,
offset bigint,
timestamp timestamp,
timestampType int,
PRIMARY KEY (partition, topic)
);
./bin/pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1,com.datastax.spark:spark-cassandra-connector_2.12:3.0.0 --conf spark.cassandra.connection.host=127.0.0.1,spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions
df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("startingOffsets","latest").option("subscribe","topic1").load()
df2 = df.withColumn("key", df["key"].cast("string")).withColumn("value", df["value"].cast("string"))
df3 = df2.withColumn("key", df2["key"].cast("integer")).withColumn("value", df2["value"].cast("integer"))
df4 = df3.withColumnRenamed("timestampType","timestamptype")
df4.printSchema()
def writeToCassandra(writeDF, epochId):
writeDF.write \
.format("org.apache.spark.sql.cassandra") \
.options(table="randintstream", keyspace="kafkaspark") \
.mode("append") \
.save()
最后,从Spark向Cassandra写查询:
query = df4.writeStream \
.trigger(processingTime="5 seconds") \
.outputMode("update") \
.foreachBatch(writeToCassandra) \
.start()
如果行总是在Cassandra中重写,那么表中的主键可能不正确--您需要确保每一行都有唯一的主键。如果您是从Spark创建Cassandra表,那么默认情况下它只是将第一列作为分区键,而且它可能不是唯一的。
提供架构后更新:
是的,这就是我提到的情况-主键为(partition,topic)
,但是从该主题读取的特定分区中的每一行都具有相同的主键值,因此它将覆盖以前的版本。您需要使主键唯一-例如,将offset
或timestamp
列添加到主键(尽管如果在同一毫秒内生成数据,timestamp
可能不唯一)。
df4.writeStream \
.trigger(processingTime="5 seconds") \
.format("org.apache.spark.sql.cassandra") \
.options(table="randintstream", keyspace="kafkaspark") \
.mode("update") \
.start()
我有一个方法应该用新内容覆盖当前文件,但是FileWriter()只是附加新内容,而不是覆盖旧内容。 这是我的FileWriter的设置方式 这是保存方法 当我运行此方法时会出现问题,它没有覆盖文件,而是将新内容附加到文件中。 我想要的: 发生了什么:
问题内容: 我正在尝试编写一种方法(如果尚不存在),然后制作一个“ log.txt文件”,然后将其写入该文件。我遇到的问题是每次调用该方法时,它都会覆盖现有的日志。如何更改方法,以使它不会覆盖数据而是仅更新文件? 我的写入文件方法: 问题答案: 只需更改为
问题内容: 下面的代码是我到目前为止所拥有的。当它写入.csv文件时,它将覆盖我之前在文件中写入的内容。如何以不擦除我之前的文本的方式写入文件(我的代码的目的是要有一个人输入他们的名字,让程序记住他们) 问题答案: 您需要在下次添加文件。这可以通过以追加模式打开文件来完成。
问题内容: 我有以下代码: 我想用新内容替换文件中的旧内容。但是,当我执行代码时,将附加文件“ test.xml”,即,我的旧内容被新的“替换”内容所取代。为了删除旧内容而只保留新内容,我该怎么办? 问题答案: 您需要先写入文件的开头,然后再使用(如果要进行就地替换): 另一种方法是读取文件,然后使用再次打开它: 无论是也将改变inode的文件的数量(我测试过两次,一次是与Ubuntu 12.04
我正在使用为我的两个测试生成报告。我正在从事selenium、java、testng、cucumber和maven项目。我的cucumberOptions中有两个TestRunner文件,我使用。当我只运行一个TestRunner时,数据块报告会按预期生成。但是,当我一个接一个地运行两个TestRunner时,第二个测试生成的区段报告将覆盖第一个测试生成的区段报告。如何将第二个测试生成的Exten
问题内容: 更新:Python 3.7+可确保保留插入顺序的字典 我想使用.py文件,例如配置文件。因此,使用这种表示法,我可以使用字符串作为键来创建字典,但是在标准的python字典中定义顺序会丢失。 我的问题:是否可以重写表示法,以便我得到而不是? 我希望仅使用OrderedDict()覆盖dict构造函数即可,但事实并非如此。 例如: 输出: 问题答案: 这几乎可以为您提供所需的语法: 编辑