我必须使用Flink作为流引擎处理来自Kafka的数据流。为了对数据进行分析,我需要查询Cassandra中的一些表。做这件事最好的方法是什么?我一直在Scala中寻找这样的例子。但是我找不到任何数据。如何使用Scala作为编程语言在Flink中读取来自Cassandra的数据呢?使用apache flink Java API将数据读写到cassandra中也有同样的问题。答案中提到它有多种方法。我想知道在我的情况下什么是最好的方法。而且,大多数可用的示例都是Java的。我正在寻找Scala示例。
我目前在Flink1.3中使用asyncIO阅读cassandra。下面是关于它的文档:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/asyncio.html(如果它有DatabaseClient,则使用com.datastax.drive.core.cluster)
如果您需要一个更深入的示例来使用它来阅读cassandra的内容,请告诉我,但不幸的是,我只能提供一个Java的示例。
编辑%1
下面是我使用Flink的异步I/O从Cassandra读取的代码示例。我仍在努力识别和修复一个问题,即由于某种原因(没有深入研究),单个查询返回大量数据时,异步数据流的超时被触发,尽管Cassandra似乎很好地返回了它,而且早在超时时间之前。但假设这只是我正在做的其他事情的一个bug,而不是因为这段代码,那么这对您来说应该很好(对我来说也很好地工作了几个月):
public class GenericCassandraReader extends RichAsyncFunction<CustomInputObject, ResultSet> {
private final Properties props;
private Session client;
public GenericCassandraReader(Properties props) {
super();
this.props = props;
}
@Override
public void open(Configuration parameters) throws Exception {
client = Cluster.builder()
.addContactPoint(props.cassandraUrl)
.withPort(props.cassandraPort)
.build()
.connect(props.cassandraKeyspace);
}
@Override
public void close() throws Exception {
client.close();
}
@Override
public void asyncInvoke(final CustomInputObject customInputObject, final AsyncCollector<ResultSet> asyncCollector) throws Exception {
String queryString = "select * from table where fieldToFilterBy='" + customInputObject.id() + "';";
ListenableFuture<ResultSet> resultSetFuture = client.executeAsync(queryString);
Futures.addCallback(resultSetFuture, new FutureCallback<ResultSet>() {
public void onSuccess(ResultSet resultSet) {
asyncCollector.collect(Collections.singleton(resultSet));
}
public void onFailure(Throwable t) {
asyncCollector.collect(t);
}
});
}
}
再次抱歉耽搁了。我希望能够解决这个错误,这样我就可以确定了,但是我想现在只有一些引用总比什么都没有要好。
编辑2
我们还意识到,由于网络延迟问题,我们最终使用了RichMapFunction,它将我们从cassandra读取的数据保持在状态。因此,该作业只需跟踪通过它的所有记录,而不必每次通过新记录时都从表中读取,以获取其中的所有记录。
我试图用以下链接中提供的信息将Cassandra作为Flink中的数据来源: null 异常跟踪-->
我是Flink大学的一年级新生,我想知道如何从hdfs读取数据。有谁能给我一些建议或简单的例子吗?谢谢大家。
我一直在试图找到一个连接器,将数据从Redis读取到Flink。Flink的文档中包含了要写入Redis的连接器的描述。在我的Flink工作中,我需要从Redis读取数据。在使用ApacheFlink进行数据流传输时,Fabian提到可以从Redis读取数据。可用于此目的的接头是什么?
命令用于从Cassandra表中读取数据。 您可以使用此命令读取整个表,单个列,特定单元格等等。 语法 示例: 下面举个例子来演示如何从Cassandra表中读取数据。 我们有一个名为“”的表和以下列(,,)。 使用SELECT命令读整个表 执行结果如下 - 读取特定列 - 该示例将从表中只读和列的数据。 使用WHERE子句 WHERE子句与SELECT命令一起使用,以指定必须满足获取数据的确切条
我正在启动一个新的Flink应用程序,允许我的公司执行大量报告。我们有一个现有的遗留系统,我们需要的大部分数据都保存在SQL Server数据库中。在开始使用新部署的Kafka流中的更多数据之前,我们首先需要使用这些数据库中的数据。 我花了很多时间阅读Flink的书和网页,但我有一些简单的问题和假设,我希望你能帮助我进步。 首先,我想使用DataStream API,这样我们既可以使用历史数据,也
我的flink程序应该为每个输入记录做一个Cassandra查找,并根据结果做一些进一步的处理。 但问题是,每次查找需要将近10秒,换句话说,这个循环需要50秒来执行。 我如何加快这个操作?或者,在Flink中有没有其他的方法可以查到Cassandra呢?