当前位置: 首页 > 知识库问答 >
问题:

从Cassandra读取数据以便在Flink中处理

谭畅
2023-03-14

我必须使用Flink作为流引擎处理来自Kafka的数据流。为了对数据进行分析,我需要查询Cassandra中的一些表。做这件事最好的方法是什么?我一直在Scala中寻找这样的例子。但是我找不到任何数据。如何使用Scala作为编程语言在Flink中读取来自Cassandra的数据呢?使用apache flink Java API将数据读写到cassandra中也有同样的问题。答案中提到它有多种方法。我想知道在我的情况下什么是最好的方法。而且,大多数可用的示例都是Java的。我正在寻找Scala示例。

共有1个答案

夏谦
2023-03-14

我目前在Flink1.3中使用asyncIO阅读cassandra。下面是关于它的文档:

https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/asyncio.html(如果它有DatabaseClient,则使用com.datastax.drive.core.cluster)

如果您需要一个更深入的示例来使用它来阅读cassandra的内容,请告诉我,但不幸的是,我只能提供一个Java的示例。

编辑%1

下面是我使用Flink的异步I/O从Cassandra读取的代码示例。我仍在努力识别和修复一个问题,即由于某种原因(没有深入研究),单个查询返回大量数据时,异步数据流的超时被触发,尽管Cassandra似乎很好地返回了它,而且早在超时时间之前。但假设这只是我正在做的其他事情的一个bug,而不是因为这段代码,那么这对您来说应该很好(对我来说也很好地工作了几个月):

public class GenericCassandraReader extends RichAsyncFunction<CustomInputObject, ResultSet> {

    private final Properties props;
    private Session client;

    public GenericCassandraReader(Properties props) {
        super();
        this.props = props;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        client = Cluster.builder()
                .addContactPoint(props.cassandraUrl)
                .withPort(props.cassandraPort)
                .build()
                .connect(props.cassandraKeyspace);
    }

    @Override
    public void close() throws Exception {
        client.close();
    }

    @Override
    public void asyncInvoke(final CustomInputObject customInputObject, final AsyncCollector<ResultSet> asyncCollector) throws Exception {

        String queryString = "select * from table where fieldToFilterBy='" + customInputObject.id() + "';";

        ListenableFuture<ResultSet> resultSetFuture = client.executeAsync(queryString);

        Futures.addCallback(resultSetFuture, new FutureCallback<ResultSet>() {

            public void onSuccess(ResultSet resultSet) {
                asyncCollector.collect(Collections.singleton(resultSet));
            }

            public void onFailure(Throwable t) {
                asyncCollector.collect(t);
            }
        });
    }
}

再次抱歉耽搁了。我希望能够解决这个错误,这样我就可以确定了,但是我想现在只有一些引用总比什么都没有要好。

编辑2

我们还意识到,由于网络延迟问题,我们最终使用了RichMapFunction,它将我们从cassandra读取的数据保持在状态。因此,该作业只需跟踪通过它的所有记录,而不必每次通过新记录时都从表中读取,以获取其中的所有记录。

 类似资料:
  • 我试图用以下链接中提供的信息将Cassandra作为Flink中的数据来源: null 异常跟踪-->

  • 我是Flink大学的一年级新生,我想知道如何从hdfs读取数据。有谁能给我一些建议或简单的例子吗?谢谢大家。

  • 我一直在试图找到一个连接器,将数据从Redis读取到Flink。Flink的文档中包含了要写入Redis的连接器的描述。在我的Flink工作中,我需要从Redis读取数据。在使用ApacheFlink进行数据流传输时,Fabian提到可以从Redis读取数据。可用于此目的的接头是什么?

  • 命令用于从Cassandra表中读取数据。 您可以使用此命令读取整个表,单个列,特定单元格等等。 语法 示例: 下面举个例子来演示如何从Cassandra表中读取数据。 我们有一个名为“”的表和以下列(,,)。 使用SELECT命令读整个表 执行结果如下 - 读取特定列 - 该示例将从表中只读和列的数据。 使用WHERE子句 WHERE子句与SELECT命令一起使用,以指定必须满足获取数据的确切条

  • 我正在启动一个新的Flink应用程序,允许我的公司执行大量报告。我们有一个现有的遗留系统,我们需要的大部分数据都保存在SQL Server数据库中。在开始使用新部署的Kafka流中的更多数据之前,我们首先需要使用这些数据库中的数据。 我花了很多时间阅读Flink的书和网页,但我有一些简单的问题和假设,我希望你能帮助我进步。 首先,我想使用DataStream API,这样我们既可以使用历史数据,也

  • 我的flink程序应该为每个输入记录做一个Cassandra查找,并根据结果做一些进一步的处理。 但问题是,每次查找需要将近10秒,换句话说,这个循环需要50秒来执行。 我如何加快这个操作?或者,在Flink中有没有其他的方法可以查到Cassandra呢?