Lagom Framework参考指南(六)

常乐
2023-12-01

1.Decouple services with a message broker(使用消息代理解耦服务)

1.1 Message Broker Support(消息代理的支持)

当一个服务需要另一个服务所拥有的数据时,有两种主要的策略来获取这些所需的数据:
(1)服务可以向拥有它所需数据的服务请求数据,并等待数据被发送回它。这是一个同步通信模式。
(2)对系统进行架构以使得:当数据被某一服务持有,而另外一个服务却需要这个数据的时候,我们可以将数据发布到一个基础设置组件中,它会以一个我们预先定义的时间区间来存储这些数据。这个额外的组件允许在不同的时间进行发布和消费,从而有效地解耦服务,从而使服务能够异步地进行通信。
    、
仅依赖于微服务之间的同步通信,这是一种架构上的坏味道。一个微服务架构中有许许多多的活动的部分,这也就意味着我们有很大的可能会失败。“同步”这个词的字面意思是“同时发生”,同步通信意味着发送者和接收者必须同时运行。这意味着在面对失败时,同步通信也会失败。如果消息丢失,这可能导致一致性问题,并可能最终演变为一个脆弱的系统,即:其中一个组件的失败会导致整个系统的失败。
解决方案是避免同步通信,而是以异步通信的方式来架构您的系统。如前所述,可以使用基础设施组件来使服务异步地进行通信。此组件通常称为消息代理。可以作为消息代理使用的各种技术,如 Google Cloud Pub/Sub, Kafka, 还有RabbitMQ.
Lagom允许服务以同步和异步的方式轻松地进行通信。这两种通信策略都有它们的用途,但是您应该尽可能使用异步通信架构您的微服务系统。为了帮助您这样做,Lagom提供了一个Message Broker API,它对特定的Message Broker技术进行抽象,并使服务能够异步地共享数据而变得非常简单。目前,Lagom只支持使用Kafka的Message Broker API的实现,但是其他实现在将来可能会出现(我还是喜欢rabbitMQ,希望早点出来吧)。

1.2 Message Broker API(消息代理API)

Lagom的Message Broker API(消息代理API)提供了一个分布式发布-订阅模型,服务可以通过主题共享数据。一个主题只是一个允许服务推送和拉取数据的通道。在Lagom中,主题是强类型的,因此订阅者和生产者都可以提前知道预期的数据将会是什么。
下面我们来简单看看消息代理API的使用

1.2.1 Declaring a topic(声明一个主题)

要将数据发布到主题,服务需要在其服务描述符中声明主题。
import com.lightbend.lagom.javadsl.api.*;
import com.lightbend.lagom.javadsl.api.broker.Topic;

import static com.lightbend.lagom.javadsl.api.Service.*;

public interface HelloService extends Service {
  String GREETINGS_TOPIC = "greetings";
  @Override
  default Descriptor descriptor() {
    return named("helloservice").withCalls(
        pathCall("/api/hello/:id",  this::hello),
        pathCall("/api/hello/:id", this::useGreeting)
      )
      // here we declare the topic(s) this service will publish to
      .withTopics(
        topic(GREETINGS_TOPIC, this::greetingsTopic)
      )
      .withAutoAcl(true);
  }
  // The topic handle
  Topic<GreetingMessage> greetingsTopic();

  ServiceCall<NotUsed, String> hello(String id);
  ServiceCall<GreetingMessage, Done> useGreeting(String id);
}
声明主题的语法类似于已经学习过的定义服务端点的语法。 Descriptor.withTopics方法接受一个主题调用序列,每个主题调用都可以通过Service.topic静态方法。后者取一个主题名称(即主题的唯一标识符),以及一个返回topic实例的方法的引用。
默认情况下,通过主题传递的数据被序列化为JSON。当然,可以使用不同的序列化格式,您可以通过在服务描述符中定义的每个主题传递一个不同的消息序列化器来实现。例如,使用上面的服务定义,下面是如何通过一个定制的序列化器:
    topic("greetings",this::greetingsTopic).withMessageSerializer(<your-custom-serializer>)

1.2.1.1 Partitioning topics(分区主题)

