有人能帮我解决正确连接2个流的问题吗,其中键和值为GenericRecord
。首先,正如您所看到的,我用AVRO模式为键和值创建了两个主题。之后,我加入了两个流,在输出主题中,我创建了新的genericrecord
,一个所谓的带有投影模式的投影记录,我得到了如下代码片段所示的异常:
@Test
public void joinKStreamToKStreamWhereKeyValueIsGenericRecordGenericRecord() throws Exception {
String methodName = new Object() {
}.getClass().getEnclosingMethod().getName();
long timestamp = new Date().getTime();
String firstTopic = String.format("%1$s_1_%2$s", methodName, timestamp);
String secondTopic = String.format("%1$s_2_%2$s", methodName, timestamp);
String outputTopic = String.format("%1$s_output_%2$s", methodName, timestamp);
String firstStorage = String.format("%1$s_store_1_%2$s", methodName, timestamp);
String secondStorage = String.format("%1$s_store_2_%2$s", methodName, timestamp);
String appIdConfig = String.format("%1$s_app_id_%2$s", methodName, timestamp);
String groupIdConfig = String.format("%1$s_group_id_%2$s", methodName, timestamp);
String schemaIdNamespace = String.format("%1$s_id_ns_%2$s", methodName, timestamp);
String schemaNameNamespace = String.format("%1$s_name_ns_%2$s", methodName, timestamp);
String schemaScopeNamespace = String.format("%1$s_scope_ns_%2$s", methodName, timestamp);
String schemaProjectionNamespace = String.format("%1$s_proj_ns_%2$s", methodName, timestamp);
String schemaIdRecord = String.format("%1$s_id_rec_%2$s", methodName, timestamp);
String schemaNameRecord = String.format("%1$s_name_rec_%2$s", methodName, timestamp);
String schemaScopeRecord = String.format("%1$s_scope_rec_%2$s", methodName, timestamp);
String schemaProjectionRecord = String.format("%1$s_proj_rec_%2$s", methodName, timestamp);
try {
Integer partitions = 1;
Integer replication = 1;
Properties topicConfig = new Properties();
RestUtils.createTopic(firstTopic, partitions, replication, topicConfig);
RestUtils.createTopic(secondTopic, partitions, replication, topicConfig);
RestUtils.createTopic(outputTopic, partitions, replication, topicConfig);
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appIdConfig);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);
streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_CONNECT_CONFIG);
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams/"); //TestUtils.tempDirectory().getAbsolutePath());
streamsConfiguration.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL_CONFIG);
Serializer kafkaAvroSerializer = new KafkaAvroSerializer();
kafkaAvroSerializer.configure(streamsConfiguration, false);
Deserializer kafkaAvroDeserializer = new KafkaAvroDeserializer();
kafkaAvroDeserializer.configure(streamsConfiguration, false);
Serde<GenericRecord> avroSerde = Serdes.serdeFrom(kafkaAvroSerializer, kafkaAvroDeserializer);
//-----
Schema idSchema = SchemaBuilder.record(schemaIdRecord).namespace(schemaIdNamespace).fields()
.name("Id").type().nullable().intType().noDefault()
.endRecord();
Schema nameSchema = SchemaBuilder.record(schemaNameRecord).namespace(schemaNameNamespace).fields()
.name("Id").type().nullable().intType().noDefault()
.name("Name").type().nullable().stringType().noDefault()
.endRecord();
Schema scopeSchema = SchemaBuilder.record(schemaScopeRecord).namespace(schemaScopeNamespace).fields()
.name("Scope").type().nullable().stringType().noDefault()
.endRecord();
Schema projectionSchema = SchemaBuilder.record(schemaProjectionRecord).namespace(schemaProjectionNamespace).fields()
.name("Id").type().nullable().intType().noDefault()
.name("Name").type().nullable().stringType().noDefault()
.name("Scope").type().nullable().stringType().noDefault()
.endRecord();
GenericRecord idRecord1 = new GenericData.Record(idSchema);
idRecord1.put("Id", 1);
GenericRecord idRecord2 = new GenericData.Record(idSchema);
idRecord2.put("Id", 2);
GenericRecord idRecord3 = new GenericData.Record(idSchema);
idRecord3.put("Id", 3);
GenericRecord idRecord4 = new GenericData.Record(idSchema);
idRecord4.put("Id", 4);
GenericRecord nameRecord1 = new GenericData.Record(nameSchema);
nameRecord1.put("Id", 1);
nameRecord1.put("Name", "Bruce Eckel");
GenericRecord nameRecord2 = new GenericData.Record(nameSchema);
nameRecord2.put("Id", 2);
nameRecord2.put("Name", "Robert Lafore");
GenericRecord nameRecord3 = new GenericData.Record(nameSchema);
nameRecord3.put("Id", 3);
nameRecord3.put("Name", "Andrew Tanenbaum");
GenericRecord nameRecord4 = new GenericData.Record(nameSchema);
nameRecord4.put("Id", 4);
nameRecord4.put("Name", "Programming in Scala");
GenericRecord scopeRecord1 = new GenericData.Record(scopeSchema);
scopeRecord1.put("Scope", "Modern Operating System");
GenericRecord scopeRecord2 = new GenericData.Record(scopeSchema);
scopeRecord2.put("Scope", "Thinking in Java");
GenericRecord scopeRecord3 = new GenericData.Record(scopeSchema);
scopeRecord3.put("Scope", "Computer Architecture");
GenericRecord scopeRecord4 = new GenericData.Record(scopeSchema);
scopeRecord4.put("Scope", "Programming in Scala");
List<KeyValue<GenericRecord, GenericRecord>> list1 = Arrays.asList(
new KeyValue<>(idRecord1, nameRecord1),
new KeyValue<>(idRecord2, nameRecord2),
new KeyValue<>(idRecord3, nameRecord3)
);
List<KeyValue<GenericRecord, GenericRecord>> list2 = Arrays.asList(
new KeyValue<>(idRecord3, scopeRecord1),
new KeyValue<>(idRecord1, scopeRecord2),
new KeyValue<>(idRecord3, scopeRecord3),
new KeyValue<>(idRecord4, scopeRecord4)
);
GenericRecord projectionRecord1 = new GenericData.Record(projectionSchema);
projectionRecord1.put("Id", nameRecord1.get("Id"));
projectionRecord1.put("Name", nameRecord1.get("Name"));
projectionRecord1.put("Scope", scopeRecord1.get("Scope"));
GenericRecord projectionRecord2 = new GenericData.Record(projectionSchema);
projectionRecord2.put("Id", nameRecord2.get("Id"));
projectionRecord2.put("Name", nameRecord2.get("Name"));
projectionRecord2.put("Scope", scopeRecord2.get("Scope"));
GenericRecord projectionRecord3 = new GenericData.Record(projectionSchema);
projectionRecord3.put("Id", nameRecord3.get("Id"));
projectionRecord3.put("Name", nameRecord3.get("Name"));
projectionRecord3.put("Scope", scopeRecord3.get("Scope"));
List<KeyValue<GenericRecord, GenericRecord>> expectedResults = Arrays.asList(
new KeyValue<>(idRecord3, projectionRecord3),
new KeyValue<>(idRecord1, projectionRecord1),
new KeyValue<>(idRecord3, projectionRecord3)
);
//-----
KStreamBuilder builder = new KStreamBuilder();
KStream<GenericRecord, GenericRecord> firstStream = builder.stream(avroSerde, avroSerde, firstTopic);
KStream<GenericRecord, GenericRecord> secondStream = builder.stream(avroSerde, avroSerde, secondTopic);
KStream<GenericRecord, GenericRecord> outputStream = firstStream.join(secondStream,
new ValueJoiner<GenericRecord, GenericRecord, GenericRecord>() {
@Override
public GenericRecord apply(GenericRecord l, GenericRecord r) {
GenericRecord projectionRecord = new GenericData.Record(projectionSchema);
projectionRecord.put("Id", l.get("Id"));
projectionRecord.put("Name", l.get("Name"));
projectionRecord.put("Scope", r.get("Scope"));
return projectionRecord;
}
}, JoinWindows.of(TimeUnit.SECONDS.toMillis(5)), avroSerde, avroSerde, avroSerde);
outputStream.to(avroSerde, avroSerde, outputTopic);
KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
streams.start();
Properties cfg1 = new Properties();
cfg1.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);
cfg1.put(ProducerConfig.ACKS_CONFIG, "all");
cfg1.put(ProducerConfig.RETRIES_CONFIG, 0);
cfg1.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
cfg1.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
cfg1.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL_CONFIG);
IntegrationTestUtils.produceKeyValuesSynchronously(firstTopic, list1, cfg1);
Properties cfg2 = new Properties();
cfg2.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);
cfg2.put(ProducerConfig.ACKS_CONFIG, "all");
cfg2.put(ProducerConfig.RETRIES_CONFIG, 0);
cfg2.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
cfg2.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
cfg2.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL_CONFIG);
IntegrationTestUtils.produceKeyValuesSynchronously(secondTopic, list2, cfg2);
Properties consumerConfig = new Properties();
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, groupIdConfig);
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
consumerConfig.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL_CONFIG);
List<KeyValue<GenericRecord, GenericRecord>> actualResults = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, outputTopic, expectedResults.size());
streams.close();
//-----
assertThat(actualResults).containsExactlyElementsOf(expectedResults);
//-----
} finally {
RestUtils.deleteTopics(firstTopic, secondTopic, outputTopic);
}
}
StackTrace:
Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=joinKStreamToKStreamWhereKeyValueIsGenericRecordGenericRecord_1_1490264134172, partition=0, offset=0
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:641)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
Caused by: org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: {"type":"record","name":"joinKStreamToKStreamWhereKeyValueIsGenericRecordGenericRecord_name_rec_1490264134172","namespace":"joinKStreamToKStreamWhereKeyValueIsGenericRecordGenericRecord_name_ns_1490264134172","fields":[{"name":"Id","type":["int","null"]},{"name":"Name","type":["string","null"]}]}
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema io.confluent.kafka.schemaregistry.rest.exceptions.RestIncompatibleAvroSchemaException: Schema being registered is incompatible with an earlier schema
io.confluent.kafka.schemaregistry.rest.exceptions.RestIncompatibleAvroSchemaException: Schema being registered is incompatible with an earlier schema
at io.confluent.kafka.schemaregistry.rest.exceptions.Errors.incompatibleSchemaException(Errors.java:63)
at io.confluent.kafka.schemaregistry.rest.resources.SubjectVersionsResource.register(SubjectVersionsResource.java:166)
at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory$1.invoke(ResourceMethodInvocationHandlerFactory.java:81)
at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:144)
at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:161)
at org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$VoidOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:143)
at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:99)
at org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:389)
at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:347)
at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:102)
at org.glassfish.jersey.server.ServerRuntime$2.run(ServerRuntime.java:308)
at org.glassfish.jersey.internal.Errors$1.call(Errors.java:271)
at org.glassfish.jersey.internal.Errors$1.call(Errors.java:267)
at org.glassfish.jersey.internal.Errors.process(Errors.java:315)
at org.glassfish.jersey.internal.Errors.process(Errors.java:297)
at org.glassfish.jersey.internal.Errors.process(Errors.java:267)
at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:317)
at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:291)
at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:1140)
at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:403)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:386)
at org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:548)
at org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:489)
at org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:426)
at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1652)
at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:585)
at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:221)
at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1127)
at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:515)
at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:185)
at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1061)
at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:110)
at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97)
at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:159)
at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97)
at org.eclipse.jetty.server.Server.handle(Server.java:499)
at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:310)
at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:257)
at org.eclipse.jetty.io.AbstractConnection$2.run(AbstractConnection.java:540)
at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:635)
at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:555)
at java.lang.Thread.run(Thread.java:745)
Caused by: io.confluent.kafka.schemaregistry.exceptions.IncompatibleSchemaException: New schema is incompatible with an earlier schema.
at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.register(KafkaSchemaRegistry.java:369)
at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.registerOrForward(KafkaSchemaRegistry.java:391)
at io.confluent.kafka.schemaregistry.rest.resources.SubjectVersionsResource.register(SubjectVersionsResource.java:154)
... 44 more
; error code: 409
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:170)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:187)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:238)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:230)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:225)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:59)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:91)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:72)
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:54)
at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:176)
at org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:109)
at org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:101)
at org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:65)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:70)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:197)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:641)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
Caused by: org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: {"type":"record","name":"joinKStreamToKStreamWhereKeyValueIsGenericRecordGenericRecord_name_rec_1490264134172","namespace":"joinKStreamToKStreamWhereKeyValueIsGenericRecordGenericRecord_name_ns_1490264134172","fields":[{"name":"Id","type":["int","null"]},{"name":"Name","type":["string","null"]}]}
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema io.confluent.kafka.schemaregistry.rest.exceptions.RestIncompatibleAvroSchemaException: Schema being registered is incompatible with an earlier schema
io.confluent.kafka.schemaregistry.rest.exceptions.RestIncompatibleAvroSchemaException: Schema being registered is incompatible with an earlier schema
关键线路是:
注册失败的架构是:
Error registering Avro schema:{
"type":"record",
"name":"joinKStreamToKStreamWhereKeyValueIsGenericRecordGenericRecord_name_rec_1490264134172",
"namespace":"joinKStreamToKStreamWhereKeyValueIsGenericRecordGenericRecord_name_ns_1490264134172",
"fields":[
{
"name":"Id",
"type":[
"int",
"null"
]
},
{
"name":"Name",
"type":[
"string",
"null"
]
}
]
}
我有一个基于Spring cloud Streams的Kafka Streams应用程序,我将全局KTable绑定到一个紧凑的主题。当我将Tombstone记录推到主题(非空键,值为null)时-我的Kafka streams应用程序失败,出现反序列化异常。失败是因为我的反序列化程序不处理空记录。 从文档中,我认为GlobalKTable甚至不会“看到”空值记录。难道不是这样吗?我需要在反序列化程
我在编写从接收器子任务到输出Kafka主题的键控流时遇到了问题。 作业的形式为:source- 异常来自kafka生产者,并导致检查点超时: > FlinkKafkaException:未能向Kafka发送数据:自批创建以来,mytopic-11:120000毫秒的16条记录已过期 作业在出现上述异常的情况下进入崩溃循环,偶尔会在再次崩溃循环之前短暂恢复。我认为这里的问题是,我正在使用这些键来确定
问题内容: 考虑这个简单的程序。该程序有两个文件: 和HelloWorld.java: 正如您在第一堂课中所看到的,我在其中想抛出异常的地方添加了一条注释(“ // throw exception”)。我必须为异常定义自己的类,还是可以在Java中使用一些常规的异常类? 问题答案: 您可以创建自己的Exception类: 在您的代码中:
我正在尝试加入两个Kafka主题的两个数据流。 每个主题都有一个key值对,其中key是整数数据类型,value包含字符串格式的json。来自这两个源的数据类似于下面的示例(key、value): 现在我正尝试基于ProductID左联接这两个流,因此所有这些记录的键都设置为ProductID。但不幸的是,我在连接的正确流值中不断得到空值。甚至连一条记录都没有正确连接。下面是我加入这两个记录的代码
给定:我在Kafka中有两个主题,假设主题A和主题B。Kafka流从主题A中读取一条记录,对其进行处理,并产生与所消耗记录相对应的多条记录(假设recordA和recordB)。现在的问题是我如何使用Kafka流来实现这一点。 在这里,读取的记录是消息;处理之后,它返回一个消息列表。如何将此列表划分为两个生产者流?任何帮助都将不胜感激。
我使用长序列化器作为键,使用字符串序列化器作为值,在我们检索到消息时将消息发布到kafka主题,并且将key和key一起视为垃圾值,如下所示 Kafka制作人配置有问题吗? 更新: 以下生产者配置 和下面的发送呼叫