当前位置: 首页 > 知识库问答 >
问题:

storm redis spout元组丢失无一例外

南门刚捷
2023-03-14

我有storm topology(1个worker)设置,其中spout(java)从redis出列(使用blpop)事件并传输到bolts。但是有一个观察到的情况是,当队列超过200万,并且在storm Nimbus/Supervisor/Zookeeper/Worker日志中没有发现警告/异常时,一些事件没有被bolt接收(在clojure中,6-spout线程、50-bolt线程)。

(ns event-processor
  (:import [backtype.storm StormSubmitter LocalCluster]
           java.util.UUID
           storm_jedis.RedisQueueSpout
           )
  (:use [backtype.storm clojure config])
  (:require [clojure.tools.logging :as log])
  (:require [clj-redis.client :as redis])
  (:import (redis.clients.jedis Jedis JedisPool JedisPoolConfig))
  (:gen-class))

(defmacro process-event [tuple]
    (log/info "processing event")
    )

(defbolt execute-ls-closure ["word"] {:prepare true}
  [conf context collector]
  (let [counts (atom {})]

    (bolt
     (execute [tuple]
       (let [
        timestart (. System currentTimeMillis)
        tuple-message (.get (get tuple "message") 0)
        string-to-emit (process-event tuple)
        ]
        (emit-bolt! collector [string-to-emit] :anchor tuple)
        (ack! collector tuple)
        )))))

(defn mk-topology []

  (topology
   ;{"1" (spout-spec sentence-spout)
   {"1" (spout-spec redis-spout :p 6)
                     }
   {"3" (bolt-spec {"1" :shuffle }
                   execute-ls-closure
                   :p 50)
                   }))

(defn run-local! []
  (let [cluster (LocalCluster.)]
    (.submitTopology cluster "word-count" {TOPOLOGY-DEBUG true} (mk-topology))
    (Thread/sleep 10000)
    (.shutdown cluster)
    ))

(defn submit-topology! [name]
  (StormSubmitter/submitTopology
   name
   {TOPOLOGY-DEBUG true
    TOPOLOGY-WORKERS 1}
   (mk-topology)))

(defn -main
  ([]
   (run-local!))
  ([name]
   (submit-topology! name)))

暂时还没有答案

 类似资料:
  • 我有一个拓扑结构如下所示: 注意,Bolt2和Bolt3从Bolt1和Bolt4接收元组。所有bolt都是运行python脚本的ShellBolts,而spout是运行从RabbitMQ读取的python脚本的ShellSpout。除了Bolt4之外,所有的工作都是预期的。如果我在RabbitMQ中一次添加一条消息,它就会全部工作并且干净利落地完成。如果我在Bolt4上挂起消息时对消息进行排队,则

  • 早上好,我试图在我们的代码中找到一个错误,关于当xml没有被格式化时使用JAXB解组XML。我已经做了很多调试和测试,但仍然找不到错误。 xml的解释部分如下所示: 列表最多可包含50.000<代码> 为此,我们有以下代码。解组器的创建被移动到一个接口,以区别我们希望通过jaxb解组的元素,包括子元素(如元素)和那些(如

  • 问题内容: 我今天犯了升级Eclipse的错误,现在无法启动新的Android项目。 我收到消息Proguard.cfg(找不到文件)。 我似乎在哪里找不到这东西?是否有可能摆脱它我在这个项目中不需要混淆… 谢谢 问题答案: 如果您确实不需要Proguard来混淆发行版,则可以从项目根文件夹的default.properties文件中删除以下行: proguard.config = proguar

  • 我正在尝试从 kafka 主题中获取消息,并看到如果我将 auto.commit.reset 策略设置为“最早”,则所有消息都会得到正确处理。但是,如果设置为“最新”,则第一条消息将丢失,其余消息将得到正确处理。如果我在这里错过了什么,任何人都可以帮忙吗?

  • 问题内容: 我正在尝试使用maven依赖项创建一个JFX11包含自定义的jar。根据我所做的研究,似乎最好的方法是通过Maven Shade插件。但是,当我运行它时,出现以下错误: 错误:缺少JavaFX运行时组件,并且是运行此应用程序所必需的 我不明白为什么会这样。我在搞什么 有一个更好的方法吗?我还尝试了使用相同消息的Maven程序集插件。 pom文件供参考 问题答案: 这个答案解释了为什么在

  • 问题内容: 我想在我的网站上使用,但得到以下信息: 我试过打印。输出以下内容: 谁能帮助我找到或建议替代方案? 问题答案: 从文档中: 页面(如果有的话)的地址,该页面将用户代理引至当前页面。这是由用户代理设置的。并非所有的用户代理都将设置此功能,有些用户代理提供了将HTTP_REFERER修改为功能的功能。简而言之,它不能真正被信任。