当前位置: 首页 > 工具软件 > Machinery > 使用案例 >

聊聊machinery的TaskState

张成济
2023-12-01

本文主要研究一下machinery的TaskState

TaskState

const (
	// StatePending - initial state of a task
	StatePending = "PENDING"
	// StateReceived - when task is received by a worker
	StateReceived = "RECEIVED"
	// StateStarted - when the worker starts processing the task
	StateStarted = "STARTED"
	// StateRetry - when failed task has been scheduled for retry
	StateRetry = "RETRY"
	// StateSuccess - when the task is processed successfully
	StateSuccess = "SUCCESS"
	// StateFailure - when processing of the task fails
	StateFailure = "FAILURE"
)

type TaskState struct {
	TaskUUID  string        `bson:"_id"`
	TaskName  string        `bson:"task_name"`
	State     string        `bson:"state"`
	Results   []*TaskResult `bson:"results"`
	Error     string        `bson:"error"`
	CreatedAt time.Time     `bson:"created_at"`
	TTL       int64         `bson:"ttl,omitempty"`
}

type TaskResult struct {
	Type  string      `bson:"type"`
	Value interface{} `bson:"value"`
}

// NewPendingTaskState ...
func NewPendingTaskState(signature *Signature) *TaskState {
	return &TaskState{
		TaskUUID:  signature.UUID,
		TaskName:  signature.Name,
		State:     StatePending,
		CreatedAt: time.Now().UTC(),
	}
}

// NewReceivedTaskState ...
func NewReceivedTaskState(signature *Signature) *TaskState {
	return &TaskState{
		TaskUUID: signature.UUID,
		State:    StateReceived,
	}
}

// NewStartedTaskState ...
func NewStartedTaskState(signature *Signature) *TaskState {
	return &TaskState{
		TaskUUID: signature.UUID,
		State:    StateStarted,
	}
}

// NewSuccessTaskState ...
func NewSuccessTaskState(signature *Signature, results []*TaskResult) *TaskState {
	return &TaskState{
		TaskUUID: signature.UUID,
		State:    StateSuccess,
		Results:  results,
	}
}

// NewFailureTaskState ...
func NewFailureTaskState(signature *Signature, err string) *TaskState {
	return &TaskState{
		TaskUUID: signature.UUID,
		State:    StateFailure,
		Error:    err,
	}
}

// NewRetryTaskState ...
func NewRetryTaskState(signature *Signature) *TaskState {
	return &TaskState{
		TaskUUID: signature.UUID,
		State:    StateRetry,
	}
}

// IsCompleted returns true if state is SUCCESS or FAILURE,
// i.e. the task has finished processing and either succeeded or failed.
func (taskState *TaskState) IsCompleted() bool {
	return taskState.IsSuccess() || taskState.IsFailure()
}

// IsSuccess returns true if state is SUCCESS
func (taskState *TaskState) IsSuccess() bool {
	return taskState.State == StateSuccess
}

// IsFailure returns true if state is FAILURE
func (taskState *TaskState) IsFailure() bool {
	return taskState.State == StateFailure
}

TaskState定义了PENDING、RECEIVED、STARTED、RETRY、SUCCESS、FAILURE状态;TaskState定义了TaskUUID、TaskName、State、Results、Error、CreatedAt、TTL属性;它提供了NewPendingTaskState、NewReceivedTaskState、NewStartedTaskState、NewSuccessTaskState、NewFailureTaskState、NewRetryTaskState方法来根据Signature来创建不同state的TaskState;另外还提供了IsCompleted、IsSuccess、IsFailure方法

Signature

// Signature represents a single task invocation
type Signature struct {
	UUID           string
	Name           string
	RoutingKey     string
	ETA            *time.Time
	GroupUUID      string
	GroupTaskCount int
	Args           []Arg
	Headers        Headers
	Priority       uint8
	Immutable      bool
	RetryCount     int
	RetryTimeout   int
	OnSuccess      []*Signature
	OnError        []*Signature
	ChordCallback  *Signature
	//MessageGroupId for Broker, e.g. SQS
	BrokerMessageGroupId string
	//ReceiptHandle of SQS Message
	SQSReceiptHandle string
	// StopTaskDeletionOnError used with sqs when we want to send failed messages to dlq,
	// and don't want machinery to delete from source queue
	StopTaskDeletionOnError bool
	// IgnoreWhenTaskNotRegistered auto removes the request when there is no handeler available
	// When this is true a task with no handler will be ignored and not placed back in the queue
	IgnoreWhenTaskNotRegistered bool
}

// Arg represents a single argument passed to invocation fo a task
type Arg struct {
	Name  string      `bson:"name"`
	Type  string      `bson:"type"`
	Value interface{} `bson:"value"`
}

// Headers represents the headers which should be used to direct the task
type Headers map[string]interface{}

// NewSignature creates a new task signature
func NewSignature(name string, args []Arg) (*Signature, error) {
	signatureID := uuid.New().String()
	return &Signature{
		UUID: fmt.Sprintf("task_%v", signatureID),
		Name: name,
		Args: args,
	}, nil
}

Signature代表对task的调用,它定义了UUID、Name、RoutingKey、ETA、GroupUUID、GroupTaskCount、Args、Headers、Priority、Immutable、RetryCount、RetryTimeout、OnSuccess、OnError、ChordCallback、BrokerMessageGroupId、SQSReceiptHandle、StopTaskDeletionOnError、IgnoreWhenTaskNotRegistered属性

Backend

// Backend represents a Redis result backend
type Backend struct {
	common.Backend
	host     string
	password string
	db       int
	pool     *redis.Pool
	// If set, path to a socket file overrides hostname
	socketPath string
	redsync    *redsync.Redsync
	redisOnce  sync.Once
	common.RedisConnector
}

// SetStatePending updates task state to PENDING
func (b *Backend) SetStatePending(signature *tasks.Signature) error {
	conn := b.open()
	defer conn.Close()

	taskState := tasks.NewPendingTaskState(signature)
	return b.updateState(conn, taskState)
}

// SetStateReceived updates task state to RECEIVED
func (b *Backend) SetStateReceived(signature *tasks.Signature) error {
	conn := b.open()
	defer conn.Close()

	taskState := tasks.NewReceivedTaskState(signature)
	b.mergeNewTaskState(conn, taskState)
	return b.updateState(conn, taskState)
}

// SetStateStarted updates task state to STARTED
func (b *Backend) SetStateStarted(signature *tasks.Signature) error {
	conn := b.open()
	defer conn.Close()

	taskState := tasks.NewStartedTaskState(signature)
	b.mergeNewTaskState(conn, taskState)
	return b.updateState(conn, taskState)
}

// SetStateRetry updates task state to RETRY
func (b *Backend) SetStateRetry(signature *tasks.Signature) error {
	conn := b.open()
	defer conn.Close()

	taskState := tasks.NewRetryTaskState(signature)
	b.mergeNewTaskState(conn, taskState)
	return b.updateState(conn, taskState)
}

// SetStateSuccess updates task state to SUCCESS
func (b *Backend) SetStateSuccess(signature *tasks.Signature, results []*tasks.TaskResult) error {
	conn := b.open()
	defer conn.Close()

	taskState := tasks.NewSuccessTaskState(signature, results)
	b.mergeNewTaskState(conn, taskState)
	return b.updateState(conn, taskState)
}

// SetStateFailure updates task state to FAILURE
func (b *Backend) SetStateFailure(signature *tasks.Signature, err string) error {
	conn := b.open()
	defer conn.Close()

	taskState := tasks.NewFailureTaskState(signature, err)
	b.mergeNewTaskState(conn, taskState)
	return b.updateState(conn, taskState)
}

// GetState returns the latest task state
func (b *Backend) GetState(taskUUID string) (*tasks.TaskState, error) {
	conn := b.open()
	defer conn.Close()

	return b.getState(conn, taskUUID)
}

此Backend是基于Redis实现的,它提供了SetStatePending、SetStateReceived、SetStateStarted、SetStateRetry、SetStateSuccess、SetStateFailure、GetState方法

小结

machinery的TaskState定义了PENDING、RECEIVED、STARTED、RETRY、SUCCESS、FAILURE状态;TaskState定义了TaskUUID、TaskName、State、Results、Error、CreatedAt、TTL属性;它提供了NewPendingTaskState、NewReceivedTaskState、NewStartedTaskState、NewSuccessTaskState、NewFailureTaskState、NewRetryTaskState方法来根据Signature来创建不同state的TaskState;另外还提供了IsCompleted、IsSuccess、IsFailure方法。

doc

 类似资料: