当前位置:网站首页>For a while, a dynamic thread pool was created, and the source code was put into GitHub

For a while, a dynamic thread pool was created, and the source code was put into GitHub

2020-11-06 01:33:43 Yin Jihuan

Explain the background

Thread pool is used a lot in daily work , When you need asynchrony , When batch processing some tasks, we will define a thread pool to process .

There are some problems in using thread pools , The following is a brief introduction to some problems encountered before .

Scene one : Realize some functions of batch data processing , At the beginning, the number of core threads in the thread pool is set to be small , And then I want to adjust , You can only restart the application after changing it .

Scene two : There's an application for task processing , Will receive MQ The message is processed by the task , The thread pool queue also allows a certain number of tasks to be cached . When tasks are slow , It's not very convenient to see how many have not been disposed of . At that time, in order to be quick and convenient , It starts a thread directly to print the size of the thread pool queue .

Just before I had a thread pool application in the official account of the US (https://mp.weixin.qq.com/s/tIWAocevZThfbrfWoJGa9w), I think their ideas are very good , There is no open source , So I took the time to work on my open source project Kitty A dynamic thread pool component has been added in , Support Cat monitor , Dynamically change the core parameters , Task accumulation alarm, etc . Today, I'd like to share with you how to realize it .

Project source code address https://github.com/yinjihuan/kitty

Usage mode

Add dependency

Components that depend on thread pools , at present Kitty Unpublished , You need to download the source code yourself install Local or private warehouse .

  
  1. <dependency>
  2. <groupId>com.cxytiandi</groupId>
  3. <artifactId>kitty-spring-cloud-starter-dynamic-thread-pool</artifactId>
  4. </dependency>

Add the configuration

And then in Nacos Configure thread pool information , This one of mine integrates Nacos. It is recommended that an application create a separate thread pool configuration file , For example, our name is dataId by kitty-cloud-thread-pool.properties,group by BIZ_GROUP.

The contents are as follows :

  
  1. kitty.threadpools.nacosDataId=kitty-cloud-thread-pool.properties
  2. kitty.threadpools.nacosGroup=BIZ_GROUP
  3. kitty.threadpools.accessToken=ae6eb1e9e6964d686d2f2e8127d0ce5b31097ba23deee6e4f833bc0a77d5b71d
  4. kitty.threadpools.secret=SEC6ec6e31d1aa1bdb2f7fd5eb5934504ce09b65f6bdc398d00ba73a9857372de00
  5. kitty.threadpools.owner= Yin Jihuan
  6. kitty.threadpools.executors[0].threadPoolName=TestThreadPoolExecutor
  7. kitty.threadpools.executors[0].corePoolSize=4
  8. kitty.threadpools.executors[0].maximumPoolSize=4
  9. kitty.threadpools.executors[0].queueCapacity=5
  10. kitty.threadpools.executors[0].queueCapacityThreshold=5
  11. kitty.threadpools.executors[1].threadPoolName=TestThreadPoolExecutor2
  12. kitty.threadpools.executors[1].corePoolSize=2
  13. kitty.threadpools.executors[1].maximumPoolSize=4

nacosDataId,nacosGroup

When the monitoring configuration is modified, you need to know which one to listen to DataId, Value is the currently configured DataId.

accessToken,secret

Verification information of the nailing robot , For warning .

owner

The person in charge of this app , The warning message will show .

threadPoolName

The name of the thread pool , You need to pay attention to .

The rest of the configuration will not be introduced one by one , Consistent with the parameters inside the thread pool , There are also some can see the source code to know .

Injection use

  
  1. @Autowired
  2. private DynamicThreadPoolManager dynamicThreadPoolManager;
  3. dynamicThreadPoolManager.getThreadPoolExecutor("TestThreadPoolExecutor").execute(() -> {
  4. log.info(" The use of thread pools ");
  5. try {
  6. Thread.sleep(30000);
  7. } catch (InterruptedException e) {
  8. e.printStackTrace();
  9. }
  10. }, "getArticle");

adopt DynamicThreadPoolManager Of getThreadPoolExecutor Method to get the thread pool object , Then incoming Runnable,Callable etc. . The second parameter is the name of the task , The reason to extend a parameter is that if the task has no identity , So there's no way to distinguish between tasks .

The thread pool component integrates by default Cat Dot , The name can be set in Cat Check the monitoring data related to this task .

Extend the functionality

Task execution monitoring

stay Cat Of Transaction The report will display the name of the thread pool as the type .

 picture

The details will be displayed with the name of the task .

 picture

Dynamic modification of core parameters

Core parameters currently only support corePoolSize,maximumPoolSize,queueCapacity( The queue type is LinkedBlockingDeque To modify ),rejectedExecutionType,keepAliveTime,unit Modification of these parameters .

commonly corePoolSize,maximumPoolSize,queueCapacity Is the most dynamic change .

If you need to change it directly in Nacos Modify the corresponding configuration value , The client will listen for configuration changes , Then synchronously modify the parameters of the first thread pool .

Queue capacity alarm

queueCapacityThreshold Is the threshold of queue capacity alarm , If the number of tasks in the queue exceeds queueCapacityThreshold It will alert .

Number of rejection alarms

When the queue is full , The new task will select the corresponding processing method according to the rejection policy set by the user . If you use AbortPolicy Strategy , There will also be an alarm . It's equivalent to that consumers are already overloaded .

Thread pool operation

The bottom docking Cat, So the running data of the thread is reported to Cat. We can do it in Cat View this information in .

If you want to show on your own platform , I've exposed /actuator/thread-pool Endpoint , You can pull the data yourself .

  
  1. {
  2. threadPools: [{
  3. threadPoolName: "TestThreadPoolExecutor",
  4. activeCount: 0,
  5. keepAliveTime: 0,
  6. largestPoolSize: 4,
  7. fair: false,
  8. queueCapacity: 5,
  9. queueCapacityThreshold: 2,
  10. rejectCount: 0,
  11. waitTaskCount: 0,
  12. taskCount: 5,
  13. unit: "MILLISECONDS",
  14. rejectedExecutionType: "AbortPolicy",
  15. corePoolSize: 4,
  16. queueType: "LinkedBlockingQueue",
  17. completedTaskCount: 5,
  18. maximumPoolSize: 4
  19. }, {
  20. threadPoolName: "TestThreadPoolExecutor2",
  21. activeCount: 0,
  22. keepAliveTime: 0,
  23. largestPoolSize: 0,
  24. fair: false,
  25. queueCapacity: 2147483647,
  26. queueCapacityThreshold: 2147483647,
  27. rejectCount: 0,
  28. waitTaskCount: 0,
  29. taskCount: 0,
  30. unit: "MILLISECONDS",
  31. rejectedExecutionType: "AbortPolicy",
  32. corePoolSize: 2,
  33. queueType: "LinkedBlockingQueue",
  34. completedTaskCount: 0,
  35. maximumPoolSize: 4
  36. }]
  37. }

Custom reject policy

Usually we use code to create thread pool, and we can customize the rejection policy , When the thread pool object is constructed, it can be passed in . Here, the thread pool creation is encapsulated , We can only Nacos Configure the name of the deny policy to use the corresponding policy . By default, it can be configured JDK Self contained CallerRunsPolicy,AbortPolicy,DiscardPolicy,DiscardOldestPolicy These four .

If you want to customize it, it is also supported , The definition is the same as before , as follows :

  
  1. @Slf4j
  2. public class MyRejectedExecutionHandler implements RejectedExecutionHandler {
  3. @Override
  4. public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
  5. log.info(" Came in .........");
  6. }
  7. }

