当前位置: 首页 > 知识库问答 >
问题:

如何在Clojure(/java)中健壮地发出大量并发HTTPS请求

郭恩
2023-03-14
for each input:
    HTTP request A
    HTTP request B
    pass event on with (A.body and B.body)

我尝试了3种方法:

  1. 当一个事件进来时,把它放在一个频道上。许多go例程从通道中取出。每个make请求通过derefing从HTTP请求中“阻止”goblock。这不起作用,因为我认为promise不能很好地处理线程。
  2. 当出现事件时,立即启动future,它“阻止”异步promise。这导致CPU使用率非常高。加上网络资源的匮乏。
  3. 当一个事件出现时,立即为请求A触发HTTP-Kit请求,传入一个发出请求B的回调,传递一个传递事件的回调。这将导致几小时后出现内存不足错误。

这些都工作和处理一段时间的容量。他们最终都会崩溃。最近一次坠机,大约12小时后:

Mar 10, 2016 2:05:59 AM com.mchange.v2.async.ThreadPoolAsynchronousRunner$DeadlockDetector run
WARNING: com.mchange.v2.async.ThreadPoolAsynchronousRunner$DeadlockDetector@1bc8a7f5 -- APPARENT DEADLOCK!!! Creating emergency threads for unassigned pending
 tasks!
