当前位置: 首页 > 工具软件 > Apache Geode > 使用案例 >

Apache Geode/GemFire 数据分区和路由机制浅析

谢泽语
2023-12-01

本篇文章主要讲解Apache Geode/GemFire 是如何进行数据分区的。


GemFire和大多数分布式系统一样都采用 Hash 的方式对数据进行分区,将 Entry 数据分布到 PartitionedRegion 当中,大家都知道 Entry 数据主要保存在 ConcurrentHashMap 中,ConcurrentHashMap存放在 Bucket 中,在 PR 服务器启动后会为 PartitionedRegion创建相应的Bucket 来保存这个ConcurrentHashMap。因此它们三者的映射关系如下所示:


Entry—> ConcurrentHashMap—> Bucket—> Region



如何进行数据分区?


当前端应用对Entry 进行操作后,Entry 会按照如下的步骤分区到PR 服务器上。


1.Entry 在插入到分布式集群的某台节点服务器过程中,会放到 PR 创建的 Bucket 中。


2.在进行 Put 操作时, 会产生一个EntryOperation 事件,在这个 Event 事件中可以找到PartionedRegion,和要进行 Put 操作的 Key。这个路由的对象就是一般的 POJO 操作类。


3.在获得路由对象上,从条目操作中获得 Key 键,在通过 Key 键来得到相关的对象Object。


4.接下来再通过 Key来获得bucketId, 再通过bucketId获得 PR,给定一个key/routing对象, 运行hashCode()生成一个 long 值, 然后用这个值与 bucket size 取模得到bucketId值。

   * 为了更好地进行哈希key分布, 使用MD5、SHA或其他的 ID 生成方法.


详见PartionedRegionHelper 的getHashKey方法.


private static int getHashKey(EntryOperation event, PartitionedRegion pr,

      Operation operation, Object key, Object value, Object callbackArgument) {

    // avoid creating EntryOperation if there is no resolver

    if (event != null) {

      pr = (PartitionedRegion)event.getRegion();

      key = event.getKey();

      callbackArgument = event.getCallbackArgument();

    }


    PartitionResolver resolver = getResolver(pr, key, callbackArgument);

    Object resolveKey = null;

    if (pr.isFixedPartitionedRegion()) {

      String partition = null ;

      if (resolver instanceof FixedPartitionResolver) {

        Map<String, Integer[]> partitionMap = pr.getPartitionsMap();

        if (event == null) {

          event = new EntryOperationImpl(pr, operation, key, value,

              callbackArgument);

        }

        partition = ((FixedPartitionResolver)resolver).getPartitionName(

            event, partitionMap.keySet());

        if (partition == null) {

          Object[] prms = new Object[] { pr.getName(), resolver };

          throw new IllegalStateException(

              LocalizedStrings.PartitionedRegionHelper_FOR_REGION_0_PARTITIONRESOLVER_1_RETURNED_PARTITION_NAME_NULL.toLocalizedString(prms));

        }

        Integer[] bucketArray = partitionMap.get(partition);

        if (bucketArray == null) {

          Object[] prms = new Object[] { pr.getName(), partition };

          throw new PartitionNotAvailableException(

              LocalizedStrings.PartitionedRegionHelper_FOR_FIXED_PARTITIONED_REGION_0_FIXED_PARTITION_1_IS_NOT_AVAILABLE_ON_ANY_DATASTORE.toLocalizedString(prms));

        }

        int numBukets = bucketArray[1];

        resolveKey = (numBukets == 1) ? partition : resolver.getRoutingObject(event);

      }

      else if (resolver == null) {

        throw new IllegalStateException(

            LocalizedStrings.PartitionedRegionHelper_FOR_FIXED_PARTITIONED_REGION_0_FIXED_PARTITION_RESOLVER_IS_NOT_AVAILABLE.toString(pr.getName()));

      }

      else if (!(resolver instanceof FixedPartitionResolver)) {

        Object[] prms = new Object[] { pr.getName(), resolver };

        throw new IllegalStateException(

            LocalizedStrings.PartitionedRegionHelper_FOR_FIXED_PARTITIONED_REGION_0_RESOLVER_DEFINED_1_IS_NOT_AN_INSTANCE_OF_FIXEDPARTITIONRESOLVER.toLocalizedString(prms));

      }

      return assignFixedBucketId(pr, partition, resolveKey);

    }

    else {

      // Calculate resolveKey.

      if (resolver == null) {

        // no custom partitioning at all

        resolveKey = key;

        if (resolveKey == null) {

          throw new IllegalStateException("attempting to hash null");

        }

      }

      else {

        if (event == null) {

          event = new EntryOperationImpl(pr, operation, key, value,

              callbackArgument);

        }

        // 通过 Entry 操作, 获得一个路由对象, 得到resolveKey, 在通过 resolveKey进行Hash计算

        resolveKey = resolver.getRoutingObject(event);

        if (resolveKey == null) {

          throw new IllegalStateException(

              LocalizedStrings.PartitionedRegionHelper_THE_ROUTINGOBJECT_RETURNED_BY_PARTITIONRESOLVER_IS_NULL.toLocalizedString());

        }

      }

      // Finally, calculate the hash.

      return getHashKey(pr, resolveKey);

    }

  }


如何进行数据感知路由?

 

GemFire 开发了一个Function Service 模块能够让客户端和服务器节点一起来处理提交的任务。如果数据跨多个节点分区,GemFire能够透明地路由数据执行行为到待处理数据的节点,这样避免了数据跨网络移动,这被称为“数据感知功能路由”。带有数据感知路由功能的应用根本不需要管理数据。

 

GemFire路由数据的执行行为而不是数据本身,GemFire直接路由数据执行行为到需要做并行处理,或汇聚结果的节点。这个特性使得GemFire从根本上降低了执行复杂任务的时间。分布式并行处理活动被抽象出来,与应用调用端无关。

 

 

应用即能够在单点执行,也能在一个小集群并行执行,甚至能够跨整个分布式集群并行执行。

GemFire的并行模型非常类似于Google的Map-Reduce模型。数据感知路由最适合于执行迭代查询或汇聚数据条目的操作。通过数据并处和并行计算,系统的吞吐量显著提升。最重要的是,计算的延迟与并行计算的节点数成反比。

 

单节点的Function执行类似于关系型数据库的Stored Procedure执行。

 

并行计算之后,结果通过Function中的结果收集器,把处理完的结果会调用ResultCollector统一收集回来。相当于Map-Reduce模型中的输出收集器。



 类似资料: