合流模式注册表不使用Quarkus和微文件
以以下错误结束
***********ERROR
Caused by: io.confluent.common.config.ConfigException: Missing required configuration "schema.registry.url" which has no default value.
我尝试构建一个Quarkus kafka消费者应用程序,它必须对kafka主题中可用的avro序列化程序消息进行反序列化
配置
schema.registry.url=http://kafka-exposed:8081
kafka.bootstrap.servers=kafka-exposed:9200
# Configure the Kafka source (we read from it)
mp.messaging.incoming.test-quarkustest.group.id=demo
mp.messaging.incoming.test-quarkustest.connector=smallrye-kafka
mp.messaging.incoming.test-quarkustest.topic=kafkacollectioncomplex
mp.messaging.incoming.test-quarkustest.value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
mp.messaging.incoming.test-quarkustest.value.value.converter=io.confluent.connect.avro.AvroConverter
mp.messaging.incoming.test-
mp.messaging.incoming.test-quarkustest.value.converter.schemas.enable=true
也尝试如下
mp.messaging.incoming.test-quarkustest.value.converter.schema.registry.url=http://kafka-exposed:8081
***********ERROR
Caused by: io.confluent.common.config.ConfigException: Missing required configuration "schema.registry.url" which has no default value.
@ApplicationScoped
public class ReadKafkaTopic {
@Incoming("test-quarkustest")
public CompletionStage<Void> process(KafkaMessage<String, JsonObject> message) {
JsonObject data= message.getPayload();
JsonArray array = (JsonArray) data.getJsonArray("skills");
return message.ack();
}
}
性能
schema.registry.url=http://kafka-exposed:8081`
kafka.bootstrap.servers=kafka-exposed:9200`
# Configure the Kafka source (we read from it)
mp.messaging.incoming.test-quarkustest.group.id=demo`
mp.messaging.incoming.test-quarkustest.connector=smallrye-kafka`
mp.messaging.incoming.test-quarkustest.topic=kafkacollectioncomplex`
mp.messaging.incoming.test-quarkustest.value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer`
mp.messaging.incoming.test-quarkustest.value.value.converter=io.confluent.connect.avro.AvroConverter`
mp.messaging.incoming.test-quarkustest.value.converter.schemas.enable=true`
也尝试了以下方法
mp.messaging.incoming.test-quarkustest.value.converter.schema.registry.url=http://kafka-exposed:8081
***********ERROR
Caused by: io.confluent.common.config.ConfigException: Missing required configuration "schema.registry.url" which has no default value.
ERROR***********************
2019-10-27 19:08:11,974 ERROR [io.sma.rea.mes.imp.ConfiguredChannelFactory] (main) Unable to create the publisher or subscriber during initialization: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
76 at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:811)
77 at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:624)
78 at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:605)
79 at io.vertx.kafka.client.consumer.KafkaReadStream.create(KafkaReadStream.java:100)
80 at io.vertx.kafka.client.consumer.KafkaConsumer.create(KafkaConsumer.java:74)
81 at io.vertx.reactivex.kafka.client.consumer.KafkaConsumer.create(KafkaConsumer.java:168)
82 at io.smallrye.reactive.messaging.kafka.KafkaSource.<init>(KafkaSource.java:51)
83 at io.smallrye.reactive.messaging.kafka.KafkaConnector.getPublisherBuilder(KafkaConnector.java:65)
84 at io.smallrye.reactive.messaging.kafka.KafkaConnector_ClientProxy.getPublisherBuilder(KafkaConnector_ClientProxy.zig:203)
85 at io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory.createPublisherBuilder(ConfiguredChannelFactory.java:145)
86 at io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory.lambda$register$4(ConfiguredChannelFactory.java:123)
87 at java.util.HashMap.forEach(HashMap.java:1289)
88 at io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory.register(ConfiguredChannelFactory.java:123)
89 at io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory.initialize(ConfiguredChannelFactory.java:118)
90 at io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory_ClientProxy.initialize(ConfiguredChannelFactory_ClientProxy.zig:195)
91 at java.util.Iterator.forEachRemaining(Iterator.java:116)
92 at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
93 at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)
94 at io.smallrye.reactive.messaging.extension.MediatorManager.initializeAndRun(MediatorManager.java:132)
95 at io.smallrye.reactive.messaging.extension.MediatorManager_ClientProxy.initializeAndRun(MediatorManager_ClientProxy.zig:100)
96 at io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingLifecycle.onApplicationStart(SmallRyeReactiveMessagingLifecycle.java:20)
97 at io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingLifecycle_Observer_onApplicationStart_4e8937813d9e8faff65c3c07f88fa96615b70e70.notify(SmallRyeReactiveMessagingLifecycle_Observer_onApplicationStart_4e8937813d9e8faff65c3c07f88fa96615b70e70.zig:51)
98 at io.quarkus.arc.EventImpl$Notifier.notify(EventImpl.java:228)
99 at io.quarkus.arc.EventImpl.fire(EventImpl.java:69)
100 at io.quarkus.arc.runtime.LifecycleEventRunner.fireStartupEvent(LifecycleEventRunner.java:23)
101 at io.quarkus.arc.runtime.ArcRecorder.handleLifecycleEvents(ArcRecorder.java:103)
102 at io.quarkus.deployment.steps.LifecycleEventsBuildStep$startupEvent32.deploy_0(LifecycleEventsBuildStep$startupEvent32.zig:77)
103 at io.quarkus.deployment.steps.LifecycleEventsBuildStep$startupEvent32.deploy(LifecycleEventsBuildStep$startupEvent32.zig:36)
104 at io.quarkus.runner.ApplicationImpl1.doStart(ApplicationImpl1.zig:161)
105 at io.quarkus.runtime.Application.start(Application.java:94)
106 at io.quarkus.runtime.Application.run(Application.java:218)
107 at io.quarkus.runner.GeneratedMain.main(GeneratedMain.zig:41)
108 Caused by: io.confluent.common.config.ConfigException: Missing required configuration "schema.registry.url" which has no default value.
109 at io.confluent.common.config.ConfigDef.parse(ConfigDef.java:251)
110 at io.confluent.common.config.AbstractConfig.<init>(AbstractConfig.java:78)
111 at io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig.<init>(AbstractKafkaAvroSerDeConfig.java:105)
112 at io.confluent.kafka.serializers.KafkaAvroDeserializerConfig.<init>(KafkaAvroDeserializerConfig.java:41)
113 at io.confluent.kafka.serializers.KafkaAvroDeserializer.configure(KafkaAvroDeserializer.java:50)
114 at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:713)
115 ... 31 more
对我来说,它通过在kafka键下设置模式注册表url来工作:
kafka.schema.registry.url=http://localhost:8081
给定格式
mp.messaging.[outgoing|incoming].[channel-name].[attribute]=[value]
和属性架构.注册表.url
,你需要以下内容
mp.messaging.incoming.test-quarkustest.connector=smallrye-kafka
mp.messaging.incoming.test-quarkustest.connector=kafkacollectioncomplex
mp.messaging.incoming.test-quarkustest.bootstrap.servers=localhost:9092
# Setup Avro
mp.messaging.incoming.test-quarkustest.value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
mp.messaging.incoming.test-quarkustest.schema.registry.url=http://localhost:8081
在创建新消费者之前,它将在内部调用kafkaConfiguration.put(属性,值)
在任何情况下,JsonObject
听起来都不是Avro子类,因此您也必须更改应用程序代码
问题内容: 我正在阅读一本有关html开发的书(我还比较陌生),尽管该书一个月前(2011年11月)才刚刚出版,但作者是一位经验丰富的编码人员,也许可以用于行动形式是老派? 因为我试图获得示例代码的要点,但是尽管进行了搜索,却找不到解释。 在Google,SO和www.w3schools.com上。 有人知道该动作对表格意味着什么吗? 问题答案: 动作通常指定提交表单的文件/页面(使用方法参数中所
在OracleJava文档中提到 请查看以下代码: } 如果 现在如果
我有一个oracle视图,我在其中查询我的数据库。 我使用< code>jpa 2.1和< code>hibernate 4.3.7将视图映射到实体。我的实体类如下所示: 我的映射 xml 看起来像这样: 因此,我使用jpa正确查询我的实体,它返回我的所有记录。问题是,当我异步更改数据库中的数据时,令人震惊的是,我的jpa查询返回以前的记录。我做错了什么吗?
问题内容: 为什么我们需要特定于数据库的功能,例如mysql_real_escape_string()?addlashes()不能做什么? 暂时忽略了参数化查询的高级替代方案,是一个仅使用addlashes()的web应用仍然容易受到SQL注入的攻击,如果是,怎么办? 问题答案: 当处理多字节编码的字符串时,加号通常不够好。
本文向大家介绍PHP 在做,包括了PHP 在做的使用技巧和注意事项,需要的朋友参考一下 示例 do-while 循环首先在每种情况下执行一次代码块,然后在指定条件为真的情况下循环遍历该代码块。 有关详细信息,请参见“循环”主题。
如何验证 Email 地址是否有效 一般来说,你不能。有一些看起来合理的方法可以使用,但却没有办法检测地址 是否实际可以投递,如果没有实际尝试投递的话。 使用正则表达式: # Match basically blah@blah.blah if ( $addr =~ /^\S+\@\S+\.\S+$/ ) { print "Looks OK"; } 如果你干真活的话,可能希望看看 CPAN