当前位置: 首页 > 知识库问答 >
问题:

Apache Ignite+Spark Dataframes:客户端与服务器的疑点

陶锋
2023-03-14
def main(args: Array[String]) {
    val ignite = setupIgnite

    closeAfter(ignite) { _ ⇒

      implicit val spark: SparkSession = SparkSession.builder
        .appName("Ignite Benchmark")
        .getOrCreate()

      val customer = readDF("csv", "|", Schemas.customerSchema, "hdfs://master.local:8020/apps/hive/warehouse/ssbplus100/customer")
      val part = readDF("csv", "|", Schemas.partSchema, "hdfs:// master.local:8020/apps/hive/warehouse/ssbplus100/part")
      val supplier = readDF("csv", "|", Schemas.supplierSchema, "hdfs:// master.local:8020/apps/hive/warehouse/ssbplus100/supplier")
      val dateDim = readDF("csv", "|", Schemas.dateDimSchema, "hdfs:// master.local:8020/apps/hive/warehouse/ssbplus100/date_dim")
      val lineorder = readDF("csv", "|", Schemas.lineorderSchema, "hdfs:// master.local:8020/apps/hive/warehouse/ssbplus100/lineorder")

      writeDF(customer, "customer", List("custkey"), TEMPLATES.REPLICATED)
      writeDF(part, "part", List("partkey"), TEMPLATES.REPLICATED)
      writeDF(supplier, "supplier", List("suppkey"), TEMPLATES.REPLICATED)
      writeDF(dateDim, "date_dim", List("datekey"), TEMPLATES.REPLICATED)
      writeDF(lineorder.limit(200000000), "lineorder", List("orderkey, linenumber"), TEMPLATES.NO_BACKUP)

    }
  }
    class org.apache.ignite.internal.mem.IgniteOutOfMemoryException: Out of memory in data region [name=default, initSize=256.0 MiB, maxSize=12.6 GiB, persistenceEnabled=false] Try the following:
  ^-- Increase maximum off-heap memory size (DataRegionConfiguration.maxSize)
  ^-- Enable Ignite persistence (DataRegionConfiguration.persistenceEnabled)
  ^-- Enable eviction or expiration policies
        at org.apache.ignite.internal.pagemem.impl.PageMemoryNoStoreImpl.allocatePage(PageMemoryNoStoreImpl.java:304)
        at org.apache.ignite.internal.processors.cache.persistence.freelist.AbstractFreeList.allocateDataPage(AbstractFreeList.java:463)
        at org.apache.ignite.internal.processors.cache.persistence.freelist.AbstractFreeList.insertDataRow(AbstractFreeList.java:501)
        at org.apache.ignite.internal.processors.cache.persistence.RowStore.addRow(RowStore.java:97)
        at org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManagerImpl$CacheDataStoreImpl.createRow(IgniteCacheOffheapManagerImpl.java:1302)
        at org.apache.ignite.internal.processors.cache.GridCacheMapEntry$UpdateClosure.call(GridCacheMapEntry.java:4426)
        at org.apache.ignite.internal.processors.cache.GridCacheMapEntry$UpdateClosure.call(GridCacheMapEntry.java:4371)
        at org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree$Invoke.invokeClosure(BPlusTree.java:3083)
        at org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree$Invoke.access$6200(BPlusTree.java:2977)
        at org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree.invokeDown(BPlusTree.java:1726)
        at org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree.invokeDown(BPlusTree.java:1703)
        at org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree.invokeDown(BPlusTree.java:1703)
        at org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree.invoke(BPlusTree.java:1610)
        at org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManagerImpl$CacheDataStoreImpl.invoke(IgniteCacheOffheapManagerImpl.java:1249)
        at org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManagerImpl.invoke(IgniteCacheOffheapManagerImpl.java:352)
        at org.apache.ignite.internal.processors.cache.GridCacheMapEntry.storeValue(GridCacheMapEntry.java:3602)
        at org.apache.ignite.internal.processors.cache.GridCacheMapEntry.initialValue(GridCacheMapEntry.java:2774)
        at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl$IsolatedUpdater.receive(DataStreamerImpl.java:2125)
        at org.apache.ignite.internal.processors.datastreamer.DataStreamerUpdateJob.call(DataStreamerUpdateJob.java:140)
        at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.localUpdate(DataStreamProcessor.java:400)
        at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.processRequest(DataStreamProcessor.java:305)
        at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.access$000(DataStreamProcessor.java:60)
        at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor$1.onMessage(DataStreamProcessor.java:90)
        at org.apache.ignite.internal.managers.communication.GridIoManager.invokeListener(GridIoManager.java:1556)
        at org.apache.ignite.internal.managers.communication.GridIoManager.processRegularMessage0(GridIoManager.java:1184)
        at org.apache.ignite.internal.managers.communication.GridIoManager.access$4200(GridIoManager.java:125)
        at org.apache.ignite.internal.managers.communication.GridIoManager$9.run(GridIoManager.java:1091)
        at org.apache.ignite.internal.util.StripedExecutor$Stripe.run(StripedExecutor.java:511)
        at java.lang.Thread.run(Thread.java:748)

这是一个奇怪的行为,因为我所有的ignite节点(运行在YARN上)都为默认区域定义了20GB(我更改了它并验证了它)。这表明错误必须来自在Spark上启动的ignite服务器(我认为是驱动程序上的一个服务器,每个工作者一个服务器),因为我没有更改Spark应用程序的ignite-config.xml中的默认区域大小(如错误所示,默认为12GB)。然而,这有道理吗?Spark是否应该抛出这个错误,因为它的唯一目标是从/向ignite读取和写入数据?Spark是否参与缓存任何数据,这是否意味着我应该在应用程序的ignite-config.xml中设置客户端模式,尽管官方示例没有使用客户端模式?

最好的问候,卡洛斯

共有1个答案

金阳曜
2023-03-14

首先,Spark-Ignite连接器已经在客户端模式下连接。

我将假设您有足够的内存,但您可以按照容量规划指南中的示例来确定。

但是,我认为问题是您对示例应用程序的跟踪太过了(!)。该示例--以便自包含--包括一个服务器和一个Spark客户机。如果您已经有一个Ignite集群,则不需要在您的Spark客户端中启动服务器。

    try (SparkSession spark = SparkSession
        .builder()
        .appName("AppName")
        .master(sparkMaster)
        .config("spark.executor.extraClassPath", igniteClassPath())
        .getOrCreate()) {

        // Get source DataFrame
        DataSet<Row> results = ....

        results.write()
            .outputMode("append")
            .format(IgniteDataFrameSettings.FORMAT_IGNITE())
            .option(IgniteDataFrameSettings.OPTION_CONFIG_FILE(), igniteCfgFile)
            .option(IgniteDataFrameSettings.OPTION_TABLE(), "Results")
            .option(IgniteDataFrameSettings.OPTION_STREAMER_ALLOW_OVERWRITE(), true)
            .option(IgniteDataFrameSettings.OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS(), "name")
            .option(IgniteDataFrameSettings.OPTION_CREATE_TABLE_PARAMETERS(), "backups=1")
            .write();
    }
 类似资料:
  • 前面的章节介绍了所有 Redis 的重要功能组件: 数据结构、数据类型、事务、Lua 环境、事件处理、数据库、持久化, 等等, 但是我们还没有对 Redis 服务器本身做任何介绍。 不过, 服务器本身并没有多少需要介绍的新东西, 因为服务器除了维持服务器状态之外, 最重要的就是将前面介绍过的各个功能模块组合起来, 而这些功能模块在前面的章节里已经介绍过了, 所以本章将焦点放在服务器的初始化过程,

  • 我想在一些计算机之间建立点对点连接,这样用户就可以在没有外部服务器的情况下聊天和交换文件。我最初的想法如下: 我在服务器上制作了一个中央服务器插座,所有应用程序都可以连接到该插座。此ServerSocket跟踪已连接的套接字(客户端),并将新连接的客户端的IP和端口提供给所有其他客户端。每个客户端都会创建一个新的ServerSocket,所有客户端都可以连接到它。 换句话说:每个客户端都有一个Se

  • 现在我的ServerSocket是这样的 一切正常,客户端连接,发送一些数据,服务器读取它,但我的问题是如何从我的服务器发送消息到所有的客户端?我目前保存客户端上的ArrayList,所以我可以只是循环,ArrayList打开一个作家和刷新它,但我想发送的数据,我得到我的ClientConnection类 我应该如何处理这个代码?在服务器类上完成的所有操作?或

  • 问题内容: 这是一个设计问题。我有需要进入HTML表的数据,稍后将由用户操纵。基本上,用户将能够选择表格行中的项目。 我有两个选择-在两种情况下,我都使用AJAX来获取数据: 在服务器端使用PHP创建HTML代码,并将其作为HTML发送到客户端。然后,用户使用Javascript(本质上是jQuery)来操纵表格。 使用JSON将原始数据发送到客户端,然后使用jQuery创建HTML,然后由用户对

  • 我有一个包含10个微服务的微服务架构,每个微服务提供一个客户端。在由微服务团队管理/控制的客户机内部,我们只接收参数并将它们传递给一个通用http调用程序,该调用程序接收endpoint和N个params,然后进行调用。所有微服务都使用http和web api(我猜技术并不重要)。 对于我来说,作为微服务团队提供一个客户是没有意义的,应该是消费者的责任,如果他们想创建一些抽象或者直接调用它是他们的

  • 我试图了解服务器-客户端网络如何为实时多人游戏工作。 假设我正在构建一个实时多人游戏,比如FPS。 如果玩家A向玩家B开枪,后端< code >服务器需要告诉玩家B他们被击中了。 我知道如何让玩家 A 告诉后端服务器他开了一枪,只是向服务器发送请求,但是如何让后端告诉玩家 他们被枪杀了? 玩家B是否必须每0.1秒检查一次后端以查看是否发生了什么事情,或者是否有更有效的方法?