大家好,开源了今天我们来聊一个比较实用的美团话题,动态可监控的动态线程池实践,全新开源项目(DynamicTp)地址在文章末尾,线程欢迎交流学习。池实 稍微有些Java编程经验的践思小伙伴都知道,Java的开源了精髓在juc包,这是美团大名鼎鼎的Doug Lea老爷 子的杰作,评价一个程序员Java水平怎么样,动态一定程度上看他对juc包下的线程一些技术掌握的怎么样,这也是池实面试中的基本上必问的一些技术点之一。 多线程编程场景下,这些类都是必备技能,会这些可以帮助我们写出高质量、站群服务器高性能、少bug的代码,同时这些也是Java中比较难啃的一些技术,需要持之以恒,学以致用,在使用中感受他们带来的奥妙。 上边简单罗列了下juc包下功能分类,这篇文章我们主要来介绍动态可监控线程池的,所以具体内容也就不展开讲了,以后有时间单独来聊吧。看这篇文章前,希望读者最好有一定的线程池ThreadPoolExecutor使用经验,不然看起来会有点懵。 如果你对ThreadPoolExecutor不是很熟悉,推荐阅读下面两篇文章 使用ThreadPoolExecutor过程中你是否有以下痛点呢? 如果你有以上痛点,这篇文章要介绍的动态可监控线程池(DynamicTp)或许能帮助到你。服务器托管 如果看过ThreadPoolExecutor的源码,大概可以知道其实它有提供一些set方法,可以在运行时动态去修改相应的值,这些方法有: public void setCorePoolSize(int corePoolSize); public void setMaximumPoolSize(int maximumPoolSize); public void setKeepAliveTime(long time, TimeUnit unit); public void setThreadFactory(ThreadFactory threadFactory); public void setRejectedExecutionHandler(RejectedExecutionHandler handler); 现在大多数的互联网项目其实都会微服务化部署,有一套自己的服务治理体系,微服务组件中的分布式配置中心扮演的就是动态修改配置,实时生效的角色。那么我们是否可以结合配置中心来做运行时线程池参数的动态调整呢?答案是肯定的,而且配置中心相对都是高可用的,使用它也不用过于担心配置推送出现问题这类事儿,而且也能减少研发动态线程池组件的难度和工作量。 综上,我们总结出以下的背景 我们基于配置中心对线程池ThreadPoolExecutor做一些扩展,实现对运行中线程池参数的动态修改,实时生效;以及实时监控线程池的运行状态,触发设置的报警策略时报警,报警信息会推送办公平台(钉钉、企微等)。报警维度包括(队列容量、线程池活性、拒绝触发等);同时也会定时采集线程池指标数据供监控平台可视化使用。使我们能时刻感知到线程池的负载,根据情况及时调整,避免出现问题影响线上业务。 | __ \ (_) |__ __| | | | |_ _ _ __ __ _ _ __ ___ _ ___| |_ __ | | | | | | | _ \ / _` | _ ` _ | |/ __| | _ \ | |__| | |_| | | | | (_| | | | | | | | (__| | |_) | |_____/ __, |_| |_|__,_|_| |_| |_|_|___|_| .__/ __/ | | | |___/ |_| :: Dynamic Thread Pool :: 参考美团线程池实践 ,对线程池参数动态化管理,增加监控、报警功能。 基于Spring框架,现只支持SpringBoot项目使用,轻量级,引入starter即可食用。 基于配置中心实现线程池参数动态调整,实时生效;集成主流配置中心,默认支持Nacos、Apollo,同时也提供SPI接口可自定义扩展实现。 内置通知报警功能,提供多种报警维度(配置变更通知、活性报警、容量阈值报警、拒绝策略触发报警),默认支持企业微信、钉钉报警,同时提供SPI接口可自定义扩展实现。 内置线程池指标采集功能,支持通过MicroMeter、JsonLog日志输出、Endpoint三种方式,可通过SPI接口自定义扩展实现。 集成管理常用第三方组件的线程池,已集成SpringBoot内置WebServer(Tomcat、Undertow、Jetty)的线程池管理。 配置变更监听模块: 线程池管理模块: 监控模块: 实现监控指标采集以及输出,默认提供以下三种方式,也可通过内部提供的SPI接口扩展其他实现。 通知告警模块: 对接办公平台,实现通告告警功能,默认实现钉钉、企微,可通过内部提供的SPI接口扩展其他实现,通知告警类型如下: apollo应用用接入用此依赖 dynamic-tp-spring-boot-starter-apollo spring-cloud场景下的nacos应用接入用此依赖 dynamic-tp-spring-cloud-starter-nacos 非spring-cloud场景下的nacos应用接入用此依赖 dynamic-tp-spring-boot-starter-nacos 线程池配置 spring: dynamic: tp: enabled: true enabledBanner: true # 是否开启banner打印,默认true enabledCollect: false # 是否开启监控指标采集,默认false collectorType: logging # 监控数据采集器类型(JsonLog | MicroMeter),默认logging logPath: /home/logs # 监控日志数据路径,默认${ user.home}/logs monitorInterval: 5 # 监控时间间隔(报警判断、指标采集),默认5s nacos: # nacos配置,不配置有默认值(规则name-dev.yml这样) dataId: dynamic-tp-demo-dev.yml group: DEFAULT_GROUP apollo: # apollo配置,不配置默认拿apollo配置第一个namespace namespace: dynamic-tp-demo-dev.yml configType: yml # 配置文件类型 platforms: # 通知报警平台配置 - platform: wechat urlKey: 3a7500-c5c3d8b69c # 替换 receivers: test1,test2 # 接受人企微名称 - platform: ding urlKey: f80dad41fcd55438f408dcd6a # 替换 secret: SECb541fa6375db9d21 # 替换,非sign模式可以没有此值 receivers: 17810129805 # 钉钉账号手机号 tomcatTp: # tomcat web server线程池配置 minSpare: 100 max: 400 jettyTp: # jetty web server线程池配置 min: 100 max: 400 undertowTp: # undertow web server线程池配置 ioThreads: 100 workerThreads: 400 executors: # 动态线程池配置 - threadPoolName: dynamic-tp-test-2 corePoolSize: 8 maximumPoolSize: 16 queueCapacity: 2000 queueType: VariableLinkedBlockingQueue # 任务队列,查看源码QueueTypeEnum枚举类 rejectedHandlerType: CallerRunsPolicy # 拒绝策略,查看RejectedTypeEnum枚举类 keepAliveTime: 40 allowCoreThreadTimeOut: false threadNamePrefix: test # 线程名前缀 notifyItems: # 报警项,不配置自动会配置(变更通知、容量报警、活性报警、拒绝报警) - type: capacity # 报警项类型,查看源码 NotifyTypeEnum枚举类 enabled: true threshold: 80 # 报警阈值 platforms: [ding,wechat] # 可选配置,不配置默认拿上层platforms配置的所以平台 interval: 120 # 报警间隔(单位:s) - type: change enabled: true - type: liveness enabled: true threshold: 80 - type: reject enabled: true threshold: 1 代码方式生成,服务启动会自动注册 @Configuration public class DtpConfig { @Bean public DtpExecutor dtpExecutor1() { return DtpCreator.createDynamicFast("dynamic-tp-test-1"); } @Bean public ThreadPoolExecutor dtpExecutor2() { return ThreadPoolBuilder.newBuilder() .threadPoolName("dynamic-tp-test-2") .corePoolSize(6) .maximumPoolSize(12) .keepAliveTime(40) .allowCoreThreadTimeOut(true) .workQueue(QueueTypeEnum.SYNCHRONOUS_QUEUE.getName(), null, false) .rejectedExecutionHandler(RejectedTypeEnum.CALLER_RUNS_POLICY.getName()) .buildDynamic(); } } 代码调用,根据线程池名称获取 public static void main(String[] args) { DtpExecutor executor = DtpRegistry.getExecutor("dynamic-tp-test-2"); executor.execute(() -> System.out.println("test dynamic tp")); } 配置文件配置的参数会覆盖代码生成方式配置的参数。 阻塞队列只有VariableLinkedBlockingQueue类型可以修改capacity,VariableLinkedBlockingQueue参考RabbitMq的实现,该类型功能和LinkedBlockingQueue相似。 启动看到如下日志输出证明接入成功。 | __ \ (_) |__ __| | | | |_ _ _ __ __ _ _ __ ___ _ ___| |_ __ | | | | | | | _ \ / _` | _ ` _ | |/ __| | _ \ | |__| | |_| | | | | (_| | | | | | | | (__| | |_) | |_____/ __, |_| |_|__,_|_| |_| |_|_|___|_| .__/ __/ | | | |___/ |_| :: Dynamic Thread Pool :: DynamicTp register, executor: DtpMainPropWrapper(dtpName=dynamic-tp-test-2, corePoolSize=8, maxPoolSize=16, keepAliveTime=40, queueType=VariableLinkedBlockingQueue, queueCapacity=2000, rejectType=RejectedCountableCallerRunsPolicy, allowCoreThreadTimeOut=false) 配置变更会推送通知消息,且会高亮变更的字段。 DynamicTp [dynamic-tp-test-2] refresh end, changed keys: [corePoolSize, queueCapacity], corePoolSize: [8 => 6], maxPoolSize: [16 => 12], queueType: [VariableLinkedBlockingQueue => VariableLinkedBlockingQueue], queueCapacity: [2000 => 4000], keepAliveTime: [40s => 50s], rejectedType: [CallerRunsPolicy => CallerRunsPolicy], allowsCoreThreadTimeOut: [false => false] 触发报警阈值会推送相应报警消息,且会高亮显示相关字段,活性告警、容量告警、拒绝告警。 配置变更会推送通知消息,且会高亮变更的字段。 通过主配置文件collectType属性配置指标采集类型,默认值:logging。 micrometer方式:通过引入micrometer相关依赖采集到相应的平台 (如Prometheus,InfluxDb...)。 logging:指标数据以json格式输出日志到磁盘,地址{ appName}.monitor.log。 2022-01-16 15:25:20.599 INFO [dtp-monitor-thread-1:d.m.log] { "activeCount":2,"queueSize":100,"largestPoolSize":4,"poolSize":2,"rejectHandlerName":"CallerRunsPolicy","queueCapacity":1024,"fair":false,"rejectCount":0,"waitTaskCount":10,"taskCount":120,"queueRemainingCapacity":1024,"corePoolSize":6,"queueType":"VariableLinkedBlockingQueue","completedTaskCount":1078,"dtpName":"remoting-call","maximumPoolSize":8} 2022-01-16 15:25:25.603 INFO [dtp-monitor-thread-1:d.m.log] { "activeCount":2,"queueSize":120,"largestPoolSize":4,"poolSize":2,"rejectHandlerName":"CallerRunsPolicy","queueCapacity":1024,"fair":false,"rejectCount":0,"waitTaskCount":20,"taskCount":140,"queueRemainingCapacity":1024,"corePoolSize":6,"queueType":"VariableLinkedBlockingQueue","completedTaskCount":1459,"dtpName":"remoting-call","maximumPoolSize":8} 2022-01-16 15:25:30.609 INFO [dtp-monitor-thread-1:d.m.log] { "activeCount":2,"queueSize":140,"largestPoolSize":4,"poolSize":2,"rejectHandlerName":"CallerRunsPolicy","queueCapacity":1024,"fair":false,"rejectCount":0,"waitTaskCount":89,"taskCount":180,"queueRemainingCapacity":1024,"corePoolSize":6,"queueType":"VariableLinkedBlockingQueue","completedTaskCount":1890,"dtpName":"remoting-call","maximumPoolSize":8} 2022-01-16 15:25:35.613 INFO [dtp-monitor-thread-1:d.m.log] { "activeCount":2,"queueSize":160,"largestPoolSize":4,"poolSize":2,"rejectHandlerName":"CallerRunsPolicy","queueCapacity":1024,"fair":false,"rejectCount":0,"waitTaskCount":99,"taskCount":230,"queueRemainingCapacity":1024,"corePoolSize":6,"queueType":"VariableLinkedBlockingQueue","completedTaskCount":2780,"dtpName":"remoting-call","maximumPoolSize":8} 2022-01-16 15:25:40.616 INFO [dtp-monitor-thread-1:d.m.log] { "activeCount":2,"queueSize":230,"largestPoolSize":4,"poolSize":2,"rejectHandlerName":"CallerRunsPolicy","queueCapacity":1024,"fair":false,"rejectCount":0,"waitTaskCount":0,"taskCount":300,"queueRemainingCapacity":1024,"corePoolSize":6,"queueType":"VariableLinkedBlockingQueue","completedTaskCount":4030,"dtpName":"remoting-call","maximumPoolSize":8} 暴露EndPoint端点(dynamic-tp),可以通过http方式请求。 [ { "dtp_name": "remoting-call", "core_pool_size": 8, "maximum_pool_size": 16, "queue_type": "SynchronousQueue", "queue_capacity": 0, "queue_size": 0, "fair": false, "queue_remaining_capacity": 0, "active_count": 2, "task_count": 2760, "completed_task_count": 2760, "largest_pool_size": 16, "pool_size": 8, "wait_task_count": 0, "reject_count": 12462, "reject_handler_name": "CallerRunsPolicy" }, { "max_memory": "220 MB", "total_memory": "140 MB", "free_memory": "44 MB", "usable_memory": "125 MB" } ] gitee地址: https://gitee.com/yanhom/dynamic-tp-spring-cloud-starter[3] github地址:https://github.com/lyh200/dynamic-tp-spring-cloud-starter[4]写在前面
背景
简介
架构设计
主要分四大模块使用
maven依赖注意事项
通知报警
监控日志
项目地址(去掉-spring-cloud...)