情况简介
spring项目,controller异步调用service的方法,产生大量并发。
具体业务:
前台同时传入大量待翻译的单词,后台业务接收单词,并调用百度翻译接口翻译接收单词并将翻译结果保存到数据库,前台不需要实时返回翻译结果。
处理方式:
controller接收文本调用service中的异步方法,将单词先保存到队列中,再启动2个新线程,从缓存队列中取单词,并调用百度翻译接口获取翻译结果并将翻译结果保存到数据库。
本文主要知识点:
多线程同时(异步)调用方法后,开启新线程,并限制线程数量。
代码如下:
@Service public class LgtsAsyncServiceImpl { /** logger日志. */ public static final Logger LOGGER = Logger.getLogger(LgtsAsyncServiceImpl2.class); private final BlockingQueue<Lgts> que = new LinkedBlockingQueue<>();// 待翻译的队列 private final AtomicInteger threadCnt = new AtomicInteger(0);// 当前翻译中的线程数 private final Vector<String> existsKey = new Vector<>();// 保存已入队列的数据 private final int maxThreadCnt = 2;// 允许同时执行的翻译线程数 private static final int NUM_OF_EVERY_TIME = 50;// 每次提交的翻译条数 private static final String translationFrom = "zh"; @Async public void saveAsync(Lgts t) { if (Objects.isNull(t) || StringUtils.isAnyBlank(t.getGco(), t.getCode())) { return; } offer(t); save(); return; } private boolean offer(Lgts t) { String key = t.getGco() + "-" + t.getCode(); if (!existsKey.contains(key)) { existsKey.add(key); boolean result = que.offer(t); // LOGGER.trace("待翻译文字[" + t.getGco() + ":" + t.getCode() + "]加入队列结果[" + result // + "],队列中数据总个数:" + que.size()); return result; } return false; } @Autowired private LgtsService lgtsService; private void save() { int cnt = threadCnt.incrementAndGet();// 当前线程数+1 if (cnt > maxThreadCnt) { // 已启动的线程大于设置的最大线程数直接丢弃 threadCnt.decrementAndGet();// +1的线程数再-回去 return; } GwallUser user = UserUtils.getUser(); Thread thr = new Thread() { public void run() { long sleepTime = 30000l; UserUtils.setUser(user); boolean continueFlag = true; int maxContinueCnt = 5;// 最大连续休眠次数,连续休眠次数超过最大休眠次数后,while循环退出,当前线程销毁 int continueCnt = 0;// 连续休眠次数 while (continueFlag) {// 队列不为空时执行 if (Objects.isNull(que.peek())) { try { if (continueCnt > maxContinueCnt) { // 连续休眠次数达到最大连续休眠次数,当前线程将销毁。 continueFlag = false; continue; } // 队列为空,准备休眠 Thread.sleep(sleepTime); continueCnt++; continue; } catch (InterruptedException e) { // 休眠失败,无需处理 e.printStackTrace(); } } continueCnt = 0;// 重置连续休眠次数为0 List<Lgts> params = new ArrayList<>(); int totalCnt = que.size(); que.drainTo(params, NUM_OF_EVERY_TIME); StringBuilder utf8q = new StringBuilder(); String code = ""; List<Lgts> needRemove = new ArrayList<>(); for (Lgts lgts : params) { if (StringUtils.isAnyBlank(code)) { code = lgts.getCode(); } // 移除existsKey中保存的key,以免下面翻译失败时再次加入队列时,加入不进去 String key = lgts.getGco() + "-" + lgts.getCode(); existsKey.remove(key); if (!code.equalsIgnoreCase(lgts.getCode())) {// 要翻译的目标语言与当前列表中的第一个不一致 offer(lgts);// 重新将待翻译的语言放回队列 needRemove.add(lgts); continue; } utf8q.append(lgts.getGco()).append("\n"); } params.removeAll(needRemove); LOGGER.debug("队列中共" + totalCnt + " 个,获取" + params.size() + " 个符合条件的待翻译内容,编码:" + code); String to = "en"; if (StringUtils.isAnyBlank(utf8q, to)) { LOGGER.warn("调用翻译出错,未找到[" + code + "]对应的百度编码。"); continue; } Map<String, String> result = getBaiduTranslation(utf8q.toString(), translationFrom, to); if (Objects.isNull(result) || result.isEmpty()) {// 把没有获取到翻译结果的重新放回队列 for (Lgts lgts : params) { offer(lgts); } LOGGER.debug("本次翻译结果为空。"); continue; } int sucessCnt = 0, ignoreCnt = 0; for (Lgts lgts : params) { lgts.setBdcode(to); String gna = result.get(lgts.getGco()); if (StringUtils.isAnyBlank(gna)) { offer(lgts);// 重新将待翻译的语言放回队列 continue; } lgts.setStat(1); lgts.setGna(gna); int saveResult = lgtsService.saveIgnore(lgts); if (0 == saveResult) { ignoreCnt++; } else { sucessCnt++; } } LOGGER.debug("待翻译个数:" + params.size() + ",翻译成功个数:" + sucessCnt + ",已存在并忽略个数:" + ignoreCnt); } threadCnt.decrementAndGet();// 运行中的线程数-1 distory();// 清理数据,必须放在方法最后,否则distory中的判断需要修改 } /** * 如果是最后一个线程,清空队列和existsKey中的数据 */ private void distory() { if (0 == threadCnt.get()) { // 最后一个线程退出时,执行清理操作 existsKey.clear(); que.clear(); } } }; thr.setDaemon(true);// 守护线程,如果主线程执行完毕,则此线程会自动销毁 thr.setName("baidufanyi-" + RandomUtils.nextInt(1000, 9999)); thr.start();// 启动插入线程 } /** * 百度翻译 * * @param utf8q * 待翻译的字符串,需要utf8格式的 * @param from * 百度翻译语言列表中的代码 * 参见:http://api.fanyi.baidu.com/api/trans/product/apidoc#languageList * @param to * 百度翻译语言列表中的代码 * 参见:http://api.fanyi.baidu.com/api/trans/product/apidoc#languageList * @return 翻译结果 */ private Map<String, String> getBaiduTranslation(String utf8q, String from, String to) { Map<String, String> result = new HashMap<>(); String baiduurlStr = "http://api.fanyi.baidu.com/api/trans/vip/translate"; if (StringUtils.isAnyBlank(baiduurlStr)) { LOGGER.warn("百度翻译API接口URL相关参数为空!"); return result; } Map<String, String> params = buildParams(utf8q, from, to); if (params.isEmpty()) { return result; } String sendUrl = getUrlWithQueryString(baiduurlStr, params); try { HttpClient httpClient = new HttpClient(); httpClient.setMethod("GET"); String remoteResult = httpClient.pub(sendUrl, ""); result = convertRemote(remoteResult); } catch (Exception e) { LOGGER.info("百度翻译API返回结果异常!", e); } return result; } private Map<String, String> convertRemote(String remoteResult) { Map<String, String> result = new HashMap<>(); if (StringUtils.isBlank(remoteResult)) { return result; } JSONObject jsonObject = JSONObject.parseObject(remoteResult); JSONArray trans_result = jsonObject.getJSONArray("trans_result"); if (Objects.isNull(trans_result) || trans_result.isEmpty()) { return result; } for (Object object : trans_result) { JSONObject trans = (JSONObject) object; result.put(trans.getString("src"), trans.getString("dst")); } return result; } private Map<String, String> buildParams(String utf8q, String from, String to) { if (StringUtils.isBlank(from)) { from = "auto"; } Map<String, String> params = new HashMap<String, String>(); String skStr = "sk"; String appidStr = "appid"; if (StringUtils.isAnyBlank(skStr, appidStr)) { LOGGER.warn("百度翻译API接口相关参数为空!"); return params; } params.put("q", utf8q); params.put("from", from); params.put("to", to); params.put("appid", appidStr); // 随机数 String salt = String.valueOf(System.currentTimeMillis()); params.put("salt", salt); // 签名 String src = appidStr + utf8q + salt + skStr; // 加密前的原文 params.put("sign", MD5Util.md5Encrypt(src).toLowerCase()); return params; } public static String getUrlWithQueryString(String url, Map<String, String> params) { if (params == null) { return url; } StringBuilder builder = new StringBuilder(url); if (url.contains("?")) { builder.append("&"); } else { builder.append("?"); } int i = 0; for (String key : params.keySet()) { String value = params.get(key); if (value == null) { // 过滤空的key continue; } if (i != 0) { builder.append('&'); } builder.append(key); builder.append('='); builder.append(encode(value)); i++; } return builder.toString(); } /** * 对输入的字符串进行URL编码, 即转换为%20这种形式 * * @param input * 原文 * @return URL编码. 如果编码失败, 则返回原文 */ public static String encode(String input) { if (input == null) { return ""; } try { return URLEncoder.encode(input, "utf-8"); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } return input; } }
总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对小牛知识库的支持。
线程中使用 java.lang.Runnable 如果用户在代码中通过 java.lang.Runnable 新启动了线程或者采用了线程池去异步地处理一些业务,那么需要将 SOFATracer 日志上下文从父线程传递到子线程中去,SOFATracer 提供的 com.alipay.common.tracer.core.async.SofaTracerRunnable 默认完成了此操作,大家可以按照
问题内容: 基本上我需要在更多线程中运行〜数百个计算。我只想在paralell中运行一些并行线程,例如5个线程和5个计算。 我正在使用spring框架,@Async选项是自然选择。我不需要全功能的JMS队列,这对我来说有点麻烦。 有任何想法吗 ?谢谢 问题答案: 你检查了吗?你可以定义一个线程池,其中包含最大数量的线程来执行任务。 如果要与结合使用,请在spring-config中使用它:
本文向大家介绍详解Servlet 3.0/3.1 中的异步处理,包括了详解Servlet 3.0/3.1 中的异步处理的使用技巧和注意事项,需要的朋友参考一下 在Servlet 3.0之前,Servlet采用Thread-Per-Request的方式处理请求,即每一次Http请求都由某一个线程从头到尾负责处理。如果一个请求需要进行IO操作,比如访问数据库、调用第三方服务接口等,那么其所对应的线程将
我目前正在处理一批数据,这些数据来自一个拥有数百万行的大型SQL数据库。 它在处理器中执行一些处理,包括通过带有连接的大型sql查询对从Reader检索到的行进行分组。 编写器将结果写入另一个表。 问题是此Batch存在性能问题,因为Sql选择查询需要大量时间并且步骤不会在多线程中执行。 因此,我希望在多标题中运行它们,但问题是,这些步骤通过计算具有相同类型的所有行的总数来对行进行分组。 因此,如
我正在尝试配置多线程步骤。我已经遵循了Spring批处理留档。 我的工作配置: 当我执行任务时,我得到以下异常。 我能够在没有油门限制参数的情况下执行作业。我想知道为什么我不能添加。任务执行器中的并发限制属性是什么。 谢谢,西瓦普拉卡什
问题内容: 我发现在Python 3.4中,用于多处理/线程的库很少:多处理vs线程与asyncio。 但是我不知道使用哪个,或者是“推荐的”。他们做的是同一件事还是不同?如果是这样,则将哪一个用于什么?我想编写一个在计算机上使用多核的程序。但是我不知道我应该学习哪个图书馆。 问题答案: 它们旨在(略有)不同的目的和/或要求。CPython(典型的主线Python实现)仍然具有全局解释器锁,因此多