当前位置: 首页 > 知识库问答 >
问题:

是否支持具有相同接收器和源主题的Kafka流和join?

勾炜
2023-03-14

我有一个复杂的Kafka流应用程序,在同一个流中有两个完全有状态的流:

    null

主要目标是提供一个工作流系统

详细的逻辑是:

  • 执行是任务运行列表
  • 执行查看所有任务运行的所有当前状态,并查找要执行的下一个
  • 如果找到任何任务,则执行更改它们的TaskRunsList并添加下一个任务并发布回Kafka,同时将要完成的任务(WorkerTask)
  • 发送到另一个队列
  • 在Kafka流之外继续WorkerTask并使用简单的Kafka使用者和生产者
  • 发布回另一个队列( WorkerTaskResult)
  • 在当前执行中的WorkerTaskResult更改当前TaskRun并更改状态(主要是运行/成功/失败),还发布回执行队列(带有Kafka流)

如您所见,execution(带有taskrun列表)是当前应用程序的状态。

当所有消息都是顺序的(没有并发性,我只能同时拥有taskrunlist的一个更改)时,该流可以很好地工作。当工作流变得并行(并发workertaskresult可以连接)时,我的执行状态似乎是override并产生一种回滚。

日志输出示例:

2020-04-20 08:05:44,830 INFO  reamThread-1 afkaExecutor Stream in with 3264792750: (
  state=RUNNING
  taskRunList=
  [
    TaskRun(id=6FiJ3US6jqZbtU3JL2AZD6, taskId=parent, value=null, state=RUNNING),
    TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=CREATED) # >>>>> t1 is created 
  ] 
)
2020-04-20 08:05:44,881 INFO  reamThread-1 afkaExecutor WorkerTaskResult: TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=RUNNING) # >>>>> worker send running state
2020-04-20 08:05:44,882 INFO  reamThread-1 afkaExecutor Stream out  with 1805535461 : (
  state=RUNNING
  taskRunList=
  [
    TaskRun(id=6FiJ3US6jqZbtU3JL2AZD6, taskId=parent, value=null, state=RUNNING),
    TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=RUNNING) # >>>>> t1 save the running state
  ] 
)
2020-04-20 08:05:45,047 INFO  reamThread-1 afkaExecutor WorkerTaskResult: TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=SUCCESS) # >>>>> worker send success
2020-04-20 08:05:45,047 INFO  reamThread-1 afkaExecutor Stream out  with 578845055 : (
  state=RUNNING
  taskRunList=
  [
    TaskRun(id=6FiJ3US6jqZbtU3JL2AZD6, taskId=parent, value=null, state=RUNNING),
    TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=SUCCESS)
  ] 
)
2020-04-20 08:05:45,153 INFO  reamThread-1 afkaExecutor Stream in with 1805535461: (
  state=RUNNING
  taskRunList=
  [
    TaskRun(id=6FiJ3US6jqZbtU3JL2AZD6, taskId=parent, value=null, state=RUNNING),
    TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=RUNNING) # >>>>> OUT OF ORDER AND ROLLBACK TO PREVIOUS VERSION
  ] 
)
2020-04-20 08:05:45,157 INFO  reamThread-1 afkaExecutor Stream out  with 1889889916 : (
  state=RUNNING
  taskRunList=
  [
    TaskRun(id=6FiJ3US6jqZbtU3JL2AZD6, taskId=parent, value=null, state=RUNNING),
    TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=RUNNING),
    TaskRun(id=6k23oBXy9cD0uCJeZ20SpB, taskId=t2, value=null, state=CREATED)
  ] 
)
2020-04-20 08:05:45,209 WARN  reamThread-1 KTableSource Detected out-of-order KTable update for execution at offset 10, partition 2.
2020-04-20 08:05:45,313 INFO  reamThread-1 afkaExecutor Stream in with 1889889916: (
  state=RUNNING
  taskRunList=
  [
    TaskRun(id=6FiJ3US6jqZbtU3JL2AZD6, taskId=parent, value=null, state=RUNNING),
    TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=RUNNING),
    TaskRun(id=6k23oBXy9cD0uCJeZ20SpB, taskId=t2, value=null, state=CREATED)
  ] 
)
2020-04-20 08:05:45,350 INFO  reamThread-1 afkaExecutor WorkerTaskResult: TaskRun(id=6k23oBXy9cD0uCJeZ20SpB, taskId=t2, value=null, state=RUNNING)
2020-04-20 08:05:45,350 INFO  reamThread-1 afkaExecutor Stream out  with 3651399223 : (
  state=RUNNING
  taskRunList=
  [
    TaskRun(id=6FiJ3US6jqZbtU3JL2AZD6, taskId=parent, value=null, state=RUNNING),
    TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=RUNNING),
    TaskRun(id=6k23oBXy9cD0uCJeZ20SpB, taskId=t2, value=null, state=RUNNING)
  ] 
)

如果也尝试使用许多不同的方法,如下面的方法:

  • executionworkerTaskresult放在同一主题上,以确保同时只处理相同的1条消息
  • 并在StateStore上自己保留最后的执行(以便加入WorkerTaskResult&执行)
  • 但听起来像是我修改了一个KTable,这并不能更好地工作

或者这里的这个:

    null

我的问题是:

  • KafkaStreams支持这个模式吗?
  • 将此流设计成并发安全的好方法是什么?

任何线索都很感激,已经被困了好几天了,谢谢

    null
Builder 
  -> Stream 1
     - from KStream<WorkerTaskResult> 
     - join KTable<Execution>
     - to Execution topic 
  -> Stream 2 
     - from KTable<Execution> (same than previous)
     - multiple output 
       - to WorkerTaskResult topic (if found an end) 
       - to Execution & to WorkerTask topic (if found a next task)
       - to Execution topic (if detect an Execution end) 
  • 创建新的任务运行
  • 更改当前任务运行的状态
    • 加入WorkerTaskResult
    • 评估整个执行并发现任务失败(基于依赖关系)

    在这个实际版本上,对我来说真正不清楚的是检测到无序KTable更新在现实世界中的意义是什么?这是否意味着KTable必须每个分区和每个键都有一个生产者才能保持主题的秩序?

    编辑2:
    同时,我发现了一种新的方式来思考流应用程序似乎正在工作。单元测试正在通过,并且不再检测到无序。以下是简化的新流程

    Builder 
      - from KTable<Execution> 
      - leftJoin KTable<WorkerTaskResult> 
      - Branch 
        - If Join > to Execution topic
        - If not joint > continue the flow 
          - Multiple output (same than previous) 
            - to WorkerTaskResult topic (if found an end) 
            - to Execution & to WorkerTask topic (if found a next task)
            - to Execution topic (if detect an Execution end) 
    
    • WorkerTaskResult现在是一个KTable,因此我只保留结果的最后一个版本
    • 我有一个单路径流(不再有2个路径)输出到执行(我认为这是解决顺序混乱的最重要部分)
    • 整个系统似乎每个输入只有一个输出(execution上的一个新值将在execution主题上产生一个新值)

    以下是新的拓扑:

    Topologies:
       Sub-topology: 0
        Source: KSTREAM-SOURCE-0000000000 (topics: [kestra_execution])
          --> KTABLE-SOURCE-0000000001
        Processor: KTABLE-SOURCE-0000000001 (stores: [execution])
          --> KTABLE-TOSTREAM-0000000002, KTABLE-JOINTHIS-0000000007
          <-- KSTREAM-SOURCE-0000000000
        Source: KSTREAM-SOURCE-0000000004 (topics: [kestra_workertaskresult])
          --> KTABLE-SOURCE-0000000005
        Processor: KTABLE-SOURCE-0000000005 (stores: [workertaskresult])
          --> KTABLE-JOINOTHER-0000000008
          <-- KSTREAM-SOURCE-0000000004
        Processor: KTABLE-JOINOTHER-0000000008 (stores: [execution])
          --> KTABLE-MERGE-0000000006
          <-- KTABLE-SOURCE-0000000005
        Processor: KTABLE-JOINTHIS-0000000007 (stores: [workertaskresult])
          --> KTABLE-MERGE-0000000006
          <-- KTABLE-SOURCE-0000000001
        Processor: KTABLE-MERGE-0000000006 (stores: [])
          --> KTABLE-TOSTREAM-0000000009
          <-- KTABLE-JOINTHIS-0000000007, KTABLE-JOINOTHER-0000000008
        Processor: KTABLE-TOSTREAM-0000000009 (stores: [])
          --> KSTREAM-FILTER-0000000010, KSTREAM-FILTER-0000000015
          <-- KTABLE-MERGE-0000000006
        Processor: KSTREAM-FILTER-0000000015 (stores: [])
          --> KSTREAM-MAPVALUES-0000000016
          <-- KTABLE-TOSTREAM-0000000009
        Processor: KSTREAM-MAPVALUES-0000000016 (stores: [])
          --> KSTREAM-MAPVALUES-0000000017
          <-- KSTREAM-FILTER-0000000015
        Processor: KSTREAM-MAPVALUES-0000000017 (stores: [])
          --> KSTREAM-FLATMAPVALUES-0000000018, KSTREAM-FILTER-0000000024, KSTREAM-FILTER-0000000019, KSTREAM-MAPVALUES-0000000067
          <-- KSTREAM-MAPVALUES-0000000016
        Processor: KSTREAM-FLATMAPVALUES-0000000018 (stores: [])
          --> KSTREAM-FILTER-0000000042, KSTREAM-FILTER-0000000055, KSTREAM-FILTER-0000000030
          <-- KSTREAM-MAPVALUES-0000000017
        Processor: KSTREAM-FILTER-0000000042 (stores: [])
          --> KSTREAM-MAPVALUES-0000000043
          <-- KSTREAM-FLATMAPVALUES-0000000018
        Processor: KSTREAM-FILTER-0000000030 (stores: [])
          --> KSTREAM-MAPVALUES-0000000031
          <-- KSTREAM-FLATMAPVALUES-0000000018
        Processor: KSTREAM-FILTER-0000000055 (stores: [])
          --> KSTREAM-MAPVALUES-0000000056
          <-- KSTREAM-FLATMAPVALUES-0000000018
        Processor: KSTREAM-MAPVALUES-0000000043 (stores: [])
          --> KSTREAM-FILTER-0000000044, KSTREAM-FILTER-0000000050
          <-- KSTREAM-FILTER-0000000042
        Processor: KSTREAM-MAPVALUES-0000000031 (stores: [])
          --> KSTREAM-FILTER-0000000032, KSTREAM-FILTER-0000000038
          <-- KSTREAM-FILTER-0000000030
        Processor: KSTREAM-MAPVALUES-0000000056 (stores: [])
          --> KSTREAM-FILTER-0000000063, KSTREAM-FILTER-0000000057
          <-- KSTREAM-FILTER-0000000055
        Processor: KSTREAM-FILTER-0000000024 (stores: [])
          --> KSTREAM-MAPVALUES-0000000025
          <-- KSTREAM-MAPVALUES-0000000017
        Processor: KSTREAM-FILTER-0000000032 (stores: [])
          --> KSTREAM-MAPVALUES-0000000033
          <-- KSTREAM-MAPVALUES-0000000031
        Processor: KSTREAM-FILTER-0000000044 (stores: [])
          --> KSTREAM-MAPVALUES-0000000045
          <-- KSTREAM-MAPVALUES-0000000043
        Processor: KSTREAM-FILTER-0000000057 (stores: [])
          --> KSTREAM-MAPVALUES-0000000058
          <-- KSTREAM-MAPVALUES-0000000056
        Processor: KSTREAM-FILTER-0000000010 (stores: [])
          --> KSTREAM-MAPVALUES-0000000011
          <-- KTABLE-TOSTREAM-0000000009
        Processor: KSTREAM-FILTER-0000000019 (stores: [])
          --> KSTREAM-MAPVALUES-0000000020
          <-- KSTREAM-MAPVALUES-0000000017
        Processor: KSTREAM-FILTER-0000000050 (stores: [])
          --> KSTREAM-MAPVALUES-0000000051
          <-- KSTREAM-MAPVALUES-0000000043
        Processor: KSTREAM-MAPVALUES-0000000025 (stores: [])
          --> KSTREAM-FILTER-0000000026
          <-- KSTREAM-FILTER-0000000024
        Processor: KSTREAM-MAPVALUES-0000000033 (stores: [])
          --> KSTREAM-MAPVALUES-0000000034
          <-- KSTREAM-FILTER-0000000032
        Processor: KSTREAM-MAPVALUES-0000000045 (stores: [])
          --> KSTREAM-MAPVALUES-0000000046
          <-- KSTREAM-FILTER-0000000044
        Processor: KSTREAM-MAPVALUES-0000000058 (stores: [])
          --> KSTREAM-MAPVALUES-0000000059
          <-- KSTREAM-FILTER-0000000057
        Processor: KSTREAM-FILTER-0000000026 (stores: [])
          --> KSTREAM-FILTER-0000000027
          <-- KSTREAM-MAPVALUES-0000000025
        Processor: KSTREAM-FILTER-0000000038 (stores: [])
          --> KSTREAM-MAPVALUES-0000000039
          <-- KSTREAM-MAPVALUES-0000000031
        Processor: KSTREAM-FILTER-0000000063 (stores: [])
          --> KSTREAM-MAPVALUES-0000000064
          <-- KSTREAM-MAPVALUES-0000000056
        Processor: KSTREAM-MAPVALUES-0000000011 (stores: [])
          --> KSTREAM-FILTER-0000000012
          <-- KSTREAM-FILTER-0000000010
        Processor: KSTREAM-MAPVALUES-0000000020 (stores: [])
          --> KSTREAM-FILTER-0000000021
          <-- KSTREAM-FILTER-0000000019
        Processor: KSTREAM-MAPVALUES-0000000034 (stores: [])
          --> KSTREAM-FILTER-0000000035
          <-- KSTREAM-MAPVALUES-0000000033
        Processor: KSTREAM-MAPVALUES-0000000046 (stores: [])
          --> KSTREAM-FILTER-0000000047
          <-- KSTREAM-MAPVALUES-0000000045
        Processor: KSTREAM-MAPVALUES-0000000051 (stores: [])
          --> KSTREAM-FILTER-0000000052
          <-- KSTREAM-FILTER-0000000050
        Processor: KSTREAM-MAPVALUES-0000000059 (stores: [])
          --> KSTREAM-FILTER-0000000060
          <-- KSTREAM-MAPVALUES-0000000058
        Processor: KSTREAM-MAPVALUES-0000000067 (stores: [])
          --> KSTREAM-FILTER-0000000068
          <-- KSTREAM-MAPVALUES-0000000017
        Processor: KSTREAM-FILTER-0000000012 (stores: [])
          --> KSTREAM-PEEK-0000000013
          <-- KSTREAM-MAPVALUES-0000000011
        Processor: KSTREAM-FILTER-0000000021 (stores: [])
          --> KSTREAM-PEEK-0000000022
          <-- KSTREAM-MAPVALUES-0000000020
        Processor: KSTREAM-FILTER-0000000027 (stores: [])
          --> KSTREAM-PEEK-0000000028
          <-- KSTREAM-FILTER-0000000026
        Processor: KSTREAM-FILTER-0000000035 (stores: [])
          --> KSTREAM-PEEK-0000000036
          <-- KSTREAM-MAPVALUES-0000000034
        Processor: KSTREAM-FILTER-0000000047 (stores: [])
          --> KSTREAM-PEEK-0000000048
          <-- KSTREAM-MAPVALUES-0000000046
        Processor: KSTREAM-FILTER-0000000052 (stores: [])
          --> KSTREAM-PEEK-0000000053
          <-- KSTREAM-MAPVALUES-0000000051
        Processor: KSTREAM-FILTER-0000000060 (stores: [])
          --> KSTREAM-PEEK-0000000061
          <-- KSTREAM-MAPVALUES-0000000059
        Processor: KSTREAM-FILTER-0000000068 (stores: [])
          --> KSTREAM-PEEK-0000000069
          <-- KSTREAM-MAPVALUES-0000000067
        Processor: KSTREAM-MAPVALUES-0000000039 (stores: [])
          --> KSTREAM-FILTER-0000000040
          <-- KSTREAM-FILTER-0000000038
        Processor: KSTREAM-MAPVALUES-0000000064 (stores: [])
          --> KSTREAM-TRANSFORM-0000000065
          <-- KSTREAM-FILTER-0000000063
        Processor: KSTREAM-FILTER-0000000040 (stores: [])
          --> KSTREAM-SINK-0000000041
          <-- KSTREAM-MAPVALUES-0000000039
        Processor: KSTREAM-PEEK-0000000013 (stores: [])
          --> KSTREAM-SINK-0000000014
          <-- KSTREAM-FILTER-0000000012
        Processor: KSTREAM-PEEK-0000000022 (stores: [])
          --> KSTREAM-SINK-0000000023
          <-- KSTREAM-FILTER-0000000021
        Processor: KSTREAM-PEEK-0000000028 (stores: [])
          --> KSTREAM-SINK-0000000029
          <-- KSTREAM-FILTER-0000000027
        Processor: KSTREAM-PEEK-0000000036 (stores: [])
          --> KSTREAM-SINK-0000000037
          <-- KSTREAM-FILTER-0000000035
        Processor: KSTREAM-PEEK-0000000048 (stores: [])
          --> KSTREAM-SINK-0000000049
          <-- KSTREAM-FILTER-0000000047
        Processor: KSTREAM-PEEK-0000000053 (stores: [])
          --> KSTREAM-SINK-0000000054
          <-- KSTREAM-FILTER-0000000052
        Processor: KSTREAM-PEEK-0000000061 (stores: [])
          --> KSTREAM-SINK-0000000062
          <-- KSTREAM-FILTER-0000000060
        Processor: KSTREAM-PEEK-0000000069 (stores: [])
          --> KSTREAM-SINK-0000000070
          <-- KSTREAM-FILTER-0000000068
        Processor: KSTREAM-TRANSFORM-0000000065 (stores: [workertask_deduplication])
          --> KSTREAM-SINK-0000000066
          <-- KSTREAM-MAPVALUES-0000000064
        Processor: KTABLE-TOSTREAM-0000000002 (stores: [])
          --> log-executionStream
          <-- KTABLE-SOURCE-0000000001
        Sink: KSTREAM-SINK-0000000014 (topic: kestra_execution)
          <-- KSTREAM-PEEK-0000000013
        Sink: KSTREAM-SINK-0000000023 (topic: kestra_execution)
          <-- KSTREAM-PEEK-0000000022
        Sink: KSTREAM-SINK-0000000029 (topic: kestra_execution)
          <-- KSTREAM-PEEK-0000000028
        Sink: KSTREAM-SINK-0000000037 (topic: kestra_execution)
          <-- KSTREAM-PEEK-0000000036
        Sink: KSTREAM-SINK-0000000041 (topic: kestra_workertaskresult)
          <-- KSTREAM-FILTER-0000000040
        Sink: KSTREAM-SINK-0000000049 (topic: kestra_execution)
          <-- KSTREAM-PEEK-0000000048
        Sink: KSTREAM-SINK-0000000054 (topic: kestra_execution)
          <-- KSTREAM-PEEK-0000000053
        Sink: KSTREAM-SINK-0000000062 (topic: kestra_execution)
          <-- KSTREAM-PEEK-0000000061
        Sink: KSTREAM-SINK-0000000066 (topic: kestra_workertask)
          <-- KSTREAM-TRANSFORM-0000000065
        Sink: KSTREAM-SINK-0000000070 (topic: kestra_execution)
          <-- KSTREAM-PEEK-0000000069
        Processor: log-executionStream (stores: [])
          --> none
          <-- KTABLE-TOSTREAM-0000000002
    

    目前,我还不清楚该解决方案是否能够适应任何并发,以及是否能够在下一次发生顺序混乱(这意味着执行是在上一次发生的回滚,从而导致同一任务的多次执行)。

