当前位置: 首页 > 工具软件 > Apache Mesos > 使用案例 >

apache mesos_Apache Mesos:编写自己的分布式框架

邴景山
2023-12-01

apache mesos

在上一篇文章中 ,我们了解了mesos是什么,它如何有用,并开始使用它。 在本文中,我们将看到如何在mesos上编写自己的框架。 (在mesos中,框架是在其上运行的任何应用程序。)本文介绍了一个名为“ mesos-pinspider”的框架,该框架获取用户的pinterest页面的用户配置文件信息和用户面板信息。

Mesos框架

通常,Mesos框架具有三个基本组件。

  • 将任务提交给框架的驱动程序
  • 向主服务器注册要提供资源的调度程序 ,执行任务并在执行程序上运行它们
  • 在从属节点上启动以执行框架任务的执行程序进程

Pinspider框架示例

您可以在github上检查代码。 让我们将其细分为PinDriver,PinScheduler和Pin UserProfileExecutor。

司机

该框架的驱动程序组件是PinDriver。

  • 创建执行人信息

    使用Builder模式描述有关执行程序的信息,而mesos使用Google协议缓冲区进行数据交换。 在这里,我们需要设置executorID,该命令基本上是一个shell命令,通过以下命令执行:'/ bin / sh -c value'。 在执行命令之前,将获取指定的所有URI。 名称由setName()设置。 来源由
    setSource(),框架用来跟踪执行程序源的标识符样式字符串。 当不同的执行者ID可能在语义上相关时,这很有用。

    Protos.ExecutorInfo userProfileExecutorInfo = Protos.ExecutorInfo.newBuilder().setExecutorId(Protos.ExecutorID.newBuilder().setValue("PinUserProfileExecutor")).setCommand(commandInfoUserProfile).setName("PinUserProfileExecutor Java").setSource("java").build();
  • 创建框架信息

    描述框架信息。 用户字段用于确定执行程序/任务应以其启动的Unix用户。 如果用户字段设置为空字符串,Mesos将自动将其设置为当前用户。 主机在删除框架之前等待调度程序进行故障转移的时间由以下方式指定:
    setFailoverTimeout()。 框架的名称由setName()设置

    Protos.FrameworkInfo.Builder frameworkBuilder = Protos.FrameworkInfo.newBuilder().setFailoverTimeout(120000).setUser("").setName("Pinspider Framework");
  • 实例化调度程序

    您需要使用需要提交的数量实例化调度程序才能使执行程序运行。

    Scheduler scheduler = args.length == 1 ?
    	new PinScheduler(userProfileExecutorInfo,userBoardExecutorInfo) :
    	new PinScheduler(userProfileExecutorInfo, userBoardExecutorInfo, Integer.parseInt(args[1]), args[2]);

    注意:请注意,使用了两个ExecutorInfo。 一个用于获取用户个人资料信息,另一个用于演示用户板信息。 此说明仅涉及一个executorinfo – userProfileExecutorInfo

  • 启动mesos调度程序驱动程序。

    MesosSchedulerDriver是SchedulerDriver的实现,SchedulerDriver是将调度程序连接到mesos的抽象接口。 这是通过管理调度程序的生命周期(开始,停止和等待任务完成)以及与Mesos交互(启动任务,终止任务等)来完成的。

    MesosSchedulerDriver schedulerDriver =
    	new MesosSchedulerDriver(scheduler,frameworkBuilder.build(), args[0]);
    
    int status = schedulerDriver.run() == Protos.Status.DRIVER_STOPPED ? 0 : 1;schedulerDriver.stop();
    
    System.exit(status);

执行器执行

框架的执行器组件是PinUserProfileExecutor。

执行程序是由框架的执行程序实现的回调接口。 在我们的实现中,让我们专注于launchTask()

@Override public void launchTask(final ExecutorDriver executorDriver
final Protos.TaskInfo taskInfo) { 

}
  • 通过使用构建器模式设置ID和状态来设置任务状态。
    Protos.TaskStatus taskStatus =
    	Protos.TaskStatus.newBuilder().setTaskId(taskInfo.getTaskId())
    .setState(Protos.TaskState.TASK_RUNNING).build();
  • 将状态更新发送到框架调度程序,根据需要进行重试,直到收到确认或执行程序终止为止,在这种情况下,将发送TASK_LOST状态更新。
    executorDriver.sendStatusUpdate(taskStatus);
  • 从任务中获取数据并运行逻辑。
    try {
    	message = ("userprofile :" + getUserProfileInfo(url)).getBytes();
    } catch (IOException e) {
    	LOGGER.error("Error parsing the Pinterest URL :" + e.getMessage());
    }
  • 向框架发送消息。
    executorDriver.sendFrameworkMessage(message);
  • 将任务的状态标记为已完成,然后将状态更新发送到框架调度程序。
    taskStatus = Protos.TaskStatus.newBuilder().setTaskId(taskInfo.getTaskId())
    	.setState(Protos.TaskState.TASK_FINISHED).build();
    executorDriver.sendStatusUpdate(taskStatus);
  • main()方法创建MesosExecutorDriver实例并运行
    mesosExecutorDriver.run() == Protos.Status.DRIVER_STOPPED ? 0 : 1

调度程序实施

该框架的Scheduler组件是Pin Scheduler。

