我有一个分布式任务队列,其中的任务如下所示:
# creates a uniquely-named file
new_path = do_work()
old_path = database.query('select old path')
unlink(old_path)
database.query('insert new path')
这里有一个竞争条件:如果任务队列软件在完全相同的时间启动其中两个任务,它们都将从数据库中获得相同的<code>old_path</code>,并且竞争失败者的取消链接调用失败(将失败者的新路径从未来的取消链接中孤立出来)。
有没有办法让我构建它来绕过这场比赛?如果需要,我可以从当前设计中抛出几乎任何东西。具体来说,我使用的是PostgreSQL,Python和Celery。我知道我可以使用表范围的锁定/将 psycopg2 的事务级别更改为可序列化,但我不确定这是否避免了这种竞争条件。表级锁定还意味着我必须为每个附加任务引入一个新表(以免它们相互阻塞),这听起来不太吸引人。
如果任务队列软件能够为请求提供唯一的标识符,也许您可以将每个请求的old_path存储在不同的行中。如果没有,也许您可以为每个请求生成一个密钥,并将路径与它一起存储。
不要选择旧路径,请执行以下操作:
old_path = database.query('
UPDATE paths
SET used_flag = TRUE
WHERE used_flag = FALSE
RETURNS data');
RETURNS
子句允许您从刚刚更新的行(/已删除/插入)中“选择”值。
used_flag
指定该行是否已被另一个Python实例使用。使用WHereused_flag=FALSE
位将确保您没有选择已使用的内容。
我强烈建议您研究已经解决此问题的工具,例如PGQ。排队比您预期的要难得多。这不是你想要重新发明的轮子。
米海的回答表面看起来很好,但在并发操作中有所下降。
两个并发UPDATE可以选择相同的行(在他的示例中具有used_flag=FALSE
)。其中一个将获得锁并继续。另一个将等到第一次运行并提交。当提交发生时,第二次更新将获得锁,重新检查其条件,发现没有任何行匹配,并且什么也不做。因此,除了一组并发更新中的一个之外,所有更新都有可能返回一个空集,事实上这种可能性很大。
在<code>READ COMMITTED</code>模式下,您仍然可以获得不错的结果,大致相当于一个会话在<code<UPDATE</code>上连续循环。在SERIALIZABLE
模式下,它将毫无希望地失败。试试看;以下是设置:
CREATE TABLE paths (
used_flag boolean not null default 'f',
when_entered timestamptz not null default current_timestamp,
data text not null
);
INSERT INTO paths (data) VALUES
('aa'),('bb'),('cc'),('dd');
这是演示。尝试三个并发会话,一步一步地进行。在READ COMMITTED中执行一次,然后使用BEGIN隔离级别SERIALIZABLE
而不是普通的BEGIN
对所有会话SERIALIZABLE
再次执行。比较结果。
SESSION 1 SESSION2 SESSION 3
BEGIN;
BEGIN;
UPDATE paths
SET used_flag = TRUE
WHERE used_flag = FALSE
RETURNING data;
BEGIN;
INSERT INTO
paths(data)
VALUES
('ee'),('ff');
COMMIT;
UPDATE paths
SET used_flag = TRUE
WHERE used_flag = FALSE
RETURNING data;
BEGIN;
INSERT INTO
paths(data)
VALUES
('gg'),('hh');
COMMIT;
COMMIT;
在< code>READ COMMITTED中,第一次更新成功并生成四行。第二个生成剩余的两个< code>ee和< code>ff,它们是在第一次更新运行后插入并提交的。第二次更新不会返回< code>gg和< code>hh,即使它实际上是在提交之后执行的,因为它已经选择了自己的行,并且在插入这些行时正在等待锁。
在< code>SERIALIZABLE隔离中,第一次更新成功并生成四行。第二次失败,返回< code >错误:由于并发更新,无法序列化访问。在这种情况下,< code>SERIALIZABLE隔离不会帮助您,它只会改变失败的性质。
如果没有显式事务,当UPDATE
并发运行时,也会发生同样的事情。如果您使用显式事务,则演示更容易,而不需要调整时间。
如上所述,系统工作正常,但如果您只想得到最旧的行呢?因为<code>UPDATE</code>在阻塞锁之前选择它要操作的行,所以您会发现在任何给定的事务集中,只有一个<code>UPDATE</code>会返回结果。
你会想到这样的技巧:
UPDATE paths
SET used_flag = TRUE
WHERE entry_id = (
SELECT entry_id
FROM paths
WHERE used_flag = FALSE
ORDER BY when_entered
LIMIT 1
)
AND used_flag = FALSE
RETURNING data;
或者
UPDATE paths
SET used_flag = TRUE
WHERE entry_id = (
SELECT min(entry_id)
FROM paths
WHERE used_flag = FALSE
)
AND used_flag = FALSE
RETURNING data;
但是这些不会如你所愿;并发运行时,两者将选择相同的目标行。一个将继续,一个将阻塞锁,直到第一个提交,然后继续并返回空结果。如果没有第二个< code >和used_flag = FALSE,我想他们甚至可以返回重复项!在上面的演示< code >路径表中添加一个< code > entry _ id SERIAL PRIMARY KEY 列后,尝试一下。要让他们比赛,只需在第三个会话中< code >锁定表路径;参见我在下面的链接中给出的例子。
我在另一个回答中写了这些问题,并且在我的回答中写了多线程会在一个受约束的集合上导致重复更新吗?
说真的,去看看PGQ。它已经为您解决了这个问题。
问题内容: 我正在开发的应用程序中存在潜在的竞争状况,我想在查询中考虑和避免这种情况。 总结应用程序流程… 在表中创建一个新行: 通过查看对时间敏感的表格,找出Bar先生是否是获胜者: 如果他是赢家,请相应地更新他的条目行: 由于每个奖项只能颁发一次,因此我需要消除比赛条件的任何可能性,在这种情况下,另一个过程可以查询奖项表并更新上述步骤2和3之间的条目表。 我一直在做一些研究,发现了大量关于事务
我不清楚它写在哪里 私有构造函数的存在是为了避免将复制构造函数实现为此(p.x,p.y)时出现的争用条件;这是私有构造函数捕获习惯用法的一个例子(Bloch和Gafter,2005)。 我知道它提供了一个getter来在数组中同时检索x和y,而不是为每一个单独的getter,所以调用者将看到一致的值,但为什么是私有构造函数呢?这里有什么诀窍
我读过很多关于代码重构和避免if-else语句的主题。实际上,我有一个类,我使用了很多if-else条件。 更多细节:我正在使用pull解析器,在soap响应的每一行,我会检查是否有我感兴趣的标签,如果没有,请检查另一个标签等: 现在,我想用别的东西,而不是那些if else条件,但我不知道是什么。 你能给我举个例子吗?
问题内容: 在确定文件是否存在时,使用try语句如何避免“竞争条件”? 我之所以这样问是因为高度支持的答案0)(更新:已删除)似乎暗示使用会创造机会,否则这种机会就不会存在。 给出的示例是: 但与以下内容相比,我不了解如何避免出现竞争状况: 调用如何使攻击者能够处理他们无法完成的文件? 问题答案: 比赛条件,当然,你的程序和文件上运行一些其他的代码之间(竞争状态总是需要至少两个平行的进程或线程,看
我已经构造了一个NFA,我正在运行这个方法来评估机器,看看一个表达式是否有效。这适用于小的正则表达式,但是当我的正则表达式的大小以及NFA的大小变得太大时,这个搜索会向我抛出一个堆栈溢出。我很确定这是因为我已经实现了一个BFS,正在使用递归,并且可能没有很好地处理我的基本案例。 这个方法接受一个表达式和一个节点(从NFA的起始节点开始)。首先,它检查表达式的长度是否为零,如果我在接受节点(节点上的
问题内容: 我正在尝试通过从客户端向服务器发送密钥和随机数来认证用户。 我的代码未向我显示客户端的响应。执行下面的代码时,我得到了一个空指针异常。 问题答案: 解决大多数问题的固定步骤: 阅读堆栈跟踪以确定哪一行代码引发NPE 在该行代码处设置一个断点 使用调试器,在遇到断点时,确定该行中的对象引用是 弄清楚为什么引用该文件(到目前为止,这是唯一实际的困难部分) 解决根本原因(也可能很困难)