当前位置: 首页 > 知识库问答 >
问题:

在不影响性能或吞吐量的情况下,对所有线程具有完全原子性

籍兴文
2023-03-14

我有一个主机名列表,我应该通过使用正确的URL来打电话。假设我在链表中有四个主机名(hostA,hostB,hostC,hostD)-

  • 执行hostA url,如果hostA启动,则获取数据并返回响应

此外,我的应用程序中运行了一个后台线程,其中包含阻止主机名列表(来自我的另一个服务),我们不应该调用该列表,但它每10分钟运行一次,因此阻止主机名列表只会在10分钟后更新,因此如果存在任何阻止主机名列表,然后我不会从主线程调用该主机名,我会尝试调用另一个主机名。这意味着如果hostA被阻止,那么它将在阻止列表中出现hostA,但是如果hostA被阻止,那么该列表中就不会出现hostA

下面是我的后台线程代码,它从我的服务URL获取数据,并在应用程序启动后每10分钟继续运行一次。然后,它将解析来自URL的数据,并将其存储在ClientData变量中-

public class TempScheduler {

    // .. scheduledexecutors service code to start the background thread

    // call the service and get the data and then parse 
    // the response.
    private void callServiceURL() {
        String url = "url";
        RestTemplate restTemplate = new RestTemplate();
        String response = restTemplate.getForObject(url, String.class);
        parseResponse(response);
    }

    // parse the response and store it in a variable
    private void parseResponse(String response) {
        //...       
        
        // get the block list of hostnames
        Map<String, List<String>> coloExceptionList = gson.fromJson(response.split("blocklist=")[1], Map.class);
        List<String> blockList = new ArrayList<String>();
        for(Map.Entry<String, List<String>> entry : coloExceptionList.entrySet()) {
            for(String hosts : entry.getValue()) {
                blockList.add(hosts);
            }
        }
        
        // store the block list of hostnames which I am not supposed to make a call
        ClientData.replaceBlockedHosts(blockList);
    }
}

下面是我的ClientData课程replaceBlockedHosts方法将仅由后台线程调用,这意味着只有一个编写器。但是主应用程序线程将多次调用isHostBlocked方法,以检查特定主机名是否被阻止。而且,blockHost方法将被从catch block多次调用,以在blockedHosts列表中添加下一个主机,因此我需要确保所有读取线程都能看到一致的数据,并且没有调用该下一个主机,而是调用hostnames链接列表中的下一个主机。

public class ClientData {

    // .. some other variables here which in turn used to decide the  list of hostnames
    
    private static final AtomicReference<ConcurrentHashMap<String, String>> blockedHosts = 
            new AtomicReference<ConcurrentHashMap<String, String>>(new ConcurrentHashMap<String, String>());

    public static boolean isHostBlocked(String hostName) {
        return blockedHosts.get().containsKey(hostName);
    }

    public static void blockHost(String hostName) {
        blockedHosts.get().put(hostName, hostName);
    }

    public static void replaceBlockedHosts(List<String> hostNames) {
        ConcurrentHashMap<String, String> newBlockedHosts = new ConcurrentHashMap<>();
        for (String hostName : hostNames) {
            newBlockedHosts.put(hostName, hostName);
        }
        blockedHosts.set(newBlockedHosts);
    }
}

下面是我的主要应用程序线程代码,其中有我应该调用的主机名列表。如果主机名为空或在阻止列表类别中,则我不会调用该特定主机名,而是会尝试列表中的下一个主机名。

@Override
public DataResponse call() {

    List<String> hostnames = new LinkedList<String>();
    
    // .. some separate code here to populate the hostnames list
    // from ClientData class
    
    for (String hostname : hostnames) {     

        // If host name is null or host name is in block list category, skip sending request to this host
        if (hostname == null || ClientData.isHostBlocked(hostname)) {
            continue;
        }
    
        try {
            String url = generateURL(hostname);

            response = restTemplate.getForObject(url, String.class);

            break;
        } catch (RestClientException ex) {
            // add host to block list, 
            // Is this call fully atomic and thread safe for blockHost method 
            // in ClientData class?
            ClientData.blockHost(hostname);
        }
    }
}

每当主机名从主线程关闭时,我不需要调用它。我的后台线程也从我的一个服务中获取这些详细信息,每当任何服务器关闭时,它都会有作为阻止主机的主机名列表,每当它们启动时,该列表就会更新。

而且,每当抛出任何RestClientException时,我都会将该主机名添加到blockedHostsconcurrentmap中,因为我的后台线程每10分钟运行一次,所以map在完成10分钟之前不会有这个主机名。每当这个服务器恢复时,我的背景会自动更新这个列表。

我上面的主机名阻止列表代码是完全原子和线程安全的吗?因为我想要的是——如果hostA关闭,那么在被阻止的主机列表更新之前,其他线程不应该调用hostA。

共有2个答案

邢臻
2023-03-14

ConcurrentHashMap放入AtomicReference时,操作的原子性不会改变putget无论如何都是原子的,唯一受影响的操作,replaceBlockedHosts也可以使用简单的volatile引用。但我不知道你为什么需要这个。

在你的call()方法中有一个检查然后动作模式:

首先,调用ClientData.isHostBlock(主机名),然后调用restTemplate.getForObject(GenerateURL(主机名),...)

因此,当另一个线程调用blockHost时,isHostBlockedisHostBlocked的原子性确实阻止了一个线程在调用isHostBlocked后立即执行。因此,在后者将主机添加到阻止列表后,前者仍将继续网络操作。

如果要限制可能在同一主机上失败的线程数,则必须限制访问同一主机的线程数。没有办法绕过它。

魏康安
2023-03-14

请记住,与其他主机进行通信所需的时间远远超过线程中的任何操作。在这种情况下,我不会担心原子操作。

假设我们有线程t1t2t1host A发送请求并等待响应。当超时达到时,将抛出一个RestClientExc0019。现在,在抛出异常和将该主机添加到被阻止主机列表之间有一个非常小的时间跨度。在主机被阻止之前,可能会发生t2试图在此时向host A发送请求的情况-但是更有可能的是t2已经在很长的时间内发送了它t1正在等待您无法阻止的响应。

可以尝试设置合理的超时。当然,还有其他类型的错误不等待超时,但即使是那些比处理异常更多的时间。

使用ConcurrentHashMap是线程安全的,应该足以跟踪被阻止的主机。

除非使用compareAndSet等方法,否则AtomicReference本身并没有多大作用,因此调用不是原子的(但在我看来,如上所述不需要)。如果您真的想在发生异常后立即阻止主机,那么应该使用某种同步。您可以使用同步集来存储被阻止的主机。这仍然无法解决需要一段时间才能真正检测到任何连接错误的问题。

关于更新:正如评论中所说,未来的超时时间应该大于请求超时时间。否则,可能会取消可调用,主机也不会添加到列表中。在使用Future时,您甚至不需要超时。获取,因为请求最终会成功或失败。

当主机A关闭时,您看到许多异常的实际问题可能仅仅是许多线程仍在等待主机A的响应。您只在启动请求之前检查被阻塞的主机,而不是在任何请求期间。任何仍在等待该主机响应的线程将继续这样做,直到达到超时。

如果要防止这种情况,可以尝试定期检查当前主机是否尚未被阻止。这是一个非常幼稚的解决方案,有点违背了未来的目的,因为它基本上是投票。不过,这应该有助于理解一般问题。

// bad pseudo code 

DataTask dataTask = new DataTask(dataKeys, restTemplate);
future = service.submit(dataTask);

while(!future.isDone()) {
    if( blockedHosts.contains(currentHost) ) {
        // host unreachable, don't wait for http timeout
        future.cancel(); 
    }
    thread.sleep(/* */);
}

更好的方法是向所有等待相同主机的数据任务线程发送中断,这样它们就可以中止请求并尝试下一个主机。

 类似资料:
  • 我一直在使用SpringIntegration调用RESTAPI,但是SpringIntegration默认附带的http客户端不支持连接池或可重用性,因此我定制了使用PoolighttpClientConnectionManager 但是现在Spring集成停止在我的类路径中拾取JKS文件,所以我构建了自己的SSL上下文,但是构建这个SSL上下文导致了性能的显著下降 对于100个并发线程, 使用

  • 我有一个包含三个模块的工作簿,总共大约19000行代码。 我添加了多个按钮来运行性能完美的子程序。 我添加了四个新的子程序(都很短,总共可能有200行)来复制/粘贴数据并构建一个图形。 当我添加这些子例程时,所有其他子例程都开始异常缓慢地运行(从5-10秒到几分钟,有些子例程完全冻结)。 我有一个解决方法的想法(每次运行例程时关闭和打开自动计算),但我试图理解为什么这是一个问题。 Excel在待机

  • 我运行Http服务器使用Netty I/O库在四核Linux机器上。使用默认工作线程池大小(在Netty中内部设置为2 x内核数)运行时,性能分析显示吞吐量上限为1k请求/秒,请求速率的进一步增加导致延迟几乎线性增加。 由于最大CPU利用率显示为60%,我根据下面的代码增加了工作线程的数量。然而,性能几乎没有任何变化,CPU仍然限制在60-70%。该进程不受内存、I/O或网络带宽的限制。为什么不通

  • Spring引导在没有完成所有任务的情况下关闭。 MainApplication.java java datacollector.java

  • -然而在那之后的很多讨论,声称这不完全是真的,我认为很难知道应用程序应该做什么特殊的背景操作。

  • 我想通过从CSV文件向服务器发送100个请求来测试10个线程。我想每个线程按顺序发射100个请求,同时允许并行请求。我有我的主要采样器和子采样器的子组件和另一个采样器,我想比较我的结果。这种配置通常会产生7个采样器。问题是,当我尝试绘制吞吐量与线程之间的关系图时,在1个用户中,结果在y轴上显示了100多个事务/秒的值。同样的事情发生在“显示结果表”侦听器(即,对于1个用户,它显示700个样本)如何