To make this strategy work, use SPI The way , Need to be in resources So let's create one META-INF Folder , And then create a services Folder , Create another java.util.concurrent.RejectedExecutionHandler The file of , The content is the full path of your defined class .

Custom alarm mode

The default is the alarm mode integrated with the nail robot , You can turn it off if you don't want to use it . Or send the alarm information to your monitoring platform .

If there is no alarm platform, new alarm mode can be realized in the project , For example, SMS, etc .

Just implement ThreadPoolAlarmNotify This class can .

  
  1. /**
  2. * Custom SMS alarm notification
  3. *
  4. * @ author Yin Jihuan
  5. * @ Personal wechat jihuan900
  6. * @ WeChat official account Ape world
  7. * @GitHub https://github.com/yinjihuan
  8. * @ The authors introduce http://cxytiandi.com/about
  9. * @ Time 2020-05-27 22:26
  10. */
  11. @Slf4j
  12. @Component
  13. public class ThreadPoolSmsAlarmNotify implements ThreadPoolAlarmNotify {
  14. @Override
  15. public void alarmNotify(AlarmMessage alarmMessage) {
  16. log.info(alarmMessage.toString());
  17. }
  18. }

Code implementation

I don't want to talk about it in detail , The source code in https://github.com/yinjihuan/kitty/tree/master/kitty-dynamic-thread-pool, Let's see for ourselves , Is not complicated .

Creating a thread pool

Create thread pool according to configuration ,ThreadPoolExecutor Is custom , Because it needs to be done Cat Buried point .

  
  1. /**
  2. * Creating a thread pool
  3. * @param threadPoolProperties
  4. */
  5. private void createThreadPoolExecutor(DynamicThreadPoolProperties threadPoolProperties) {
  6. threadPoolProperties.getExecutors().forEach(executor -> {
  7. KittyThreadPoolExecutor threadPoolExecutor = new KittyThreadPoolExecutor(
  8. executor.getCorePoolSize(),
  9. executor.getMaximumPoolSize(),
  10. executor.getKeepAliveTime(),
  11. executor.getUnit(),
  12. getBlockingQueue(executor.getQueueType(), executor.getQueueCapacity(), executor.isFair()),
  13. new KittyThreadFactory(executor.getThreadPoolName()),
  14. getRejectedExecutionHandler(executor.getRejectedExecutionType(), executor.getThreadPoolName()), executor.getThreadPoolName());
  15. threadPoolExecutorMap.put(executor.getThreadPoolName(), threadPoolExecutor);
  16. });
  17. }

Refresh thread pool

First of all, you need to monitor Nacos Modification of .

  
  1. /**
  2. * Monitoring configuration modification ,spring-cloud-alibaba 2.1.0 Version not supported @NacosConfigListener Listening in
  3. */
  4. public void initConfigUpdateListener(DynamicThreadPoolProperties dynamicThreadPoolProperties) {
  5. ConfigService configService = nacosConfigProperties.configServiceInstance();
  6. try {
  7. configService.addListener(dynamicThreadPoolProperties.getNacosDataId(), dynamicThreadPoolProperties.getNacosGroup(), new AbstractListener() {
  8. @Override
  9. public void receiveConfigInfo(String configInfo) {
  10. new Thread(() -> refreshThreadPoolExecutor()).start();
  11. log.info(" Thread pool configuration changed , Refresh complete ");
  12. }
  13. });
  14. } catch (NacosException e) {
  15. log.error("Nacos Configure listening exception ", e);
  16. }
  17. }

Then refresh the parameter information of thread pool , Because when the listening event is triggered , The configuration has not been refreshed at this time , So I waited 1 Second , Let the configuration refresh, and then take values directly from the configuration class .

It works even if it's a little bit frustrating , In fact, a better way is to parse receiveConfigInfo that configInfo,configInfo It is the whole configuration content after the change . Because it's not easy to parse into a property file , Don't do , I'll change it later .

  
  1. /**
  2. * Refresh thread pool
  3. */
  4. private void refreshThreadPoolExecutor() {
  5. try {
  6. // Wait for the configuration refresh to complete
  7. Thread.sleep(1000);
  8. } catch (InterruptedException e) {
  9. }
  10. dynamicThreadPoolProperties.getExecutors().forEach(executor -> {
  11. ThreadPoolExecutor threadPoolExecutor = threadPoolExecutorMap.get(executor.getThreadPoolName());
  12. threadPoolExecutor.setCorePoolSize(executor.getCorePoolSize());
  13. threadPoolExecutor.setMaximumPoolSize(executor.getMaximumPoolSize());
  14. threadPoolExecutor.setKeepAliveTime(executor.getKeepAliveTime(), executor.getUnit());
  15. threadPoolExecutor.setRejectedExecutionHandler(getRejectedExecutionHandler(executor.getRejectedExecutionType(), executor.getThreadPoolName()));
  16. BlockingQueue<Runnable> queue = threadPoolExecutor.getQueue();
  17. if (queue instanceof ResizableCapacityLinkedBlockIngQueue) {
  18. ((ResizableCapacityLinkedBlockIngQueue<Runnable>) queue).setCapacity(executor.getQueueCapacity());
  19. }
  20. });
  21. }

Other refreshes come with the thread pool , It should be noted that the thread pool queue size refresh , Currently only supported LinkedBlockingQueue queue , because LinkedBlockingQueue The size of is not allowed to be modified , So according to the idea provided by meituan's article , Customized a queue that can be modified , It's just a way of LinkedBlockingQueue A copy of the code for , You can change it .

Go to Cat Report operation information

Go to Cat Of Heartbeat Report upload data code is as follows , Mainly still Cat It provides the ability to expand . Just call the following method to report data regularly .

  
  1. public void registerStatusExtension(ThreadPoolProperties prop, KittyThreadPoolExecutor executor) {
  2. StatusExtensionRegister.getInstance().register(new StatusExtension() {
  3. @Override
  4. public String getId() {
  5. return "thread.pool.info." + prop.getThreadPoolName();
  6. }
  7. @Override
  8. public String getDescription() {
  9. return " Thread pool monitoring ";
  10. }
  11. @Override
  12. public Map<String, String> getProperties() {
  13. AtomicLong rejectCount = getRejectCount(prop.getThreadPoolName());
  14. Map<String, String> pool = new HashMap<>();
  15. pool.put("activeCount", String.valueOf(executor.getActiveCount()));
  16. pool.put("completedTaskCount", String.valueOf(executor.getCompletedTaskCount()));
  17. pool.put("largestPoolSize", String.valueOf(executor.getLargestPoolSize()));
  18. pool.put("taskCount", String.valueOf(executor.getTaskCount()));
  19. pool.put("rejectCount", String.valueOf(rejectCount == null ? 0 : rejectCount.get()));
  20. pool.put("waitTaskCount", String.valueOf(executor.getQueue().size()));
  21. return pool;
  22. }
  23. });
  24. }

