当前位置: 首页 > 知识库问答 >
问题:

使用Kafka与Micronaut

公孙茂学
2023-03-14

是否有任何示例项目展示了如何将Kafka与Micronaut结合使用?我很难让它工作。

我有以下制片人:

@KafkaClient
interface AppClient {

@Topic("topic-name")
    void sendMessage(@KafkaKey String id, Event event)
}

听众:

@KafkaListener(
    groupId="group-id",
    offsetReset = OffsetReset.EARLIEST
)
class AppListener {

@Topic("topic-name")
    void onMessage(Event event) {
        // do stuff
    }
}

我的申请。yml包含:

kafka:
  bootstrap:
    servers: localhost:2181

以及应用测试。yml(这是正确的吗?它应该与application.yml位于同一目录中吗?还不确定嵌入式服务器应该如何使用):

kafka:
  #  embedded:
  #    enabled: true
  #    topics: promo-api-promotions
  bootstrap:
    servers: localhost:9092

我的测试结果如下:

@MicronautTest
class AppSpec extends Specification {

@Shared
@AutoCleanup
EmbeddedServer server = ApplicationContext.run(EmbeddedServer)

@Shared
private AppClient appClient =
        server.applicationContext.getBean(AppClient)

def 'The upload endpoint is called'() {
  // test here
  appClient.sendMessage(id, event)
  // other test stuff
}

我面临的主要问题是:

>

  • 我的消费者没有从我的主题中消费。我可以看到制作者在Kafka中创建了主题,并且创建了客户端组,但是偏移量保持在0。

    当测试启动时,我遇到了问题,看起来好像创建了两个客户端实例,因此MBean注册失败(此外,如果我尝试使用嵌入式Kafka,我会收到另一条消息,关于端口9092已经在使用,因为它尝试启动服务器两次):

    javax.management.InstanceAlreadyExistsExc0019:kafka.consumer: type=app-info, id=app-kafka-client-app-listener atcom.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)atcom.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerAnd Repository(DefaultMBeanServerInterceptor.java:1898

  • 共有2个答案

    商风华
    2023-03-14

    您应该添加嵌入式配置kafka。嵌入的已启用,并将其传递到ApplicationContext。运行方法。

    Map<String, Object> config = Collections.
        unmodifiableMap(new HashMap<String, Object>() {
            {
                put(AbstractKafkaConfiguration.EMBEDDED, true);
                put(AbstractKafkaConfiguration.EMBEDDED_TOPICS, "test_topic");
        }
    });
    
    try (ApplicationContext ctx = ApplicationContext.run(config)) {
    

    消费者在另一个线程中使用Kafka的内容,你必须等待一段时间,直到你的Applister赶上。你可以在KafkaProducerListenerTest中看到一个简短的例子

    还记得Micronaut文档:嵌入Kafka中描述的Kafka依赖项吗

    蒋胡非
    2023-03-14

    设法解决了第二个问题——传入侦听器的对象没有@JsonCreator。我发现这一点是通过在玩游戏时尝试使用Jackson对象映射器从JSON构建对象。

    如果其他人有同样的问题——在继续前进之前,确保对象模型与杰克逊一起工作!

     类似资料:
    • 我们正在使用普罗米修斯一段时间,真的很享受它。 关于什么是jmx导出器的几句话 jmx-exporter是一个程序,它从基于JVM的应用程序(例如Java和Scala)读取JMX数据,并通过HTTP以Prometheus理解并可以抓取的简单文本格式公开它。 所以让我们开始讨论我的问题… 我们使用jmx导出器配置了kafka,如下所示 该配置在kakfa配置下的ambari中设置 设置配置后,我们重

    • 我想有一个客户端应用程序与请求/响应语义学调用另一个应用程序,这是一个Kafka流应用程序。 我的客户端应用程序基于此示例(基本上没有变化)。我需要从客户端接收消息的应用程序是Kafka Streams应用程序。但是包含相关id的消息头丢失。 Kafka Streams应用程序是一个简单的拓扑结构,用于测试此。。。 对于这个POC,我保持它的简单性,让客户机和服务器“同意”主题名称(和)。所以在这

    • 我想知道是否有一种简单的方法来连接Kafka和Netflix导体(而不是SQS)?目前,它似乎只适用于Amazon SQS。此外,似乎只能按任务执行一个操作。有没有办法按任务执行多个操作? 提前感谢,

    • 我正在尝试使用Github中jcustenborder的Kafka Connect with Kafka-Connect-Twitter将Twitter tweets引入Kafka。说明书上说: 导出类路径行实际上不起作用,运行时不返回任何内容。在kafka-connect-twitter存储库中运行mvn clean package之后,connect avro docker属性文件似乎想要使用

    • 我已经做了一些Kafka流应用程序和Kafka消费者应用程序。最后,Kafka流不是什么,而是消费来自Kafka的实时事件的消费者。所以我不知道什么时候使用Kafka流,或者为什么我们应该使用Kafka流,因为我们可以在消费者端执行所有转换。

    • 我在Databricks上阅读下面的博客 https://databricks.com/blog/2015/03/30/improvements-to-kafka-integration-of-spark-streaming.html 在解释spark-kafka集成如何使用WAl接收器的过程时,它说 1.Kafka数据由在火花工作线程/执行程序中运行的Kafka接收器持续接收。这使用了Kafka