我有一个Java应用程序,它处理avro消息的Kafka流,并为每条消息对mongoDB集合执行查询。
在正确处理了几十条消息后,应用程序停止运行并抛出“com.mongodb.MongoSocketReadExc0019:过早到达流的结尾”。
这是代码:
JavaPairInputDStream<String, byte[]> directKafkaStream = KafkaUtils.createDirectStream(jsc,
String.class, byte[].class, StringDecoder.class, DefaultDecoder.class, kafkaParams, topics);
directKafkaStream.foreachRDD(rdd ->{
rdd.foreach(avroRecord -> {
byte[] encodedAvroData = avroRecord._2;
LocationType t = deserialize(encodedAvroData);
MongoClientOptions.Builder options_builder = new MongoClientOptions.Builder();
options_builder.maxConnectionIdleTime(60000);
MongoClientOptions options = options_builder.build();
MongoClient mongo = new MongoClient ("localhost:27017", options);
MongoDatabase database = mongo.getDatabase("DB");
MongoCollection<Document> collection = database.getCollection("collection");
Document myDoc = collection.find(eq("key", 4)).first();
System.out.println(myDoc);
});
});
首先,你不应该为每条记录打开mongo连接!然后你应该关闭你的mongo连接。
Mongo不喜欢你打开很多(成百上千?)不关闭它们。
以下是通过RDD打开mongo连接的示例:
directKafkaStream.foreachRDD(rdd ->{
rdd.foreachPartition(it -> {
// Opens only 1 connection per partition
MongoClient mongo = new MongoClient ("localhost:27017");
MongoDatabase database = mongo.getDatabase("DB");
MongoCollection<Document> collection = database.getCollection("collection");
while (it.hasNext()) {
byte[] encodedAvroData = it.next()._2;
LocationType t = deserialize(encodedAvroData);
Document myDoc = collection.find(eq("key", 4)).first();
System.out.println(myDoc);
}
mongo.close();
});
});
问题内容: 我正在使用最新的spring-data- mongodb(1.1.0.M2)和最新的Mongo驱动程序(2.9.0-RC1)。我遇到这样的情况,我有多个客户端连接到我的应用程序,并且我想在同一台Mongo服务器中为每个客户端提供自己的“模式/数据库”。如果我直接使用驱动程序,这并不是一件很难的事情: 看,容易。但是spring-data- mongodb不允许使用简单的方法来使用多个数
问题内容: 我选择的数据库是MongoDB。我正在编写一个数据层API,以从客户端应用程序中抽象实现细节- 也就是说,我实质上是在提供一个公共接口(一个充当IDL的对象)。 我正在以TDD方式测试自己的逻辑。在每个单元测试之前,调用一个方法来创建数据库单例,此后,当测试完成时,将调用一个方法来删除数据库。这有助于促进单元测试之间的独立性。 几乎所有单元测试(即 执行上下文查询 )都需要先进行某种插
问题内容: 我有一个MongoDB,用于存储来自不同传感器的数据。它具有以下结构: 现在,例如,我需要“心率”-包含所有字段和“数据”字段的文档-匹配条件“ 1483537204000和1483537214000之间的时间戳记”的子文档。 我已经在另一个问题中的mongo shell中获得了有关如何执行此操作的答案。参见以下代码: 但是如何在java spring-data中做到这一点?似乎在sp
问题内容: 这是我的GeneratePdf.java导入… Mongo连接并获取数据(): 我收到此错误,但我不明白: 问题答案: 使用JRBeanCollectionDataSource不是使用MongoDB连接器的正确方法。看一下Jaspersoft MongoDB Connector源附带的测试: MongoDbDatasource / src / test / java / com / j
问题内容: 我正在尝试使用MongoTemplate在Spring Data中实现以下工作的mongoDb查询: 该集合具有以下结构: 我要在这里做的是一份有关简单调查提交数据的报告。问题是“第一个问题的回答为0的用户如何回答第二个问题?” 我花了一整天时间搜索SpringData Mongo Db文档,但没有发现任何东西。有人可以帮忙吗? TIA 问题答案: 您可以通过提供自己的解决方案来解决此
问题内容: Mongodb是一个无模式文档数据库,但是在春季数据中,有必要定义实体类和存储库类,如下所示: 实体类: 存储库类: 无论如何,在春季数据mongodb中是否有使用map not class的功能,以便服务器可以接受任何动态JSON数据,然后将其存储在BSON中而无需任何预定义类? 问题答案: 您可能想知道,Spring或Java是否是解决您问题的正确解决方案-为什么不使用更动态的工具
问题内容: 我正在尝试使用查询搜索出生日期 而spring数据mongodb生成以下查询: MongoTemplate:使用查询查找: 字段:类的空值:类com.temp.model.User集合中的用户:user 但是我没有得到任何结果。 我在mongodb中的dob字段: 如何搜索 ISODate格式的 dob ? 问题答案: 这段代码可以很好地满足您的需求: 我的测试是使用以下代码,它可以正
问题内容: 我正在尝试通过查询元数据字段来获取GridFS文件列表。例如,我得到了一个GridFS文件文档,看起来像: 我想查询所有包含“ target_field” =“ abcdefg”的文件。我创建查询如下: 该列表始终为空。否则,查询文件名或uploadDate的效果很好。是否可以通过嵌套属性获取GridFS文件? 问题答案: 不幸的是,我没有让它与嵌套的BasicDBObjects一