Define thread pool endpoints

The configuration and operation of thread pool can be exposed through custom endpoint , Can let the external monitoring system pull data to do the corresponding processing .

  
  1. @Endpoint(id = "thread-pool")
  2. public class ThreadPoolEndpoint {
  3. @Autowired
  4. private DynamicThreadPoolManager dynamicThreadPoolManager;
  5. @Autowired
  6. private DynamicThreadPoolProperties dynamicThreadPoolProperties;
  7. @ReadOperation
  8. public Map<String, Object> threadPools() {
  9. Map<String, Object> data = new HashMap<>();
  10. List<Map> threadPools = new ArrayList<>();
  11. dynamicThreadPoolProperties.getExecutors().forEach(prop -> {
  12. KittyThreadPoolExecutor executor = dynamicThreadPoolManager.getThreadPoolExecutor(prop.getThreadPoolName());
  13. AtomicLong rejectCount = dynamicThreadPoolManager.getRejectCount(prop.getThreadPoolName());
  14. Map<String, Object> pool = new HashMap<>();
  15. Map config = JSONObject.parseObject(JSONObject.toJSONString(prop), Map.class);
  16. pool.putAll(config);
  17. pool.put("activeCount", executor.getActiveCount());
  18. pool.put("completedTaskCount", executor.getCompletedTaskCount());
  19. pool.put("largestPoolSize", executor.getLargestPoolSize());
  20. pool.put("taskCount", executor.getTaskCount());
  21. pool.put("rejectCount", rejectCount == null ? 0 : rejectCount.get());
  22. pool.put("waitTaskCount", executor.getQueue().size());
  23. threadPools.add(pool);
  24. });
  25. data.put("threadPools", threadPools);
  26. return data;
  27. }
  28. }

Cat Monitor the execution time of threads in the thread pool

It was supposed to put surveillance on KittyThreadPoolExecutor Of execute,submit In the method . After the test, we found that there was a problem , The data is in Cat It does have , But the execution time is 1 millisecond , It doesn't work .

I don't think we all know , Because threads are executed separately later , So it's meaningless to bury a point where you add a task .

I still think of a way to realize the function of buried point , Is to use the thread pool to provide beforeExecute and afterExecute Two methods , Both methods are triggered before and after thread execution .

  
  1. @Override
  2. protected void beforeExecute(Thread t, Runnable r) {
  3. String threadName = Thread.currentThread().getName();
  4. Transaction transaction = Cat.newTransaction(threadPoolName, runnableNameMap.get(r.getClass().getSimpleName()));
  5. transactionMap.put(threadName, transaction);
  6. super.beforeExecute(t, r);
  7. }
  8. @Override
  9. protected void afterExecute(Runnable r, Throwable t) {
  10. super.afterExecute(r, t);
  11. String threadName = Thread.currentThread().getName();
  12. Transaction transaction = transactionMap.get(threadName);
  13. transaction.setStatus(Message.SUCCESS);
  14. if (t != null) {
  15. Cat.logError(t);
  16. transaction.setStatus(t);
  17. }
  18. transaction.complete();
  19. transactionMap.remove(threadName);
  20. }

After the code we go to see on their own , This is the end of the article . If you feel that this article is good, please forward it !

Thank you very much. .

Finally, I would like to thank the technical team of meituan for the article , Although not sharing the source code , But the ideas and application scenarios are very clear .

Interested in Star Let's go :https://github.com/yinjihuan/kitty

About author : Yin Jihuan , Simple technology enthusiasts ,《Spring Cloud Microservices - Full stack technology and case analysis 》, 《Spring Cloud Microservices introduction Actual combat and advanced 》 author , official account Ape world Originator . Personal wechat jihuan900, Welcome to hook up with .

I have compiled a complete set of learning materials , Those who are interested can search through wechat 「 Ape world 」, Reply key 「 Learning materials 」 Get what I've sorted out Spring Cloud,Spring Cloud Alibaba,Sharding-JDBC Sub database and sub table , Task scheduling framework XXL-JOB,MongoDB, Reptiles and other related information .

版权声明
本文为[Yin Jihuan]所创,转载请带上原文链接,感谢