当前位置: 首页 > 知识库问答 >
问题:

使用java的cassandra中的高频插入会丢失一些数据

林博厚
2023-03-14

我有5,000,000插入查询在文件中。我想从文件中读取它们,并用java驱动程序和执行Async方法写入cassandra,在循环语句中,如以下代码:

public static void main(String[] args) {
        FileReader fr = null;
        try {
            fr = new FileReader("the-file-name.txt");
            BufferedReader br = new BufferedReader(fr);
            String sCurrentLine;
            long time1 = System.currentTimeMillis();
            while ((sCurrentLine = br.readLine()) != null) {
                session.executeAsync(sCurrentLine);
            }

            System.out.println(System.currentTimeMillis() - time1);
            fr.close();
            br.close();
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    } 

我的表格定义是:

CREATE TABLE test.climate (
    city text,
    date text,
    time text,
    temprature int,
    PRIMARY KEY ((city, date), time)
) WITH CLUSTERING ORDER BY (time ASC)
    AND bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';

但运行程序后,表中的行数为2569725

cqlsh:test> select count(*) from climate ;

 count
---------
 2569725

我测试了10多次,每次选择计数(*)的结果都在2400,00到2600000之间

共有1个答案

勾学博
2023-03-14

您发出异步插入的速度比它们执行的速度快,因此它们最终会超过队列大小并失败。你可以增加你的队列大小,这是可行的,但是你只是对内存施加了反压力,而不是你的制作人,并且仍然可能撞到墙。尝试限制飞行中的查询,例如:

public static void main2(String[] args) {
    FileReader fr = null;
    int permits = 256;
    Semaphore l = new Semaphore(permits);
    try {
        fr = new FileReader("the-file-name.txt");
        BufferedReader br = new BufferedReader(fr);
        String sCurrentLine;
        long time1 = System.currentTimeMillis();
        while ((sCurrentLine = br.readLine()) != null) {
            l.acquire();
            session.executeAsync(sCurrentLine)
                .addListener(()->l.release(), MoreExecutors.directExecutor());
        }
        l.acquire(permits);

        System.out.println(System.currentTimeMillis() - time1);
        fr.close();
        br.close();
    } catch (FileNotFoundException e) {
        e.printStackTrace();
    } catch (IOException e) {
        e.printStackTrace();
    }
}

它可能会运行得同样快,只需要找到合适大小的信号量。还要注意阻塞,直到所有的许可都被返回(在末尾获取max),否则您可以在发送所有可能在队列中的请求之前关闭jvm。

免责声明:我没有测试上述代码

 类似资料:
  • 我的代码如下: 输出为 1 2 3 4 5 7 8 9 10 我不知道为什么数字6没有输出这是多线程的原因吗?

  • 我正在尝试使用hector API将数据插入到cassandra数据库中。下面显示了我使用的代码。 但是在给定的keyspace下的/var/lib/cassandra/data文件夹中找不到任何插入的数据。数据插入似乎不能正常工作。代码有什么问题。下面显示了我用来创建'data'列族的命令。

  • 命令用于将数据插入到表的列中。 语法: 示例: 在之前的文章中,我们创建一个名为“”的表,其中包含列(, , ),需要在表中插入一些数据。 我们来看看向“”表中插入数据的代码 - 在执行上面语句插入数据后,可以使用SELECT命令验证是否成功插入了数据。 执行结果如下所示 - 如下图所示 -

  • 我正在尝试使用Pig将HDFS中的文件中的数据复制到Cassandra中的表中。但在将数据存储在Cassandra中时,作业失败,出现空指针异常。有人能帮我吗? 用户表结构: 创建表用户(user\u id text主键、age int、第一个文本、最后一个文本) 我的猪脚本 > A=加载“/用户/hduser/用户。txt“使用PigStorage(',')作为(id:chararray,age

  • 我正在用SQS和JavaSDK发送和接收消息。几乎所有的消息都工作正常,但是其中一些丢失了,我不明白为什么。这是发送消息的代码: 以及接收代码(在循环中运行): 问题是,我能够接收到一些消息,但有些消息不是(总是相同类型的数据)。发送和接收的代码对于所有消息都是相同的。应用程序日志: 正在发送消息:{QueueUrl:https://sqs.us-east-1.amazonaws.com/0000

  • 问题内容: 我不知道为什么我对此查询感到困惑。 我有两个表:带有记录和带有记录。两个表都需要包含相同的数据,但是存在一些不匹配的情况。 我需要编写一个mysql查询以插入从到的丢失记录。 最后,两者和应该相同。 我不想先截断所有条目,然后再从另一个表中插入。因此,请提供任何帮助。 谢谢你。 问题答案: 也可以使用它。这将避免像John Woo的回答那样避免子查询的开销(当系统可能为外部查询的 每条