当前位置: 首页 > 知识库问答 >
问题:

ApacheFlink-org的类文件。阿帕奇。Flink。流动。应用程序编程接口。斯卡拉。找不到数据流

公良高刚
2023-03-14

使用ApacheFlink版本1.3.2和Cassandra3.11,我编写了一个简单的代码,使用ApacheFlink-Cassandra连接器将数据写入Cassandra。代码如下:

final Collection<String> collection = new ArrayList<>(50);
        for (int i = 1; i <= 50; ++i) {
            collection.add("element " + i);
        }
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Tuple2<UUID, String>> dataStream = env
                .fromCollection(collection)
                .map(new MapFunction<String, Tuple2<UUID, String>>() {

                    final String mapped = " mapped ";
                    String[] splitted;

                    @Override
                    public Tuple2<UUID, String> map(String s) throws Exception {
                        splitted = s.split("\\s+");
                        return new Tuple2(
                                UUID.randomUUID(),
                                splitted[0] + mapped + splitted[1]
                        );
                    }
                });
        dataStream.print();
        CassandraSink.addSink(dataStream)
                .setQuery("INSERT INTO test.phases (id, text) values (?, ?);")
                .setHost("127.0.0.1")
                .build();
        env.execute();

尝试使用ApacheFlink 1.4.2(1.4.x)运行相同的代码时,出现错误:

Error:(36, 22) java: cannot access org.apache.flink.streaming.api.scala.DataStream
  class file for org.apache.flink.streaming.api.scala.DataStream not found

在线

CassandraSink.addSink(dataStream)
                    .setQuery("INSERT INTO test.phases (id, text) values (?, ?);")
                    .setHost("127.0.0.1")
                    .build();

我认为ApacheFlink1.4.2中存在一些依赖性更改,这导致了问题。

我在代码中使用以下导入的依赖项:

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.cassandra.CassandraSink;

如何解决Apache Flink 1.4.2版中的错误?

更新:在Flink 1.3.2中,类org.apache.flink.streaming.api.scala.DataStream

我在针对Cassandra connector的Flink 1.4.2文档中尝试了代码示例,但是我得到了相同的错误,但是该示例使用了Flink 1.3.2依赖项!


共有1个答案

郗浩言
2023-03-14

除所有其他依赖项外,请确保您具有Flink Scala依赖项:

梅文

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-streaming-scala_2.11</artifactId>
  <version>1.4.2</version>
</dependency>

格拉德尔

dependencies {
    compile group: 'org.apache.flink', name: 'flink-streaming-scala_2.11', version: '1.4.2'
..
}

我设法让您的示例使用以下依赖项:

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.cassandra.CassandraSink;

梅文

<dependencies>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.4.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.11</artifactId>
        <version>1.4.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_2.11</artifactId>
        <version>1.4.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.11</artifactId>
        <version>1.4.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-cassandra_2.11</artifactId>
        <version>1.4.2</version>
    </dependency>

</dependencies>
 类似资料:
  • 我正在windows计算机上使用Kafka,并尝试使用文件源连接器生成从文件到Kafka主题的内容。首先我启动了zookeeper,然后在启动Kafka Standalone Connector时启动了Kafka server(步骤3),我收到了很多警告,ReflectionsException 我对此没有什么疑问: 1。我需要添加一些jar文件吗 2。在libs文件夹下的Kafka dir中有一

  • 您能说Apache Karaf包括以下内容吗?其中包括: Apache Felix(它是OSGi 4.2框架的实现) Apache Aries(它是Blueprint标准的实现)

  • 我正在使用Flink从Apache Pulsar读取数据。我在pulsar中有一个分区主题,有8个分区。在本主题中,我生成了1000条消息,分布在8个分区中。我的笔记本电脑中有8个内核,因此我有8个子任务(默认情况下,并行度=#个内核)。在执行Eclipse中的代码后,我打开了Flink UI,发现一些子任务没有收到任何记录(空闲)。我希望所有8个子任务都能得到利用(我希望每个子任务都映射到我的主

  • 我正在尝试了解这个位置的scala代码。(我来自java背景)。 https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala 我在下面的部分感觉完全迷失了 我知道并行化和平面映射的作用。我不明白arr1是如何初始化的。它是 int 类型

  • 我的jsp有一个代码: 这是我的servlet: 当我点击登录按钮时,我有一个错误。 提前谢谢

  • 我在生产服务器上部署了我的项目,并得到以下错误。 这是一个实时项目,所以,在出现错误后,我用运行良好的前一个版本替换了它,但现在它也抛出了相同的错误。请告诉我有什么问题? 错误: InExpage。jsp: