我目前正在开发一个涉及在Rails服务器上同步联系人的应用程序。我正在使用Redis服务器和sidekiq在后台执行联系人同步。我的数据库是mongodb,我正在使用Mongoid
gem作为ORM。工作流程如下:
sidekiq的一项工作如下:
现在的问题是,sidekiq需要花费大量时间才能完成这项工作。平均需要50-70秒才能完成工作。
以下是相关文件
sidekiq.yml
# Sample configuration file for Sidekiq.
# Options here can still be overridden by cmd line args.
# sidekiq -C config.yml
:verbose: true
:concurrency: 5
:logfile: ./log/sidekiq.log
:pidfile: ./tmp/pids/sidekiq.pid
:queues:
- [new_wall, 1]#6
- [contact_wall, 1]#7
- [email, 1]#5
- [gcm_chat, 1]#5
- [contact_address, 1]#7
- [backlog_contact_address, 5]
- [comment, 7]
- [default, 5]
蒙古扁豆
development:
# Configure available database sessions. (required)
sessions:
# Defines the default session. (required)
default:
# Defines the name of the default database that Mongoid can connect to.
# (required).
database: "<%= ENV['DB_NAME']%>"
# Provides the hosts the default session can connect to. Must be an array
# of host:port pairs. (required)
hosts:
- "<%=ENV['MONGOD_URL']%>"
#username: "<%= ENV['DB_USERNAME']%>"
#password: "<%= ENV['DB_PASSWORD']%>"
options:
#pool: 12
# Change the default write concern. (default = { w: 1 })
# write:
# w: 1
# Change the default consistency model to primary, secondary.
# 'secondary' will send reads to secondaries, 'primary' sends everything
# to master. (default: primary)
# read: secondary_preferred
# How many times Moped should attempt to retry an operation after
# failure. (default: The number of nodes in the cluster)
# max_retries: 20
# The time in seconds that Moped should wait before retrying an
# operation on failure. (default: 0.25)
# retry_interval: 0.25
# Configure Mongoid specific options. (optional)
options:
# Includes the root model name in json serialization. (default: false)
# include_root_in_json: false
# Include the _type field in serializaion. (default: false)
# include_type_for_serialization: false
# Preload all models in development, needed when models use
# inheritance. (default: false)
# preload_models: false
# Protect id and type from mass assignment. (default: true)
# protect_sensitive_fields: true
# Raise an error when performing a #find and the document is not found.
# (default: true)
# raise_not_found_error: true
# Raise an error when defining a scope with the same name as an
# existing method. (default: false)
# scope_overwrite_exception: false
# Use Active Support's time zone in conversions. (default: true)
# use_activesupport_time_zone: true
# Ensure all times are UTC in the app side. (default: false)
# use_utc: false
test:
sessions:
default:
database: db_test
hosts:
- localhost:27017
options:
read: primary
# In the test environment we lower the retries and retry interval to
# low amounts for fast failures.
max_retries: 1
retry_interval: 0
production:
# Configure available database sessions. (required)
sessions:
# Defines the default session. (required)
default:
# Defines the name of the default database that Mongoid can connect to.
# (required).
database: "<%= ENV['DB_NAME']%>"
# Provides the hosts the default session can connect to. Must be an array
# of host:port pairs. (required)
hosts:
- "<%=ENV['MONGOD_URL']%>"
username: "<%= ENV['DB_USERNAME']%>"
password: "<%= ENV['DB_PASSWORD']%>"
pool: 10
options:
# Configure Mongoid specific options. (optional)
options:
模型.rb
def retry_save_contact_dump(c_dump_id)
c_dump = ContactDump.where(_id: c_dump_id, status: ContactDump::CONTACT_DUMP_CONS[:ERROR]).first
return false if c_dump.blank?
user = User.where(_id: c_dump.user_id).first
puts "retry_save_contact_dump"
user.save_contacts_with_name(c_dump.contacts)
c_dump.status = ContactDump::CONTACT_DUMP_CONS[:PROCESSED]
c_dump.error_msg = ""
c_dump.save
rescue => e
c_dump.status = ContactDump::CONTACT_DUMP_CONS[:CANTSYNC]
c_dump.error_msg = e.message
c_dump.save
end
def save_contacts_with_name(c_array)
m_num = Person.get_number_digest(self.mobile_number.to_s)
c_array.each do |n|
next if m_num == n["hash_mobile_number"]
p = Person.where(h_m_num: n["hash_mobile_number"]).first_or_create
save_friend(p) #if p.persisted?
p.c_names.create(name: n["name"], user_id: self.id)
end
end
ContactDump.rb
class ContactDump
include Mongoid::Document
include Mongoid::Timestamps::Created
include Mongoid::Timestamps::Updated
field :contacts, type: Array
field :status, type: Integer, default: 0
field :user_id, type: BSON::ObjectId
field :error_msg, type: String
CONTACT_DUMP_CONS = {FRESH: 0, PROCESSED: 1, ERROR: 2, CANTSYNC: 3}
end
如何加快工作处理速度?我尝试了在sidekiq.yml和mongoid.yml池中增加sidekiq并发性的排列,但是没有帮助。
whatsApp和其他消息传递应用程序如何处理联系人同步?
如果需要其他信息,请询问。谢谢。
编辑:如果不可能回答这个问题,有人可以建议我其他方法同步Rails服务器上的联系人。
救援的索引。
class ContactDump
index({status: 1})
end
class Person
index({h_m_num: 1})
end
Person
可能需要更多索引,具体取决于您的Person.get_number_digest
工作。
添加索引后运行 rake db:mongoid:create_indexes
另外,请删除puts
,您不需要您的工作人员,并且即使您看不到输出,看跌期权也会严重影响您的表现!
在Spark中有几个优化可以减少批处理的时间。这些可以在优化指南中作了讨论。这节重点讨论几个重要的。 数据接收的并行水平 通过网络(如kafka,flume,socket等)接收数据需要这些数据反序列化并被保存到Spark中。如果数据接收成为系统的瓶颈,就要考虑并行地接收数据。注意,每个输入DStream创建一个receiver(运行在worker机器上) 接收单个数据流。创建多个输入DStrea
我有一个java。util。流动包含键值对的流,如: 现在,我想合并所有具有相同密钥的条目: 数据已经排序,因此只需合并连续的数据集。 现在,我正在寻找一种方法来转换上述流的内容,而不将所有数据集加载到内存中。 我更喜欢得到一个java.util.stream.Stream,结果是一个不同的对象类型包含一个值列表,而不是一个单独的值。 我唯一的方法是一个自定义迭代器,它执行合并,但是转换为迭代器并
我们正在尝试在K8s集群上部署apache Flink作业,但我们注意到一个奇怪的行为,当我们开始我们的作业时,任务管理器内存以分配的数量开始,在我们的例子中是3 GB。 最终,内存开始减少,直到达到约160 MB,此时,它会恢复一点内存,所以不会达到其极限。 这种非常低的内存通常会导致作业因任务管理器心跳异常而终止,即使在尝试查看Flink仪表板上的日志或执行作业流程时也是如此。 为什么它的内存
我有以下代码 我试图对流进行基准测试,但在一次又一次地调用相同的函数时,执行时间稳步下降
问题内容: 如何在Java 8中使用泛型参数重载Function? 错误:java:名称冲突:sum(java.util.function.Function )和sum(java.util.function.Function )具有相同的擦除 问题答案: 您所提出的示例与Java 8无关,与Java中泛型的工作原理无关。并将在编译时进行类型擦除,并将其转换为。方法重载的经验法则是具有不同的数量,类
我只是想学习PySpark,但对以下两个RDD之间的区别感到困惑,我知道一个是类型集,一个是列表,但两者都是RDD 和 图和减函数处理代码: 我可以很容易地执行映射/减少功能对第二个rdd数据,但当我尝试执行映射或减少我得到以下错误:那么我们如何将第一个rdd转换为第二个rdd数据,或者如果有任何方法来解决以下错误请帮助 Py4JJavaError:调用z:org时出错。阿帕奇。火花应用程序编程接