共有1个答案

霍建章
2023-03-14

KafkaStreams支持这种模式吗?

大体上是的。您只需要确保您不会以“无限循环”结束,即在某个点输入记录应该“终止”并且不再对输出主题产生任何内容。对于您的情况,执行最终将不再创建新的任务(通过反馈循环)。

将此流设计成并发安全的好方法是什么

总要看具体应用……对于您的情况,如果我正确理解了应用程序的设计,您基本上有两个输入主题(executionWorkerTaskResult)和两个输出主题(executionWorkerTask)。当处理输入主题时,来自每个输入的消息可以修改共享状态(即,任务的状态)。

此外,还有一个“外部应用程序”,它读取WorkerTask主题并写入WorkerTaskResult主题?因此,在整个数据流中实际上存在第二个循环?我假设还有其他上游应用程序也会将新数据推入execution主题?

                             +-----------------+
                             |                 |
                             v                 |
upstream producers ---> "Execution" --+        |
                                      |        |
                                      v        |  
                                      KS-App --+
                                      ^        |
                                      |        |
            +--> "WorkerTaskResult" --+        +--> "WorkerTask" --+
            |                                                      |
            +------------------------ outside app <----------------+

我不清楚的是ATM:

  • 哪些状态更改从KS-App直接传播回执行
  • 通过WorkerTaskResult从“外部应用程序”传播哪些状态更改?

也许你可以更新你的问题,我可以尝试相应地更新我的答案。

更新(基于编辑%1和%2)

你可以这么说。对于每个输入记录,table()运算符将输入的时间戳与表中当前条目的时间戳进行比较。如果输入记录具有较小的时间戳,则记录警告(仍将应用更新):警告的原因是,表只存储每个键的一个条目,并且表只希望在时间上向前移动。如果更新顺序不对,这可能会导致意外的结果,从而导致警告日志。每个分区使用一个生成器或每个键使用一个生成器将避免每个键的数据无序(假设生成器只发送有序数据)。

如果我完全了解您的应用程序的新版本,我不是100%肯定atm。但一般来说,您希望确保避免数据竞争,并使execution的更新线性化。

 类似资料:
  • 在Kafka中有没有办法使用XML源并将其转换为JSON,然后将JSON数据发送给Kafka进行接收? 我在《Kafka连接》中见过Avro和Protobuf作为转换器?他们能够将XML转换为JSON吗?或者他们会转换为AVRO、Protobuf特定格式而不是JSON?

  • 我有2个Kafka的主题流完全相同的内容从不同的来源,所以我可以有高可用性的情况下,其中一个来源失败。我正在尝试使用Kafka Streams0.10.1.0将2个主题合并为1个输出主题,这样我就不会错过任何关于失败的消息,并且当所有源都启动时没有重复的消息。 当使用KStream的方法时,其中一个主题可以毫无问题地关闭(次要主题),但是当主主题关闭时,将不会向输出主题发送任何内容。这似乎是因为,

  • 我使用处理器API创建kafka流媒体应用程序。 下面是我如何创建一个主题,将时间戳附加到所有传入消息 Kafka主题。sh--创建--zookeeper localhost:2181--复制因子1--分区1--主题topicName--配置消息。时间戳。类型=创建时间 工作流处理来自源主题的传入消息并将其发布到接收器主题。出于某种奇怪的原因,我在源主题和接收器主题消息中看到了相同的时间戳。例如,

  • 我在Kafka Streams拓扑工作,有时,在更改应用程序ID和/或clientId属性后,我在特定的kafka流上收到错误:“”。我已经在每个Kafka节点的server.properties中设置了属性,但似乎没有创建此流的主题。 这是我的Kafka Streams拓扑:

  • 在某些场景中,Apache Flink似乎无法很好地处理具有相同时间戳的两个事件。 根据文档,水印t表示任何新事件的时间戳都将严格大于t。除非您完全放弃两个事件具有相同时间戳的可能性,否则您将无法安全地发出t的水印。强制使用不同的时间戳还将系统每秒可处理的事件数限制为1000。 这真的是Apache Flink中的一个问题还是有解决方法? 对于那些希望使用具体示例的人,我的用例是为事件时间顺序流构

  • 在构建Kafka Streams拓扑时,可以通过两种不同的方式对多个主题的读取进行建模: 读取具有相同源节点的所有主题。 选项1相对于选项2是否有相对优势,反之亦然?所有主题都包含相同类型的数据,并具有相同的数据处理逻辑。