我正在使用spark(批处理,而不是流)从kafka topic中读取数据来创建spark dataframe。我想使用spark将这个数据帧加载到cassandra。Dataframe是字符串格式,如下所示。
root |-value:string(nullable = true)
+--------------------+
|value |
+--------------------+
|"1,Visa,6574" |
|"3,Visa,6574" |
|"4,MasterCard,6574" |
|"5,MasterCard,6574" |
|"8,Maestro,8372" |
+--------------------+
我尝试使用','分隔符拆分数据帧记录,并形成新的数据帧,我可以将其数据到cassandra。
创建了如下的火花DF。
df = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS_CONS) \
.option("subscribe", KAFKA_TOPIC_NAME_CONS) \
.option("startingOffsets", "earliest") \
.load()
df2=df.selectExpr("CAST(value AS STRING)")
df2.printSchema()
我尝试使用','拆分数据。
split_col=split(df2['value'],',')
df3=df2.withColumn('Name1',split_col.getItem(0))
df3=df2.withColumn('Name2',split_col.getItem(1))
df3=df2.withColumn('Name3',split_col.getItem(2))
上面的代码没有给出预期的结果,我觉得
根| -- 值: 字符串 (可以为空 = 真) | -- 名称3: 字符串 (可以为空 = 真)
+-------------------+-----+
|value |Name3|
+-------------------+-----+
|"1,Visa,6574" |6574"|
|"3,Visa,6574" |6574"|
|"4,MasterCard,6574"|6574"|
|"5,MasterCard,6574"|6574"|
|"8,Maestro,8372" |8372"|
+-------------------+-----+
我想得到如下的put:
+-------------------+----------+------+
|Name1 |Name2 |Name3 |
+-------------------+----------+------+
| 1 |Visa |6574 |
| 3 |Visa |6574 |
| 4 |MasterCard|6574 |
| 5 |MasterCard|6574 |
| 8 |Maestro |8372 |
+-------------------+----------+------+
请帮忙!!
您的解决方案完全可以。唯一的问题是在执行拆分并用于下一步之后,df2
和df3
的赋值。完成第一次拆分后,您分配给df3
,但对于后续拆分,您只使用df2
。因此,只有第三个split语句被spark评估。
解决方案明智,要么在最后一次拆分之前不要分配给新变量
df3 = df2.withColumn('Name1', f.split('value', ',').getItem(0)).\
withColumn('Name2', f.split('value', ',').getItem(1)).\
withColumn('Name3', f.split('value', ',').getItem(2))
df3.show()
+-----------------+-----+----------+-----+
| value|Name1| Name2|Name3|
+-----------------+-----+----------+-----+
| 1,Visa,6574| 1| Visa| 6574|
| 3,Visa,6574| 3| Visa| 6574|
|4,MasterCard,6574| 4|MasterCard| 6574|
|5,MasterCard,6574| 5|MasterCard| 6574|
| 8,Maestro,8372| 8| Maestro| 8372|
+-----------------+-----+----------+-----+
或者在下一个split中使用赋值变量(除非必要,否则不鼓励使用这种方式)
df3 = df2.withColumn('Name1', f.split('value', ',').getItem(0))
df3 = df3.withColumn('Name2', f.split('value', ',').getItem(1))
df3 = df3.withColumn('Name3', f.split('value', ',').getItem(2))
df3.show()
+-----------------+-----+----------+-----+
| value|Name1| Name2|Name3|
+-----------------+-----+----------+-----+
| 1,Visa,6574| 1| Visa| 6574|
| 3,Visa,6574| 3| Visa| 6574|
|4,MasterCard,6574| 4|MasterCard| 6574|
|5,MasterCard,6574| 5|MasterCard| 6574|
| 8,Maestro,8372| 8| Maestro| 8372|
+-----------------+-----+----------+-----+
问题内容: 我目前正在使用MYSQL中的函数,我有另一个表中的逗号分隔字符串(1,22,344,55),如何在MYSQL中将其拆分为数组(不是temp_table)。另外,MYSQL中有类似的函数可以执行foreach()吗? 问题答案: MySQL不包含拆分定界字符串的函数。但是,创建自己的函数非常容易。 用法 从这里:http : //blog.fedecarg.com/2009/02/22/
如何使用Spark-Scala连接日期和时间列(两个字符串)
我正在Spark 3.0.0上执行Spark结构流的示例,为此,我使用了twitter数据。我在Kafka中推送了twitter数据,单个记录如下所示 2020-07-21 10:48:19|1265200268284588034|RT@narendramodi:与@IBM首席执行官@ArvindKrishna先生进行了广泛的互动。我们讨论了几个与技术相关的主题,…|印度海得拉巴 在这里,每个字段
问题 你想拆分一个字符串。 解决方案 使用 JavaScript 字符串的 split() 方法: "foo bar baz".split " " # => [ 'foo', 'bar', 'baz' ] 讨论 String 的这个 split() 方法是标准的 JavaScript 方法。可以用来基于任何分隔符——包括正则表达式来拆分字符串。这个方法还可以接受第二个参数,用于指定返回的子字符串数
问题内容: 我正在尝试找到一种将String拆分为String数组的方法,并且每当遇到白色香料时就需要对其进行拆分,例如 “嗨,我是保罗” 进入” “嗨”“我”“保罗” 如何使用RegularExpression在split()方法中表示空格? 问题答案: 您需要一个正则表达式,例如,这意味着: 每当遇到至少一个空格时就进行拆分 。完整的Java代码是:
显示字符串: 先看看下面这几行语句在RGSS中的效果: p "这将会显示双引号" p '这也会显示双引号' print "这不会显示双引号" print '这也不会显示双引号' 把上面的四行语句复制下来,然后在我们刚刚建立好的Test脚本中粘贴,粘贴前最好把Test脚本的内容清除掉,我们只需要测试我们现在的代码。好,运行游戏,看看效果吧。 首先,看得出来,用来输出显示的方法又多了一种:print