当前位置: 首页 > 知识库问答 >
问题:

用RxJava异步线程实现防止争用条件

胡飞鹏
2023-03-14

我正在使用spring framework StringRedisTemplate来更新一个与多个线程发生的条目。

public void processSubmission(final String key, final Map<String, String> submissionDTO) {
    final String hashKey = String.valueOf(Hashing.MURMUR_HASH.hash(key));
    this.stringRedisTemplate.expire(key, 60, TimeUnit.MINUTES);
    final HashOperations<String, String, String> ops = this.stringRedisTemplate.opsForHash();
    Map<String, String> data = findByKey(key);
    String json;
    if (data != null) {
        data.putAll(submissionDTO);
        json = convertSubmission(data);
    } else {
        json = convertSubmission(submissionDTO);
    }
    ops.put(key, hashKey, json);
}
key (assignmentId) -> value (submissionId, status) 
class ProcessSubmission{

@Override
    public Observable<Boolean> processSubmissionSet1(List<Submission> submissionList, HttpHeaders requestHeaders) {
        return Observable.create(observer -> {
            for (final Submission submission : submissionList) {            
                //Cache entry insert method invoke via this call
                final Boolean status = processSubmissionExecutor.processSubmission(submission, requestHeaders);
                observer.onNext(status);
            }
            observer.onCompleted();
        });
    }

    @Override
    public Observable<Boolean> processSubmissionSet2(List<Submission> submissionList, HttpHeaders requestHeaders) {
        return Observable.create(observer -> {
            for (final Submission submission : submissionList) {
                //Cache entry insert method invoke via this call
                final Boolean status = processSubmissionExecutor.processSubmission(submission, requestHeaders);
                observer.onNext(status);
            }
            observer.onCompleted();
        });
    }

} 
class MyService{    
public void handleSubmissions(){
    final Observable<Boolean> statusObser1 = processSubmission.processSubmissionSet1(subListDtos.get(0), requestHeaders)
                .subscribeOn(Schedulers.newThread());
    final Observable<Boolean> statusObser2 = processSubmission.processSubmissionSet2(subListDtos.get(1), requestHeaders)
                    .subscribeOn(Schedulers.newThread());                   
    statusObser1.subscribe();
    statusObser2.subscribe();
    }       
}

因此,handleSubmissions在每个分配ID中使用多个线程进行调用。但是每个主线程创建并调用两个反应性java线程,并处理与每个赋值相关联的提交列表。

在保持RxJava实现性能的同时,防止redis入口争用的最佳方法是什么?有没有一个方法我可以做这个redis操作更有效的方式?

共有1个答案

齐文栋
2023-03-14

看起来您只是在末尾使用ops变量执行put操作,您可以隔离需要同步的点。

在我所做的简短研究中,我找不到hashoperations是否还不是线程安全的)。

但是,一个如何分离你所关心的部分的例子是这样做:

public void processSubmission(final String key, final Map<String, String> submissionDTO) {
    final String hashKey = String.valueOf(Hashing.MURMUR_HASH.hash(key));
    this.stringRedisTemplate.expire(key, 60, TimeUnit.MINUTES);
    Map<String, String> data = findByKey(key);
    String json;
    if (data != null) {
        data.putAll(submissionDTO);
        json = convertSubmission(data);
    } else {
        json = convertSubmission(submissionDTO);
    }
    putThreadSafeValue(key, hashKey, json);
}

并且有一个方法只对put操作进行同步:

private synchronized void putThreadSafeValue(key, hashKey, json) {
    final HashOperations<String, String, String> ops = this.stringRedisTemplate.opsForHash();
    ops.put(key, hashKey, json);
}

有许多方法可以做到这一点,但是看起来可以将线程争用限制在put操作。

 类似资料:
  • 本文向大家介绍wxpython中利用线程防止假死的实现方法,包括了wxpython中利用线程防止假死的实现方法的使用技巧和注意事项,需要的朋友参考一下 前段时间我编写了一个工业控制的软件,在使用中一直存在一个问题,就是当软件检索设备时,因为这个功能执行的时间比较长,导致GUI界面假死,让用户分辨不清楚软件到底仍在执行,还是真的挂掉了。(虽然我设计了同步log显示,但是这个也同样假死了) 程序截图如

  • 我问了这个问题,得到了这个有趣(也有点令人不安)的答案。 Daniel在他的回答中指出(除非我读错了),ECMA-335 CLI规范允许编译器从以下方法生成引发NullReferenceException的代码。 他说,为了保证不抛出,关键字应该在上使用,或者应该在行周围使用。 有人能证实这一点吗?如果这是真的,Mono和。NET编译器有关此问题的信息? 编辑此处是标准的链接。 更新我认为这是规范

  • 在使用ACK时,有没有一种简单的方法实现类似于“锁定”的东西来防止RabbitMQ队列中的竞争条件? 我有以下问题--我有几个客户机使用ACK使用队列。每当客户端收到消息时,他就会确认并处理消息。但是,如果由于某种原因处理失败,我希望消息返回到队列。

  • 本文向大家介绍Spring Boot使用Spring的异步线程池的实现,包括了Spring Boot使用Spring的异步线程池的实现的使用技巧和注意事项,需要的朋友参考一下 前言 线程池,从名字上来看,就是一个保存线程的"池子",凡事都有其道理,那线程池的好处在哪里呢? 我们要让计算机为我们干一些活,其实都是在使用线程,使用方法就是new一个Runnable接口或者新建一个子类,继承于Threa

  • 本文向大家介绍C#使用Interlocked实现线程同步,包括了C#使用Interlocked实现线程同步的使用技巧和注意事项,需要的朋友参考一下 通过System.Threading命名空间的Interlocked类控制计数器,从而实现进程 的同步。Iterlocked类的部分方法如下表: 示例,同时开启两个线程,一个写入数据,一个读出数据 代码如下:(但是运行结果却不是我们想象的那样) 运行结