当前位置: 首页 > 知识库问答 >
问题:

火花流后立即使用火花RDD过滤器

闻人杰
2023-03-14

我正在使用火花流,我从Kafka读取流。阅读此流后,我将其添加到hazelcast地图中。

问题是,我需要在读取Kafka的流之后立即从地图中过滤值。

我正在使用下面的代码来并行化地图值。

List<MyCompObj> list = CacheManager.getInstance().getMyMap().values().stream().collect(Collectors.toList());
JavaRDD<MyCompObj> myObjRDD = sparkContext.parallelize(list);

但在这个逻辑中,我在另一个逻辑中使用JavaRDD,即JavaInputDStream.foreachRDD,这会导致序列化问题。

第一个问题是,如何通过事件驱动来运行spark作业?

另一方面,我只是想得到一些关于计划火花工作的意见。安排spark作业在特定时间执行的最佳做法是什么?

共有2个答案

郝杰
2023-03-14

我通过将流式处理和批处理分为两部分来解决我的问题。

我正在使用石英和SparkLauncher触发新作业(示例)

充培
2023-03-14

你考虑过Hazelcast Jet吗?它是Hazelcast构建的流式计算引擎。将数据从Kafka持续泵送到Hazelcast是一种容错的方式,是它的谋生手段,请参阅代码示例

Jet采用嵌入式Hazelcast,为您简化了架构。

 类似资料:
  • 我想了解以下关于火花概念的RDD的事情。 > RDD仅仅是从HDFS存储中复制某个节点RAM中的所需数据以加快执行的概念吗? 如果一个文件在集群中被拆分,那么对于单个flie来说,RDD从其他节点带来所有所需的数据? 如果第二点是正确的,那么它如何决定它必须执行哪个节点的JVM?数据局部性在这里是如何工作的?

  • CreateDataFrame接受2个参数,一个rdd和模式。 我的图式是这样的 <代码>val schemas=结构类型(Seq(StructField(“number”,IntegerType,false),StructField(“notation”,StringType,false))) 在一种情况下,我能够从RDD创建数据帧,如下所示: 在以下其他情况下。。我不能 data2不能成为Da

  • 本质上,我想对dStream中的每个元素应用一组函数。目前,我正在为pyspark.streaming.dstream使用“map”函数。根据文档,我的方法似乎是正确的。http://spark.apache.org/docs/latest/api/python/pyspark.streaming.html#pyspark.streaming.dstream map(f,preservesPart

  • 我正在尝试使用python库Tweepy来传输twitter数据。我设置了工作环境,谷歌了一下这些东西,但是我不知道它们是如何工作的。我想在python (tweepy)中使用spark streaming(DStream-Batch processing)。我至少经历了以下环节: < li >如何获取tweepy中某个位置的特定标签的推文? < Li > http://spark . Apach

  • 我试图从聚合原理的角度来理解火花流。Spark DF 基于迷你批次,计算在特定时间窗口内出现的迷你批次上完成。 假设我们有数据作为- 然后首先对Window_period_1进行计算,然后对Window_period_2进行计算。如果我需要将新的传入数据与历史数据一起使用,比如说Window_priod_new与Window_pperid_1和Window_perid_2的数据之间的分组函数,我该

  • 我有一个要求,火花UDF必须超载,我知道UDF超载是不支持火花。因此,为了克服spark的这一限制,我尝试创建一个接受任何类型的UDF,它在UDF中找到实际的数据类型,并调用相应的方法进行计算并相应地返回值。这样做时,我得到一个错误 以下是示例代码: 有可能使上述要求成为可能吗?如果没有,请建议我一个更好的方法。 注:Spark版本-2.4.0