当前位置: 首页 > 知识库问答 >
问题:

kafka流不会与动态生成的类一起运行

陶元凯
2023-03-14

我想启动一个反序列化动态创建的类的流。这个Bean是使用反射和URLCLassLOader创建的,参数为给定的字符串类,但是KafkaStreams API不识别我的新类。

流与预创建的Beans完美配合,但使用动态Beans时会自动关闭。反倾销者是和杰克逊一起创造的,也是单独工作的。

下面是类解析器代码

@SuppressWarnings("unchecked")
public static Class<?> getClassFromSource(String className, String sourceCode)
        throws IOException, ClassNotFoundException {

    /*
     * create an empty source file
     */
    File sourceFile = new File(com.google.common.io.Files.createTempDir(), className + ".java");

    sourceFile.deleteOnExit();

    /*
     * generate the source code, using the source filename as the class name write
     * the source code into the source file
     */
    try (FileWriter writer = new FileWriter(sourceFile)) {
        writer.write(sourceCode);
    }

    /*
     * compile the source file
     */
    JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();

    File parentDirectory = null;

    try (StandardJavaFileManager fileManager = compiler.getStandardFileManager(null, null, null)) {

        parentDirectory = sourceFile.getParentFile();

        fileManager.setLocation(StandardLocation.CLASS_OUTPUT, Arrays.asList(parentDirectory));

        Iterable<? extends JavaFileObject> compilationUnits = fileManager
                .getJavaFileObjectsFromFiles(Arrays.asList(sourceFile));

        compiler.getTask(null, fileManager, null, null, null, compilationUnits).call();
    }

    /*
     * load the compiled class
     */
    try (StandardJavaFileManager fileManager = compiler.getStandardFileManager(null, null, null)) {

        parentDirectory = sourceFile.getParentFile();

        fileManager.setLocation(StandardLocation.CLASS_OUTPUT, Arrays.asList(parentDirectory));

        Iterable<? extends JavaFileObject> compilationUnits = fileManager
                .getJavaFileObjectsFromFiles(Arrays.asList(sourceFile));

        compiler.getTask(null, fileManager, null, null, null, compilationUnits).call();
    }

    /*
     * load the compiled class
     */
    try (URLClassLoader classLoader = URLClassLoader.newInstance(new URL[] { parentDirectory.toURI().toURL() })) {
        return (Class<?>) classLoader.loadClass(className);
    }
}

首先,我实例化接收类作为参数的serde

// dynamic generated class from a source class
Class clazz = getClassFromSource("DynamicClass", source);

// Serdes for created class that implements org.apache.kafka.common.serialization.Deserializer
DynamicDeserializer deserializer = new DynamicDeserializer(clazz);
DynamicSerializer serializer = new DynamicSerializer();
Serde<?> encryptedSerde = Serdes.serdeFrom(serializer, deserializer);

然后启动使用此Serdes的流拓扑

StreamsBuilder builder = new StreamsBuilder();

KTable<String, Long> dynamicStream = builder
            .stream(topicName, Consumed.with(Serdes.String(), encryptedSerde))
            .groupByKey()
            .count();

dynamicStream.to(outputTopicName, Produced.with(Serdes.String(), Serdes.Long()));

流拓扑应该正常执行,但始终会生成此错误

2019-09-01 14:54:16 WARN  ConsumerConfig:355 - The configuration 'log4j.appender.stdout.Target' was supplied but isn't a known config.
2019-09-01 14:54:16 WARN  ConsumerConfig:355 - The configuration 'log4j.appender.stdout.layout' was supplied but isn't a known config.
2019-09-01 14:54:16 WARN  ConsumerConfig:355 - The configuration 'log4j.appender.stdout.layout.ConversionPattern' was supplied but isn't a known config.
2019-09-01 14:54:16 WARN  ConsumerConfig:355 - The configuration 'stream.restart.application' was supplied but isn't a known config.
2019-09-01 14:54:16 WARN  ConsumerConfig:355 - The configuration 'aes.key.path' was supplied but isn't a known config.
2019-09-01 14:54:16 WARN  ConsumerConfig:355 - The configuration 'path.to.listening' was supplied but isn't a known config.
2019-09-01 14:54:16 WARN  ConsumerConfig:355 - The configuration 'log4j.appender.stdout' was supplied but isn't a known config.
2019-09-01 14:54:16 WARN  ConsumerConfig:355 - The configuration 'admin.retries' was supplied but isn't a known config.
2019-09-01 14:54:16 WARN  ConsumerConfig:355 - The configuration 'log4j.rootLogger' was supplied but isn't a known config.
2019-09-01 14:54:16 INFO  AppInfoParser:117 - Kafka version: 2.3.0
2019-09-01 14:54:16 INFO  AppInfoParser:118 - Kafka commitId: fc1aaa116b661c8a
2019-09-01 14:54:16 INFO  AppInfoParser:119 - Kafka startTimeMs: 1567360456724
2019-09-01 14:54:16 INFO  KafkaStreams:800 - stream-client [streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72] Started Streams client
2019-09-01 14:54:16 INFO  StreamThread:740 - stream-thread [streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72-StreamThread-1] Starting
2019-09-01 14:54:16 INFO  StreamThread:207 - stream-thread [streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72-StreamThread-1] State transition from CREATED to RUNNING
2019-09-01 14:54:16 INFO  KafkaConsumer:1027 - [Consumer clientId=streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72-StreamThread-1-consumer, groupId=streamingbean-test-20190901145412544] Subscribed to pattern: 'DynamicBean|streamingbean-test-20190901145412544-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition'
2019-09-01 14:54:17 INFO  Metadata:266 - [Producer clientId=streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72-StreamThread-1-producer] Cluster ID: tp7OBhwVRQqT2NpPlL55_Q
2019-09-01 14:54:17 INFO  Metadata:266 - [Consumer clientId=streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72-StreamThread-1-consumer, groupId=streamingbean-test-20190901145412544] Cluster ID: tp7OBhwVRQqT2NpPlL55_Q
2019-09-01 14:54:17 INFO  AbstractCoordinator:728 - [Consumer clientId=streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72-StreamThread-1-consumer, groupId=streamingbean-test-20190901145412544] Discovered group coordinator AcerDerick:9092 (id: 2147483647 rack: null)
2019-09-01 14:54:17 INFO  ConsumerCoordinator:476 - [Consumer clientId=streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72-StreamThread-1-consumer, groupId=streamingbean-test-20190901145412544] Revoking previously assigned partitions []
2019-09-01 14:54:17 INFO  StreamThread:207 - stream-thread [streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72-StreamThread-1] State transition from RUNNING to PARTITIONS_REVOKED
2019-09-01 14:54:17 INFO  KafkaStreams:257 - stream-client [streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72] State transition from RUNNING to REBALANCING
2019-09-01 14:54:17 INFO  KafkaConsumer:1068 - [Consumer clientId=streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72-StreamThread-1-restore-consumer, groupId=null] Unsubscribed all topics or patterns and assigned partitions
2019-09-01 14:54:17 INFO  StreamThread:324 - stream-thread [streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72-StreamThread-1] partition revocation took 0 ms.
    suspended active tasks: []
    suspended standby tasks: []
2019-09-01 14:54:17 INFO  AbstractCoordinator:505 - [Consumer clientId=streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72-StreamThread-1-consumer, groupId=streamingbean-test-20190901145412544] (Re-)joining group
2019-09-01 14:54:17 ERROR StreamsPartitionAssignor:354 - stream-thread [streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72-StreamThread-1-consumer] DynamicClass is unknown yet during rebalance, please make sure they have been pre-created before starting the Streams application.
2019-09-01 14:54:17 INFO  AbstractCoordinator:469 - [Consumer clientId=streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72-StreamThread-1-consumer, groupId=streamingbean-test-20190901145412544] Successfully joined group with generation 1
2019-09-01 14:54:17 INFO  ConsumerCoordinator:283 - [Consumer clientId=streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72-StreamThread-1-consumer, groupId=streamingbean-test-20190901145412544] Setting newly assigned partitions: 
2019-09-01 14:54:17 INFO  StreamThread:1164 - stream-thread [streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72-StreamThread-1] Informed to shut down
2019-09-01 14:54:17 INFO  StreamThread:207 - stream-thread [streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72-StreamThread-1] State transition from PARTITIONS_REVOKED to PENDING_SHUTDOWN
2019-09-01 14:54:17 INFO  StreamThread:1178 - stream-thread [streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72-StreamThread-1] Shutting down
2019-09-01 14:54:17 INFO  KafkaConsumer:1068 - [Consumer clientId=streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72-StreamThread-1-restore-consumer, groupId=null] Unsubscribed all topics or patterns and assigned partitions
2019-09-01 14:54:17 INFO  KafkaProducer:1153 - [Producer clientId=streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72-StreamThread-1-producer] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2019-09-01 14:54:17 INFO  StreamThread:207 - stream-thread [streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72-StreamThread-1] State transition from PENDING_SHUTDOWN to DEAD
2019-09-01 14:54:17 INFO  StreamThread:1198 - stream-thread [streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72-StreamThread-1] Shutdown complete

共有1个答案

吕胤
2023-03-14

过了一段时间,我用一个简单的解决方案修复了这个问题,但可能不是最优雅的。首先,我使用JSON字符串反序列化器从主题获取数据,然后将其传递给另一个反序列化器,该反序列化器转换为我的动态对象。

 类似资料:
  • 我有一个webpack配置,当我直接调用webpack时,它会生成react包。由于我想合并热重新加载,所以我需要在端口3000上运行的development express服务器(服务APIendpoint)旁边运行webpack dev服务器 webpack.dev.config.js dev-server.js package.json脚本 如果我跑了 结果是一个新的js包和html:dis

  • 问题内容: 沿以下几行创建用于下载文件的按钮/链接非常方便: 和 但是,我想 触发仅在单击按钮/链接时下载文件的生成 。换句话说,单击后,我将调用一个方法来生成文件(在本例中为Pentaho报告),将其放置在临时位置并返回指向该文件的指针。然后我告诉那个使用它。问题是, 这有可能 吗? 目前,我们有类似下面的代码,可以正常工作,但是我对是否可以代替它感兴趣。 (如果有区别,请参见1.4.18节。)

  • 我在一个布局中使用了fab,但在运行时给出了以下错误:08-30 22:01:35.548 262 95-26295/? E/AndroidRuntime:致命异常:main process:com.example.ahr.a1000funnysms,pid:26295 Android.view.filflateException:二进制XML文件第30行:错误inflating类Android.

  • 问题内容: HTML jQuery 角度脚本 在这里,AngularJS的控制器部分中调用的函数不会从ng-click事件中触发。HTML已成功添加,但是ng- click无效。告诉我解决方案以使其正常工作 问题答案: 仍然不是一个完美的解决方案!!!-只是为了说明如何进行动态编译 不要将控制器用于dom操作-必须在指令的帮助下完成

  • 在我运行mongo恢复后,mongo服务无法自动启动,但如果我打开终端并运行monstar,服务运行完美。如果我关闭终端,我得到。有什么建议吗? 错误:无法连接到服务器127.0.0.1 shell/mongo。js:79 当我运行mon神时,我得到: MongoDB启动:pid=1875 port=27017 dbpath=/data/db/64位周四25 12:16:40db version

  • 我正在使用confluent JDBC连接器连接到postgres数据库,以检索更改并将其放在Kafka主题中。现在,我想使用spring boot消费者来使用这些消息。这些消息采用AVRO格式。我从连接器中获得了模式,并使用avro-maven插件为其生成了一个POJO类。 但是当侦听器启动时,只有以下错误 当我不使用avro对数据进行反序列化时,我会收到数据但不可读。 在pom中。xml我有以