当前位置: 首页 > 知识库问答 >
问题:

添加JsonArray时,Spark流抛出java.util.并发修改异常

萧焱
2023-03-14

在下面的过程中,我得到了一个错误。我意识到这个错误似乎是抛出的,因为它试图读取分区(rec)中的整个记录,但试图将其分配给字符串(Str=jsonArray.toJSONString();)同时我使用5秒的批处理间隔火花流配置。对这段代码有什么建议吗?请好心帮忙。谢啦

错误在这一行:

 Str=jsonArray.toJSONString();

以下是我的全部功能

MapRowRDD.foreachRDD(rdd ->{
            rdd.foreachPartition(
                    rec-> {
                        while(rec.hasNext()) {
                            JSONObject record = rec.next();
                            i=i+1;
                          if(TimeUnit.MINUTES.convert(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
                                  .parse((String) record.get("DATE_TRANSACTION"))
                                  .getTime()-DateUtils.addMinutes(new Date(), -5)
                                  .getTime(),TimeUnit.MILLISECONDS)>=0 || Integer.valueOf((String) record.get("EVENT_TYPE"))<0) {
                              jsonArray.add(record);
                            if(i % v_BATCH_WINDOW == 0)
                            {   
                                try {
                                    Str=jsonArray.toJSONString();
                                    HttpResponse<String> Response = ui.post(v_REST_API_ENDPOINT).body(Str).asString();
                                    out_JSON=Response.getBody();
                                    log.warn("Response : " + out_JSON.toString());
                                }
                                catch(UnirestConfigException e){
                                    System.out.println("UnirestConfigException occured "+ e.toString());
                                    e.printStackTrace();
                                }
                                jsonArray.clear();
                                i=0;
                            }
                          }
                        publishToKafka(record.toString(), outputTopic, props);
                        }
                        Str=jsonArray.toJSONString();
                        if (!Str.equals("[]") && Str!=null && !Str.isEmpty()) {
                            HttpResponse<String> Response = ui.post(v_REST_API_ENDPOINT).body(Str).asString();
                        }
                        jsonArray.clear();
                        i=0;
                    }   
                    );
        });

共有1个答案

仲孙才捷
2023-03-14

正如您所知,当您通过不同线程同时修改和迭代同一集合时,会发生此异常。jsonArray不是线程安全的,用一些线程安全的集合(如Vector)替换它,看看它是否有效

 类似资料:
  • 问题内容: 问题发生在 包含该行的代码位于 所有这一切都在里面,这里是一个 当我触摸时,它可能会激活,这将创建另一个具有不同属性的属性,这些属性会从屏幕上掉下来并在不到一秒钟的时间内销毁自己。这是我创建粒子效果的方式。我们可以将其称为“粒子” ,就像构造函数中的参数一样。 一切正常,直到我添加另一个main为止。现在,我同时在屏幕上有两个,如果我触摸最新的,它可以正常工作并启动粒子。 但是,如果我

  • 当我使用temp=iterator.next()时,sort方法会导致并发修改错误。你能帮我解决并发修改错误吗。我给出了整个类的代码,但我只是尝试完成sort方法。事先谢谢你的帮助。 我必须对ArrayList中的所有数组进行排序。

  • 问题内容: 我有这段代码,它给了我并发修改异常。即使看不到任何并发修改,我也无法理解为什么继续得到它。 问题答案: 为了避免,你应该这样编写代码: 允许你在迭代期间修改列表,但不能在创建和使用列表之间进行修改。

  • 问题内容: 您能否告诉我在单线程环境中是否有可能发生并发修改异常的方法,我下面发布的以下应用程序由两个线程组成,请告诉我我也可以在单个线程中看到相同的异常..请劝告 是的,我知道,在单线程环境中,此错误可能会出现..如下面的代码所示。 请告知解决该问题的方法是什么..这样就不会出现此错误.. !! 问题答案: 可以在单线程环境中引发A。只要在不应该​​在上下文中修改对象的情况下使用它,就不必在另一

  • 当Maven构建我的项目并运行单元测试时,有时会抛出一个并发修改异常(大约5次中有1次会失败,其他时间会成功构建)。但是当我以单元测试的形式在本地运行测试时,它们都会毫无例外地通过。 在我的pom.xml文件我有Surefire插件配置为: 然而,我得到的stackTrace没有提到是什么导致了并发修改异常。 我注意到所有的测试都通过了构建,但是出于某种原因,Maven重新打印了已经通过但现在有测

  • 我已经解决了如何合并两个XML文件并修改匹配的属性。 我现在正在努力解决如果file1中不存在file2节点,如何添加file2节点(基于属性名) 这是我拥有的xsl文件: 我找到了如何使用XSLT合并两个xml文件,但无法解决如何将提议解决方案应用于我的xsl。有人能帮忙吗?