当前位置: 首页 > 工具软件 > gqlgen > 使用案例 >

mongodb订阅发布模型_如何使用Go,GQLgen和MongoDB处理GraphQL订阅

长孙鸿
2023-12-01

mongodb订阅发布模型

by Anshul Sanghi

通过Anshul Sanghi

如何使用Go,GQLgen和MongoDB处理GraphQL订阅 (How to handle GraphQL subscriptions with Go, GQLgen and MongoDB)

使用GraphQL订阅和MongoDB ChangeStreams创建实时数据服务器 (Creating a real-time data server with GraphQL subscriptions and MongoDB ChangeStreams)

If you have used GQLgen in the past, you know that it indeed supports subscription models, but the implementation they use doesn’t exactly work with MongoDB properly.

如果您曾经使用过GQLgen,那么您知道它确实支持订阅模型,但是它们使用的实现不适用于MongoDB。

For those of you who haven’t heard of or used GQLgen yet, it is a go package that essentially generates boilerplate code automatically from your GraphQL schemas and provides you with added functionality like setting up a GraphQL server, etc. We are going to use this extensively for our GraphQL setup, so I suggest you go take a look at it before continuing as I won’t be covering it much here. A good starting point would be this.

对于尚未听说或使用GQLgen的人来说,这是一个go软件包,它实质上从GraphQL模式自动生成样板代码,并为您提供诸如设置GraphQL服务器等附加功能。我们将使用这在我们的GraphQL设置中已经广泛使用,所以我建议您在继续之前先对其进行研究,因为在这里我不会对其进行过多介绍。 一个很好的起点就是这个

We are going to build an API that handles creating/querying/updating a user and listens when a user has a new notification via a subscription.

我们将构建一个API,该API可以处理用户的创建/查询/更新,并在用户通过订阅收到新通知时进行侦听。

I had to make some changes to my code as well as the GQLgen generated code to make it work properly, but I’m not really sure if this is the best way to go from a performance perspective and I would love to have any suggestions. This is also not going to cover everything into detail except for the required parts since the post is already long enough as is.

我必须对代码以及GQLgen生成的代码进行一些更改才能使其正常运行,但是我不确定从性能角度来看这是否是最佳方法,我很乐意提出任何建议。 这也不会涵盖所有细节,除了所需的部分,因为帖子已经足够长了。

建立 (Setup)

Let’s set up a starter project before we dive into the code. Create a new project in your GOPATH and create a package db within it. This directory will contain all code related to the database itself (in this case, MongoDB).

在深入研究代码之前,让我们建立一个入门项目。 在GOPATH创建一个新项目,并在GOPATH创建一个包db 。 该目录将包含与数据库本身相关的所有代码(在本例中为MongoDB)。

Next, install the following required packages:

接下来,安装以下必需的软件包:

go get github.com/99designs/gqlgengo get github.com/gorilla/muxgo get github.com/globalsign/mgo

The following packages are required to be installed and are only used by GQLgen internally. We are not going to work with these directly but they are required:

以下软件包是必须安装的,仅由GQLgen内部使用。 我们不会直接使用它们,但是它们是必需的:

go get github.com/pkg/errorsgo get github.com/urfave/cligo get golang.org/x/tools/go/ast/astutilgo get golang.org/x/tools/go/loadergo get golang.org/x/tools/importsgo get gopkg.in/yaml.v2

We are ready to start writing some code :)

我们准备开始编写一些代码了:)

项目设置 (Project Setup)

I’ll be using the globalsign/mgo package for Golang as my MongoDB driver which is essentially a community maintained version of labix/mgo.v2. Check it out here.

我将使用globalsign/mgo软件包作为MongoDB驱动程序,该驱动程序实质上是由社区维护的labix / mgo.v2版本。 在这里查看

In your db directory, create a file setup.go with the following code:

在您的db目录中,使用以下代码创建文件setup.go

package dbimport (   "fmt"   "github.com/globalsign/mgo")
var session *mgo.Sessionvar db *mgo.Database
func ConnectDB() {   session, err := mgo.Dial("mongodb://localhost:27017,localhost:27018")
if err != nil {      fmt.Println(err)   }   session.SetMode(mgo.Monotonic, true)   db = session.DB("subscriptionstest")}
func GetCollection(collection string) *mgo.Collection {   return db.C(collection)}
func CloseSession() {   session.Close()}

