当前位置: 首页 > 知识库问答 >
问题:

如何使用Java8逐个分区地从cassandra中加载/读取数据?

钱季
2023-03-14

我使用的是spring-boot、datastax-java-cassandra-connector2.11-2.4.1.jar和java8。

我有一个场景,我需要从C*表读取/加载数据,但这个表可能有数百万条记录。

我需要从C*表加载这些数据,在使用datastax-java-cassandra-connector API的Java/spring-boot中,我是否可以按分区提取数据?

共有1个答案

梁丘德寿
2023-03-14

虽然select*from table可能起作用,但更有效的方法是通过像select*from table where token(part_key)>beginRange和token(part_key)<=endRange这样的查询按令牌范围读取数据。Spark Cassandra连接器的工作方式与此相同--它获取所有可用令牌范围的列表,然后从每个令牌范围获取数据,但将其直接发送到持有该令牌范围的节点(与通过协调器节点检索所有数据的select*from table相反)。

在计算令牌边界时需要小心,特别是对于整个范围的begin和end。您可以在我的存储库中找到Java代码的示例(太长了,不能粘贴在这里)。

 类似资料:
  • Java函数都有一个严重的限制,即它的使用者不可能抛出检查过的异常。因此,我想逐个访问Stream的元素。 我想这样做: 然而,是一种短路终端操作。也就是说,它关闭了河流。这段代码会在while循环的第二次迭代中崩溃。我不能简单地将所有元素放在一个数组中,然后逐个检查该数组,因为可能有数千万个元素。 请注意,我不是在问如何从中抛出异常。这个问题已经得到了回答。

  • 我想知道一个使用者如何从多个分区使用消息,具体来说,从不同的分区读取消息的顺序是什么? 我看了一眼源代码(Consumer,Fetcher),但我不能完全理解。 这是我以为会发生的: 分区是顺序读取的。也就是说:在继续下一个分区之前,一个分区中的所有消息都将被读取。如果我们达到< code>max.poll.records而没有消耗整个分区,则下一次读取将继续读取当前分区,直到耗尽为止,然后继续下

  • 当我使用Spark从S3读取多个文件时(例如,一个包含许多Parquet文件的目录)- 逻辑分区是在开始时发生,然后每个执行器直接下载数据(在worker节点上)吗?< br >还是驱动程序下载数据(部分或全部),然后进行分区并将数据发送给执行器? 此外,分区是否默认为用于写入的相同分区(即每个文件= 1个分区)?

  • 我的flink程序应该为每个输入记录做一个Cassandra查找,并根据结果做一些进一步的处理。 但问题是,每次查找需要将近10秒,换句话说,这个循环需要50秒来执行。 我如何加快这个操作?或者,在Flink中有没有其他的方法可以查到Cassandra呢?

  • 我有两台运行Cassandra的不同独立机器,我想将数据从一台机器迁移到另一台机器。 因此,根据数据税文档,我首先在机器1上拍摄了我的Cassandra集群的快照。 然后,我将数据移动到计算机 2,在那里我尝试使用 sstableloader 导入它。 注意:机器2上的keypsace(open_weather)和tablename(raw_weather_data)已创建,并且与机器1上的相同。