当前位置: 首页 > 知识库问答 >
问题:

理解Kafka民意测验(),flush()

陆卓
2023-03-14

我刚接触Kafka,正在为我的新应用程序尝试一些小用例。用例基本上是Kafka制作人-

当消费时(步骤2),下面是步骤的顺序...1.消费者。轮询(1.0)1. a.产生多个主题(多个水槽代理正在监听)1.b。产生。轮询()2。每25个msgs刷新()3。提交()每个msgs(asynchCommit=false)

问题1:这个动作顺序对吗!?!

问题2:这会导致数据丢失吗?因为刷新是每25毫秒一次,提交是每一毫秒一次?!?

问题3:生产者和消费者之间的区别?

问题4:当消息已提交但未刷新时会发生什么!?!

如果有人能帮助我理解生产者/消费者之间的偏移示例,我将非常感激。

提前谢谢!!

共有1个答案

翟渝
2023-03-14

让我们先简单地了解一下Kafka:

什么是Kafka制作人:

t.turner@devs:~/developers/softwares/kafka_2.12-2.2.0$ bin/kafka-console-producer.sh --broker-list 100.102.1.40:9092,100.102.1.41:9092 --topic company_wallet_db_v3-V3_0_0-transactions
>{"created_at":1563415200000,"payload":{"action":"insert","entity":{"amount":40.0,"channel":"INTERNAL","cost_rate":1.0,"created_at":"2019-07-18T02:00:00Z","currency_id":1,"direction":"debit","effective_rate":1.0,"explanation":"Voucher,"exchange_rate":null,expired","id":1563415200,"instrument":null,"instrument_id":null,"latitude":null,"longitude":null,"other_party":null,"primary_account_id":2,"receiver_phone":null,"secondary_account_id":362,"sequence":1,"settlement_id":null,"status":"success","type":"voucher_expiration","updated_at":"2019-07-18T02:00:00Z","primary_account_previous_balance":0.0,"secondary_account_previous_balance":0.0}},"track_id":"a011ad33-2cdd-48a5-9597-5c27c8193033"}
[2019-07-21 11:53:37,907] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 7 : {company_wallet_db_v3-V3_0_0-transactions=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

您可以忽略警告。它显示为Kafka找不到主题并自动创建主题。

让我们看看kafka是如何存储这个信息的:

制作人在代理服务器中的/kafka logs(对于apache kafka)或/kafka cf data(对于合流版本)创建一个目录

drwxr-xr-x   2 root root  4096 Jul 21 08:53 company_wallet_db_v3-V3_0_0-transactions-0

cd到此目录中,然后列出文件。您将看到存储实际数据的. log文件:

-rw-r--r--   1 root root 10485756 Jul 21 08:53 00000000000000000000.timeindex
-rw-r--r--   1 root root 10485760 Jul 21 08:53 00000000000000000000.index
-rw-r--r--   1 root root        8 Jul 21 08:53 leader-epoch-checkpoint
drwxr-xr-x   2 root root     4096 Jul 21 08:53 .
-rw-r--r--   1 root root      762 Jul 21 08:53 00000000000000000000.log

如果打开日志文件,您将看到:

^@^@^@^@^@^@^@^@^@^@^Bî^@^@^@^@^B<96>T<88>ò^@^@^@^@^@^@^@^@^Al^S<85><98>k^@^@^Al^S<85><98>kÿÿÿÿÿÿÿÿÿÿÿÿÿÿ^@^@^@^Aö
^@^@^@^Aè
{"created_at":1563415200000,"payload":{"action":"insert","entity":{"amount":40.0,"channel":"INTERNAL","cost_rate":1.0,"created_at":"2019-07-18T02:00:00Z","currency_id":1,"direction":"debit","effective_rate":1.0,"explanation":"Voucher,"exchange_rate":null,expired","id":1563415200,"instrument":null,"instrument_id":null,"latitude":null,"longitude":null,"other_party":null,"primary_account_id":2,"receiver_phone":null,"secondary_account_id":362,"sequence":1,"settlement_id":null,"status":"success","type":"voucher_expiration","updated_at":"2019-07-18T02:00:00Z","primary_account_previous_balance":0.0,"secondary_account_previous_balance":0.0}},"track_id":"a011ad33-2cdd-48a5-9597-5c27c8193033"}^@

让我们了解消费者将如何投票和读取记录:

什么是Kafka民调:

Kafka为分区中的每条记录维护一个数字偏移量。该偏移量充当该分区内记录的唯一标识符,还表示使用者在分区中的位置。例如,位于位置5的消费者已经消费了偏移量为0到4的记录,接下来将收到偏移量为5的记录。实际上,与消费者的用户相关的位置有两个概念:消费者的位置给出下一条记录的偏移量。它将比消费者在该分区中看到的最高偏移量大一倍。每次消费者在call to poll(长)中收到消息时,它都会自动前进。

因此,轮询将一个持续时间作为输入,读取该持续时间的00000000000000000000.log文件,并将其返回给消费者。

何时删除邮件:

Kafka负责信息的刷新。有两种方法:

  1. 基于时间:默认为7天。可以使用log.retention.ms=1680000
  2. 更改
  3. 基于大小:可以设置为log.retention.bytes=10487500

现在让我们看看消费者:

t.turner@devs:~/developers/softwares/kafka_2.12-2.2.0$ bin/kafka-console-consumer.sh --bootstrap-server 100.102.1.40:9092 --topic company_wallet_db_v3-V3_0_0-transactions --from-beginning
{"created_at":1563415200000,"payload":{"action":"insert","entity":{"amount":40.0,"channel":"INTERNAL","cost_rate":1.0,"created_at":"2019-07-18T02:00:00Z","currency_id":1,"direction":"debit","effective_rate":1.0,"explanation":"Voucher,"exchange_rate":null,expired","id":1563415200,"instrument":null,"instrument_id":null,"latitude":null,"longitude":null,"other_party":null,"primary_account_id":2,"receiver_phone":null,"secondary_account_id":362,"sequence":1,"settlement_id":null,"status":"success","type":"voucher_expiration","updated_at":"2019-07-18T02:00:00Z","primary_account_previous_balance":0.0,"secondary_account_previous_balance":0.0}},"track_id":"a011ad33-2cdd-48a5-9597-5c27c8193033"}
^CProcessed a total of 1 messages

上面的命令指示使用者从偏移=0读取。Kafka为这个控制台使用者分配一个group_id,并维护这个group_id读取的最后一个偏移量。因此,它可以将更新的消息推送到这个消费者组

什么是Kafkapromise:

提交是告知Kafka消费者已成功处理的消息的一种方式。这可以被认为是在组id:current_offset 1之间更新查找。您可以使用consumer对象的commitSync()或commitSync()方法来管理它。

参考:https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html

 类似资料:
  • 我在这里查看了Confluent Kafka库中的消费者实现,感觉它们在功能上是相同的,只是在返回的内容方面有所不同。 Poll()调用consumer()来查看是否有消息准备好要拾取,如果是,则调用OnMessage事件。versour,consumer,将消息保存在它的一个参数中,并返回一个布尔值。我觉得不同之处在于实现上,功能上是相同的https://github.com/confluent

  • 问题内容: 我有一个简单的字符设备驱动程序,可让您从自定义硬件设备中读取。它使用DMA将数据从设备内存复制到内核空间(然后由用户决定)。 该调用非常简单。它开始DMA写操作,然后在等待队列中等待。DMA完成后,中断处理程序将设置一个标志并唤醒等待队列。需要注意的重要一点是, 即使在设备要提供数据之前 ,我也可以随时启动DMA 。DMA引擎将坐下等待,直到有要复制的数据为止。这很好。我可以在用户空间

  • 我正在尝试使用PollRich获得JPA实体 但是在那之后,尽管表包含数百行,但我只得到一行。如何获取所有行?我想要像往常一样的polEnrich行为,它给我所有的表行。

  • 我想告诉Kafka我的消费者何时成功处理了一条记录,因此我通过设置提交到false。我有两条关于我订阅的主题的消息,偏移量分别为0和1,并创建了一个消费者,因此每次调用最多返回一条记录(通过将设置为1)。 我现在调用并收到第一条消息,但我不承认它;我不调用

  • 在本章中,我们将研究如何在Drupal中创建Poll module 。 此模块可帮助您为您的网站创建民意调查。 您可以提出问题,提供任意数量的答案,您的访问者也可以投票。 以下是创建Poll Module的步骤。 Step 1 - 单击菜单栏中的“ Module ”。 Step 2 - 启用“ Poll module ,然后单击“ Save Configuration 。 Step 3 - 单击“

  • 本文向大家介绍Kafka的流处理是什么意思?相关面试题,主要包含被问及Kafka的流处理是什么意思?时的应答技巧和注意事项,需要的朋友参考一下 答:连续、实时、并发和以逐记录方式处理数据的类型,我们称之为Kafka流处理。