当前位置: 首页 > 知识库问答 >
问题:

如何使用Apache Flink读取Cassandra?

陆雅志
2023-03-14

我的flink程序应该为每个输入记录做一个Cassandra查找,并根据结果做一些进一步的处理。

ClusterBuilder secureCassandraSinkClusterBuilder = new ClusterBuilder() {
        @Override
        protected Cluster buildCluster(Cluster.Builder builder) {
            return builder.addContactPoints(props.getCassandraClusterUrlAll().split(","))
                    .withPort(props.getCassandraPort())
                    .withAuthProvider(new DseGSSAPIAuthProvider("HTTP"))
                    .withQueryOptions(new QueryOptions().setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM))
                    .build();
        }
    };

    for (int i=1; i<5; i++) {
        CassandraInputFormat<Tuple2<String, String>> cassandraInputFormat =
                new CassandraInputFormat<>("select * from test where id=hello" + i, secureCassandraSinkClusterBuilder);
        cassandraInputFormat.configure(null);
        cassandraInputFormat.open(null);
        Tuple2<String, String> out = new Tuple8<>();
        cassandraInputFormat.nextRecord(out);
        System.out.println(out);
    }

但问题是,每次查找需要将近10秒,换句话说,这个for循环需要50秒来执行。

我如何加快这个操作?或者,在Flink中有没有其他的方法可以查到Cassandra呢?

共有1个答案

马魁
2023-03-14

我想出了一个在使用流数据查询Cassandra时相当快的解决方案。会对有同样问题的人有用。

首先,可以用很少的代码查询Cassandra,

Session session = secureCassandraSinkClusterBuilder.getCluster().connect();
ResultSet resultSet = session.execute("SELECT * FROM TABLE");

但这样做的问题是,创建session是一个非常耗时的操作,并且每个键空间应该执行一次。您只创建一次session并将其用于所有读取查询。

public class CassandraSessionSingleton {
    private static CassandraSessionSingleton cassandraSessionSingleton = null;

    public Session session;

    private CassandraSessionSingleton(ClusterBuilder clusterBuilder) {
        Cluster cluster = clusterBuilder.getCluster();
        session = cluster.connect();
    }

    public static CassandraSessionSingleton getInstance(ClusterBuilder clusterBuilder) {
        if (cassandraSessionSingleton == null)
            cassandraSessionSingleton = new CassandraSessionSingleton(clusterBuilder);
        return cassandraSessionSingleton;
    }

}
public class SomeProcessFunction implements ProcessFunction <Object, ResultSet> {
    ClusterBuilder secureCassandraSinkClusterBuilder;

    // Constructor
    public SomeProcessFunction (ClusterBuilder secureCassandraSinkClusterBuilder) {
        this.secureCassandraSinkClusterBuilder = secureCassandraSinkClusterBuilder;
    }

    @Override
    public void  ProcessElement (Object obj) throws Exception {
        ResultSet resultSet = CassandraLookUp.cassandraLookUp("SELECT * FROM TEST", secureCassandraSinkClusterBuilder);
        return resultSet;
    }
}

注意,您可以将clusterbuilder传递给processfunction,因为它是可序列化的。现在来看cassandralookup方法,我们在该方法中执行查询。

public class CassandraLookUp {
    public static ResultSet cassandraLookUp(String query, ClusterBuilder clusterBuilder) {
        CassandraSessionSingleton cassandraSessionSingleton = CassandraSessionSingleton.getInstance(clusterBuilder);
        Session session = cassandraSessionSingleton.session;
        ResultSet resultSet = session.execute(query);
        return resultSet;
    }
}

singleton对象只在第一次运行查询时创建,之后重用相同的对象,因此查找没有延迟。

 类似资料:
  • 我只找到TextInputFormat和CsvInputFormat。那么,如何使用ApacheFlink读取HDFS中的拼花文件呢?

  • 所以我必须检索存储在HDFS中的文件的内容,并对其进行某些分析。 问题是,我甚至无法读取文件并将其内容写入本地文件系统中的另一个文本文件。(我是Flink的新手,这只是一个测试,以确保我正确读取了文件) HDFS中的文件是纯文本文件。这是我的密码: 在我运行/tmp之后,它没有输出。 这是一个非常简单的代码,我不确定它是否有问题,或者我只是做了一些别的错误。正如我所说,我对Flink完全是新手 此

  • 问题内容: 我是JSON格式的新手,在阅读的教程中,我不太了解如何使用php进行解析。所以我有这个: 我想回显 坐标 和 reverseGeocode 。谁能请我朝正确的方向前进? 问题答案: 尝试跑步 您的示例JSON作为字符串在哪里。

  • 问题内容: 我正在使用Rselenium导航到网页。以下代码正在执行此操作。我没有提供该网址,因为我在需要VPN连接的公司中使用该网址: 导航到该网页后,在html源代码中,有下表: 现在的问题是如何提取该表的内容?请假定该URL不存在,否则我可以使用XML函数:readHTMLTable(remDr $ getCurrentUrl())。但这出于某些原因不起作用。我只需要使用remoteDriv

  • 问题内容: 我有一台扫描仪,并将定界符设置为“”,但是它仍然不会使用next()方法读取空格。我知道nextline()可以工作,但是我需要单独检查输入中的每个字符,包括空格;这是一个复杂的数据分析问题。不过我很困惑。谷歌什么也没发现。 谁能帮我这个?我正在考虑将空格反转为一个特殊字符,然后出于分析该字符的目的,将其反转回一个包含在字符串中的空格……这似乎有些过头了!有没有更优雅的方式做到这一点?

  • 问题内容: 如何从带有最少循环的json数据中获取以下json数据的所有pid和styles属性 谢谢 问题答案: