当前位置: 首页 > 知识库问答 >
问题:

不支持给定DataStream的类型:GenericTypeFlink Cassandra

燕昊东
2023-03-14

我想给卡桑德拉写一行。首先,我将Avro流转换为行流。编译时没有显示错误。请参阅下面的代码:(Kafka消费者和卡桑德拉辛在其他工作中都可以单独工作)

StreamExecutionEnvironment environment =  StreamExecutionEnvironment.getExecutionEnvironment();

// Initialize KafkaConsumer
FlinkKafkaConsumer010 kafkaConsumer = KafkaConnection.getKafkaConsumer(AvroSchemaClass.class, inTopic, schemaRegistryUrl, properties);

// Set KafkaConsumer as source
DataStream<AvroSchemaClass> avroInputStream = environment.addSource(kafkaConsumer);

// converting avro message to flink's row datatype.
// see https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.html
AvroRowDeserializationSchema avroToRow = new AvroRowDeserializationSchema(AvroSchemaClass.class);
DataStream<Row> rowInputStream = avroInputStream.map(new MapFunction<Orders_value, Row>() {
                @Override
                public Row map(AvroSchemaClass orders_value) throws Exception {
                    return avroToRow.deserialize(orders_value.toByteBuffer().array());
                }
            });

// Example transformation
DataStream<Row> rowOutputStream = rowInputStream.filter(row -> country.equals(row.getField(7).toString()));
       
CassandraSink streamSink = CassandraConnection.getSink(rowOutputStream,
                    cassandraURL,
                    cassandraPort,
                    cassandraCluster,
                    cassandraUser,
                    cassandraPass,
                    insertQuery);
streamSink.name("Write something to Cassandra");

environment.execute();

但是当我在flink中运行作业时,会出现以下错误:

java.lang.IllegalArgumentException: No support for the type of the given DataStream: GenericType<org.apache.flink.types.Row>
        at org.apache.flink.streaming.connectors.cassandra.CassandraSink.addSink(CassandraSink.java:255)
        at servingLayer.CassandraConnection.getSink(CassandraConnection.java:24)
        at speedLayer.KafkaToCassandra.main(KafkaToCassandra.java:84)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.base/java.lang.reflect.Method.invoke(Unknown Source)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
        at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
        at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
        at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
java.lang.NullPointerException

数据流类型的特定更改是解决方案吗?如果是,如何实施?如果您需要进一步的信息,请告诉我。

共有1个答案

东门城
2023-03-14

似乎CassandraSink应该支持Row开箱即用。问题是rowOutputStreamRowTypeInfo不知何故丢失了,它使用了回退GenericType(无效序列化)。

AvroRowDeserializationSchema正在正确返回类型信息,但DataStream API没有自动获取该信息。

因此,如果一切正常,那么修复程序将显式地设置rowIn/OutputStream的返回类型,如下所示

DataStream<Row> rowInputStream = avroInputStream.map(new MapFunction<Orders_value, Row>() {
            @Override
            public Row map(AvroSchemaClass orders_value) throws Exception {
                return avroToRow.deserialize(orders_value.toByteBuffer().array());
            }
        }).returns(avroToRow.getProducedType());
...
DataStream<Row> rowOutputStream = rowInputStream.filter(row -> country.equals(row.getField(7).toString()))
  .returns(avroToRow.getProducedType())

一般来说,如果坚持使用一个API会更容易。在这种情况下,我建议完全使用Table API。

 类似资料:
  • 问题内容: 自数小时以来,我一直在尝试纠正http错误,但它仍显示不支持的页面。我在邮递员中添加标题。 这是我的Java代码 这是我的档案 问题答案: 通过和如何在响应流和请求流之间对对象进行序列化和反序列化。 将会发生的是,将从提供者的注册表中进行搜索,以查找可以处理的媒体类型。如果找不到,则Jersey无法处理该请求,并将发送415不支持的媒体类型。通常,你还应该在服务器端记录一个异常。不知道

  • 我已经创建了一个示例web服务来进行post调用。 我使用的是Jersey JAX-RS和Maven。

  • 我正在用Spring Boot构建一些API,但是当我试图用Postman查询时,我得到了一些关于Content-Type的错误。 我不明白哪里出了问题。 我注意到错误消失时,我删除了@刚体作为参数的方法。为什么啊? 我只想: 将XML发送到API

  • lint Description:参考sqlint格式,以插件形式集成到代码编辑器,显示输出更加友好 Example: soar -report-type lint -query test.sql markdown Description:该格式为默认输出格式,以markdown格式展现,可以用网页浏览器插件直接打开,也可以用markdown编辑器打开 Example: echo "select

  • 在ZF2-PostgreSQL应用程序中,我想使用Doctrine2本机查询来构建分页器列表。 因此,如果选择了任何自定义的Doctrine/Pgsql类型,它都会非常有效。但对于一个查询,我将使用来自自定义类型的数据。 我在PostgreSQL中声明了一个名为的Doctrine 2自定义类型,如下所示: 这种类型在某些表中使用。表中的示例: ( 此类型链接到实体,以便: 在事件上进行条令类型注册

  • 我正在使用一种称为点(x:双精度,y:双精度)的数据类型。我正在尝试使用_c1列和_c2作为Point()的输入,然后创建一个新的Point值列,如下所示 然后我调用函数: 但是,当我声明udf时,我得到了以下错误: 我已经正确导入了此数据类型,并且之前多次使用它。现在我尝试将其包含在我的udf的架构中,它无法识别它。包含标准Int,字符串,数组等以外的类型的方法是什么? 我在亚马逊EMR上使用火