当前位置: 首页 > 知识库问答 >
问题:

设计Kafka消费者和生产者以实现可扩展性

庄萧迟
2023-03-14

我想设计一个解决方案,用于向多个提供商发送不同类型的电子邮件。总体概述。

我有几个上游提供商Sendgrid、Zoho、Mailgun等。它们将用于发送电子邮件等。例如:

  • 注册新用户的电子邮件
  • 删除用户的电子邮件
  • 空间配额限制的电子邮件

(一般大约有6种类型的电子邮件)

每种类型的电子邮件都应该生成到生产者中,转换为序列化Java对象,并发送到与上游提供商集成的适当的Kafka消费者。

问题是如何设计Kafka以获得最大的性能和可扩展性?

>

  • 迄今为止,我所能想到的第一个解决方案是为每种类型的电子邮件和每个网关设置主题(6x4=24个主题)。在将来,我希望添加更多类型的消息和网关。可能会达到600个主题。这将产生大量用于维护的Java源代码和许多需要管理的主题。另一个不利因素是Kafka的日志将非常庞大。

    第二个解决方案是为每个消费者使用一个主题(集成网关)。但在这种情况下,如何根据要发送的消息类型发送每种类型的不同序列化Java对象?

    是否有更好的方法来设计此设置,以便我可以更轻松地扩展它,并使其对未来的集成非常健壮?

    您可以在这里看到我如何在消费者和生产者之间发送消息:org。阿帕奇。Kafka。常见的KafkaException:SaleRequestFactory类不是org的实例。阿帕奇。Kafka。常见的序列化。序列化程序

    编辑:

    1. 订单很重要,因为通信是异步的。生产者将等待返回的状态消息
    2. 将每个网关的数据保存在不同的主题上并不重要
    3. 你想要什么样的隔离?我希望将消息/主题彼此完全隔离,以防止将来需要添加更多网关或消息类型时出错

    将每个网关的数据保存在不同的主题上对您来说重要吗?-不,我只想隔离数据。

    如果您将使用每个网关的单个主题,您是否关心它在客户端造成的开销?-读取不必要的消息、编写更多逻辑、混合序列化器等

    我不知道这里。我的主要目的是使系统易于扩展新功能。

  • 共有2个答案

    邹涵畅
    2023-03-14

    我认为对于您提到的操作开销来说,每个事件类型一个主题确实太多了。

    选项2我认为将是正确的方式-每个集成一个主题-网关,具有专用的消费者。优点是:

    • 您在主题级别隔离工作负载(集成网关A上的许多消息不会影响网关B的消费者)
    • 您可以根据主题工作负载扩展消费者

    生产者将根据网关的要求序列化消息,并在特定主题上发布消息。消费者只需阅读并推送信息。

    鲁英卫
    2023-03-14

    不幸的是,这里没有简单的答案。
    您需要问自己几个问题,并从几个权衡中进行选择-

    首先,秩序重要吗?您只是想从A点转发到B点的电子邮件吗?,或者,您是否希望(我想您会)为同一实体保留合理的事件顺序(例如,在发送更改密码的同一新用户的邮件之前,需要先收到关于用户创建的邮件。)

    如果顺序很重要,最好将同一主题与分区键一起使用,因为Kafka保证只在分区级别对消息排序。

    您想要什么样的隔离?将每个网关的数据保存在不同的主题上对您来说很重要吗?
    如果您使用每个网关的单个主题,您是否关心它在客户端造成的开销?-读取不必要的消息,编写更多逻辑、混合序列化程序等

    你能估计你会扩展哪些维度吗?-如果你愿意使用第一个解决方案,每个网关的主题

    ConFluent几乎没有关于这些主题的好文章可以帮助你-

    你应该在同一个Kafka主题中加入几种事件类型吗?

    如何选择Kafka集群中的主题/分区数量?

     类似资料:
    • 我没有使用Spring Kafka模块来生成和使用消息。相反,我在生产者和消费者实现中使用Apache客户端库。由于我没有使用Spring Kafka,因此Spring Slueth自动配置不适用于生成跟踪。我已经提到https://docs.spring.io/spring-cloud-sleuth/docs/current-SNAPSHOT/reference/html/integration

    • 在这种情况下,我是否需要求助于Kafka事务API来在消费者轮询循环中创建事务生产者,在该循环中,我在事务中执行:(1)处理消耗的记录和(2)在关闭事务之前提交它们的偏移量。在这种情况下,普通的commitsync/commitasync是否有效?

    • 本教程演示了如何发送和接收来自Spring Kafka的消息。 首先创建一个能够发送消息给Kafka主题的Spring Kafka Producer。 接下来,我们创建一个Spring Kafka Consumer,它可以收听发送给Kafka主题的消息。使用适当的键/值序列化器和解串器来配置它们。 最后用一个简单的Spring Boot应用程序演示应用程序。 下载并安装Apache Kafka 要

    • 我目前正在开发Kafka模块,我正在使用Kafka通信的抽象。我能够集成生产者 Spring Boot测试类 监听器类 我的问题是:在测试类中,我断言分区、有效负载等是从BlockingQueue轮询的,然而,我的问题是如何验证用KafkaListener注释的类中的业务逻辑是否得到正确执行,并根据错误处理和其他业务场景将消息路由到不同的主题。在一些示例中,我看到了CountDownLatch的断

    • 向Kafka推送100,000条消息 在使用者使用所有100,000条消息之前,使用Ctrl-C关闭zookeeper和kafka服务(这是通过在consumer方法中使用来模拟的)。 发现 在zookeeper和kafka服务被关闭后,消费者继续在控制台上写消息。 问题 我如何使消费者从上次消费的消息的索引+1继续。 向Kafka推送100,000条消息 在使用者使用所有100,000条消息之前

    • 我正在创建一个系统,其中前端服务将消息推送到Kafka请求主题,并为一些下游后端消费者(实际上是一个最终推送回Kafka的复杂系统)监听另一个响应主题,以处理请求消息并最终推进到“回应”话题。 我试图找出最优雅的方法来确保消费者监听适当的分区并收到响应,并且后端推送到前端消费者正在监听的分区。我们总是需要确保响应到达产生初始消息的同一个消费者。 到目前为止,我有两种解决方案,但都不是特别令人满意的