scala-mongo-driver使用mongo的问题

谢阳曜
2023-12-01
一月 26, 2019 5:23:06 下午 com.mongodb.diagnostics.logging.JULLogger log   
信息: Cluster created with settings {hosts=[47.105.94.110:27017],   
mode=SINGLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms', maxWaitQueueSize=500}   
一月 26, 2019 5:23:07 下午 com.mongodb.diagnostics.logging.JULLogger log   
信息: No server chosen by com.mongodb.async.client.ClientSessionHelper$1@6e2c9341    
from cluster description ClusterDescription{type=UNKNOWN, connectionMode=SINGLE, serverDescriptions=[ServerDescription{address=47.105.94.110:27017, type=UNKNOWN, state=CONNECTING}]}.     
Waiting for 30000 ms before timing out
// To directly connect to the default server localhost on port 27017
val mongoClient: MongoClient = MongoClient()
val database: MongoDatabase = mongoClient.getDatabase("mydb").withCodecRegistry(codecRegistry)
val collection: MongoCollection[Person] = database.getCollection("test")
println ("all count :"+collection.count().foreach(println))

demo路径 https://download.csdn.net/download/zhaoyu_nb/10941749(默认是5分 都不能改 有点坑)

git地址 https://github.com/zhaoyuasd/sbt-mongo-scala

这里使用的观察者模式 在加个多线程的异步响应结果 

 java.util.Observable ---这个可以参考下

libraryDependencies += "org.mongodb.scala" %% "mongo-scala-driver" % "2.5.0"

网上很多scala连接mongo都是基于mongo-java-driver 不是这个版本 这个是比较新的,学东西还是上官网的比较好

 这个是连接的代码 这里直接报错了

后来官网是这样说的

Observable API

Scala驱动程序不依赖任何第三方框架进行异步编程。为实现此目的,Scala API使用Observer模式的自定义实现,该模式包含三个特征:

  1. 可观察
  2. 观察
  3. 订阅

An Observable是一个可能无限数量的序列元素的提供者,根据从它收到的需求发布 Subscription

响应对以下方案给出的Observable.subscribe(Subscriber)方法的可能调用序列的调用Observer

onSubscribe onNext* (onError | onComplete)?

这意味着onSubscribe始终发出信号,然后是可能无限数量的onNext信号(按照要求 Subscription),然后是onError信号(如果有故障),或者onComplete信号,当没有更多元素可用时 - 只要Subscription没有取消。

注意

Observables从API返回的所有内容都是冷的,这意味着在订阅之前不会发生任何I / O. 因此,观察者保证从一开始就看到整个序列。因此,只创建一个Observable不会导致任何网络IO,并且直到Subscriber.request()被调用驱动程序才执行操作。

每个Subscription与一个ObservableMongoDB操作相关,其“订阅者”将接收它自己的特定结果集。

 

所以问题就很简单了 reactive 没有订阅是不会执行的  后面要加一个 订阅的操作(不熟悉reactive 可以去看看spring-webflux的文档)

scala-mongo-driver 官网文档地址 http://mongodb.github.io/mongo-scala-driver/2.5/getting-started/quick-tour-primer/

另外它的那个demo没法下载下来 不过有个类很重要 我直接粘出来了

里面有点隐式转换

val person: Person = Person("Ada", "Lovelace")
collection.insertOne(person).results()  ---有了下面的隐式转换 这个.results()就可以调用了 insert操作操作也才会真正执行
package com.laozhao.scala

import java.util.concurrent.TimeUnit

import scala.concurrent.Await
import scala.concurrent.duration.Duration

import org.mongodb.scala._

object Helpers {

  implicit class DocumentObservable[C](val observable: Observable[Document]) extends ImplicitObservable[Document] {
    override val converter: (Document) => String = (doc) => doc.toJson
  }

  implicit class GenericObservable[C](val observable: Observable[C]) extends ImplicitObservable[C] {
    override val converter: (C) => String = (doc) => doc.toString
  }

  trait ImplicitObservable[C] {
    val observable: Observable[C]
    val converter: (C) => String

    def results(): Seq[C] = Await.result(observable.toFuture(), Duration(10, TimeUnit.SECONDS))
    def headResult() = Await.result(observable.head(), Duration(10, TimeUnit.SECONDS))
    def printResults(initial: String = ""): Unit = {
      if (initial.length > 0) print(initial)
      results().foreach(res => println(converter(res)))
    }
    def printHeadResult(initial: String = ""): Unit = println(s"${initial}${converter(headResult())}")
  }

}

 

 类似资料: