当前位置: 首页 > 面试题库 >

渠道中的合并项目

长孙嘉容
2023-03-14
问题内容

我有一个接收任务并将其放入频道的html" target="_blank">功能。每个任务都有ID,一些属性以及放置结果的通道。看起来像这样

task.Result = make(chan *TaskResult)
queue <- task
result := <-task.Result
sendReponse(result)

另一个goroutine从通道中获取任务,对其进行处理,然后将结果放入任务的通道中

task := <-queue
task.Result <- doExpensiveComputation(task)

此代码可以正常工作。但是现在我要合并任务queue。任务处理是一项非常昂贵的操作,因此我希望一次处理具有相同ID的队列中的所有任务。我看到了两种方法。

首先,不要将具有相同ID的任务放入队列,因此,当现有任务到达时,它将等待其复制完成。这是伪代码

if newTask in queue {
  existing := queue.getById(newTask.ID)
  existing.waitForComplete()
  sendResponse(existing.ProcessingResult)
} else {
  queue.enqueue(newTask)
}

因此,我可以使用go通道和map来实现它以实现随机访问+一些同步手段,例如互斥体。我不喜欢这种方式的是,我必须在代码周围携带地图和通道,并使它们的内容保持同步。

第二种方法是将所有任务放入队列,但是在结果到达时从队列中提取任务和所有具有相同ID的任务,然后将结果发送到所有任务。这是伪代码

someTask := queue.dequeue()
result := doExpensiveComputation(someTask)
someTask.Result <- result
moreTasks := queue.getAllWithID(someTask.ID)
for _,theSameTask := range moreTasks {
  theSameTask.Result <- result
}

我有一个想法,如何以与上述相同的方式使用chan + map +互斥体来实现此目的。

这是一个问题:是否存在一些内置/现有的数据结构可用于解决此类问题?还有其他(更好)的方法吗?


问题答案:

如果我正确地理解了问题,我想到的最简单的解决方案是在任务发送者(放入queue)和工作人员(从取出queue)之间添加中间层。这(可能是例行程序)将负责存储当前任务(按ID)并将结果广播到每个匹配的任务。

伪代码:

go func() {
    active := make(map[TaskID][]Task)

    for {
        select {
        case task := <-queue:
            tasks := active[task.ID]
            // No tasks with such ID, start heavy work
            if len(tasks) == 0 {
                worker <- task
            }
            // Save task for the result
            active[task.ID] = append(active[task.ID], task)
        case r := <-response:
            // Broadcast to all tasks
            for _, task := range active[r.ID] {
                task.Result <- r.Result
            }
        }
    }
}()

不需要互斥体,也可能不需要携带任何东西,工作人员将只需要将所有结果放入中间层,然后可以正确地路由响应。如果冲突ID可能在一段时间内到达,您甚至可以在此处轻松添加缓存。

编辑:
我有一个梦想,上面的代码导致了死锁。如果您一次发送大量请求并阻塞了worker通道,那么这将是一个严重的问题–这个中间层例程正worker <- task等待工作人员完成,但是所有工作人员都可能无法发送到响应通道(因为我们的例程可以收集)。可播放的证明。

可以考虑在通道中添加一些缓冲区,但这不是一个合适的解决方案(除非您可以通过这种方式设计系统,使缓冲区永远不会填满)。有几种方法可以解决此问题;例如,您可以运行一个单独的例程来收集响应,但是随后您需要active使用互斥量保护地图。可行
您还可以worker <- task选择一个选项,该选项将尝试将任务发送给工作人员,接收新任务(如果没有要发送的内容)或收集响应。可以利用以下事实:零通道从未准备好进行通信(被选择忽略),因此您可以在一个选择中选择接收和发送任务。例:

go func() {
    var next Task // received task which needs to be passed to a worker
    in := queue // incoming channel (new tasks) -- active
    var out chan Task // outgoing channel (to workers) -- inactive
    for {
        select {
        case t := <-in:
            next = t // store task, so we can pass to worker
            in, out = nil, worker // deactivate incoming channel, activate outgoing
        case out <- next:
            in, out = queue, nil // deactivate outgoing channel, activate incoming
        case r := <-response:
            collect <- r
        }
    }
}()



 类似资料:
  • 渠道分析 渠道来源细分

  • 国内市场上有许许多多的应用市场,常见的有:百度、360、腾讯应用宝、豌豆荚等。 其他手机厂家如小米、华为、魅族、三星等都有自己的应用市场,总共有上百家! 问题 发版前,Android工程师打包了上百个渠道版本,如何检验渠道号与apk名称是否一致? 希望自动去获取apk的友盟渠道号,并自动进行校验。 怎么做 Android Apk的渠道号一般存放在AndroidManifest.xml文件中。 批量

  • 1. 简介 “转化归因-渠道归因”报告能够帮助您洞察消费者在与您的产品接触过程中的每个广告触点对最终转化带来的价值。您需要结合投放目标,选择合适的归因模型进行分析。 渠道归因报告能够帮助您洞察这些问题: · 本轮广告投放,哪个渠道的拉新效果最好 · 辅助转化的渠道都有哪些 2. 使用简介 在使用“渠道转化归因”相关报告前,您需要将业务上有价值的事件(如下单、注册、留资等)标记为转化。您可以在“管理

  • 功能介绍 获取渠道转化漏斗的漏斗图和转化图数据。 接口 https://openapi.baidu.com/rest/2.0/mtj/svc/app/getDataByKey 请求参数 此处仅列本接口特有参数,公共参数请参考报告级API说明 获取表格数据 参数名 参数类型 是否必须 描述 method string 是 conversion/channelconversion/a;convers

  • 功能介绍 获取本APP的渠道分析报告相关数据 接口 https://openapi.baidu.com/rest/2.0/mtj/svc/app/getDataByKey 请求参数 此处仅列本接口特有参数,公共参数请参考报告级API说明 获取表格数据 参数名 参数类型 是否必须 描述 method string 是 channel/a;channel/f metrics string 是 指标列

  • 功能介绍 获取本APP的全部统计渠道列表 接口 https://openapi.baidu.com/rest/2.0/mtj/svc/config/getChannelList?access_token=[ACCESS_TOKEN]&aid=[APP_ID] 请求参数 参数名 参数类型 是否必须 描述 access_token string 是 用户准入token aid uint 是 应用ID