Kafka将在多个分区之间分配特定主题的消息,以便主题可以伸缩。发送到不同分区的消息可能会被处理掉,因此,如果您发布的消息的顺序是重要的,那么您需要确保消息以这样一种顺序被保存。通常,这意味着确保特定实体的每条消息都被发送到相同的分区。
Lagom允许您配置一个分区键策略,该策略从消息中提取分区键。然后,Kafka将使用这个键来帮助决定将每个消息发送到哪个分区。它会通过您所传递给他的PartitionKeyStrategy来决定消息被传到那个分区。
return named("blogpostservice")
        .withTopics(
                topic("blogposts", this::blogPostEvents)
                    .withProperty(KafkaProperties.partitionKeyStrategy(),
                            BlogPostEvent::getPostId)
        );

1.2.2 Implementing a topic(实现一个主题)

Lagom设计用来生成的消息的主要来源是持久实体事件。与其以一种特别的方式发布事件来响应特定的事件,不如从您的持久化实体中获取事件流,并将其调整为发送到message broker(消息代理)的消息流。通过这种方式,您可以确保至少有一次由发布者和消费者处理事件,这样您就可以保证在整个系统中有非常强的一致性。
Lagom的 TopicProducer工具类提供了两种方法来发布持久实体事件流:singleStreamWithOffset用于使用非共享的读侧事件流,而 taggedStreamWithOffset用于使用共享的读侧事件流。这两种方法都需要一个回调函数,它用来获取主题生产者发布的最后一个消息的偏移量,并且允许通过 获得读流的PersistentEntityRegistry.eventStream方法可以从该偏移量恢复事件流。eventStream方法来获得一个读边流。有关读端流的更多细节,请参见Persistent Read-Side's.
在使用singleStreamWithOffset的情况下,Lagom将会确保您的主题生成器只在集群的一个节点上运行;而使用taggedStreamWithOffset的情况下,那么则会将标签均匀地分布在集群中,以分发发布负载。
这里有一个关于发布一个单一的非共享事件流的例子:
public Topic<GreetingMessage> greetingsTopic() {
    return TopicProducer.singleStreamWithOffset(offset -> {
        return persistentEntityRegistry
                .eventStream(HelloEventTag.INSTANCE, offset)
                .map(this::convertEvent);
    });
}
请注意,当服务启动时,传递给主题生成器的读端事件流将“激活”。这意味着您的服务持久化的所有事件最终将被发布到连接的主题。此外,您通常希望将您的领域事件映射到其他类型,以便其他服务不会与另一个服务的领域模型事件紧密耦合。换句话说,领域模型事件是服务的一个实现细节,不应该泄漏。

1.2.2.1 Offset storage(偏移量存储)

Lagom将使用您配置的持久性API提供程序来为事件流存储偏移量。要阅读更多关于偏移存储的内容,请参阅Cassandra offset文档、JDBC数据库偏移文档和JPA数据库偏移文档(建议读者去官网自行查看)。

1.2.3 Subscribe to a topic(订阅一个主题)

要订阅主题,服务只需要调用主题上的Topic.subscribe()。例如,假设一个服务想要收集HelloService发布的所有问候消息(请参阅上面的代码片段)。您应该做的第一件事是注入HelloService。
private final HelloService helloService;

@Inject
public AnotherServiceImpl(HelloService helloService) {
    this.helloService = helloService;
}
然后,订阅“问候”主题,并将你的逻辑连接到每个发布到主题的消息上。
helloService.greetingsTopic()
    .subscribe() // <-- you get back a Subscriber instance
    .atLeastOnce(Flow.fromFunction((GreetingMessage message) -> {
        return doSomethingWithTheMessage(message);
    }));
当调用 Topic.subscribe()方法时候会返回一个Subscriber对象,在上面的代码片段中,我们使用“至少一次”传递语义订阅了“greetings ”主题。这意味着发布到“greetings ”主题的每个消息至少会接收一次,但可能更多。订阅方还提供了一个“atMostOnceSource”,它给您提供了“最多一次”的传递语义。如果拿不准,那么就使用“至少一次”语义。
最后,订阅者通过 Subscriber.withGroupId组合成一个订阅者组。订阅者组允许集群中的许多节点都可以使用消息流,同时确保每个消息仅由集群中的每个节点处理一次。如果没有订阅者组,那么对特定服务的所有节点都将获取流中的所有消息,从而导致它们的处理被复制。默认情况下,Lagom将使用与使用该主题的服务同名的组id。

1.2.4 Polymorphic event streams(多态事件流)

通常,您将希望向某个特定主题发布多个类型的事件。这可以通过创建每个事件实现的接口来实现。为了成功地将这些事件序列化和从JSON反序列化,需要一些额外的注释来指导Jackson描述和消费生成的JSON中的事件的类型。
例如,考虑一种情况,你有一个博客文章创建的事件和一个博客发布的事件。这是你的事件结构的样子:
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.annotation.JsonTypeName;

@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type",
        defaultImpl = Void.class)
@JsonSubTypes({
        @JsonSubTypes.Type(BlogPostEvent.BlogPostCreated.class),
        @JsonSubTypes.Type(BlogPostEvent.BlogPostPublished.class)
})
public interface BlogPostEvent {

    String getPostId();

    @JsonTypeName("created")
    final class BlogPostCreated implements BlogPostEvent {
        private final String postId;
        private final String title;

        @JsonCreator
        public BlogPostCreated(String postId, String title) {
            this.postId = postId;
            this.title = title;
        }

        public String getPostId() {
            return postId;
        }

        public String getTitle() {
            return title;
        }
    }

    @JsonTypeName("published")
    final class BlogPostPublished implements BlogPostEvent {
        private final String postId;

        @JsonCreator
        public BlogPostPublished(String postId) {
            this.postId = postId;
        }

        public String getPostId() {
            return postId;
        }
    }
}
@JsonTypeInfo注解描述了事件的类型是如何序列化的。在本例中,它表示每个事件类型将由其名称标识,该名称将进入称为type的属性。每个事件子类的@JsonTypeName表示事件的名称应该是什么。@JsonTypeName注解用于告诉Jackson事件的可能的子类型是什么,这样它就知道在反序列化时应该注意的地方。
下面是BlogPostCreated 事件产生的JSON:
{
  "type": "created",
  "postId": "1234",
  "title": "Some title"
}
但是BlogPostPublished 发布事件的JSON看起来像这样:
{
  "type": "published",
  "postId": "1234",
}
最后呢,注意@JsonSubTypes注解的defaultImpl = Void.class。这告诉Jackson,如果它遇到一个事件类型,它不承认它的名称,将其反序列化为null。这是可选的,但对于确保服务中向前兼容性非常重要,如果服务添加了它发布的新事件子类,通常您希望您的使用该事件流的现有服务忽略掉它。这样设置将允许他们这样做,否则,您将不得不升级所有使用该事件流的服务,在您升级生成事件的生产者之前显式地忽略它。

1.3 Lagom的Kafka Client

Lagom提供了使用Kafka消息代理API的实现。在剩下的部分中,您将学习如何在构建中添加依赖,以及如何配置和调优主题的发布者和订阅者。

1.3.1 依赖

要使用这个特性,可以在项目的构建中添加以下内容。
Maven

<dependency>
    <groupId>com.lightbend.lagom</groupId>
    <artifactId>lagom-javadsl-kafka-broker_${scala.binary.version}</artifactId>
    <version>${lagom.version}</version>
</dependency>

Sbt

libraryDependencies += lagomJavadslKafkaBroker

当导入了Lagom的代理模块时,牢牢记住该模块需要一个Lagom Persistence 的实现模块,所以要保证还要依赖Lagom Persistence Cassandra 或者Lagom Persistence JDBC。

1.3.2 配置

Lagom的Kafka客户端的实现是使用akka-stream-kafka来构建的。akka-stream-kafka包包装了官方的Apache Java Kafka client,并公开一个(Akka)基于流的API来发布/消费Kafka的消息。因此,实际上有三个库在运行,每个库都公开自己的配置。让我们来探索每一层所暴露的配置,从最顶上的那个开始,也就是 Lagom Kafka Client。

 类似资料: