是否有任何示例项目展示了如何将Kafka与Micronaut结合使用?我很难让它工作。
我有以下制片人:
@KafkaClient
interface AppClient {
@Topic("topic-name")
void sendMessage(@KafkaKey String id, Event event)
}
听众:
@KafkaListener(
groupId="group-id",
offsetReset = OffsetReset.EARLIEST
)
class AppListener {
@Topic("topic-name")
void onMessage(Event event) {
// do stuff
}
}
我的申请。yml包含:
kafka:
bootstrap:
servers: localhost:2181
以及应用测试。yml(这是正确的吗?它应该与application.yml位于同一目录中吗?还不确定嵌入式服务器应该如何使用):
kafka:
# embedded:
# enabled: true
# topics: promo-api-promotions
bootstrap:
servers: localhost:9092
我的测试结果如下:
@MicronautTest
class AppSpec extends Specification {
@Shared
@AutoCleanup
EmbeddedServer server = ApplicationContext.run(EmbeddedServer)
@Shared
private AppClient appClient =
server.applicationContext.getBean(AppClient)
def 'The upload endpoint is called'() {
// test here
appClient.sendMessage(id, event)
// other test stuff
}
我面临的主要问题是:
>
我的消费者没有从我的主题中消费。我可以看到制作者在Kafka中创建了主题,并且创建了客户端组,但是偏移量保持在0。
当测试启动时,我遇到了问题,看起来好像创建了两个客户端实例,因此MBean注册失败(此外,如果我尝试使用嵌入式Kafka,我会收到另一条消息,关于端口9092已经在使用,因为它尝试启动服务器两次):
javax.management.InstanceAlreadyExistsExc0019:kafka.consumer: type=app-info, id=app-kafka-client-app-listener atcom.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)atcom.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerAnd Repository(DefaultMBeanServerInterceptor.java:1898
您应该添加嵌入式配置kafka。嵌入的已启用
,并将其传递到ApplicationContext。运行方法。
Map<String, Object> config = Collections.
unmodifiableMap(new HashMap<String, Object>() {
{
put(AbstractKafkaConfiguration.EMBEDDED, true);
put(AbstractKafkaConfiguration.EMBEDDED_TOPICS, "test_topic");
}
});
try (ApplicationContext ctx = ApplicationContext.run(config)) {
消费者在另一个线程中使用Kafka的内容,你必须等待一段时间,直到你的Applister赶上。你可以在KafkaProducerListenerTest中看到一个简短的例子
还记得Micronaut文档:嵌入Kafka中描述的Kafka依赖项吗
设法解决了第二个问题——传入侦听器的对象没有@JsonCreator。我发现这一点是通过在玩游戏时尝试使用Jackson对象映射器从JSON构建对象。
如果其他人有同样的问题——在继续前进之前,确保对象模型与杰克逊一起工作!
我们正在使用普罗米修斯一段时间,真的很享受它。 关于什么是jmx导出器的几句话 jmx-exporter是一个程序,它从基于JVM的应用程序(例如Java和Scala)读取JMX数据,并通过HTTP以Prometheus理解并可以抓取的简单文本格式公开它。 所以让我们开始讨论我的问题… 我们使用jmx导出器配置了kafka,如下所示 该配置在kakfa配置下的ambari中设置 设置配置后,我们重
我想有一个客户端应用程序与请求/响应语义学调用另一个应用程序,这是一个Kafka流应用程序。 我的客户端应用程序基于此示例(基本上没有变化)。我需要从客户端接收消息的应用程序是Kafka Streams应用程序。但是包含相关id的消息头丢失。 Kafka Streams应用程序是一个简单的拓扑结构,用于测试此。。。 对于这个POC,我保持它的简单性,让客户机和服务器“同意”主题名称(和)。所以在这
我想知道是否有一种简单的方法来连接Kafka和Netflix导体(而不是SQS)?目前,它似乎只适用于Amazon SQS。此外,似乎只能按任务执行一个操作。有没有办法按任务执行多个操作? 提前感谢,
我正在尝试使用Github中jcustenborder的Kafka Connect with Kafka-Connect-Twitter将Twitter tweets引入Kafka。说明书上说: 导出类路径行实际上不起作用,运行时不返回任何内容。在kafka-connect-twitter存储库中运行mvn clean package之后,connect avro docker属性文件似乎想要使用
我已经做了一些Kafka流应用程序和Kafka消费者应用程序。最后,Kafka流不是什么,而是消费来自Kafka的实时事件的消费者。所以我不知道什么时候使用Kafka流,或者为什么我们应该使用Kafka流,因为我们可以在消费者端执行所有转换。
我在Databricks上阅读下面的博客 https://databricks.com/blog/2015/03/30/improvements-to-kafka-integration-of-spark-streaming.html 在解释spark-kafka集成如何使用WAl接收器的过程时,它说 1.Kafka数据由在火花工作线程/执行程序中运行的Kafka接收器持续接收。这使用了Kafka