我试图建立一个队列研究来跟踪应用html" target="_blank">程序内的用户行为,我想问你是否知道当我使用.join()时如何在pyspark中指定条件:
rdd1 = sc.parallelize ([(u'6df99638e4584a618f92a9cfdf318cf8',
((u'service1',
u'D8B75AA2-7408-49A7-A70D-6442C12E2B6A',
u'2016-02-08',
u'2016-39',
u'2016-6',
u'2016-2',
'2016-10-19'),
(u'service2',
u'D8B75AA2-7408-49A7-A70D-6442C12E2B6A',
u'1',
u'67.0',
u'2016-293',
u'2016-42',
u'2016-10',
'2016-10-19')))])
rdd2 = sc.parallelize ([(u'6df99638e4584a618f92a9cfdf318cf8',
((u'serice1',
u'D8B75AA2-7408-49A7-A70D-6442C12E2B6A',
u'2016-02-08',
u'2016-39',
u'2016-6',
u'2016-2',
'2016-10-20'),
(u'service2',
u'D8B75AA2-7408-49A7-A70D-6442C12E2B6A',
u'10',
u'3346.0',
u'2016-294',
u'2016-42',
u'2016-10',
'2016-10-20')))])
这两个rdds表示有关用户的信息,其ID为'6df99638e4584a618f92a9cfdf318cf8',并在2016-10-19和2016-10-20登录到服务1和服务2。我的objectif是连接我的两个RDD,每一个至少包含20,000行。所以它必须是一个内部联接。真正的目的是获取所有已经在2016-10-19'登录并且在2016-10-20登录的用户。所以更具体地说,我的最后一个对象是,在内部联接之后,只有RDD2的内容,这里是rxemple的结果。
预期产出:
[(u'6df99638e4584a618f92a9cfdf318cf8',
((u'serice1', u'D8B75AA2-7408-49A7-A70D-6442C12E2B6A', u'2016-02-08', u'2016-39', u'2016-6', u'2016-2', '2016-10-20'),
(u'service2', u'D8B75AA2-7408-49A7-A70D-6442C12E2B6A', u'10', u'3346.0', u'2016-294', u'2016-42', u'2016-10', '2016-10-20'))
) ]
rdd1.map(lambda (k, v) : k).join(rdd2)
这段代码给了我一个空的RDD。
怎么办?PS:我必须处理RDD,而不是数据副本!所以我不想将我的rdds转换为DataFrames:D任何帮助都很感激。Thx!
因此,您正在寻找rdd1和rdd2的联接,它将只从rdd2中获取键和值:
rdd_output = rdd1.join(rdd2).map(lambda (k,(v1,v2)):(k,v2))
结果是:
print rdd_output.take(1)
[(u'6df99638e4584a618f92a9cfdf318cf8', (
(u'serice1', u'D8B75AA2-7408-49A7-A70D-6442C12E2B6A', u'2016-02-08', u'2016-39', u'2016-6', u'2016-2', '2016-10-20'),
(u'service2', u'D8B75AA2-7408-49A7-A70D-6442C12E2B6A', u'10', u'3346.0', u'2016-294', u'2016-42', u'2016-10', '2016-10-20')
))]
我有一个pyspark数据帧(df1 ),它由10K行组成,数据帧看起来像- 另一个pyspark数据帧(df2)由100k记录组成,看起来像- 我想使用pyspark内连接,最终的数据帧看起来像- df2中mobile_no的长度是12,但df1中是10。我可以加入它,但这是昂贵的操作。使用pyspark有帮助吗?
假设我在Spark上有两个数据帧 现在,我想通过多个列(任何大于1的数字)连接它们 我拥有的是第一个数据帧的列数组和第二个数据帧中的列数组,这些数组具有相同的大小,我希望通过这些数组中指定的列进行连接。例如: 因为这些数组大小可变,所以我不能使用这种方法: 是否有任何方法可以动态连接多个列?
我无法连接到具有SEDA队列的骆驼路线。在服务器端,我有以下配置: 我正在尝试从这样的独立客户端访问此路由: 但我的制作人无法连接到seda队列。无法按我的路线排队。无法在我的bean属性中添加camelContext。我正在获取“bean类的属性'camelContext'无效”。如果我将正文发送到SEDA队列,则消息将发送到那里,但不会发送到路由的下一个元素
我正在开发我的第一个Azure实现,我已经设置了我的Azure帐户,并且我使用NuGet为我的应用程序安装了正确的DLL和配置。当我将我的WCF客户端设置为指向服务总线队列并运行该方法时,会出现以下异常: 微软。ServiceBus.服务器错误 我的endpoint配置是: 我的行为是: 这个错误消息非常通用,我不确定我应该首先查看哪个位置
我在Weblogic中创建了以下内容:MyJMSServer持久存储:目标:域当前服务器:域 创建的TestModule具有:ConnectionFactory类型:连接工厂JNDI名称:JNDIConnectionFactory子部署:TestSubdeployment目标:MyJMSServer 队列类型:队列JNDI名称:JNDI队列子部署:TestSubdeployment目标:MyJMS
我有两个具有大量(几百万到几千万)行的数据帧。我想为他们牵线搭桥。 在我目前使用的BI系统中,您可以通过首先对特定键进行分区,然后在该键上进行连接来快速完成此操作。 这是我在Spark中需要遵循的模式吗,或者这并不重要?乍一看,在分区之间转移数据似乎浪费了很多时间,因为没有正确地进行预分区。 如果有必要,我该怎么做?