当前位置: 首页 > 知识库问答 >
问题:

创建具有相同执行元的actorpublisher和actorsubscriber

孙成益
2023-03-14

现在我需要处理错误,并通过一个错误处理程序将其推送到不同的kafka队列中。我正在尝试将EsHandler同时用作发布服务器和订阅服务器。我不确定如何包括EsHandler作为中间人而不是Sink。

这是我的代码:

val publisher = Kafka.kafka.consume(topic, "es", new StringDecoder())

val flow = Flow[String].map { elem => JsonConverter.convert(elem.toString()) }

val sink = Sink.actorSubscriber[GenModel](Props(classOf[EsHandler]))

Source(publisher).via(flow).to(sink).run()


class EsHandler extends ActorSubscriber with ActorPublisher[Model] {

  val requestStrategy = WatermarkRequestStrategy(100)

  def receive = {
    case OnNext(msg: Model) =>
      context.actorOf(Props(classOf[EsStorage], self)) ! msg

    case OnError(err: Exception) =>
      context.stop(self)

    case OnComplete =>
      context.stop(self)

    case Response(msg) =>
      if (msg.isError()) onNext(msg.getContent())
  }
}

class ErrorHandler extends ActorSubscriber {

  val requestStrategy = WatermarkRequestStrategy(100)

  def receive = {
    case OnNext(msg: Model) =>
      println(msg)
  }
}

共有1个答案

齐迪
2023-03-14

我们强烈建议不要实现您自己的处理器(这是reactive streams规范给“subscriber&&Publisher”起的名字。正确使用它是相当困难的,这就是为什么没有Publisher作为helper特性直接公开的原因。

相反,大多数情况下,您希望使用提供给您的/接收器(或发布服务器/订阅服务器),并在这些步骤之间运行操作,如映射/筛选等步骤。

事实上,您可以使用现有的Kafka源和汇实现,它称为reactive-kafka,并由Reactive Streams TCK验证,因此您可以相信它是有效的实现。

 类似资料:
  • 问题内容: 在Python 的列表中,以下代码给出此输出: 是否存在使用JavaScript中的数组执行此操作的简便方法? 我编写了以下函数来做到这一点,但是有没有更短或更短的东西呢? 问题答案: 您可以这样做: 它在每次迭代中将数组加倍,因此可以创建很少迭代的真正大数组。 注意:您还可以通过使用代替来改善您的功能,因为每次迭代都会创建一个新的数组。像这样(作为一个如何使用数组的示例显示):

  • 问题内容: 我的类中有很多div,并且我想使用jquery遍历它们以检查每个div是否满足特定条件。如果为真,则应执行一个操作。 有人知道我会怎么做吗? 问题答案: 使用每个:’ ‘是数组中的位置,是您要迭代的DOM对象(也可以通过jQuery包装器进行访问)。 检查api参考以获取更多信息。

  • 在我的身体里我有3

  • 我正在创建一个简单的社交图,用户可以在其中创建一个帖子,标记它,并对它进行评论。我用py2neo做模型。该模型具有和作为节点。用户在上、或。在我的例子中,单个用户可以在单个上创建多个或(就像其他任何社交网络一样)。根据我的模型,这需要多个或关系,但具有不同的属性。模型是这样建立的: 我运行以下操作来构建图形: 我希望有两个关系,如下所示: 但我看到事实并非如此: 那么,我的问题是双重的。(1)可以

  • 我有下面的xml结构: 有不同的 Field(X) 包装器元素,它们包含相同的用户元素。XML 中可以发送 n 个字段。因此,我不能为每个单独的Jaxb。我需要访问用户,但在取消编组步骤后忽略 Field 元素。不幸的是,我无权更改xml结构。我无法找到解决方案。任何指针都会有所帮助。

  • 问题内容: 我有一张桌子,像这样: 我想选择具有相同基因座和染色体的所有行。例如,第3行和第4行。一次可能有2个以上,并且它们可能不是按顺序排列的。 我尝试了这个: 但是,即使重复,它总是返回第3行,从不返回第4行。我想我缺少明显而简单的东西,但我茫然。 有人可以帮忙吗? 问题答案: 您需要了解,当您在查询中包含内容时,您是在告诉SQL合并行。您将为每个唯一值获得一行。在随后过滤这些组。通常,您可