当前位置: 首页 > 知识库问答 >
问题:

在java.util.concurrent中找到。未来需要scala.concurrent.Future

施英哲
2023-03-14

相关:java.util.concurrent.Future的scala.concurrent.Future包装器

这来自我的另一个问题:

如何将akka streams Kafka(reactive-Kafka)集成到akka http应用中?

我有一个AKKA HTTP应用程序,我想在路由中的onComplete函数中向Kafka发送消息/产品记录,如下所示:

val producer : KafkaProducer = new KafkaProducer(producerSettings)

val routes : Route = 
  post {
    entity(as[User]) { user =>
      val createUser = userService.create(user)
      onSuccess(createUser) {
        case Invalid(y: NonEmptyList[Err]) =>  
          complete(BadRequest -> "invalid user")
        case Valid(u: User) => { 
          val producerRecord = 
            new ProducerRecord[Array[Byte], String]("topic1","some message")

          onComplete(producer.send(producerRecord)) { _ =>
            complete(ToResponseMarshallable((StatusCodes.Created, u)))
          }
        }
      }
    }
  }

但是,on complete(producer send producer record)正在生成以下类型不匹配错误:

[错误]已找到:未来[org.apache.kafka.clients.producer.RecordMetadata](java.util.concurrent)[错误]必填:未来[org.apache.kafka.clients.producer.RecordMetadata](scala.concurrent)[错误]onCompleteRecordMetadata{_=

有没有办法解决这个问题,也许可以把制作人当作水槽(http://doc.akka.io/docs/akka-stream-kafka/current/producer.html#producer-as-a-sink)而不是java生产者。发送函数?

共有2个答案

敖涵容
2023-03-14

为了回答您的特定问题,scala-java8-compat库提供了java8和Scala Futures之间的转换器。

具体来说,您可以使用 FutureConverters.toScala(producer.send(producerRecord))java.util.concurrent.Future 转换为 scala.concurrent.Future

但是,使用本身具有 Scala 友好 API 的客户端库(如上面的 Stefano 所建议的那样)可能会为您带来最佳结果。

凌轶
2023-03-14

你可以利用Cake的基于Scala的Kafka客户端,它将完成运行Java期货的工作,并给你Scala期货。一旦你确保你创建了一个 cakesolutions.kafka.KafkaProducer 而不是 org.apache.kafka.clients.producer.KafkaProducer,你的代码的其余部分实际上应该保持不变。

或者,您可以利用反应式Kafka解决这个问题,同时继续使用高级Akka HTTP DSL。你可以通过将你的制作人记录运行到一个Kafka接收器来实现,如下所示:

val producerSink = Producer.plainSink(producerSettings)

...
        // inside the route
        val producerRecord =
          new ProducerRecord[Array[Byte], String]("topic1", "some message")

        onComplete(Source.single(producerRecord).runWith(producerSink)) { _ =>
          complete(ToResponseMarshallable((StatusCodes.Created, u)))
        }
 类似资料:
  • 有相当多的:https://vertx.io/docs/apidocs/io/vertx/core/Future.html 使用Vertx的示例如下: 我的印象是,与,但它似乎没有。正如您所看到的,告诉Vertx Future失败的方法是调用它的fail()方法。 另一方面,我们有完全未来,这是一个实现的接口:https://docs.oracle.com/javase/8/docs/api/ja

  • 我有一个场景,其中我得到一个String消息列表,我必须遍历String并调用另一个方法,这是一个长时间运行的过程。然后我必须收集这个长时间运行过程的结果并连接结果并将其发送回用户交互界面。我对Scala中的这些未来概念很陌生。我正在使用Play框架,其中字符串列表将来自用户交互界面。这是我第一次尝试实现ht场景的样子: 为简单起见,long RunningCall将只返回一个字符串。稍后我将把它

  • 问题内容: 我想在中找到此链接“我们的状态”的元素。我在craigslist中尝试此操作。任何帮助将不胜感激 这是网址:http : //auburn.craigslist.org/ 问题答案: 在您的情况下,仅使用类名是不够的。 有15个匹配节点 有11个匹配节点 有5个匹配节点 因此,您需要更多限制来缩小范围。下面的 选项1和2 可用于CSS选择器,其中1可能是最适合您的需求的选择器。 选项1

  • 我正在尝试用Spring Boot开发一个CRUD web应用程序。我将Hibernate用于我的DAO部分。当我尝试午餐我的主应用程序,我收到以下错误消息: 接口客户端DAO: 类ClienteDAOImpl 类ClienteController: pom.xml:

  • 问题内容: 无法下载任何pythonWindows模块并安装。我想尝试使用Scrapy框架和无堆栈框架,但由于错误“需要python版本2.6,但在注册表中找不到”而无法安装。 尝试将其安装到 Windows 7、64位计算机 问题答案: 对新观看者的警告:该答案已有好几年历史了(线索是它将Windows 7描述为“新”)。到现在(2014年),大多数Python库都应该支持64位。但是,如果仍然

  • 问题内容: 我有一个。如果用户第二次输入相同的数字,我想向用户显示。为此,我需要找到它。 我希望我能说清楚。 问题答案: 如果要检查是否在中存储了某些值,则可以使用该方法,如果对象在列表中,则将返回该方法,否则。