我正在尝试使用Kafka将数据从oracle迁移到mongodb。我取了一个1000万的样本记录集,列长为90,每行为5KB
我将数据分成10个线程,但其中一个线程不是每次都在运行....当我检查数据时,我看到MongoDB中丢失了100万条记录。
主类:
int totalRec = countNoOfRecordsToBeProcessed;
int minRownum =0;
int maxRownum =0;
int recInThread=totalRec/10;
System.out.println("oracle "+new Date());
for(int i=minRownum;i<=totalRec;i=i+recInThread+1){
KafkaThread kth = new KafkaThread(i, i+recInThread, conn);
Thread th = new Thread(kth);
th.start();
}
System.out.println("oracle done+ "+new Date());
Kafka制作人线程类:
JSONObject obj=new JSONObject();
while(rs.next()){
int total_rows = rs.getMetaData().getColumnCount();
for (int i = 0; i < total_rows; i++) {
obj.put(rs.getMetaData().getColumnLabel(i + 1)
.toLowerCase(), rs.getObject(i + 1));
}
//System.out.println("object->"+serializedObject);
producer.send(new ProducerRecord<String, String>("oracle_1",obj.toString()));
obj= new JSONObject();
//System.out.println(counter++);
}
消费者类:
KafkaConsumer consumer = new KafkaConsumer<>(props);
//subscribe to topic
consumer.subscribe(Arrays.asList(topicName));
MongoClientURI clientURI = new MongoClientURI(mongoURI);
MongoClient mongoClient = new MongoClient(clientURI);
MongoDatabase database = mongoClient.getDatabase(clientURI.getDatabase());
final MongoCollection<Document> collection = database.getCollection(clientURI.getCollection());
while (true) {
final ConsumerRecords<Long, String> consumerRecords =
consumer.poll(10000);
if (consumerRecords.count()!=0) {
List<InsertOneModel> list1 = new ArrayList<>();
consumerRecords.forEach(record -> {
// System.out.printf("Consumer Record:(%d, %s, %d, %d)\n",
// record.key(), record.value(),
// record.partition(), record.offset());'
String row =null;
row = record.value();
Document doc=Document.parse(row);
InsertOneModel t = new InsertOneModel<>(doc);
list1.add(t);
});
collection.bulkWrite((List<? extends WriteModel<? extends Document>>) (list1), new BulkWriteOptions().ordered(false));
consumer.commitAsync();
list1.clear();
}
}
}
我的建议是:使用Kafka Connect JDBC连接器将数据拉入,使用一个Kafka Connect MongoDB接收器将数据推出。否则你只是在推倒重来。卡夫卡连接是阿帕奇卡夫卡的一部分。
卡夫卡连接入门:
现在有两个系统,oracle产生数据业务数据,需要同于到mysql数据库中用于查询, 可以实时或异步,延时时间不要太长就好。而且两个库里的表结构,不一定相同,需要做简单加工。 目前的想法,就是定时任务服务,2个数据源,先查oracle再写mysql 还有什么牛逼的方案吗?
我正在构建一个Django应用程序,它有几个应用程序。使用SQLite数据库作为后端运行良好。当我试图使用“manage.py migrate”将后端迁移到Oracle时,我发现了以下错误 django.db.utils.DatabaseError:ORA-01950:对表空间“XXXXXX”没有权限 当我在数据库中检查我的用户权限时,它有创建表、视图等的权限。我尝试执行“manage.py sq
本文向大家介绍mysql数据迁移到Oracle的正确方法,包括了mysql数据迁移到Oracle的正确方法的使用技巧和注意事项,需要的朋友参考一下 在mysql数据库里有一个表student,它的结构如下: 在Oracle数据库里有一个表from_mysql,它的结构如下: 现在要把数据从mysql的student转移到Oracle的from_mysql中去,这里笔者借助kettle的spoon工
本文向大家介绍Oracle数据库部分迁至闪存存储的实现方法,包括了Oracle数据库部分迁至闪存存储的实现方法的使用技巧和注意事项,需要的朋友参考一下 环境:Oracle 11.2.0.4 RAC(2 nodes) 说明:假设新增闪存挂载点是/flash(使用了第三方的集群文件系统),如果是使用Oracle的ASM,则本文提及的所有/flash目录都可以认定是新的闪存磁盘组是+FLASH。 1 实
我在使用DSE SQOOP将oracle数据迁移到cassandra时出现以下错误。 下面是命令和错误堆栈: 。/DSE sq OOP cql-import-table USERS-columns first name-Cassandra-key space ORCL-Cassandra-table USERS-connect JDBC:Oracle:thin:Scott/xxxx @//us01
本文向大家介绍使用Nodejs连接mongodb数据库的实现代码,包括了使用Nodejs连接mongodb数据库的实现代码的使用技巧和注意事项,需要的朋友参考一下 一个简单的nodejs连接mongodb示例,来自 mongodb官方示例 1. 创建package.json 首先,创建我们的工程目录connect-mongodb,并作为我们的当前目录 输入npm init命令创建package.j