当前位置: 首页 > 知识库问答 >
问题:

如何将数据从Cassandra加载到Apache Flink数据流

楚宏胜
2023-03-14

尝试使用Apache Flink从Cassandra获取数据,引用本文,我可以读取数据,但我不知道如何将其加载到DataStream对象中。代码如下:

ClusterBuilder cb = new ClusterBuilder() {
            @Override
            public Cluster buildCluster(Cluster.Builder builder) {
                return builder.addContactPoint("localhost")
                        /*.withCredentials("hduser".trim(), "hadoop".trim())*/
                        .build();
            }
        };
CassandraInputFormat<Tuple2<UUID, String>> cassandraInputFormat = new CassandraInputFormat<Tuple2<UUID, String>>(query, cb);

cassandraInputFormat.configure(null);
cassandraInputFormat.open(null);

Tuple2<UUID, String> testOutputTuple = new Tuple2<>();
ByteArrayOutputStream res = new ByteArrayOutputStream();
res.reset();

while (!cassandraInputFormat.reachedEnd()) {
    cassandraInputFormat.nextRecord(testOutputTuple);
    res.write((testOutputTuple.f0.toString() + "," + testOutputTuple.f1).getBytes());
}
DataStream<byte[]> temp = new DataStream<byte[]>(env, new StreamTransformation<byte[]>(res.toByteArray()));

我试过了

DataStream<byte[]> temp = new DataStream<byte[]>(env, new StreamTransformation<byte[]>(res.toByteArray()));

将变量中的数据加载到数据流中

共有2个答案

汝臻
2023-03-14

在Flink中创建数据流总是从ExecutionEnvironment开始。

而不是:

DataStream<byte[]> temp = new DataStream<byte[]>(env, new StreamTransformation<byte[]>(res.toByteArray())); 

尝试:

DataStream<Tuple2<UUID, String>> raw = ExecutionEnvironment.createInput(cassandraInputFormat);

然后可以使用map函数将数据类型更改为DataStream

我没有使用卡桑德拉连接器本身,所以我不知道您是否正确使用了该部件。

孙明德
2023-03-14

从DB读取数据-是一项有限的任务。使用CassandraInputFormat时,您应该使用DataSet API,而不是DataStream。例如:

DataSet<Tuple2<Long, Date>> ds = env.createInput(executeQuery(YOUR_QUERY), TupleTypeInfo.of(new TypeHint<Tuple2<Long, Date>>() {}));

private static CassandraInputFormat<Tuple2<Long, Date>> executeQuery(String YOUR_QUERY) throws IOException {
    return new CassandraInputFormat<>(YOUR_QUERY, new ClusterBuilder() {
        private static final long serialVersionUID = 1;
            @Override
            protected Cluster buildCluster(com.datastax.driver.core.Cluster.Builder builder) {
                return builder.addContactPoints(CASSANDRA_HOST).build();
            }
        });
    }
}
 类似资料:
  • 我刚接触Cassandra Spark,并尝试使用Spark主集群将数据从文件加载到Cassandra表。我遵循以下链接中给出的步骤 http://docs.datastax.com/en/datastax_enterprise/4.7/datastax_enterprise/spark/sparkImportTxtCQL.html 在第8步,数据显示为整数数组,但当我使用相同的命令时,结果显示为

  • 我使用DataStage 11.7 cassandra connector连接配置了cassandra VMs的2个节点(192.168.3.240&192.168.3.241)。 我的apache cassandra版本是3.11.3,我使用的是datastax驱动程序:dse-java-driver-core-1.8.1.jar和dse-java-driver-extras-1.8.1.jar

  • 我需要将存在于Web链接上的数据上传到hdfs,例如“博客”。 现在,我正在寻找实现这一目标的选项,可以找到以下链接: http://blog . cloud era . com/blog/2012/09/analyzing-Twitter-data-with-Hadoop/ 但是通过水槽文档阅读,我不清楚如何设置水槽源来指向博客内容所在的网站。 根据我对 fluem 文档的理解,需要有网络服务器

  • 我需要将来自textarea输入的XML数据发布到PHP,以便解析它并输出一个表。 我尝试了一些方法,但似乎都不奏效。 目前我有: PHP是: 第一个回声$xmlraw正在工作——它在一行中输出XML字符串——帖子正在正确地发送数据。 第二个echo$xml不输出任何东西,Foreach也不输出任何东西——在PHP中有些东西不起作用 我还尝试将$xmlraw直接加载到simplexml_load_

  • 问题内容: 我正在尝试使用Java + Hibernate + Spring将CSV文件加载到mySQL数据库中。我在DAO中使用以下查询来帮助我加载到数据库中: 我有一些想法可以从http://dev.mysql.com/doc/refman/5.1/en/load- data.html 使用它,以及如何从hibernate +spring应用程序将csv文件导入到mysql中? 但是我得到了错