当前位置: 首页 > 知识库问答 >
问题:

理解mongodb Scala src中的含义和订阅

施永宁
2023-03-14

我试图更好地理解Scala MongoDB src

使用scala mongodb驱动程序(api文档:http://mongodb.github.io/mongo-scala-driver/)

当我使用

  val collection: MongoCollection[Document] = database.getCollection("mycollection");

      val observable: Observable[Completed] = collection.insertOne(doc)


      observable.subscribe(new Observer[Completed] {
        override def onNext(result: Completed): Unit = println("Inserted")
        override def onError(e: Throwable): Unit = println("Failed")
        override def onComplete(): Unit = println("Completed")
      })
/**
     * Subscribes to the [[Observable]] and requests `Long.MaxValue`.
     *
     * Uses the default or overridden `onNext`, `onError`, `onComplete` partial functions.
     *
     * @param doOnNext anonymous function to apply to each emitted element.
     * @param doOnError anonymous function to apply if there is an error.
     * @param doOnComplete anonymous function to apply on completion.
     */
    def subscribe(doOnNext: T => Any, doOnError: Throwable => Any, doOnComplete: () => Any): Unit = {
      observable.subscribe(new Observer[T] {
        override def onSubscribe(subscription: Subscription): Unit = subscription.request(Long.MaxValue)

        override def onNext(tResult: T): Unit = doOnNext(tResult)

        override def onError(throwable: Throwable): Unit = doOnError(throwable)

        override def onComplete(): Unit = doOnComplete()

      })
    }
 /**
   * Request `Observable` to start streaming data.
   *
   * This is a "factory method" and can be called multiple times, each time starting a new [[Subscription]].
   * Each `Subscription` will work for only a single [[Observer]].
   *
   * If the `Observable` rejects the subscription attempt or otherwise fails it will signal the error via [[Observer.onError]].
   *
   * @param observer the `Observer` that will consume signals from this `Observable`
   */
  def subscribe(observer: Observer[_ >: T]): Unit

似乎调用subscribe调用了一个新线程(因为它被称为subscribe),但我没有看到这个新线程是从src调用的?

当我使用observable.subscribe(new Observer[Completed]{....

更新:

import org.mongodb.scala.MongoClient;
import org.mongodb.scala.bson.collection.immutable.Document;
import org.mongodb.scala._
import org.scalatest._
import Matchers._
import org.mongodb.scala._

class MongoSpec extends FlatSpec with Matchers {

  "Test MongoDb" should "insert" in {
    {
      val mongoClient: MongoClient = MongoClient()
      val database: MongoDatabase = mongoClient.getDatabase("scala-poc");

      val doc: Document = Document("_id" -> 6, "name" -> "MongoDB", "type" -> "database",
        "count" -> 1, "info" -> Document("x" -> 203, "y" -> 100))

      val collection: MongoCollection[Document] = database.getCollection("documents");

      val observable: Observable[Completed] = collection.insertOne(doc)

      observable.subscribe(new Observer[Completed] {
        override def onNext(result: Completed): Unit = println("Inserted")
        override def onError(e: Throwable): Unit = println(" \n\nFailed " + e + "\n\n")
        override def onComplete(): Unit = println("Completed")
      })

      mongoClient.close();

    }

  }
}
Failed com.mongodb.MongoClientException: Shutdown in progress

MongoClient.Close();正在insertOne方法完成之前调用。

那么insertOne或subscribe方法是异步的?

共有1个答案

阳光辉
2023-03-14

>

  • 否,subscribe(doOnNext,doOnError,doOnComplete)调用subscribe(observer)(您可以从问题中引用的实现中看到)。所以如果它也是从那里调用的,你会得到一个无限循环。当您编写诸如observer.subscribe(x=>println(s“next=$x”),error=>error.printstacktrace(),()=>{})之类的内容时,会使用“布线”。

    否,subscribe不会创建新线程。实现observable的类大多从Java MongoDB驱动程序中包装类,并调用它们自己的subscribe方法,例如override def subscribe(observer:observer[_>:TResult]):Unit=observe(wrapped).subscribe(observer)。这些subscribe方法也不会启动新的线程:请参见https://mongodb.github.io/mongo-java-driver/3.1/driver-async/reference/observables/获得一些解释。

  •  类似资料:
    • mysql会员订阅数据表的设计应该如何设计?产品有订阅商品和非订阅的,每次都只能购买一个。 订阅有1个月 3个月的 每次到期自动扣费。如果在一个月类购买了几个订阅商品 则扣费按照最新的一个 然后延长到期时间。其实是不是每次订阅都不需要生成新订单的 翻阅了其他资料都找不到很好的设计

    • 问题内容: 假设我们有一个类名Home。是什么区别 Home.this 和 Home.class ?他们指的是什么? 问题答案: 这个 引用该类的当前实例。 此表达式的正式术语似乎是qualified this,如Java语言规范的15.8.4节所引用。 在一个简单的类中,说和将等效。此表达式仅在存在内部类且需要引用封闭类的情况下使用。 例如: 家庭类 将类的表示形式作为对象返回。 此表达式的正式

    • 我猜的意思是指(2的)指数。“ld”和“fr”是什么意思?我在http://pubs.opengroup.org/onlinepubs/009695399/functions/ldexp.html中看到,“ld”可能意味着“load”(尾数/指数形式),但即使是这一点也不完全清楚。我不确定“fr”,但是(http://pubs.opengroup.org/onlinepubs/9699919799

    • 问题内容: 我是Golang的新手,正在做http://tour.golang.org/。谁能向我解释 第1、3、5和7行, 特别是’*’和’&’的作用吗?我的意思是通过在函数声明中提及它们,它们应该/应该做什么?一个玩具的例子: 看起来它们就像我们在C ++中一样。但是我无法将这些概念与此处的内容联系起来。换句话说,当我在Go的函数声明中使用“ *”和“&”时,它们会做什么。 编辑: 我知道引用

    • 问题内容: 是否有一个含义,超越指出,以源代码的读者,一个注释块是不是故意Javadoc注释?最近,我看到了很多看起来像这样的代码: 这是某种既定的惯例吗?如果是这样,它是什么意思(除了明显的字面意义之外),什么时候使用? 问题答案: 根据我能找到的信息,这是为了修复JavaDoc工具的较旧版本(或更可能是Eclipse处理JavaDoc注释的较旧版本)中的错误,该错误将不会显示替代a方法的正确文

    • 问题内容: 我在Android源代码中看到一个陌生的符号: 例如: 我对星号表示法不熟悉。有人可以解释吗? 问题答案: 是的简化版本 此表示法来自C。