当前位置: 首页 > 知识库问答 >
问题:

Flink SQL支持Java映射类型吗?

郎长卿
2023-03-14

我正在尝试使用Flink的SQL API从地图访问密钥。它失败,线程“main”组织中出现错误异常。阿帕奇。Flink。桌子应用程序编程接口。TableException:不支持类型:任何请告知我如何修复它。这是我的活动课

     public class EventHolder {

        private Map<String,String> event;

        public Map<String, String> getEvent() {
            return event;
        }

        public void setEvent(Map<String, String> event) {
            this.event = event;
        }
    }

这是提交flink作业的主类

public class MapTableSource {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<EventHolder> mapEventStream = env.fromCollection(getMaps());

        // register a table and use SQL
        StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
        tableEnv.registerDataStream("mapEvent", mapEventStream); 
        //tableEnv.registerFunction("orderSizeType", new OrderSizeType());

        Table alerts = tableEnv.sql(
                "select event['key'] from mapEvent ");

        DataStream<String> alertStream = tableEnv.toAppendStream(alerts, String.class);

        alertStream.filter(new FilterFunction<String>() {
            private static final long serialVersionUID = -2438621539037257735L;

            @Override
            public boolean filter(String value) throws Exception {
                System.out.println("Key value is:"+value);
                return value!=null;
            }
        });

        env.execute("map-tablsource-job");
    }

    private static List<EventHolder> getMaps(){
        List<EventHolder> list = new ArrayList<>();
        for(int i=0;i<5;i++){
            EventHolder holder = new EventHolder();
            Map<String,String> map = new HashMap<>();
            map.put("key", "value");
            holder.setEvent(map);
            list.add(holder);
        }
        return list;
    }
}

当我运行它时,我得到了例外

Exception in thread "main" org.apache.flink.table.api.TableException: Type is not supported: ANY
at org.apache.flink.table.api.TableException$.apply(exceptions.scala:53)
at org.apache.flink.table.calcite.FlinkTypeFactory$.toTypeInfo(FlinkTypeFactory.scala:341)
at org.apache.flink.table.plan.logical.LogicalRelNode$$anonfun$12.apply(operators.scala:530)
at org.apache.flink.table.plan.logical.LogicalRelNode$$anonfun$12.apply(operators.scala:529)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
at scala.collection.Iterator$class.foreach(Iterator.scala:742)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.plan.logical.LogicalRelNode.<init>(operators.scala:529)
at org.apache.flink.table.api.TableEnvironment.sql(TableEnvironment.scala:503)
at com.c.p.flink.MapTableSource.main(MapTableSource.java:25)

我正在使用flink 1.3.1

共有2个答案

空成天
2023-03-14

我解决了以下类似问题:

//Should probably make MapVal more generic, but works for this example
public class MapVal extends ScalarFunction {
    public String eval(Map<String, String> obj, String key) {
        return obj.get(key);
    }
}

public class Car {
    private String make;
    private String model;
    private int year;
    private Map<String, String> attributes;
    //getters/setters...
}

//After registering Stream and TableEnv etc

tableEnv.registerFunction("mapval", new MapVal());

Table cars = tableEnv
                .scan("Cars")
                .select("make, model, year, attributes.mapval('name')");
阎庆
2023-03-14

我认为问题出在收集中。由于Java的限制(即类型擦除),Flink无法提取所需的类型信息。因此,映射被视为SQL任意类型的黑盒。您可以使用tablenv验证表的类型。扫描(“mapEvent”)。printSchema()。您可以使用类型在集合中指定类型信息。映射(Types.STRING,Types.STRING)。

 类似资料:
  • avro.version= 在avro模式中,map type不支持缺省值。我尝试了以下不同的模式。 第一: 第二: 第三: Java代码如下: 我只希望constraintQuantities和RawQuanties为null。因为这些是可选字段。即使我没有将它们设置为null,它也会抛出异常。 最重要的是,方法生成java POJO,但无法构建该对象。 如果不设置它们为空,然后接收以下异常:

  • 我试图在两个进程之间共享从USB摄像头(logitech c270)接收到的帧(图像),以便避免出现。我正在使用这里描述的内存映射流式I/O方法,在使用后,我可以成功地从相机获取帧。但是,我有另一个进程(用于图像处理),它必须在出列后使用图像缓冲区,并向第一个进程发出信号,再次对缓冲区进行排队。 在网上搜索,我可以发现打开视频设备多次是允许的,但是当我尝试映射(尝试两个和只是)在第二个过程后成功的

  • 我通常使用创建新的编年史映射,如下所示: 历史记录映射是否支持不同值类实例的存储,如果支持,如何构建该映射?

  • 我的服务器资源: 我的客户资源: 我的Build.Gradle文件:

  • 类型映射 web3j中使用的原生Java到ABI类型映射如下: boolean -> bool BigInteger -> uint/int byte[] -> bytes String -> string and address types List<> -> dynamic/static array BigInteger类型必须用于数字类型,因为Ethereum以太坊中的数字类型是256位整数

  • 问题内容: 我有一个Postgres表,其中包含type列。在JDBC代码中,我使用了String数组,但是有一个异常告诉我这两个不匹配。如果这些类型之间没有映射,您可以为字符串数组建议Postgres类型吗? 这是代码: 问题答案: 要了解多维PostgreSQL数组类型,请考虑手册中的以下引号: 当前实现也不执行声明的维数。无论大小或维数如何,特定元素类型的数组都被认为是同一类型。因此,仅声明