当前位置: 首页 > 知识库问答 >
问题:

Ignite嵌入式TCP SPI发现和内存管理

颜新
2023-03-14

用例描述:我想使用ignite embedded来维护内存中的缓存,以加快我的spark工作。

1)在ignite embedded模式下TCP SPI发现是如何工作的?文档指出,在ignite embedded模式下,ignite节点的生命周期由spark管理,节点从spark作业本身内部启动和关闭。既然ignite节点绑定到YARN容器,那么是否仍然需要通过SPI配置?或者服务发现是自动/动态地进行的?

2)建立在第一个问题上:我们如何启动一个火花任务,比如说,4个火花执行器,但只有2个点火节点?

3)我提供了一个我开发的示例代码,但我的工作由于超出内存而被取消。我已经按照原始文档中的规定浏览了容量规划页面。我的数据约为300 MB,在最坏的情况下,我预计它将消耗约1.5 GB内存,没有复制,并且索引在一个整数字段上。

我的集群配置:1个主-24 GB内存,2个核心CPU和2个从-8GB内存,2个核心CPU

import java.io.Serialisable
import org.apache.spark._
import org.apache.ignite.spark._
import org.apache.ignite.configuration._
import org.apache.ignite.spi.discovry.tcp.ipfinder.vm.TCPDiscoveryVmIpFinder
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi
import java.util.Arrays
import org.apache.spark.sql._
import org.apache.ignite.cache.query.annotations.QuerySqlField
import scala.annotation.meta.field
import org.apache.ignite._


val ic= new IgniteContext(sc, () => {
    new discoveryspi= new TcpDiscoverySpi()
    val finder= new TCPDiscoveryVmIpFinder()
    finder.setAddresses(Arrays.asList("127.0.0.1:47500")) //and more ip address in my cluster
    discoveryspi.setIpFinder(finder)

    val dataStorage= new DataStorageConfiguration()
    dataStorage.getDefaultDataRegionConfiguration().setMaxSize(16L*1024*1024*1024)  //16GB

    val cfg= new IgniteConfiguration()
    cfg.setDiscoverySpi(discoveryspi)
    cfg.setDataStorageConfiguration(dataStorage)
    cfg}, false)

case class User(
    @(QuerySqlField @field)(index=true) id: Int,
    @(QuerySqlField @field) gender: String,
    @(QuerySqlField @field) marks: Int
) extends Serialisable

val cacheCfg= new CacheConfiguration[Int, User]("sharedRDD")
cacheCfg.setIndexedTypes(classOf[Int], classOf[User])
cacheCfg.setCacheMode(CahceMode.PARTITIONED)

val df2= spark.sql("select * from db_name.User")  //read data from hive table
val df2_rdd= df2.rdd
val data_rdd= df2_rdd.map(x=> User(x.getInt(0), x.getString(1), x.getInt(2)))

val tcache: IgniteRDD[Int, User]= ic.fromCache(cacheCfg)
tcache.savePairs(data_rdd.map(x=> (x.id, x)))

val result= tcache.sql("select * from User u1 left join User u2 on(u1.id=u2.id)") //test query for self join

这个程序工作良好,直到我做自我加入。像“select*from User limit 5”这样的简单查询工作得非常好。

错误日志

警告tcpdiscoveryspi:无法连接到任何地址frim IP查找器(将每2秒重试一次加入拓扑)

警告TCPCommunicationSPI:连接超时

警告yarnScheduleRbackend$yarnScheduleRendpoint:容器因超出内存限制而被YARN杀死。考虑提升spark.yarn.executor.memoryoverhead

我已经将spark.yarn.executor.memoryoverhead参数增加到2 GB,executor memeory增加到6GB。然而,考虑到我的数据大小只有300 MB,我仍然无法找出我在这里遗漏了什么

共有1个答案

彭阳朔
2023-03-14
  1. 发现Spi像通常的Apache Ignite节点一样以嵌入式模式工作。因此,无论如何,您都需要正确配置发现Spi(尤其是Ip查找器)。有关节点发现的更多详细信息,您可以在那里找到,并选择适合您的情况的更多信息。
  2. 如果不需要在spark作业中实例Apache Ignite,就不要创建IgniteContext对象。
  3. 我认为您的JVM占用了大量内存。您需要检查JVM设置。
 类似资料:
  • 有什么想法吗?

  • 我有时会在pom中看到以下声明。xml。。。 如您所见,sping-boo-starter-web被声明为tomcat-embed-jasper。 是不是sping-boo-starter-web已经有一个嵌入式tomcat了?为什么一些开发人员仍然声明tomcat-embed-jasper以及boot-starter-web?还是有什么原因?

  • 细节 Linux 中,设备用/dev/目录下的文件表示。例如 /dev/hda1 第一块硬盘的第一主分区 /dev/hdb5 第二块硬盘的第一逻辑分区 /dev/sda4 第一块 SATA 硬盘的第四主分区,或者扩展分区 /dev/null 黑洞设备 关于磁盘设备,详见“分区概念”一节 mount 设备文件 [挂载路径] 挂载文件系统 -t 指定文件系统的类型 通常不必指定,mount 自

  • 本文向大家介绍Android开发实现的内存管理工具类,包括了Android开发实现的内存管理工具类的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了Android开发实现的内存管理工具类。分享给大家供大家参考,具体如下: 更多关于Android相关内容感兴趣的读者可查看本站专题:《Android开发之内存与缓存技巧总结》、《Android资源操作技巧汇总》、《Android视图View技巧总

  • 问题内容: 编辑:改写问题: 我想将ActiveMQ用作服务器和客户端应用程序之间的信使服务。 我正在尝试在服务器内设置嵌入式代理(即不是单独的进程),以处理产生的消息供我的客户使用。该队列被保留。 经纪人初始化如下: 修补之后,我最终得到了服务器部分: 客户端非常相似,看起来像这样: main方法只是在线程中启动其中的每一个,以开始生成/接收消息。 …但是我在每个线程的开头都遇到以下问题: 看来

  • 对于一个基于图论的框架来说,节点和边是最小的部件。实际应用中,这些部件构成了各种有向图。比如一个有环图,它的数据流动就是一个环形,部件之间的持有关系如果不能很好的处理,那么可能就会存在内存问题。EasyReact 的内存管理逻辑非常简单,也非常精巧。可以让框架使用者无需关注太多的细节即可轻松的使用,而不必担心本框架涉及的内存方面的问题。 中间节点 节点包含了 fork、map、filter、ski