I already have a replica-set set up on ports 27017 and 27018. I suggest you do the same at this point before proceeding. Next, create a scripts folder in your project, and create a new file gqlgen.go with the following contents:

我已经在端口2701727018上设置了副本集。 我建议您在进行此操作之前先做同样的事情。 接下来,在您的项目中创建一个scripts文件夹,并创建一个包含以下内容的新文件gqlgen.go

// +build ignorepackage mainimport "github.com/99designs/gqlgen/cmd"func main() {   cmd.Execute()}

This is just required to run the generator and so we are going to exclude it from our build.

这只是运行生成器所必需的,因此我们将其从构建中排除。

Now, let’s create a new package users and create a file schema.graphql in it with the following code:

现在,让我们创建一个新的包users并使用以下代码在其中创建文件schema.graphql

schema {    query: Query    mutation: Mutation}type Query {    user(id: ID!): User!}
type Mutation {    createUser(input: NewUser!): User!    updateUser(input: UpdateUser!): User!    updateNotification(input: UpdateNotification): User!}
type User {    id: ID!    first: String!    last: String!    email: String!    notifications: [Notification!]!}
type Notification {    id: ID!    seen: Boolean!    text: String!    title: String!}
input NewUser {    email: String!}input UpdateUser {    id: ID!    first: String    last: String    email: String}input UpdateNotification {    id: ID!    userID: ID!    seen: Boolean!}
type Subscription {    notificationAdded(id: ID!): User!}

Now, navigate to the users folder from the command line and run

现在,从命令行导航到users文件夹并运行

go run ../scripts/gqlgen.go init

This will generate 4 files, namely resolver.go generated.go models_gen.go gqlgen.yml. It will also create a folder called server within the users package which will hold the code for running the GraphQL server. You can remove that as we are going to have our own server at the root of the project which would also allow us to eventually have multiple GraphQL endpoints being served from a single server.

这将生成4个文件,即resolver.go generated.go models_gen.go gqlgen.yml 。 它还将在用户包中创建一个名为server的文件夹,其中将包含用于运行GraphQL服务器的代码。 您可以删除它,因为我们将在项目的根目录拥有自己的服务器,这也最终使我们可以从单个服务器提供多个GraphQL端点。

Initially, we will be working with only resolver.go which basically holds the logic for various queries and mutations we defined in our schema file. But first, we need to go to the models_gen.go file and add the bson:"_id" tag to our ID field in the user struct so that we can get the id from the database into this struct.

最初,我们将仅使用resolver.go ,该文件基本上包含我们在架构文件中定义的各种查询和变异的逻辑。 但是首先,我们需要转到models_gen.go文件,并将bson:"_id"标记添加到用户结构中的ID字段中,以便我们可以将数据库中的ID获取到该结构中。

type User struct {   ID            string         `json:"id" bson:"_id"`   First         string         `json:"first"`   Last          string         `json:"last"`   Email         string         `json:"email"`   Notifications []Notification `json:"notifications"`}

Now, let’s quickly set up the basic resolvers without going into much detail. You’ll notice that at the top of the file, you’ll see some code similar to this:

现在,让我们快速设置基本的解析器,而无需太详细。 您会注意到,在文件顶部,您将看到一些类似于以下的代码:

type Resolver struct{}func (r *Resolver) Mutation() MutationResolver {   return &mutationResolver{r}}func (r *Resolver) Query() QueryResolver {   return &queryResolver{r}}func (r *Resolver) Subscription() SubscriptionResolver {   return &subscriptionResolver{r}}

We are going to replace it with this:

我们将用以下内容替换它:

type Resolver struct {   users *mgo.Collection}func New() Config {   return Config{      Resolvers: &Resolver{         users: db.GetCollection("users"),      },   }}func (r *Resolver) Mutation() MutationResolver {   r.users = db.GetCollection("users")   return &mutationResolver{r}}func (r *Resolver) Query() QueryResolver {   r.users = db.GetCollection("users")   return &queryResolver{r}}func (r *Resolver) Subscription() SubscriptionResolver {   r.users = db.GetCollection("users")   return &subscriptionResolver{r}}

We are doing this so that we can have a reference to our collection directly within the resolver struct which would make it easier for us to work with the collection throughout the resolvers. I’ll explain the significance of the New function later when we need it.

我们这样做是为了可以直接在解析程序结构中引用我们的集合,这将使我们更容易在整个解析程序中使用集合。 我会解释New的意义 稍后在需要时起作用。

Let’s quickly set up our basic resolvers.

让我们快速设置基本解析器。

CreateUser解析器 (CreateUser Resolver)

func (r *mutationResolver) CreateUser(ctx context.Context, input NewUser) (User, error) {   var user User   count, err := r.users.Find(bson.M{"email": input.Email}).Count()   if err != nil {      return User{}, err   } else if count > 0 {      return User{}, errors.New("user with that email already exists")   }   err = r.users.Insert(bson.M{"email": input.Email,})   if err != nil {      return User{}, err   }   err = r.users.Find(bson.M{"email": input.Email}).One(&user)   if err != nil {      return User{}, err   }   return user, nil}

UpdateUser解析器 (UpdateUser Resolver)

func (r *mutationResolver) UpdateUser(ctx context.Context, input UpdateUser) (User, error) {   var fields = bson.M{}   var user User   update := false   if input.First != nil && *input.First != "" {      fields["first"] = *input.First      update = true   }   if input.Last != nil && *input.Last != "" {      fields["last"] = *input.Last      update = true   }   if input.Email != nil && *input.Email != "" {      fields["email"] = *input.Email      update = true   }   if !update {      return User{}, errors.New("no fields present for updating data")   }   err := r.users.UpdateId(bson.ObjectIdHex(input.ID), fields)   if err != nil {      return User{}, err   }   err = r.users.Find(bson.M{"_id": bson.ObjectIdHex(input.ID)}).One(&user)   if err != nil {      return User{}, err   }
user.ID = bson.ObjectId(user.ID).Hex()
return user, nil}

UpdateNotification解析器 (UpdateNotification Resolver)

func (r *mutationResolver) UpdateNotification(ctx context.Context, input *UpdateNotification) (User, error) {   var user User   var oid = bson.ObjectIdHex(input.UserID)   if err := r.users.Find(bson.M{"_id": oid}).One(&user); err != nil {      return User{}, err   }   for index, val := range user.Notifications {      if bson.ObjectId(val.ID).Hex() == input.ID {         val.Seen = input.Seen         user.Notifications[index] = val         break      }   }   if err := r.users.UpdateId(oid, user); err != nil {      return User{}, err   }   return user, nil}

QueryUser解析器 (QueryUser Resolver)

func (r *queryResolver) User(ctx context.Context, id string) (User, error) {   var user User   if err := r.users.FindId(bson.ObjectIdHex(id)).One(&user); err != nil {      return User{}, err   }   user.ID = bson.ObjectId(user.ID).Hex()   return user, nil}

Now that we are done with the setup, let’s move on to the main part.

现在我们完成了设置,让我们继续进行主要部分。

具有ChangeStreams的MongoDB实时数据 (MongoDB Real-time Data With ChangeStreams)

MongoDB now supports real-time data similar to firebase starting from version 3.6. The setup isn’t as easy though. There are a few important prerequisites for change streams to work properly:

版本3.6开始,MongoDB现在支持类似于firebase的实时数据。 设置不是那么容易。 要使变更流正常工作,需要满足一些重要的先决条件:

  • It is only available for shared clusters and replica sets with the WireTiger driver. MongoDB v3.6+ have WireTiger as the default driver but we do need to set up a replica set ourselves.

    它仅适用于使用WireTiger驱动程序的共享群集和副本集。 MongoDB v3.6 +具有WireTiger作为默认驱动程序,但我们确实需要自己设置一个副本集。
  • Change stream is only available if "majority" read concern support is enabled (it is enabled by default).

    更改流仅在启用"majority"阅读关注支持(默认情况下启用)时可用。

Here’s what our method signature for NotificationAdded Resolver would look like:

这是NotificationAdded Resolver的方法签名如下所示:

func (r *subscriptionResolver) NotificationAdded(ctx context.Context, id string) (<-chan User, error) {   panic("not implemented")}

There’s a problem with this implementation and we’ll need to change it slightly to make it work properly. But first, let’s look at the code required within the resolver which will also make it easier for us to understand why the change was required.

此实现存在问题,我们需要对其进行一些更改以使其正常运行。 但是首先,让我们看一下解析器中所需的代码,这也将使我们更容易理解为什么需要进行更改。

We are first going to define the two variables userDoc and change and set up our changeStream listener like so:

我们首先要定义两个变量userDoc ,然后像下面这样change并设置我们的changeStream侦听器:

var userDoc Uservar change bson.Mcs, err := r.users.Watch([]bson.M{}, mgo.ChangeStreamOptions{MaxAwaitTimeMS: time.Hour, FullDocument: mgo.FullDocument("updateLookup")})
if err != nil {   return err}if cs.Err() != nil {   fmt.Println(err)}

Here, we are watching for changes in the user collection. We are also setting the timeout for ChangeStream as 1 hour. This is required to keep the change stream alive and not close automatically. We are also going to need the full document that was changed and so we define that setting in the ChangeStreamOptions as well. The watch function returns a cursor which we can then iterate over.

在这里,我们正在监视用户集合中的更改。 我们还将ChangeStream的超时设置为1小时。 这是使更改流保持活动状态并且不会自动关闭的必需条件。 我们还将需要已更改的完整文档,因此我们也在ChangeStreamOptions中定义该设置。 watch函数返回一个游标,然后可以对其进行迭代。

Next, we are going to start a goroutine for handling cursor events like so:

接下来,我们将开始一个goroutine来处理光标事件,如下所示:

go func() {   start := time.Now()   for {      ok := cs.Next(&change)      if ok {         byts, _ := bson.Marshal(change["fullDocument"].(bson.M))         bson.Unmarshal(byts, &userDoc)         userDoc.ID = bson.ObjectId(userDoc.ID).Hex()         if userDoc.ID == id {            *userChan <- userDoc         }      }      if time.Since(start).Minutes() >= 60 {         break      }      continue   }}()

Here we are iterating over the cursor using cursor.Next() method and a for loop. Whenever there’s a change event, the code inside the for loop will be executed and the data from that event will be available to us within the change variable.

在这里,我们使用cursor.Next()方法和for循环遍历光标。 每当发生change事件时,都会执行for循环中的代码,并且该事件中的数据将在change变量中对我们可用。

We are essentially going to extract the full document field from the change struct as type User in the for loop. We then check if the changed user is the same as the one that the subscription is looking for. If so, we send it to our channel and wait for more events.

本质上,我们将从for循环中的type User从更改结构中提取完整文档字段。 然后,我们检查更改后的用户是否与订阅所寻找的用户相同。 如果是这样,我们将其发送到我们的频道并等待更多事件。

This is also a good time to discuss the method signature for this method. Once again, you’d have something like this:

这也是讨论该方法的方法签名的好时机。 再一次,您将得到如下所示的内容:

func (r *subscriptionResolver) NotificationAdded(ctx context.Context, id string) (&lt;-chan User, error) {   ...}

It receives an id which is the userID and expects that a channel is returned. If we return a channel from this function, it will always be empty. Let’s look at the generated.go file to better understand this. The code related to this particular method would look something like this (It’s separated across the file but I am aggregating only the required code here):

它接收到一个ID,即用户ID,并期望返回一个频道。 如果我们从此函数返回一个通道,则该通道将始终为空。 让我们看一下generated.go文件,以更好地理解这一点。 与这个特定方法相关的代码看起来像这样(它在文件中是分开的,但是我这里只汇总了所需的代码):

type SubscriptionResolver interface {   NotificationAdded(ctx context.Context, id string) (&lt;-chan User, error)}
func (ec *executionContext) _Subscription_notificationAdded(ctx context.Context, field graphql.CollectedField) func() graphql.Marshaler {   rawArgs := field.ArgumentMap(ec.Variables)   args, err := field_Subscription_notificationAdded_args(rawArgs)   if err != nil {      ec.Error(ctx, err)      return nil   }   ctx = graphql.WithResolverContext(ctx, &graphql.ResolverContext{      Field: field,   })      rctx := ctx   results, err := ec.resolvers.Subscription().NotificationAdded(rctx, args["id"].(string))   if err != nil {      ec.Error(ctx, err)      return nil   }   return func() graphql.Marshaler {      res, ok := <-results      if !ok {         return nil      }      var out graphql.OrderedMap      out.Add(field.Alias, func() graphql.Marshaler {         return ec._User(ctx, field.Selections, &res)      }())      return &out   }}

The returned channel is then read by the generated code to get the updates and pass it on to our client. The problem is, once we return the channel from our resolver, that function execution is already over. Basically meaning that the channel would never receive any values here.

然后,返回的通道将由生成的代码读取以获取更新并将其传递给我们的客户端。 问题是,一旦我们从解析器返回通道,函数执行就结束了。 基本上意味着该通道在此永远不会接收任何值。

On the flip side, if values were added to the channel before returning it from the function, we are essentially going to have to wait an hour for all the updates to be pushed to the client since we are waiting an hour for the change streams to timeout (provided that we use a non-goroutine implementation for our ChangeStream cursor). It’s clear that this is not an ideal situation. Let’s make some changes to the above code to make it work for us.

在另一方面,如果在从函数返回通道之前将值添加到通道,则由于要等待一个小时的变更流发送到客户端,我们实际上将不得不等待一个小时才能将所有更新推送到客户端。超时(前提是我们对ChangeStream游标使用了非goroutine实现)。 显然,这不是理想的情况。 让我们对以上代码进行一些更改以使其对我们有用。

I’m first going to define a channel in the _Subscription_notificationAdded method whose pointer will then be passed to our resolver. It would look something like this:

我首先要在_Subscription_notificationAdded方法中定义一个通道,然后将其指针传递给我们的解析器。 它看起来像这样:

func (ec *executionContext) _Subscription_notificationAdded(ctx context.Context, field graphql.CollectedField) func() graphql.Marshaler {   rawArgs := field.ArgumentMap(ec.Variables)   args, err := field_Subscription_notificationAdded_args(rawArgs)   if err != nil {      ec.Error(ctx, err)      return nil   }   ctx = graphql.WithResolverContext(ctx, &graphql.ResolverContext{      Field: field,   })
userChan := make(chan User, 1)   rctx := ctx   go ec.resolvers.Subscription().NotificationAdded(rctx, args["id"].(string), &userChan)
return func() graphql.Marshaler {      res, ok := <-userChan      if !ok {         return nil      }      var out graphql.OrderedMap      out.Add(field.Alias, func() graphql.Marshaler {         return ec._User(ctx, field.Selections, &res)      }())      return &out   }}

We are creating a new channel with a limit of 1 item at a time for performance reasons. We are then passing its pointer to our resolver and also making the call to this resolver a goroutine.

由于性能原因,我们正在创建一个新通道,一次限制为1个项目。 然后,我们将其指针传递给解析器,并使对该解析器的调用成为goroutine。

The _Subscription_notificationAdded method will then return a function that listens to the userChan and pushes the update to our client every time a value is received.

然后, _Subscription_notificationAdded方法将返回一个函数,该函数侦听userChan并在每次接收到值时将更新推送到我们的客户端。

We also need to change the method signature for the method we just modified, we need to change

我们还需要为刚刚修改的方法更改方法签名,我们需要更改

type SubscriptionResolver interface {   NotificationAdded(ctx context.Context, id string) (&lt;-chan User, error)}

to

type SubscriptionResolver interface {   NotificationAdded(ctx context.Context, id string, userChan *chan User) error}

That’s all the modification we need. Once that’s done, here’s what the complete NotificationAdded Subscription Resolver would look like:

这就是我们需要的所有修改。 完成后,完整的NotificationAdded Subscription Resolver如下所示:

func (r *subscriptionResolver) NotificationAdded(ctx context.Context, id string, userChan *chan User) error {   var userDoc User   var change bson.M   cs, err := r.users.Watch([]bson.M{}, mgo.ChangeStreamOptions{MaxAwaitTimeMS: time.Hour, FullDocument: mgo.FullDocument("updateLookup")})   if err != nil {      return err   }   if cs.Err() != nil {      fmt.Println(err)   }   go func() {      start := time.Now()      for {         ok := cs.Next(&change)         if ok {            byts, _ := bson.Marshal(change["fullDocument"].(bson.M))            bson.Unmarshal(byts, &userDoc)            userDoc.ID = bson.ObjectId(userDoc.ID).Hex()            if userDoc.ID == id {               *userChan <- userDoc            }         }         if time.Since(start).Minutes() >= 60 {            break         }         continue      }   }()   return nil}

Now the code that is sending an item to the channel and the one that is receiving it are both non-blocking and running in the background.

现在,将项目发送到通道的代码和正在接收项目的代码都是非阻塞的,并且在后台运行。

Phew! That was a lot of work but that was all the heavy lifting that we had to do. Let’s move on to the fun part and create a server and see the result of our efforts.

! 那是很多工作,但这是我们要做的所有繁重的工作。 让我们继续进行有趣的部分,创建服务器并查看我们的努力结果。

有趣的东西 (The fun stuff)

Create a file main.go at the root of your project with the following code:

使用以下代码在项目的根目录下创建文件main.go

package main
import (   "fmt"   "github.com/gorilla/mux"   "github.com/gorilla/websocket"   "github.com/rs/cors"      "log"   "net/http"   "os"   "github.com/99designs/gqlgen/handler"   "<project path relative to GOPATH>/users"   "<project path relative to GOPATH>/db")
const defaultPort = "8080"func main() {   port := os.Getenv("PORT")if port == "" {   port = defaultPort}
db.ConnectDB()
c := cors.New(cors.Options{   AllowedOrigins:   []string{"http://localhost:" + port},   AllowCredentials: true,})r := mux.NewRouter()r.Handle("/", handler.Playground("User", "/users"))r.Handle("/users", c.Handler(handler.GraphQL(users.NewExecutableSchema(users.New()),   handler.WebsocketUpgrader(websocket.Upgrader{      CheckOrigin: func(r *http.Request) bool {         return true      },   }))),)http.Handle("/", r)log.Fatal(http.ListenAndServe(":8080", nil))}

GQLgen provides us with some built-in handlers like Playground and WebsocketUpgrader which essentially creates a UI for testing our GraphQL server and for having a WebSocket connection with the clients.

GQLgen为我们提供了一些内置的处理程序,例如Playground和WebsocketUpgrader,它们实质上创建了一个UI来测试GraphQL服务器以及与客户端建立WebSocket连接。

Also, remember we added a function called New to our resolvers earlier, which I mentioned that we’d talk about later? Well, here you can see why it was required. It essentially returned a Configuration struct that was required by the handlers provided by GQLgen for our code to work properly. You can see that the default code uses users.Config{Resolvers: &users.Resolvers{}} directly which is also fine as long as you include the code for the users field in the resolvers struct and set it to the users collection.

另外,请记住我们添加了一个名为New的函数 提早给我们的解析器,我提到过我们稍后再谈? 好吧,在这里您可以看到为什么需要它。 本质上,它返回了GQLgen提供的处理程序所需的Configuration结构,以使我们的代码正常工作。 您可以看到默认代码直接使用users.Config{Resolvers: &users.Resolvers{}} ,这也很好,只要您在resolvers结构中包括users字段的代码并将其设置为users集合即可。

At this point, we are ready to start our GraphQL server and test things out.

至此,我们准备启动GraphQL服务器并进行测试。

Run go build and then execute the generated binary file. The server should be running by now. Make sure you do have the MongoDB replica set running before trying to run our server, otherwise, it will throw an error. You can start here if you need help with running a replica set.

运行go build ,然后执行生成的二进制文件。 服务器应该现在正在运行。 在尝试运行服务器之前,请确保已运行MongoDB副本集,否则,它将引发错误。 如果您需要有关运行副本集的帮助,则可以从此处开始。

创建用户 (Create User)

更新用户 (Update User)

查询用户 (Query User)

通知已添加订阅 (NotificationAdded Subscription)

And there you have it!

在那里,您拥有了!

I once again, I want to stress that this might not be the optimal solution to the problem at hand, but it’s my take on a possible solution and I would love to have your feedback and suggestions on this.

我要再次强调,这可能不是解决当前问题的最佳方法,但这是我的一种可能的解决方案,我很乐意收到您对此的反馈和建议。

Thanks for reading. A few ? are always appreciated ?

谢谢阅读。 一些 ? 总是欣赏?

翻译自: https://www.freecodecamp.org/news/https-medium-com-anshap1719-graphql-subscriptions-with-go-gqlgen-and-mongodb-5e008fc46451/

mongodb订阅发布模型

 类似资料: