当前位置: 首页 > 工具软件 > EventMachine > 使用案例 >

关于eventmachine,协程在rails里面的使用

仇睿
2023-12-01

最近在公司实习的时候一直在学习rails和ruby.

在实现一个小需求,从微信的接口拉取粉丝信息然后更新我们的数据库,这是一个不太常用的功能,很久才会跑一次.

但是rails的模式,如果作为普通的客户端来发需求,而且同时发送的请求不可以太多,否则会出错(估计是微信的限制),ruby的httprequest是阻塞操作,一次只会发送一个请求,我查阅资料并且求教一番之后有如下几种可行的思路.

####1.可以用socket来编程,自己模拟部分的http交互,开两个线程,一个负责不停的发送,另外一个不停的接受并且解析.这样速度很快,但是工作量很大.
####2.单线程加callback,这样可以在一次循环里面发出200个报文,然后等待回调.这实质上就是一个IO多路复用,当然我们可以加上多进/程线程来使得更加快速.

EventMachine是一个gem,可以引入callback的方式来处理httprequest.

在考虑方案2的时候,发现如果后续的逻辑比较复杂,那么你的回调就会写的非常复杂,各种嵌套.这很像js里面的callback地狱,面对这样的情况,使用协程可以使得我们的代码按照同步的方式来写异步,这样就非常有利于阅读.具体怎么写协程和Eventmachine结合我也没有写.只是查阅了一些资料.

rails里面要想使用EventMachine还是会有一些问题,其中最需注意的就是一定要写好EventMachine事件循环的结束状态,否则会一直loop导致后续EventMachine循环无法被有效的执行.

给出代码,免得忘:


class WxBindController < ApplicationController

  def execute_transcation(user_info_list)

    user_info_list.each do |id,attributes|
      user_info_list[id] = attributes.select{|k,v| v && v!=""}
    end

    #user_info_list = user_info_list.compact
    #将数据更新到数据库里面
    WeiXinBind.Base.connection do transcation
    WeiXinBind.update(user_info_list.keys,user_info_list.values)
    end
  end


  def send_request_in_batch(request_url_list,user_info_list)
    EM.run do
      cnt = 0
      request_url_list.each do |url,id|

        http = EventMachine::HttpRequest.new(url).get
        http.errback {
          cnt += 1
          if cnt == request_url_list.count - 1
            EM.stop
          end
        }
        http.callback {
          resp = Oj.load(http.response)
          if !resp.blank? && resp["subscribe"] == 1
            user_info_list[id] = {nickname:resp["nickname"],headimg_url:resp["headimgurl"],
                                  sex:resp["sex"],          province:resp["province"],
                                  city:resp["city"],        country:resp["country"]}
          end
          cnt += 1
          if cnt == request_url_list.count - 1
            EM.stop
          end
      }
      end
    end
  end


 #用来更新weixin_bind里面的数据
  def update

    access_token = params[:access_token]
    cate_id      = params[:cate_id]
    issue_id     = params[:issue_id]

    user_info_list = {}
    request_url_list = []

    users = WeiXinBind.select("weixin_bind.*").joins("inner join live_users as lu using(user_id)").where("lu.cate_id = ? and lu.issue_id  =?",cate_id,issue_id)
    users.each do |wx|
      openid = wx.service_openid
      request_url_list << ["https://api.weixin.qq.com/cgi-bin/user/info?access_token=#{access_token}&openid=#{openid}&lang=zh_CN",wx.id] unless openid.blank?

        if request_url_list.count > 200
         send_request_in_batch(request_url_list,user_info_list)
         request_url_list.clear
        end

        if user_info_list.count > 10000
          execute_transcation(user_info_list)
          user_info_list.clear
        end
      end

      send_request_in_batch(request_url_list,user_info_list)
      execute_transcation(user_info_list)
      render :text  => "OK"
  end


  #用来测试EventMachine的使用
  def mysleep(x)
    MyLog.debug(Time.now)
  end

end


 类似资料: