当前位置: 首页 > 知识库问答 >
问题:

使用kafka实现oracle到mongodb的数据迁移

融烨磊
2023-03-14

我正在尝试使用Kafka将数据从oracle迁移到mongodb。我取了一个1000万的样本记录集,列长为90,每行为5KB

我将数据分成10个线程,但其中一个线程不是每次都在运行....当我检查数据时,我看到MongoDB中丢失了100万条记录。

主类:

    int totalRec = countNoOfRecordsToBeProcessed;
    int minRownum =0;
    int maxRownum =0;
    int recInThread=totalRec/10;
    System.out.println("oracle  "+new Date());
    for(int i=minRownum;i<=totalRec;i=i+recInThread+1){ 
          KafkaThread kth = new KafkaThread(i, i+recInThread, conn);
        Thread th = new Thread(kth);
        th.start();
    }
    System.out.println("oracle done+  "+new Date());    

Kafka制作人线程类:

JSONObject obj=new JSONObject();
while(rs.next()){

        int total_rows = rs.getMetaData().getColumnCount();
        for (int i = 0; i < total_rows; i++) {
            obj.put(rs.getMetaData().getColumnLabel(i + 1)
                    .toLowerCase(), rs.getObject(i + 1));
     }
            //System.out.println("object->"+serializedObject);
            producer.send(new ProducerRecord<String, String>("oracle_1",obj.toString()));
            obj= new JSONObject();
        //System.out.println(counter++);
}

消费者类:

        KafkaConsumer consumer = new KafkaConsumer<>(props);
        //subscribe to topic
        consumer.subscribe(Arrays.asList(topicName));

        MongoClientURI clientURI = new MongoClientURI(mongoURI);
        MongoClient mongoClient = new MongoClient(clientURI);
        MongoDatabase database = mongoClient.getDatabase(clientURI.getDatabase());
        final MongoCollection<Document> collection = database.getCollection(clientURI.getCollection());
        while (true) {
            final ConsumerRecords<Long, String> consumerRecords =
                    consumer.poll(10000);

            if (consumerRecords.count()!=0) {
                List<InsertOneModel> list1 = new ArrayList<>();
                consumerRecords.forEach(record -> {
//                    System.out.printf("Consumer Record:(%d, %s, %d, %d)\n",
//                            record.key(), record.value(),
//                            record.partition(), record.offset());'

                    String row =null;
                     row = record.value();
                     Document doc=Document.parse(row);
                     InsertOneModel t = new InsertOneModel<>(doc);
                     list1.add(t);
                });
                collection.bulkWrite((List<? extends WriteModel<? extends Document>>) (list1), new BulkWriteOptions().ordered(false));
                consumer.commitAsync();
                list1.clear();
            }
            }
    }

共有1个答案

裴建华
2023-03-14

我的建议是:使用Kafka Connect JDBC连接器将数据拉入,使用一个Kafka Connect MongoDB接收器将数据推出。否则你只是在推倒重来。卡夫卡连接是阿帕奇卡夫卡的一部分。

卡夫卡连接入门:

  • https://www.confluent.io/blog/simplest-useful-kafka-connect-data-pipeline-world-thereabouts-part-1/
  • https://www.confluent.io/blog/blogthe-simplest-useful-kafka-connect-data-pipeline-in-the-world-or-thereabouts-part-2/
  • https://www.confluent.io/blog/simplest-useful-kafka-connect-data-pipeline-world-thereabouts-part-3/
 类似资料:
  • 现在有两个系统,oracle产生数据业务数据,需要同于到mysql数据库中用于查询, 可以实时或异步,延时时间不要太长就好。而且两个库里的表结构,不一定相同,需要做简单加工。 目前的想法,就是定时任务服务,2个数据源,先查oracle再写mysql 还有什么牛逼的方案吗?

  • 我正在构建一个Django应用程序,它有几个应用程序。使用SQLite数据库作为后端运行良好。当我试图使用“manage.py migrate”将后端迁移到Oracle时,我发现了以下错误 django.db.utils.DatabaseError:ORA-01950:对表空间“XXXXXX”没有权限 当我在数据库中检查我的用户权限时,它有创建表、视图等的权限。我尝试执行“manage.py sq

  • 本文向大家介绍mysql数据迁移到Oracle的正确方法,包括了mysql数据迁移到Oracle的正确方法的使用技巧和注意事项,需要的朋友参考一下 在mysql数据库里有一个表student,它的结构如下: 在Oracle数据库里有一个表from_mysql,它的结构如下: 现在要把数据从mysql的student转移到Oracle的from_mysql中去,这里笔者借助kettle的spoon工

  • 本文向大家介绍Oracle数据库部分迁至闪存存储的实现方法,包括了Oracle数据库部分迁至闪存存储的实现方法的使用技巧和注意事项,需要的朋友参考一下 环境:Oracle 11.2.0.4 RAC(2 nodes) 说明:假设新增闪存挂载点是/flash(使用了第三方的集群文件系统),如果是使用Oracle的ASM,则本文提及的所有/flash目录都可以认定是新的闪存磁盘组是+FLASH。 1 实

  • 我在使用DSE SQOOP将oracle数据迁移到cassandra时出现以下错误。 下面是命令和错误堆栈: 。/DSE sq OOP cql-import-table USERS-columns first name-Cassandra-key space ORCL-Cassandra-table USERS-connect JDBC:Oracle:thin:Scott/xxxx @//us01

  • 本文向大家介绍使用Nodejs连接mongodb数据库的实现代码,包括了使用Nodejs连接mongodb数据库的实现代码的使用技巧和注意事项,需要的朋友参考一下 一个简单的nodejs连接mongodb示例,来自 mongodb官方示例 1. 创建package.json 首先,创建我们的工程目录connect-mongodb,并作为我们的当前目录 输入npm init命令创建package.j