当前位置: 首页 > 知识库问答 >
问题:

从akka stream到fs2的旅程——如何使用http4s在fs2中定义akka stream http flow-like阶段

符修杰
2023-03-14

我正在努力加深对fs2的了解,并想尝试fs2Kafka的一个用例,在这个用例中我将取代akka stream。想法很简单,从Kafka读取数据,通过http请求将数据发布到接收器,然后在成功后提交回Kafka。到目前为止,我还不能真正理解http部分。在akka stream/akka http中,您有一个现成的流https://doc.akka.io/docs/akka-http/current/client-side/host-level.html#using-a-host-connection-pool

Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool]

与akka stream完美融合。

我试着看看我是否可以用http4s和fs2做类似的事情。

有没有人有任何参考资料、代码示例、博客之类的东西来展示如何进行这种集成。到目前为止,我唯一能想到的是,将流包装到客户端资源的使用方法中,即

BlazeClientBuilder[IO](IORuntime.global.compute).resource.use { ..... run stream here ..... }

即使这样,我也不确定整件事

共有1个答案

赫连秦迟
2023-03-14

典型级生态系统的特点是,一切都只是一个库,你不需要例子来说明它们中有多少相互作用,你只需要了解每个库是如何工作的以及组成的基本规则。

def createClient(/** whatever arguments you need */): Resource[IO, Client[IO]] = {
  // Fill this based on the documentation of the client of your choice:
  // I would recommend the ember client from http4s:
  // https://http4s.org/v0.23/api/org/http4s/ember/client/emberclientbuilder 
}


def sendHttpRequest(client: Client[IO])(data: Data): IO[Result] = {
  // Fill this based on the documentation of your client:
  // https://http4s.org/v0.23/client/
  // https://http4s.org/v0.23/api/org/http4s/client/client
}

def getStreamOfRecords(/** whatever arguments you need */): Stream[IO, CommittableConsumerRecord[IO, Key, Data]] = {
  // Fill this based on the documentation of fs2-kafka:
  // https://fd4s.github.io/fs2-kafka/docs/consumers
}

def program(/** whatever arguments you need */): Stream[IO, Unit] = {
  // Based on the documentation of fs2 and fs2-kafka I would guess something like this:
  Stream.fromResource(createClient(...)).flatMap { client =>
    getStreamOfRecords(...).evalMapFilter { committable =>
      sendHttpRequest(client)(data = committable.record).map { result =>
        if (result.isSuccess) Some(committable.offset)
        else None
      }
    }.through(commitBatchWithin(...))
  }
}

object Main extends IOApp.Simple {
  override final val run: IO[Unit] =
    program(...).compile.drain
}

请注意,这些都是我在脑海中写的,只需快速浏览一下文档,您就需要更改很多内容(尤其是类型,比如Data

 类似资料:
  • 我已经创建了一个Micronaut(在Kotlin中)来将映射转换为cron字符串,用于,但是我无法让Micronaut进行转换。 作为一个示例,我希望支持以下配置: 我还创建了一个@PropertiesConfiguration类,将这个属性定义为一个字符串(我想这是告诉Micronaut该属性实际上是什么类型所必需的),并将它注入到一个bean中,以确保使用了所有内容(只是为了确保,它没有什么

  • http4s 是一个小型的 Scala 接口,用于处理 HTTP 服务。相当于 Ruby 的 Rack、Python 的 WSGI、Haskell 的 WAI 和 Java 的 Servlet。 示例代码: // Make your model safe and streaming by using a scalaz-stream Processdef getData(req: Request):

  • 我正在尝试创建一个定制的Jenkins管道,它将多个Jenkins作业生成的工件绑定在一起。每个作业都在不同的存储库上运行(基于它们自己的文件)。这些Jenkins文件的阶段完成了创建档案、编译代码等任务。 有人可能会说,我可以使用单个作业中的工件,然后将它们绑定在一起,但问题是这个高级管道将在特定的 ,因此它无法凭空创建存档。 由于我是詹金斯管道公司的新手,您对如何解决这个问题有什么建议吗? 最

  • 问题内容: 我如何在hibernate中使用。我想在文件中使用SQL 。我有2个查询,我要合并为1个。 查询如下所示: 我也试过了。我使用了不带like子句的方法,它起作用。但是不起作用。 包装的异常: 这就是我传递参数的方式: 试过|| 并得到以下异常: 问题答案: 也许user_id不是char / varchar?您必须先将带有str()的user_id转换为字符数据! 例:

  • 问题内容: 如何在like子句中转义通配符? 例如: 有任何想法吗? 问题答案: 在Hibernate 3中,可以使用escape参数指定转义字符: 我认为这应该可行,尽管我从未在实践中尝试过。