Mar 10, 2016 3:38:38 AM com.mchange.v2.async.ThreadPoolAsynchronousRunner$DeadlockDetector run
WARNING: com.mchange.v2.async.ThreadPoolAsynchronousRunner$DeadlockDetector@1bc8a7f5 -- APPARENT DEADLOCK!!! Complete Status:
        Managed Threads: 3
        Active Threads: 1
        Active Tasks:
                com.mchange.v2.resourcepool.BasicResourcePool$1DestroyResourceTask@65d8b232 (com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread-#0)
        Pending Tasks:
                com.mchange.v2.resourcepool.BasicResourcePool$AcquireTask@359acb0d
Pool thread stack traces:
        Thread[com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread-#0,5,main]
                com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread.run(ThreadPoolAsynchronousRunner.java:560)
        Thread[com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread-#1,5,main]
                java.lang.Object.wait(Native Method)
                com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread.run(ThreadPoolAsynchronousRunner.java:534)
        Thread[com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread-#2,5,main]
                java.lang.Object.wait(Native Method)
                com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread.run(ThreadPoolAsynchronousRunner.java:534)


Thu Mar 10 04:38:34 UTC 2016 [client-loop] ERROR - select exception, should not happen
java.lang.OutOfMemoryError: Java heap space
        at java.io.ByteArrayOutputStream.<init>(ByteArrayOutputStream.java:77)
        at sun.security.ssl.OutputRecord.<init>(OutputRecord.java:76)
        at sun.security.ssl.EngineOutputRecord.<init>(EngineOutputRecord.java:65)
        at sun.security.ssl.HandshakeOutStream.<init>(HandshakeOutStream.java:63)
        at sun.security.ssl.Handshaker.activate(Handshaker.java:514)
        at sun.security.ssl.SSLEngineImpl.kickstartHandshake(SSLEngineImpl.java:717)
        at sun.security.ssl.SSLEngineImpl.beginHandshake(SSLEngineImpl.java:743)
        at org.httpkit.client.HttpClient.finishConnect(HttpClient.java:310)
        at org.httpkit.client.HttpClient.run(HttpClient.java:375)
        at java.lang.Thread.run(Thread.java:745)
Mar 10, 2016 4:56:34 AM baleen.events invoke
SEVERE: Thread error: Java heap space
java.lang.OutOfMemoryError: Java heap space
Mar 10, 2016 5:00:43 AM baleen.events invoke
SEVERE: Thread error: Java heap space
java.lang.OutOfMemoryError: Java heap space
Mar 10, 2016 4:58:25 AM baleen.events invoke
SEVERE: Thread error: Java heap space
java.lang.OutOfMemoryError: Java heap space

YourKit探查器告诉我,我通过org.httpkit.client.handler通过java.util.concurrent.futureTask有大约2GB的char[],这表明对旧处理程序(即请求)的引用在某种程度上被保留了。尝试使用回调的全部原因是为了避免这种情况(尽管它们可能会被闭包所困扰)

共有1个答案

皇甫波峻
2023-03-14
  1. 每秒发出50个HTTP请求(每个请求可能需要200ms),这意味着在任何给定时间都可能有100个请求在运行)听起来是不是负担过重?

这在现代硬件上绝对不过分。

您可以结合core.async管道和HTTP-Kit的回调来实现这一点。您实际上并不需要为每个请求创建go例程(尽管这应该没有坏处),因为您可以从http-kit回调中使用异步put!

下面是一个小程序的示例,它执行与您所描述的类似的操作。它从通道中读取“events”--在本例中,每个events都是ID“1”--并在HTTP服务上查找这些ID。它接受第一次调用的响应,查找JSON键“next”并将其作为步骤2的URL排队。最后,当此查找完成时,它会将一个事件添加到out通道中,go例程将监视该通道以报告统计信息。

(ns concur-req.core
  (require [clojure.core.async :as async]
           [cheshire.core :refer [decode]]
           [org.httpkit.client :as http]))

(defn url-of
  [id]
  ;; this service responds within 100-200ms
  (str "http://localhost:28080/" id ".json"))

(defn retrieve-json-async
  [url c]
  (http/get url nil
            (fn [{body :body status :status :as resp}]
              (if (= 200 status)
                (async/put! c (decode body true))
                (println "ERROR:" resp))
              (async/close! c))))

(defn run [parallelism stop-chan]
  (let [;; allocate half of the parallelism to each step
        step1-n    (int (max (/ parallelism 2) 1))
        step2-n    step1-n
        ;; buffer to take ids, transform them into urls
        step1-chan (async/chan step1-n (map url-of))
        ;; buffer for result of pulling urls from step1, xform by extracting :next url
        step2-chan (async/chan step2-n (map :next))
        ;; buffer to count completed results
        out-chan   (async/chan 1 (map (constantly 1)))
        ;; for delivering the final result
        final-chan (async/chan)
        start-time (System/currentTimeMillis)]

    ;; process URLs from step1 and put the result in step2
    (async/pipeline-async step1-n step2-chan retrieve-json-async step1-chan)
    ;; process URLs from step2 and put the result in out
    (async/pipeline-async step2-n out-chan retrieve-json-async step2-chan)

    ;; keep the input channel full until stop-chan is closed.
    (async/go-loop []
      (let [[v c] (async/alts! [stop-chan [step1-chan "1"]])]
        (if (= c stop-chan)
          (async/close! step1-chan)
          (recur))))

    ;; count messages on out-chan until the pipeline is closed, printing
    ;; status message every second
    (async/go-loop [status-timer (async/timeout 1000) subt 0 accu 0]
      (let [[v c] (async/alts! [status-timer out-chan])]
        (cond (= c status-timer)
              (do (println subt "records...")
                  (recur (async/timeout 1000) 0 (+ subt accu)))

              (nil? v)
              (async/>! final-chan (+ subt accu))

              :else
              (recur status-timer (+ v subt) accu))))

    ;; block until done, then emit final report.
    (let [final-total (async/<!! final-chan)
          elapsed-ms  (- (System/currentTimeMillis) start-time)
          elapsed-s   (/ elapsed-ms 1000.0)]
      (print (format "Processed %d records with parallelism %d in %.3f seconds (%d/sec)\n"
                     final-total parallelism elapsed-s
                     (int (/ final-total elapsed-s)))))))

(defn run-for
  [seconds parallelism]
  (let [stop-chan (async/chan)]
    (future
      (Thread/sleep (* seconds 1000))
      (async/close! stop-chan))
    (run parallelism stop-chan)))

(do
  ;; Warm up the connection pool, avoid somaxconn problems...
  (doseq [p (map #(* 20 (inc %)) (range 25))]
    (run-for 1 p))
  (run-for (* 60 60 6) 500))

为了测试这一点,我设置了一个HTTP服务,它只在100-200ms之间的随机Hibernate时间后响应。然后我在我的Macbook Pro上运行了这个程序6个小时。

在并行度设置为500的情况下,平均每秒完成1155个事务(每秒完成2310个HTTP请求)。我确信,通过一些调优(特别是通过将HTTP服务移到不同的机器上),这个值可能会高得多。JVM内存在前30分钟内爬行到1.5GB,然后保持这个大小。我使用的是Oracle的64位1.8JVM。

 类似资料:
  • 我正在尝试发出HTTPS post请求。 请说明是什么导致了错误

  • 问题内容: 我想从Java代码登录到应用程序。这是我的代码… 但我无法登录,它只返回登录页面。 如果有人可以,请帮助我了解我在做什么错。 问题答案: 错误 :-( www-form 中间有多余的空格) 正确

  • 我对斯威夫特还是个新手。我正在尝试制作一个HTTPS POST到HTTPS url的特定头。注意HTTPS而不是HTTP。我如何完成这件事?提前道谢。

  • 问题内容: 我目前正在尝试使用Go进行一些实验。这是我正在尝试做的事情: 我有一个REST API服务正在运行,我想在尽可能多的Goroutine中反复查询特定的URL,以查看这些响应的性能如何(通过查看我的REST API服务器日志)。在退出程序之前,我想发送总计100万个HTTP请求-在计算机允许的范围内同时执行。 我知道有一些工具可以做到这一点,但是我主要对如何使用goroutines在Go

  • 我真的很惊讶,因为看起来太低级了,失去了http代理的简单性,发出http请求似乎应该放在核心库集中(我意识到这是非常主观的)。 删除HTTP代理的理由是什么?它不符合Clojure的核心哲学吗?对于用于基本HTTP请求的最佳库有什么建议?

  • 问题内容: 如何使用jQuery明确提出AJAX HTTPS GET请求?我正在尝试执行以下操作。在https页面上,我有一行代码,但是出现以下错误 如果相对资源来自https页面,为什么AJAX调用会尝试使用HTTP协议访问页面?如果$ .get(url)方法默认执行此操作,那么我如何使用jQuery进行显式的HTTPS GET请求?在http://forum.jquery.com/topic/