调度程序是由框架的调度程序实现的回调接口。 在我们的实现中,让我们专注于resourceOffers(),statusUpdate()和frameworkMessage()

  • 构造函数:使用执行程序信息和启动任务数进行构造。
    public PinScheduler(Protos.ExecutorInfo pinUserProfileExecutor , Protos.ExecutorInfo pinUserBoardExecutor ) {
    	this(pinUserProfileExecutor,pinUserBoardExecutor, 5, "http://www.pinterest.com/techcrunch");
    } 
    
    public PinScheduler(Protos.ExecutorInfo pinUserProfileExecutor,Protos.ExecutorInfo pinUserBoardExecutor,  int totalTasks, String url) { 
    
    	this.pinUserProfileExecutor = pinUserProfileExecutor;
    	this.pinUserBoardExecutor = pinUserBoardExecutor;
    	this.totalTasks = totalTasks; this.crawlQueue =
    		Collections.synchronizedList(new ArrayList<String>());
    	this.crawlQueue.add(url);
    }
  • 资源报价
    • 资源商品可以是CPU,内存等资源。从商品列表中,获取资源的标量值。 在设置任务信息时,我们需要提供任务资源的需求。
      for (Protos.Offer offer : list) {
      	List<Protos.TaskInfo> taskInfoList = new ArrayList<Protos.TaskInfo>();
      	double offerCpus = 0; double offerMem = 0;
      	for (Protos.Resource resource : offer.getResourcesList()) {
      		if (resource.getName().equals("cpus")) {
      			offerCpus += resource.getScalar().getValue();
      		}
      		else if (resource.getName().equals("mem")) {
      			offerMem += resource.getScalar().getValue();
      		}
      	}
      	LOGGER.info("Received Offer : " + offer.getId().getValue() +
      		" with cpus = " + offerCpus + " and mem =" + offerMem);
    • 创建任务ID。
      Protos.TaskID taskID = Protos.TaskID.newBuilder().setValue(Integer.toString(launchedTasks++)).build();
    • 通过设置任务ID,添加资源,设置数据和设置执行程序来创建任务信息。
      Protos.TaskInfo pinUserProfileTaskInfo = Protos.TaskInfo.newBuilder().setName("task " + taskID.getValue())
      	.setTaskId(taskID).setSlaveId(offer.getSlaveId())
      	.addResources(Protos.Resource.newBuilder().setName("cpus")
      	.setType(Protos.Value.Type.SCALAR)
      	.setScalar(Protos.Value.Scalar
      	.newBuilder().setValue(CPUS_PER_TASK)))
      	.addResources(Protos.Resource.newBuilder().setName("mem")
      	.setType(Protos.Value.Type.SCALAR)
      	.setScalar(Protos.Value.Scalar.newBuilder().setValue(MEM_PER_TASK)))
      	.setData(ByteString.copyFromUtf8(crawlQueue.get(0)))
      	.setExecutor(Protos.ExecutorInfo.newBuilder(pinUserProfileExecutor)).build();
    • 通过SchedulerDriver启动任务。
      ...
      taskInfoList.add(pinUserProfileTaskInfo);
      taskInfoList.add(pinUserBoardTaskInfo);
      }
      schedulerDriver.launchTasks(offer.getId(), taskInfoList);
  • 状态更新

    当任务的状态已更改(即,从属丢失且任务丢失),任务完成且执行者发送状态更新时,调用此方法。

    @Override public void statusUpdate(SchedulerDriver schedulerDriver, Protos.TaskStatus taskStatus) {
    ...
    }
    • 如果任务完成,请停止SchedulerDriver
      if (taskStatus.getState() == Protos.TaskState.TASK_FINISHED) {
      
      	finishedTasks++;
      
      	LOGGER.info("Finished tasks : " + finishedTasks);
      
      	if (finishedTasks == totalTasks) {
      
      	schedulerDriver.stop();
      
      	}
      
      }
    • 如果任务被杀死,丢失或失败,则中止SchedulerDriver
      if (taskStatus.getState() == Protos.TaskState.TASK_FAILED
      || taskStatus.getState() == Protos.TaskState.TASK_KILLED
      || taskStatus.getState() == Protos.TaskState.TASK_LOST) {
      	LOGGER.error("Aborting because the task " + taskStatus.getTaskId().getValue() +
      		" is in unexpected state : " + taskStatus.getState().getValueDescriptor().getName() +
      		"with reason : " + taskStatus.getReason().getValueDescriptor().getName()
      	+ " from source : " + taskStatus.getSource().getValueDescriptor().getName() + " with message : " + taskStatus.getMessage());
      	schedulerDriver.abort();
      }
  • 框架讯息

    当执行程序发送消息时,将调用此函数。

    • 处理您的讯息
      @Override public void frameworkMessage(SchedulerDriver schedulerDriver, Protos.ExecutorID executorID,
      Protos.SlaveID slaveID, byte[] bytes) {
      	String data = new String(bytes);
      	System.out.println(data);
      	LOGGER.info("User Profile Information : " + data);
      }

此处提供了完整的代码,并提供运行和输出示例的说明。

翻译自: https://www.javacodegeeks.com/2015/01/apache-mesos-writing-your-own-distributed-frameworks.html

apache mesos

 类似资料: