当前位置: 首页 > 面试题库 >

减少sidekiq作业的执行时间

穆子琪
2023-03-14
问题内容

我目前正在开发一个涉及在Rails服务器上同步联系人的应用程序。我正在使用Redis服务器和sidekiq在后台执行联系人同步。我的数据库是mongodb,我正在使用Mongoid
gem作为ORM。工作流程如下:

  1. 电话上的联系人通过应用程序传递到Rails服务器,然后在Rails服务器上,它在Redis服务器中排队。
  2. 现在,cron作业会触发连接到Redis的sidekiq并完成作业。

sidekiq的一项工作如下:

  1. 它具有触点数组(最大可达3000)。
  2. 它必须处理所有这些联系人。通过处理,我的意思是对数据库进行插入查询。

现在的问题是,sidekiq需要花费大量时间才能完成这项工作。平均需要50-70秒才能完成工作。

以下是相关文件

sidekiq.yml

# Sample configuration file for Sidekiq.
# Options here can still be overridden by cmd line args.
#   sidekiq -C config.yml

:verbose: true
:concurrency:  5
:logfile: ./log/sidekiq.log
:pidfile: ./tmp/pids/sidekiq.pid
:queues:
  - [new_wall, 1]#6
  - [contact_wall, 1]#7
  - [email, 1]#5
  - [gcm_chat, 1]#5
  - [contact_address, 1]#7
  - [backlog_contact_address, 5]
  - [comment, 7]
  - [default, 5]

蒙古扁豆

development:
  # Configure available database sessions. (required)
  sessions:
    # Defines the default session. (required)
    default:
          # Defines the name of the default database that Mongoid can connect to.
          # (required).
          database: "<%= ENV['DB_NAME']%>"
          # Provides the hosts the default session can connect to. Must be an array
          # of host:port pairs. (required)
          hosts:
            - "<%=ENV['MONGOD_URL']%>"
          #username: "<%= ENV['DB_USERNAME']%>"
          #password: "<%= ENV['DB_PASSWORD']%>"
          options:

            #pool: 12
        # Change the default write concern. (default = { w: 1 })
        # write:
        # w: 1

        # Change the default consistency model to primary, secondary.
        # 'secondary' will send reads to secondaries, 'primary' sends everything
        # to master. (default: primary)
        # read: secondary_preferred

        # How many times Moped should attempt to retry an operation after
        # failure. (default: The number of nodes in the cluster)
        # max_retries: 20

        # The time in seconds that Moped should wait before retrying an
        # operation on failure. (default: 0.25)
        # retry_interval: 0.25
  # Configure Mongoid specific options. (optional)
  options:
    # Includes the root model name in json serialization. (default: false)
    # include_root_in_json: false

    # Include the _type field in serializaion. (default: false)
    # include_type_for_serialization: false

    # Preload all models in development, needed when models use
    # inheritance. (default: false)
    # preload_models: false

    # Protect id and type from mass assignment. (default: true)
    # protect_sensitive_fields: true

    # Raise an error when performing a #find and the document is not found.
    # (default: true)
    # raise_not_found_error: true

    # Raise an error when defining a scope with the same name as an
    # existing method. (default: false)
    # scope_overwrite_exception: false

    # Use Active Support's time zone in conversions. (default: true)
    # use_activesupport_time_zone: true

    # Ensure all times are UTC in the app side. (default: false)
    # use_utc: false
test:
  sessions:
    default:
      database: db_test
      hosts:
        - localhost:27017
      options:
        read: primary
        # In the test environment we lower the retries and retry interval to
        # low amounts for fast failures.
        max_retries: 1
        retry_interval: 0


production:
  # Configure available database sessions. (required)
  sessions:
    # Defines the default session. (required)
    default:
      # Defines the name of the default database that Mongoid can connect to.
      # (required).
      database: "<%= ENV['DB_NAME']%>"
      # Provides the hosts the default session can connect to. Must be an array
      # of host:port pairs. (required)
      hosts:
        - "<%=ENV['MONGOD_URL']%>"
      username: "<%= ENV['DB_USERNAME']%>"
      password: "<%= ENV['DB_PASSWORD']%>"
      pool: 10
      options:

  # Configure Mongoid specific options. (optional)
  options:

模型.rb

def retry_save_contact_dump(c_dump_id)
      c_dump = ContactDump.where(_id: c_dump_id, status: ContactDump::CONTACT_DUMP_CONS[:ERROR]).first
      return false if c_dump.blank?
      user = User.where(_id: c_dump.user_id).first
      puts "retry_save_contact_dump"
      user.save_contacts_with_name(c_dump.contacts)
      c_dump.status = ContactDump::CONTACT_DUMP_CONS[:PROCESSED]
      c_dump.error_msg = ""
      c_dump.save
    rescue => e
      c_dump.status = ContactDump::CONTACT_DUMP_CONS[:CANTSYNC]
      c_dump.error_msg = e.message
      c_dump.save
   end


def save_contacts_with_name(c_array)
    m_num = Person.get_number_digest(self.mobile_number.to_s)
    c_array.each do |n|
      next if m_num == n["hash_mobile_number"]
      p = Person.where(h_m_num: n["hash_mobile_number"]).first_or_create
      save_friend(p) #if p.persisted?
      p.c_names.create(name: n["name"], user_id: self.id)
    end
  end

ContactDump.rb

class ContactDump
  include Mongoid::Document
  include Mongoid::Timestamps::Created
  include Mongoid::Timestamps::Updated

  field :contacts,   type: Array
  field :status,     type: Integer, default: 0
  field :user_id,    type: BSON::ObjectId
  field :error_msg,  type: String

  CONTACT_DUMP_CONS = {FRESH: 0,  PROCESSED: 1, ERROR: 2, CANTSYNC: 3}
end

如何加快工作处理速度?我尝试了在sidekiq.yml和mongoid.yml池中增加sidekiq并发性的排列,但是没有帮助。

whatsApp和其他消息传递应用程序如何处理联系人同步?

如果需要其他信息,请询问。谢谢。

编辑:如果不可能回答这个问题,有人可以建议我其他方法同步Rails服务器上的联系人。


问题答案:

救援的索引。

class ContactDump
  index({status: 1})
end

class Person
  index({h_m_num: 1})
end

Person可能需要更多索引,具体取决于您的Person.get_number_digest工作。

添加索引后运行 rake db:mongoid:create_indexes

另外,请删除puts,您不需要您的工作人员,并且即使您看不到输出,看跌期权也会严重影响您的表现!



 类似资料:
  • 在Spark中有几个优化可以减少批处理的时间。这些可以在优化指南中作了讨论。这节重点讨论几个重要的。 数据接收的并行水平 通过网络(如kafka,flume,socket等)接收数据需要这些数据反序列化并被保存到Spark中。如果数据接收成为系统的瓶颈,就要考虑并行地接收数据。注意,每个输入DStream创建一个receiver(运行在worker机器上) 接收单个数据流。创建多个输入DStrea

  • 我有一个java。util。流动包含键值对的流,如: 现在,我想合并所有具有相同密钥的条目: 数据已经排序,因此只需合并连续的数据集。 现在,我正在寻找一种方法来转换上述流的内容,而不将所有数据集加载到内存中。 我更喜欢得到一个java.util.stream.Stream,结果是一个不同的对象类型包含一个值列表,而不是一个单独的值。 我唯一的方法是一个自定义迭代器,它执行合并,但是转换为迭代器并

  • 我们正在尝试在K8s集群上部署apache Flink作业,但我们注意到一个奇怪的行为,当我们开始我们的作业时,任务管理器内存以分配的数量开始,在我们的例子中是3 GB。 最终,内存开始减少,直到达到约160 MB,此时,它会恢复一点内存,所以不会达到其极限。 这种非常低的内存通常会导致作业因任务管理器心跳异常而终止,即使在尝试查看Flink仪表板上的日志或执行作业流程时也是如此。 为什么它的内存

  • 我有以下代码 我试图对流进行基准测试,但在一次又一次地调用相同的函数时,执行时间稳步下降

  • 问题内容: 如何在Java 8中使用泛型参数重载Function? 错误:java:名称冲突:sum(java.util.function.Function )和sum(java.util.function.Function )具有相同的擦除 问题答案: 您所提出的示例与Java 8无关,与Java中泛型的工作原理无关。并将在编译时进行类型擦除,并将其转换为。方法重载的经验法则是具有不同的数量,类

  • 我只是想学习PySpark,但对以下两个RDD之间的区别感到困惑,我知道一个是类型集,一个是列表,但两者都是RDD 和 图和减函数处理代码: 我可以很容易地执行映射/减少功能对第二个rdd数据,但当我尝试执行映射或减少我得到以下错误:那么我们如何将第一个rdd转换为第二个rdd数据,或者如果有任何方法来解决以下错误请帮助 Py4JJavaError:调用z:org时出错。阿帕奇。火花应用程序编程接