当前位置: 首页 > 知识库问答 >
问题:

何时使用选择器。添加接收,选择器。选择

秦琦
2023-03-14

希望能澄清一下我什么时候应该使用选择器。添加接收选择器。选择。这可能不是节奏问题,但也许我错过了一些关于Golang的知识。

对于选择器。选择我认为基本思想是等待通道的下一个输出。不完全确定什么是选择器。AddRecieve可以。

例如,在cadence示例中,< code>local_activity链接并粘贴在下面:

func signalHandlingWorkflow(ctx workflow.Context) error {
    logger := workflow.GetLogger(ctx)
    ch := workflow.GetSignalChannel(ctx, SignalName)
    for {
        var signal string
        if more := ch.Receive(ctx, &signal); !more {
            logger.Info("Signal channel closed")
            return cadence.NewCustomError("signal_channel_closed")
        }

        logger.Info("Signal received.", zap.String("signal", signal))

        if signal == "exit" {
            break
        }

        cwo := workflow.ChildWorkflowOptions{
            ExecutionStartToCloseTimeout: time.Minute,
            // TaskStartToCloseTimeout must be larger than all local activity execution time, because DecisionTask won't
            // return until all local activities completed.
            TaskStartToCloseTimeout: time.Second * 30,
        }
        childCtx := workflow.WithChildOptions(ctx, cwo)

        var processResult string
        err := workflow.ExecuteChildWorkflow(childCtx, processingWorkflow, signal).Get(childCtx, &processResult)
        if err != nil {
            return err
        }
        logger.Sugar().Infof("Processed signal: %v, result: %v", signal, processResult)
    }

    return nil
}

我们不使用任何选择器。AddReceive

但是,在这里的例子中,它也使用信号通道:根据外部输入改变优步节奏睡眠时间

我还会将代码粘贴到这里

func SampleTimerWorkflow(ctx workflow.Context, timerDelay time.Duration) error 
{
    logger := workflow.GetLogger(ctx)
    resetCh := workflow.GetSignalChannel(ctx, "reset")

    timerFired := false
    delay := timerDelay
    for ;!timerFired; {
        selector := workflow.NewSelector(ctx)

        logger.Sugar().Infof("Setting up a timer to fire after: %v", delay)
        timerCancelCtx, cancelTimerHandler := workflow.WithCancel(ctx)
        timerFuture := workflow.NewTimer(timerCancelCtx, delay)
        selector.AddFuture(timerFuture, func(f workflow.Future) {
            logger.Info("Timer Fired.")
            timerFired = true
        })

        selector.AddReceive(resetCh, func(c workflow.Channel, more bool) {
            logger.Info("Reset signal received.")
            logger.Info("Cancel outstanding timer.")
            cancelTimerHandler()

            var t int
            c.Receive(ctx, &t)
            logger.Sugar().Infof("Reset delay: %v seconds", t)
            delay = time.Second * time.Duration(t)
        })

        logger.Info("Waiting for timer to fire.")
        selector.Select(ctx)
    }

    workflow.GetLogger(ctx).Info("Workflow completed.")
    return nil
}

你可以看到有selector.AddReceive,我不完全确定它的目的是什么,或者我应该什么时候使用它。

我试图向我的工作流发送一个信号,允许我延长到期时间。这意味着,它会延迟对< code>ExpirationActivity的调用

按照这个例子(结合我当前的代码),只要我发送重置信号,timerFired似乎就会立即设置为true。

我当前的代码如下(我去掉了一些不相关的if语句),之前,我只使用了< code >选择器的一个实例。Select,但是我的代码在某个地方没有正常运行。

func Workflow(ctx workflow.Context) (string, error) {
    // local state per bonus workflow
    bonusAcceptanceState := pending
    logger := workflow.GetLogger(ctx).Sugar()
    logger.Info("Bonus workflow started")
    timerCreated := false
    timerFired := false
    delay := timerDelay

    // To query state in Cadence GUI
    err := workflow.SetQueryHandler(ctx, "bonusAcceptanceState", func(input []byte) (string, error) {
        return bonusAcceptanceState, nil
    })
    if err != nil {
        logger.Info("SetQueryHandler failed: " + err.Error())
        return "", err
    }
    info := workflow.GetInfo(ctx)
    executionTimeout := time.Duration(info.ExecutionStartToCloseTimeoutSeconds) * time.Second
    // decisionTimeout := time.Duration(info.TaskStartToCloseTimeoutSeconds) * time.Second
    decisionTimeout := time.Duration(info.ExecutionStartToCloseTimeoutSeconds) * time.Second
    maxRetryTime := executionTimeout // retry for the entire time

    retryPolicy := &cadence.RetryPolicy{
        InitialInterval:          time.Second,
        BackoffCoefficient:       2,
        MaximumInterval:          executionTimeout,
        ExpirationInterval:       maxRetryTime,
        MaximumAttempts:          0, // unlimited, bound by maxRetryTime
        NonRetriableErrorReasons: []string{},
    }
    ao := workflow.ActivityOptions{
        TaskList:               taskList,
        ScheduleToStartTimeout: executionTimeout, // time until a task has to be picked up by a worker
        ScheduleToCloseTimeout: executionTimeout, // total execution timeout
        StartToCloseTimeout:    decisionTimeout,  // time that a worker can take to process a task
        RetryPolicy:            retryPolicy,
    }
    ctx = workflow.WithActivityOptions(ctx, ao)
    selector := workflow.NewSelector(ctx)
    timerCancelCtx, cancelTimerHandler := workflow.WithCancel(ctx)

    var signal *singalType

    for {
        signalChan := workflow.GetSignalChannel(ctx, signalName)
        // resetCh := workflow.GetSignalChannel(ctx, "reset")

        selector.AddReceive(signalChan, func(c workflow.Channel, more bool) {
            c.Receive(ctx, &signal)
        })

        selector.Select(ctx)

        if signal.Type == "exit" {
            return "", nil
        }

        // We can check the age and return an appropriate response
        if signal.Type == "ACCEPT" {
            if bonusAcceptanceState == pending {
                logger.Info("Bonus Accepted")
                bonusAcceptanceState = accepted

                var status string
                future := workflow.ExecuteActivity(ctx, AcceptActivity)
                if err := future.Get(ctx, &status); err != nil {
                    logger.Errorw("Activity failed", "error", err)
                }
                // Start expiration timer
                if !timerCreated {
                    timerCreated = true
                    timerFuture := workflow.NewTimer(timerCancelCtx, delay)
                    selector.AddFuture(timerFuture, func(f workflow.Future) {
                        logger.Info("Timer Fired.")
                        timerFired = true
                    })
                }

            }
        }
        
        
        if signal.Type == "ROLLOVER_1X" && bonusAcceptanceState == accepted {
            var status string
            future := workflow.ExecuteActivity(ctx, Rollover1x)
            if err := future.Get(ctx, &status); err != nil {
                logger.Errorw("Activity failed", "error", err)
            }
            selector.Select(ctx)
        }
        if signal.Type == "ROLLOVER_COMPLETE" && bonusAcceptanceState == accepted {
            var status string
            future := workflow.ExecuteActivity(ctx, RolloverComplete)
            if err := future.Get(ctx, &status); err != nil {
                logger.Errorw("Activity failed", "error", err)
                return "", err
            }
            // Workflow is terminated on return result
            return status, nil
        }
        for; !timerFired && bonusAcceptanceState == accepted && signal.Type == "RESET" {
            cancelTimerHandler()

            i, err := strconv.Atoi(signal.Value)
            if err != nil {
                logger.Infow("error in converting")
            }

            logger.Infof("Reset delay: %v seconds", i)
            delay = time.Minute * time.Duration(i)
            timerFuture := workflow.NewTimer(timerCancelCtx, delay)
            selector.AddFuture(timerFuture, func(f workflow.Future) {
                logger.Info("Timer Fired.")
                timerFired = true
            })
            selector.Select(ctx)
        }
        if timerFired {
            var status string
            future := workflow.ExecuteActivity(ctx, ExpirationActivity)
            if err := future.Get(ctx, &status); err != nil {
                logger.Errorw("Activity failed", "error", err)
            }
            return status, nil
        }
    }

}

共有2个答案

顾磊
2023-03-14

检查未来的退货结果

selector.AddFuture(timerFuture, func(f workflow.Future) {
    err := f.Get(ctx, nil)
    if err == nil {
        logger.Info("Timer Fired.")
        timerFired = true
    }
})

ref:https://github . com/Uber-go/cadence-client/blob/0256258 b 905 b 677 F2 f 38 fcacfbda 43398d 236309/workflow/deterministic _ wrappers . go # L128-L129

韦辰钊
2023-03-14
  • 您将只使用选择器。AddReceive当您需要让选择器监听频道时,如您的第二个代码段。如果您只需要直接处理来自通道的信号而不需要选择器,那么就不需要使用它
  • 选择器。选择是让代码等待某些事件发生。因为你不想用繁忙的循环等待

本质上,这与Golang select语句的概念完全相同。Golang select允许您等待计时器和频道。但Golang没有选择器。选择()只是因为它已经融入语言本身,但Cadence是一个库。

因此,与golang相同,您不必使用< code>select语句来使用计时器或通道。只有当您必须编写一些代码来监听多个事件源时,才需要它。

例如,如果你有两个通道,你想写一些共同的逻辑来处理这两个通道,例如,增加一个计数器。此计数器不属于任何频道。这是一个普通的柜台。那么使用一个< code >选择器看起来会更好。

chA := workflow.GetSignalChannel(ctx, SignalNameA)
chB := workflow.GetSignalChannel(ctx, SignalNameB)
counter := 0

selector.AddReceive(chA)
selector.AddReceive(chB)
For {
  selector.Select()
  counter += 1
}

带有选择器的工作流代码看起来与 Golang 中的工作流代码非常相似:

    counter := 0
    for {
        select {
        case _ := <- chA:
            counter += 1
        case _ := <- chB:
            counter += 1
        }
    }

否则,您可能需要使用两个goroutines来监听每个频道,并进行计数。golang代码是这样的:

counter := 0
go func(){
   for{
      _ := <- chA
      counter += 1
   }
}()

go func(){
   for{
      _ := <- chB
      counter += 1
   }
}()

这可能是比赛条件的问题。除非计数器实现了线程安全

在Cadence工作流代码中,它是这样的:

chA := workflow.GetSignalChannel(ctx, SignalNameA)
chB := workflow.GetSignalChannel(ctx, SignalNameB)
counter := 0

Workflow.Go(ctx){
   for{
     chA.Receive(ctx,nil)
     counter +=1
   }
}

Workflow.Go(ctx){
   for{
     chB.Receive(ctx,nil)
     counter +=1
   }
}

然而,Cadence中没有这样的竞争条件,因为Cadence的协同程序(由Workflow.Go()启动)实际上不是并发的。上面的两个工作流代码应该都能完美地工作。

但是Cadence仍然提供了这个选择器与Golang相同,主要是因为第一个更自然地编写代码。

 类似资料:
  • 本文向大家介绍Jsoup 使用CSS选择器选择元素,包括了Jsoup 使用CSS选择器选择元素的使用技巧和注意事项,需要的朋友参考一下 示例 您可以在此处找到支持的选择器的详细概述。

  • 连接池维持一份连接清单,它决定节点在什么时候从活节点转变为死节点(或死节点转变为活节点)。然而连接池选择连接对象时是没有逻辑的,这份工作属于 Selector 类。 选择器(selector)的工作是从连接数组中返回一个连接。和连接池一样,也有几种选择器可供选择。 RoundRobinSelector(默认) 选择器通过轮询调度的方式来返回连接。例如在第一个请求中选择节点1,在第二请求中选择节点

  • 选择器 CasperJS大量使用选择器来处理DOM,并且可以透明地使用CSS3或XPath表达式。 接下来的例子都基于下面的HTML代码: <!doctype html> <html> <head> <meta charset="utf-8"> <title>My page</title> </head> <body> <h1 class="page-title">Hell

  • 选择器 See the Pen FEND_Selectors by Li Xinyang (@li-xinyang) on CodePen. 选择器可被看做表达式,通过它可以选择相应的元素并应用不同的样式。 简单选择器 元素选择器 组合选择器 简单选择器 简单选择器可组合使用。 标签选择器 <div> <p>Sample Paragraph</p> <p>Sample Paragraph<

  • 选择器是jQuery的核心。一个选择器写出来类似$('#dom-id')。 为什么jQuery要发明选择器?回顾一下DOM操作中我们经常使用的代码: // 按ID查找: var a = document.getElementById('dom-id'); // 按tag查找: var divs = document.getElementsByTagName('div'); // 查找<p cl

  • 问题内容: 我希望此javascript在id =“ mainSelect”的选择中创建12到100个选项,因为我不想手动创建所有选项标签。你能给我一些指导吗?谢谢 问题答案: 您可以通过一个简单的循环来实现: JS Perf比较了我和imeVidas的答案,是因为我认为他的表情比我的看起来更容易理解/直观,而且我想知道这将如何转化为实现。根据Chromium14/Ubuntu11.04的说法,其