当前位置: 首页 > 知识库问答 >
问题:

使用python UDF和CqlStorage处理Cassandra - pass列表对象

谷梁弘深
2023-03-14

我正在处理一个数据流,包括Pig中的一些聚合步骤和将步骤存储到Cassandra中。我已经能够传递相对简单的数据类型,例如integer、long或dates,但无法找到如何使用CqlStorage将某种列表、集合或元组从Pig传递到Cassandra。

我使用 Pig 0.9.2,所以我不能使用 FLATTEN 方法。

如何从Pig 0.9.2中填充带有复杂数据类型(如集合或列表)的Cassandra表?

> < li>

我根据描述创建了相应的Cassandra表:

CREATE TABLE mycassandracf (
my_id int,
date timestamp,
my_count bigint,
grouped_ids list<bigint>,
PRIMARY KEY (my_id, date)); 

以及携带准备好的声明的STORE指令:

STORE CassandraAggregate
INTO 'cql://test/mycassandracf?output_query=UPDATE+test.mycassandracf+set+my_count+%3D+%3F%2C+grouped_ids+%3D+%3F'
USING CqlStorage;

从“GROUP BY”关系中,我以cql友好的格式(例如在元组中)“生成”一个关系,我想将其存储到Cassandra中。

CassandraAggregate = FOREACH GroupedRelation
    GENERATE TOTUPLE(TOTUPLE('my_id', $0.my_id),
    TOTUPLE('date', ISOToUnix($0.createdAt))),
    TOTUPLE(COUNT($1), $1.grouped_id);

DUMP CassandraAggregate;

(((my_id,30021),(date,1357084800000)),(2,{(60128490006325819),(62726281032786005)}))
(((my_id,30165),(date,1357084800000)),(1,{(60128411174143024)}))
(((my_id,30376),(date,1357084800000)),(4,{(60128411146211875),(63645100121476995),(60128411146211875),(63645100121476995)}))

不出所料,在此关系上使用STORE指令会引发异常:

java.lang.ClassCastException: org.apache.pig.data.DefaultDataBag 不能被投射到 org.apache.pig.data.data.DataByteArray

因此,我添加了一个用python编写的UDF,以在grouped_id袋上应用一些扁平化:

@outputSchema("flat_bag:bag{}")
def flattenBag(bag):
    return tuple([long(item) for tup in bag for item in tup])

我使用tuple是因为使用python集和python列表最终会导致转换错误。

将其添加到我的管道中,我有:

CassandraAggregate = FOREACH GroupedRelation
    GENERATE TOTUPLE(TOTUPLE('my_id', $0.my_id),
    TOTUPLE('date', ISOToUnix($0.createdAt))),
    TOTUPLE(COUNT($1), py_f.flattenBag($1.grouped_id));

DUMP CassandraAggregate;

(((my_id,30021),(date,1357084800000)),(2,(60128490006325819,62726281032786005)))
(((my_id,31120),(date,1357084800000)),(1,(60128411174143024)))
(((my_id,31120),(date,1357084800000)),(1,(60128411146211875,63645100121476995,6012841114621187563645100121476995)))

对最后一个关系使用 STORE 指令会引发错误堆栈的异常:

java.io.IOException: java.io.IOException: org.apache.thrift.transport.TTransportException
at     org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.runPipeline(PigGenericMapReduce.java:465)
at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.processOnePackageOutput(PigGenericMapReduce.java:428)
at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.reduce(PigGenericMapReduce.java:408)
at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.reduce(PigGenericMapReduce.java:262)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176)
at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:652)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:420)
at org.apache.hadoop.mapred.Child$4.run(Child.java:266)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
at org.apache.hadoop.mapred.Child.main(Child.java:260)
Caused by: java.io.IOException: org.apache.thrift.transport.TTransportException
at org.apache.cassandra.hadoop.cql3.CqlRecordWriter$RangeClient.run(CqlRecordWriter.java:248)
Caused by: org.apache.thrift.transport.TTransportException
at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
at org.apache.thrift.transport.TFramedTransport.readFrame(TFramedTransport.java:129)
at org.apache.thrift.transport.TFramedTransport.read(TFramedTransport.java:101)
at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
at org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:378)
at org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:297)
at org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:204)
at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69)
at org.apache.cassandra.thrift.Cassandra$Client.recv_execute_prepared_cql3_query(Cassandra.java:1724)
at org.apache.cassandra.thrift.Cassandra$Client.execute_prepared_cql3_query(Cassandra.java:1709)
at org.apache.cassandra.hadoop.cql3.CqlRecordWriter$RangeClient.run(CqlRecordWriter.java:232)

我用简单的数据类型测试了完全相同的工作流程,并且工作正常。我真正想要的是用复杂的类型(例如Pig的集合或列表)填充卡桑德拉表的方法。

非常感谢

共有1个答案

罗波鸿
2023-03-14

经过进一步调查,我在这里找到了解决方案:

https://issues.apache.org/jira/browse/CASSANDRA-5867

基本上,CqlStorage支持复杂类型。为此,类型应该由元组中的元组来表示,将数据类型作为字符串作为第一个元素。对于list来说,这是一种方法:

# python
@outputSchema("flat_bag:bag{}")
def flattenBag(bag):
    return ('list',) + tuple([long(item) for tup in bag for item in tup])

因此,在咕噜声中:

# pig
CassandraAggregate = FOREACH GroupedRelation
    GENERATE TOTUPLE(TOTUPLE('my_id', $0.my_id),
    TOTUPLE('date', ISOToUnix($0.createdAt))),
    TOTUPLE(COUNT($1), py_f.flattenBag($1.grouped_id));

DUMP CassandraAggregate;

(((my_id,30021),(date,1357084800000)),(2,(list, 60128490006325819,62726281032786005)))
(((my_id,31120),(date,1357084800000)),(1,(list, 60128411174143024)))
(((my_id,31120),(date,1357084800000)),(1,(list, 60128411146211875,63645100121476995,6012841114621187563645100121476995)))

然后使用经典的编码准备语句将其存储到cassandra中。

希望这将有所帮助。

 类似资料:
  • 假设数组如下所示: 数组中最多可以有100.000个值。 另一方面,如果我这样做: 我得到serialization异常,因为spark正在尝试序列化spark上下文,而spark上下文是不可序列化的。 如何使这个工作,但仍然利用并行性。 这是我得到的咒语:

  • 我将数据类型定义为: 现在,就目前而言,我有一个持久模型定义为: 我可以使用Esqueleto很容易地创建一个查询来填充委托视图。它会是这样的: 现在,考虑填充的问题。原则上,我们通过在上面的查询中运行子查询来获得足够的数据来填充。好的,很公平。现在如何在SQL中使用“group by Haskell-list”,如?如何折叠行,以便最终获得人员列表列表? 我的印象是无法处理这种情况(即它没有可以

  • 编辑:我已经更改了模式,以便做出一些澄清。 每天都会为当天创建一个新表。所以一个表只包含一天的日志。 我的查询条件如下。 查询特定用户在特定日期(日期而不是时间)的所有日志。 因此原因、项目、价格和计数根本不会用作查询的提示或条件。

  • 我正在尝试进行大量的外部服务调用,每个调用都遵循异常处理和有条件的进一步处理。我认为使用内部的. on完成来扩展这个不错的(Scala中带有期货的异步IO)示例会很容易,但似乎我对范围和/或期货有些不理解。有人能给我指出正确的方向吗? 在我的电脑上(Scala 2.10.4 ),这打印出来: 我要(顺序不重要):

  • 编辑:对于的最后一行,还可以放入:

  • 我试图使用Apache Spark来处理我的大型(230K条目)cassandra数据集,但我经常遇到不同类型的错误。然而,我可以成功地运行应用程序时,运行在一个数据集约200个条目。我有一个由3个节点和1个主节点和2个工作节点组成的spark设置,这两个工作节点还安装了一个cassandra集群,该集群的数据索引复制系数为2。我的两个spark workers在web界面上显示2.4和2.8GB