当前位置: 首页 > 知识库问答 >
问题:

Kafka在以键值作为一般记录的联接2流上引发异常

谭翔
2023-03-14

有人能帮我解决正确连接2个流的问题吗,其中键和值为GenericRecord。首先,正如您所看到的,我用AVRO模式为键和值创建了两个主题。之后,我加入了两个流,在输出主题中,我创建了新的genericrecord,一个所谓的带有投影模式的投影记录,我得到了如下代码片段所示的异常:

@Test
public void joinKStreamToKStreamWhereKeyValueIsGenericRecordGenericRecord() throws Exception {
    String methodName = new Object() {
    }.getClass().getEnclosingMethod().getName();

    long timestamp = new Date().getTime();

    String firstTopic = String.format("%1$s_1_%2$s", methodName, timestamp);
    String secondTopic = String.format("%1$s_2_%2$s", methodName, timestamp);

    String outputTopic = String.format("%1$s_output_%2$s", methodName, timestamp);

    String firstStorage = String.format("%1$s_store_1_%2$s", methodName, timestamp);
    String secondStorage = String.format("%1$s_store_2_%2$s", methodName, timestamp);

    String appIdConfig = String.format("%1$s_app_id_%2$s", methodName, timestamp);
    String groupIdConfig = String.format("%1$s_group_id_%2$s", methodName, timestamp);

    String schemaIdNamespace = String.format("%1$s_id_ns_%2$s", methodName, timestamp);
    String schemaNameNamespace = String.format("%1$s_name_ns_%2$s", methodName, timestamp);
    String schemaScopeNamespace = String.format("%1$s_scope_ns_%2$s", methodName, timestamp);
    String schemaProjectionNamespace = String.format("%1$s_proj_ns_%2$s", methodName, timestamp);

    String schemaIdRecord = String.format("%1$s_id_rec_%2$s", methodName, timestamp);
    String schemaNameRecord = String.format("%1$s_name_rec_%2$s", methodName, timestamp);
    String schemaScopeRecord = String.format("%1$s_scope_rec_%2$s", methodName, timestamp);
    String schemaProjectionRecord = String.format("%1$s_proj_rec_%2$s", methodName, timestamp);

    try {
        Integer partitions = 1;
        Integer replication = 1;
        Properties topicConfig = new Properties();

        RestUtils.createTopic(firstTopic, partitions, replication, topicConfig);
        RestUtils.createTopic(secondTopic, partitions, replication, topicConfig);
        RestUtils.createTopic(outputTopic, partitions, replication, topicConfig);

        Properties streamsConfiguration = new Properties();
        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appIdConfig);
        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);
        streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_CONNECT_CONFIG);
        streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
        streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);
        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams/"); //TestUtils.tempDirectory().getAbsolutePath());
        streamsConfiguration.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL_CONFIG);

        Serializer kafkaAvroSerializer = new KafkaAvroSerializer();
        kafkaAvroSerializer.configure(streamsConfiguration, false);

        Deserializer kafkaAvroDeserializer = new KafkaAvroDeserializer();
        kafkaAvroDeserializer.configure(streamsConfiguration, false);

        Serde<GenericRecord> avroSerde = Serdes.serdeFrom(kafkaAvroSerializer, kafkaAvroDeserializer);

        //-----

        Schema idSchema = SchemaBuilder.record(schemaIdRecord).namespace(schemaIdNamespace).fields()
                .name("Id").type().nullable().intType().noDefault()
                .endRecord();

        Schema nameSchema = SchemaBuilder.record(schemaNameRecord).namespace(schemaNameNamespace).fields()
                .name("Id").type().nullable().intType().noDefault()
                .name("Name").type().nullable().stringType().noDefault()
                .endRecord();

        Schema scopeSchema = SchemaBuilder.record(schemaScopeRecord).namespace(schemaScopeNamespace).fields()
                .name("Scope").type().nullable().stringType().noDefault()
                .endRecord();

        Schema projectionSchema = SchemaBuilder.record(schemaProjectionRecord).namespace(schemaProjectionNamespace).fields()
                .name("Id").type().nullable().intType().noDefault()
                .name("Name").type().nullable().stringType().noDefault()
                .name("Scope").type().nullable().stringType().noDefault()
                .endRecord();

        GenericRecord idRecord1 = new GenericData.Record(idSchema);
        idRecord1.put("Id", 1);
        GenericRecord idRecord2 = new GenericData.Record(idSchema);
        idRecord2.put("Id", 2);
        GenericRecord idRecord3 = new GenericData.Record(idSchema);
        idRecord3.put("Id", 3);
        GenericRecord idRecord4 = new GenericData.Record(idSchema);
        idRecord4.put("Id", 4);

        GenericRecord nameRecord1 = new GenericData.Record(nameSchema);
        nameRecord1.put("Id", 1);
        nameRecord1.put("Name", "Bruce Eckel");
        GenericRecord nameRecord2 = new GenericData.Record(nameSchema);
        nameRecord2.put("Id", 2);
        nameRecord2.put("Name", "Robert Lafore");
        GenericRecord nameRecord3 = new GenericData.Record(nameSchema);
        nameRecord3.put("Id", 3);
        nameRecord3.put("Name", "Andrew Tanenbaum");
        GenericRecord nameRecord4 = new GenericData.Record(nameSchema);
        nameRecord4.put("Id", 4);
        nameRecord4.put("Name", "Programming in Scala");

        GenericRecord scopeRecord1 = new GenericData.Record(scopeSchema);
        scopeRecord1.put("Scope", "Modern Operating System");
        GenericRecord scopeRecord2 = new GenericData.Record(scopeSchema);
        scopeRecord2.put("Scope", "Thinking in Java");
        GenericRecord scopeRecord3 = new GenericData.Record(scopeSchema);
        scopeRecord3.put("Scope", "Computer Architecture");
        GenericRecord scopeRecord4 = new GenericData.Record(scopeSchema);
        scopeRecord4.put("Scope", "Programming in Scala");

        List<KeyValue<GenericRecord, GenericRecord>> list1 = Arrays.asList(
                new KeyValue<>(idRecord1, nameRecord1),
                new KeyValue<>(idRecord2, nameRecord2),
                new KeyValue<>(idRecord3, nameRecord3)
        );

        List<KeyValue<GenericRecord, GenericRecord>> list2 = Arrays.asList(
                new KeyValue<>(idRecord3, scopeRecord1),
                new KeyValue<>(idRecord1, scopeRecord2),
                new KeyValue<>(idRecord3, scopeRecord3),
                new KeyValue<>(idRecord4, scopeRecord4)
        );

        GenericRecord projectionRecord1 = new GenericData.Record(projectionSchema);
        projectionRecord1.put("Id", nameRecord1.get("Id"));
        projectionRecord1.put("Name", nameRecord1.get("Name"));
        projectionRecord1.put("Scope", scopeRecord1.get("Scope"));

        GenericRecord projectionRecord2 = new GenericData.Record(projectionSchema);
        projectionRecord2.put("Id", nameRecord2.get("Id"));
        projectionRecord2.put("Name", nameRecord2.get("Name"));
        projectionRecord2.put("Scope", scopeRecord2.get("Scope"));

        GenericRecord projectionRecord3 = new GenericData.Record(projectionSchema);
        projectionRecord3.put("Id", nameRecord3.get("Id"));
        projectionRecord3.put("Name", nameRecord3.get("Name"));
        projectionRecord3.put("Scope", scopeRecord3.get("Scope"));

        List<KeyValue<GenericRecord, GenericRecord>> expectedResults = Arrays.asList(
                new KeyValue<>(idRecord3, projectionRecord3),
                new KeyValue<>(idRecord1, projectionRecord1),
                new KeyValue<>(idRecord3, projectionRecord3)
        );

        //-----

        KStreamBuilder builder = new KStreamBuilder();

        KStream<GenericRecord, GenericRecord> firstStream = builder.stream(avroSerde, avroSerde, firstTopic);

        KStream<GenericRecord, GenericRecord> secondStream = builder.stream(avroSerde, avroSerde, secondTopic);

        KStream<GenericRecord, GenericRecord> outputStream = firstStream.join(secondStream,
                new ValueJoiner<GenericRecord, GenericRecord, GenericRecord>() {
                    @Override
                    public GenericRecord apply(GenericRecord l, GenericRecord r) {
                        GenericRecord projectionRecord = new GenericData.Record(projectionSchema);
                        projectionRecord.put("Id", l.get("Id"));
                        projectionRecord.put("Name", l.get("Name"));
                        projectionRecord.put("Scope", r.get("Scope"));
                        return projectionRecord;
                    }
                }, JoinWindows.of(TimeUnit.SECONDS.toMillis(5)), avroSerde, avroSerde, avroSerde);

        outputStream.to(avroSerde, avroSerde, outputTopic);

        KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);

        streams.start();

        Properties cfg1 = new Properties();
        cfg1.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);
        cfg1.put(ProducerConfig.ACKS_CONFIG, "all");
        cfg1.put(ProducerConfig.RETRIES_CONFIG, 0);
        cfg1.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        cfg1.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        cfg1.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL_CONFIG);
        IntegrationTestUtils.produceKeyValuesSynchronously(firstTopic, list1, cfg1);

        Properties cfg2 = new Properties();
        cfg2.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);
        cfg2.put(ProducerConfig.ACKS_CONFIG, "all");
        cfg2.put(ProducerConfig.RETRIES_CONFIG, 0);
        cfg2.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        cfg2.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        cfg2.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL_CONFIG);
        IntegrationTestUtils.produceKeyValuesSynchronously(secondTopic, list2, cfg2);

        Properties consumerConfig = new Properties();
        consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);
        consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, groupIdConfig);
        consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
        consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
        consumerConfig.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL_CONFIG);

        List<KeyValue<GenericRecord, GenericRecord>> actualResults = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, outputTopic, expectedResults.size());

        streams.close();

        //-----

        assertThat(actualResults).containsExactlyElementsOf(expectedResults);

        //-----
    } finally {
        RestUtils.deleteTopics(firstTopic, secondTopic, outputTopic);
    }
}

StackTrace:

Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=joinKStreamToKStreamWhereKeyValueIsGenericRecordGenericRecord_1_1490264134172, partition=0, offset=0
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:641)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
Caused by: org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: {"type":"record","name":"joinKStreamToKStreamWhereKeyValueIsGenericRecordGenericRecord_name_rec_1490264134172","namespace":"joinKStreamToKStreamWhereKeyValueIsGenericRecordGenericRecord_name_ns_1490264134172","fields":[{"name":"Id","type":["int","null"]},{"name":"Name","type":["string","null"]}]}
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema io.confluent.kafka.schemaregistry.rest.exceptions.RestIncompatibleAvroSchemaException: Schema being registered is incompatible with an earlier schema
io.confluent.kafka.schemaregistry.rest.exceptions.RestIncompatibleAvroSchemaException: Schema being registered is incompatible with an earlier schema
    at io.confluent.kafka.schemaregistry.rest.exceptions.Errors.incompatibleSchemaException(Errors.java:63)
    at io.confluent.kafka.schemaregistry.rest.resources.SubjectVersionsResource.register(SubjectVersionsResource.java:166)
    at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory$1.invoke(ResourceMethodInvocationHandlerFactory.java:81)
    at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:144)
    at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:161)
    at org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$VoidOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:143)
    at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:99)
    at org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:389)
    at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:347)
    at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:102)
    at org.glassfish.jersey.server.ServerRuntime$2.run(ServerRuntime.java:308)
    at org.glassfish.jersey.internal.Errors$1.call(Errors.java:271)
    at org.glassfish.jersey.internal.Errors$1.call(Errors.java:267)
    at org.glassfish.jersey.internal.Errors.process(Errors.java:315)
    at org.glassfish.jersey.internal.Errors.process(Errors.java:297)
    at org.glassfish.jersey.internal.Errors.process(Errors.java:267)
    at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:317)
    at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:291)
    at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:1140)
    at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:403)
    at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:386)
    at org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:548)
    at org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:489)
    at org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:426)
    at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1652)
    at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:585)
    at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:221)
    at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1127)
    at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:515)
    at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:185)
    at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1061)
    at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
    at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:110)
    at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97)
    at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:159)
    at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97)
    at org.eclipse.jetty.server.Server.handle(Server.java:499)
    at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:310)
    at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:257)
    at org.eclipse.jetty.io.AbstractConnection$2.run(AbstractConnection.java:540)
    at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:635)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:555)
    at java.lang.Thread.run(Thread.java:745)
