个人知识:我从javacodegeeks那里读到:“……SimpleAsynctaskeExecutor对于玩具项目来说是可以的,但对于任何比它更大的项目,它都有点风险,因为它不限制并发线程,也不重用线程。所以为了安全起见,我们还将添加一个任务执行器bean……”来自baeldung的一个非常简单的例子是如何添加我们自己的任务执行器。但我可以找到任何解释后果的指南,以及一些值得应用的案例。
个人愿望:我正在努力为我们的微服务提供一个企业架构,并将其发布在Kafka主题上。“不限制并发线程和不重用线程导致的风险”这句话似乎是合理的,主要是针对我基于日志的案例。
我在本地桌面上成功运行了下面的代码,但我想知道我是否正确地提供了自定义任务执行器。
我的问题:这个配置是否考虑到我已经在使用kafkattpla(即默认情况下至少对于产生/发送消息来说是同步的、单例的和线程安全的),是否真的朝着正确的方向重用线程并避免在使用SimpleAsyncTaskExecitor时意外创建线程?
生产者配置
@EnableAsync
@Configuration
public class KafkaProducerConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerConfig.class);
@Value("${kafka.brokers}")
private String servers;
@Bean
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(2);
executor.setQueueCapacity(500);
executor.setThreadNamePrefix("KafkaMsgExecutor-");
executor.initialize();
return executor;
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return props;
}
}
制作人
@Service
public class Producer {
private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Async
public void send(String topic, String message) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(final SendResult<String, String> message) {
LOGGER.info("sent message= " + message + " with offset= " + message.getRecordMetadata().offset());
}
@Override
public void onFailure(final Throwable throwable) {
LOGGER.error("unable to send message= " + message, throwable);
}
});
}
}
出于演示目的:
@SpringBootApplication
public class KafkaDemoApplication implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(KafkaDemoApplication.class, args);
}
@Autowired
private Producer p;
@Override
public void run(String... strings) throws Exception {
p.send("test", " qualquer messagem demonstrativa");
}
}
这是SimpleAsyncTaskExecutor的默认实现
protected void doExecute(Runnable task) {
Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task));
thread.start();
}
每个任务都会创建新线程,用Java创建线程并不便宜:(参考)
线程对象使用大量内存,在大型应用程序中,分配和取消分配许多线程对象会产生大量内存管理开销。
=
这就是为什么建议您使用线程池实现,线程创建开销仍然存在,但由于线程被重用而不是创建,因此大大减少了线程创建开销。
配置ThreadPoolTaskExecutor
时,应根据应用程序负载正确定义两个值得注意的参数:
>
这是池中的最大线程数。
私有int queue容量=整数。MAX_VALUE;
这是排队的最大任务数。当队列已满时,默认值可能会导致OutOfMemory异常。
使用默认值(Integer.MAX_value
)可能会导致服务器资源不足/崩溃。
您可以通过增加最大池大小setMaxPoolSize()
的数量来提高吞吐量,要减少加载增加时的预热,请将core poolsize设置为更高的值setCorePoolSize()
(当加载增加时,将初始化maxPoolSize-corePoolSize
之间不同的任何线程数)
在我以前的网站上,我曾经使用cookie来显示一个预主页,只有在第一次访问时。这很有效(例如,请参见此处),但是现在使用cookie并不流行,所以我想尽量避免使用cookie。 现在,我的新网站项目几乎总是通过javascript启动pre-home(显示一个modalbox),所以我不需要在服务器端执行任何操作。我正在考虑使用HTML5的localStorage而不是Cookie,如果浏览器没有
脚本内容如下,没有写Shebang行 执行 ./xxx.sh,得到输出结果 y 执行 bash ./xxx.sh,得到输出结果 y 执行 zsh ./xxx.sh,得到输出结果 x zsh数组下标是从1开始的,所以zsh的结果应该是输出y。 MacOS默认的Shell是zsh,但是执行./xxx.sh结果却不是y
我试图在登录过程中添加用户ip验证。如果用户的ip地址不在数据库中,应用程序应该拒绝身份验证。 问题:根据下面的设置,Auth.AuthenticationProvider()并没有替换默认的DaoAuthenticationProvider,而是将UserIpAuthenticationProvider添加为列表中的第一个AuthenticationProvider。 在用户名/密码组合不正确的
本文向大家介绍你认为table的作用和优缺点是什么呢?相关面试题,主要包含被问及你认为table的作用和优缺点是什么呢?时的应答技巧和注意事项,需要的朋友参考一下 用于表格数据的展示,并且很符合人们的直观认知。 在 + 布局流行之前,普遍使用 进行布局。曾经的神器 Dreamweaver 的可视化拖拽也是基于 布局。 布局的好处在于样式好控制,特别是居中、对齐。缺点在于会多处非常多的 DOM 节点
本文向大家介绍可替换元素和不可替换元素有什么不同的特点?相关面试题,主要包含被问及可替换元素和不可替换元素有什么不同的特点?时的应答技巧和注意事项,需要的朋友参考一下 可替换元素的内容由元素的某些属性的值决定 不可替换元素的内容由子节点的内容决定
我正在AWS EMR上学习火花。在这个过程中,我试图理解执行者数量(--num-executors)和执行者核心(--executor-cores)之间的区别。谁能告诉我这里吗? 同样,当我试图提交以下作业时,我得到了错误: