当前位置: 首页 > 知识库问答 >
问题:

Pyspark内连接队列研究

邓子濯
2023-03-14

我试图建立一个队列研究来跟踪应用html" target="_blank">程序内的用户行为,我想问你是否知道当我使用.join()时如何在pyspark中指定条件:

rdd1 = sc.parallelize ([(u'6df99638e4584a618f92a9cfdf318cf8',
    ((u'service1',
      u'D8B75AA2-7408-49A7-A70D-6442C12E2B6A',
      u'2016-02-08',
      u'2016-39',
      u'2016-6',
      u'2016-2',
      '2016-10-19'),
     (u'service2',
      u'D8B75AA2-7408-49A7-A70D-6442C12E2B6A',
      u'1',
      u'67.0',
      u'2016-293',
      u'2016-42',
      u'2016-10',
      '2016-10-19')))])


rdd2 = sc.parallelize ([(u'6df99638e4584a618f92a9cfdf318cf8',
    ((u'serice1',
      u'D8B75AA2-7408-49A7-A70D-6442C12E2B6A',
      u'2016-02-08',
      u'2016-39',
      u'2016-6',
      u'2016-2',
      '2016-10-20'),
     (u'service2',
      u'D8B75AA2-7408-49A7-A70D-6442C12E2B6A',
      u'10',
      u'3346.0',
      u'2016-294',
      u'2016-42',
      u'2016-10',
      '2016-10-20')))])

这两个rdds表示有关用户的信息,其ID为'6df99638e4584a618f92a9cfdf318cf8',并在2016-10-19和2016-10-20登录到服务1和服务2。我的objectif是连接我的两个RDD,每一个至少包含20,000行。所以它必须是一个内部联接。真正的目的是获取所有已经在2016-10-19'登录并且在2016-10-20登录的用户。所以更具体地说,我的最后一个对象是,在内部联接之后,只有RDD2的内容,这里是rxemple的结果。

预期产出:

    [(u'6df99638e4584a618f92a9cfdf318cf8',
((u'serice1', u'D8B75AA2-7408-49A7-A70D-6442C12E2B6A', u'2016-02-08', u'2016-39', u'2016-6', u'2016-2', '2016-10-20'), 
(u'service2', u'D8B75AA2-7408-49A7-A70D-6442C12E2B6A', u'10', u'3346.0', u'2016-294', u'2016-42', u'2016-10', '2016-10-20'))
) ] 
rdd1.map(lambda (k, v) : k).join(rdd2)

这段代码给了我一个空的RDD。

怎么办?PS:我必须处理RDD,而不是数据副本!所以我不想将我的rdds转换为DataFrames:D任何帮助都很感激。Thx!

共有1个答案

翁烨霖
2023-03-14

因此,您正在寻找rdd1和rdd2的联接,它将只从rdd2中获取键和值:

rdd_output = rdd1.join(rdd2).map(lambda (k,(v1,v2)):(k,v2))

结果是:

print rdd_output.take(1)

[(u'6df99638e4584a618f92a9cfdf318cf8', (
(u'serice1', u'D8B75AA2-7408-49A7-A70D-6442C12E2B6A', u'2016-02-08', u'2016-39', u'2016-6', u'2016-2', '2016-10-20'), 
(u'service2', u'D8B75AA2-7408-49A7-A70D-6442C12E2B6A', u'10', u'3346.0', u'2016-294', u'2016-42', u'2016-10', '2016-10-20')
))]
 类似资料:
  • 我有一个pyspark数据帧(df1 ),它由10K行组成,数据帧看起来像- 另一个pyspark数据帧(df2)由100k记录组成,看起来像- 我想使用pyspark内连接,最终的数据帧看起来像- df2中mobile_no的长度是12,但df1中是10。我可以加入它,但这是昂贵的操作。使用pyspark有帮助吗?

  • 假设我在Spark上有两个数据帧 现在,我想通过多个列(任何大于1的数字)连接它们 我拥有的是第一个数据帧的列数组和第二个数据帧中的列数组,这些数组具有相同的大小,我希望通过这些数组中指定的列进行连接。例如: 因为这些数组大小可变,所以我不能使用这种方法: 是否有任何方法可以动态连接多个列?

  • 我无法连接到具有SEDA队列的骆驼路线。在服务器端,我有以下配置: 我正在尝试从这样的独立客户端访问此路由: 但我的制作人无法连接到seda队列。无法按我的路线排队。无法在我的bean属性中添加camelContext。我正在获取“bean类的属性'camelContext'无效”。如果我将正文发送到SEDA队列,则消息将发送到那里,但不会发送到路由的下一个元素

  • 我正在开发我的第一个Azure实现,我已经设置了我的Azure帐户,并且我使用NuGet为我的应用程序安装了正确的DLL和配置。当我将我的WCF客户端设置为指向服务总线队列并运行该方法时,会出现以下异常: 微软。ServiceBus.服务器错误 我的endpoint配置是: 我的行为是: 这个错误消息非常通用,我不确定我应该首先查看哪个位置

  • 我在Weblogic中创建了以下内容:MyJMSServer持久存储:目标:域当前服务器:域 创建的TestModule具有:ConnectionFactory类型:连接工厂JNDI名称:JNDIConnectionFactory子部署:TestSubdeployment目标:MyJMSServer 队列类型:队列JNDI名称:JNDI队列子部署:TestSubdeployment目标:MyJMS

  • 我有两个具有大量(几百万到几千万)行的数据帧。我想为他们牵线搭桥。 在我目前使用的BI系统中,您可以通过首先对特定键进行分区,然后在该键上进行连接来快速完成此操作。 这是我在Spark中需要遵循的模式吗,或者这并不重要?乍一看,在分区之间转移数据似乎浪费了很多时间,因为没有正确地进行预分区。 如果有必要,我该怎么做?