Caused by: io.confluent.kafka.schemaregistry.exceptions.IncompatibleSchemaException: New schema is incompatible with an earlier schema.
    at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.register(KafkaSchemaRegistry.java:369)
    at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.registerOrForward(KafkaSchemaRegistry.java:391)
    at io.confluent.kafka.schemaregistry.rest.resources.SubjectVersionsResource.register(SubjectVersionsResource.java:154)
    ... 44 more
; error code: 409
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:170)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:187)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:238)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:230)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:225)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:59)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:91)
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:72)
    at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:54)
    at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:176)
    at org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:109)
    at org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:101)
    at org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:65)
    at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48)
    at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:70)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:197)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:641)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)

共有1个答案

年文柏
2023-03-14
Caused by: org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: {"type":"record","name":"joinKStreamToKStreamWhereKeyValueIsGenericRecordGenericRecord_name_rec_1490264134172","namespace":"joinKStreamToKStreamWhereKeyValueIsGenericRecordGenericRecord_name_ns_1490264134172","fields":[{"name":"Id","type":["int","null"]},{"name":"Name","type":["string","null"]}]}
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema io.confluent.kafka.schemaregistry.rest.exceptions.RestIncompatibleAvroSchemaException: Schema being registered is incompatible with an earlier schema
io.confluent.kafka.schemaregistry.rest.exceptions.RestIncompatibleAvroSchemaException: Schema being registered is incompatible with an earlier schema

关键线路是:

  • “注册Avro架构时出错”
  • “正在注册的架构与以前的架构”
  • “不兼容

注册失败的架构是:

Error registering Avro schema:{  
   "type":"record",
   "name":"joinKStreamToKStreamWhereKeyValueIsGenericRecordGenericRecord_name_rec_1490264134172",
   "namespace":"joinKStreamToKStreamWhereKeyValueIsGenericRecordGenericRecord_name_ns_1490264134172",
   "fields":[  
      {  
         "name":"Id",
         "type":[  
            "int",
            "null"
         ]
      },
      {  
         "name":"Name",
         "type":[  
            "string",
            "null"
         ]
      }
   ]
}
    null
 类似资料:
  • 我有一个基于Spring cloud Streams的Kafka Streams应用程序,我将全局KTable绑定到一个紧凑的主题。当我将Tombstone记录推到主题(非空键,值为null)时-我的Kafka streams应用程序失败,出现反序列化异常。失败是因为我的反序列化程序不处理空记录。 从文档中,我认为GlobalKTable甚至不会“看到”空值记录。难道不是这样吗?我需要在反序列化程

  • 我在编写从接收器子任务到输出Kafka主题的键控流时遇到了问题。 作业的形式为:source- 异常来自kafka生产者,并导致检查点超时: > FlinkKafkaException:未能向Kafka发送数据:自批创建以来,mytopic-11:120000毫秒的16条记录已过期 作业在出现上述异常的情况下进入崩溃循环,偶尔会在再次崩溃循环之前短暂恢复。我认为这里的问题是,我正在使用这些键来确定

  • 问题内容: 考虑这个简单的程序。该程序有两个文件: 和HelloWorld.java: 正如您在第一堂课中所看到的,我在其中想抛出异常的地方添加了一条注释(“ // throw exception”)。我必须为异常定义自己的类,还是可以在Java中使用一些常规的异常类? 问题答案: 您可以创建自己的Exception类: 在您的代码中:

  • 我正在尝试加入两个Kafka主题的两个数据流。 每个主题都有一个key值对,其中key是整数数据类型,value包含字符串格式的json。来自这两个源的数据类似于下面的示例(key、value): 现在我正尝试基于ProductID左联接这两个流,因此所有这些记录的键都设置为ProductID。但不幸的是,我在连接的正确流值中不断得到空值。甚至连一条记录都没有正确连接。下面是我加入这两个记录的代码

  • 给定:我在Kafka中有两个主题,假设主题A和主题B。Kafka流从主题A中读取一条记录,对其进行处理,并产生与所消耗记录相对应的多条记录(假设recordA和recordB)。现在的问题是我如何使用Kafka流来实现这一点。 在这里,读取的记录是消息;处理之后,它返回一个消息列表。如何将此列表划分为两个生产者流?任何帮助都将不胜感激。

  • 我使用长序列化器作为键,使用字符串序列化器作为值,在我们检索到消息时将消息发布到kafka主题,并且将key和key一起视为垃圾值,如下所示 Kafka制作人配置有问题吗? 更新: 以下生产者配置 和下面的发送呼叫