|
FastFailure从字面意思上就能猜出来,其实就是快速失败,什么是快速失败呢,就是broker感觉自己不行(繁忙)的时候,接下来的一些请求就会直接返回失败,如果有用过dubbo的小伙伴,可能会知道dubbo服务提供者端有个快速失败机制,就是线程池满的时候,再来调用请求就会直接拒绝掉,为啥会有这个机制呢?其实是为了保障broker或者是服务的稳定性与可用性,不然broker或者是服务非常繁忙的时候,处理请求很慢,还哗哗的一堆堆请求不停的打过来,容易将broker或者是服务干宕机。
我们上面说过,当broker感觉自己不行的时候会有快速失败的机制,那什么时候会感觉到不行呢,其实就是OSPageCacheBusy,我们都知道commitlog 是单线程顺序追加写的,要想实现单线程顺序追加写,就得在追加写之前获取这个锁,这个OSPageCacheBusy就是按照某个线程持有锁的时间算出来的,当一次写入持有锁时间1s以上,RocketMQ就会认为OSPageCacheBusy,这个时候就会开启FastFailure机制,将来的请求给快速拒绝掉。
现在想想,什么时候会造成追加写入commitlog很慢?
下面是我总结出来的2个原因:
- 内存爆了的时候,jvm进行垃圾回收,stop the world ,然后会造成整个写入过程慢。
- commitlog这个使用的mmap内存文件映射,内存文件映射你可以理解为将文件映射到内存中,文件中的每一个字节都在内存中有对应,然后你操作内存就相当于操作文件,也就是说你往mmap写入的话,其实就是往操作系统的vfs层的pagecache里面写入;但是我们都知道你操作的内存是一层虚拟内存,会将内存划分为4k一个个的内存页,虽然你做了mmap内存映射,实际上操作系统可能并没有为你某个位置分配一个真实的物理内存页,这个时候你往里面写入数据的话,操作系统发现写入的那个虚拟内存页没有对应的物理内存页,这个时候就会请求调页,给你分配一个内存页,如果你内存不足了,或者是正在swap交换内存,这个时候写入os pagecache就会缓慢,造成波动(这块内容涉及到操作系统的一些知识);
在BrokerController这个类实例化的时候会创建许多组件,其中一个BrokerFastFailure 组件;
- 启动清理过期请求线程;
- 判断任务是否需要快速失败;
- 清理超时请求;
启动BrokerController 的时候,就会启动BrokerFastFailure 这个组件,我们一起来看下它的启动方法:
// 启动清理过期请求线程public void start {this.scheduledExecutorService.scheduleAtFixedRate(new Runnable {@Overridepublic void run {// 默认开启if (brokerController.getBrokerConfig.isBrokerFastFailureEnable) {// 清理过期请求cleanExpiredRequest;}}}, 1000, 10, TimeUnit.MILLISECONDS);}可以看到创建一个定时任务,然后每10ms,也就是10毫秒执行一次,判断如果开启这个快速失败的话,就执行cleanExpiredRequest方法进行清理,这个默认是开启的;
private void cleanExpiredRequest {// 如果说存储组件处于os pagecache busy状态,pagecache是一个读写高并发的情况,导致pagecache来不及处理while (this.brokerController.getMessageStore.isOSPageCacheBusy) {try {// 判断是否繁忙if (!this.brokerController.getSendThreadPoolQueue.isEmpty) {// 获取taskfinal Runnable runnable = this.brokerController.getSendThreadPoolQueue.poll(0, TimeUnit.SECONDS);if (null == runnable) {break;}final RequestTask rt = castRunnable(runnable);// 快速失败,把请求直接返回一个响应,system busy,broker busyrt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[PCBUSY_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", System.currentTimeMillis - rt.getCreateTimestamp, this.brokerController.getSendThreadPoolQueue.size));} else {break;}} catch (Throwable ignored) {}}// 清理超时请求// 发送消息请求队列cleanExpiredRequestInQueue(this.brokerController.getSendThreadPoolQueue,this.brokerController.getBrokerConfig.getWaitTimeMillsInSendQueue);// 拉取消息请求队列cleanExpiredRequestInQueue(this.brokerController.getPullThreadPoolQueue,this.brokerController.getBrokerConfig.getWaitTimeMillsInPullQueue);// 心跳队列cleanExpiredRequestInQueue(this.brokerController.getHeartbeatThreadPoolQueue,this.brokerController.getBrokerConfig.getWaitTimeMillsInHeartbeatQueue);// 结束事物请求队列cleanExpiredRequestInQueue(this.brokerController.getEndTransactionThreadPoolQueue, this.brokerController.getBrokerConfig.getWaitTimeMillsInTransactionQueue);}可以看到上面有个while循环,如果是os pagecache一直繁忙的话,就会一直执行这个循环里面的代码,如果sendThreadPoolQueue(这个就是发送消息的队列,当broker收到消息生产者发送消息的请求后,会交给线程池,然后线程处理不过来的就放到这个队列中排队等待处理,这个是线程池的一个知识)不是空的话,就会从这个队列里面取出任务,然后直接返回SYSTEM_BUSY系统繁忙,[PCBUSY_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d这串错误信息最好有个印象,最起码看到它能知道是啥原因。
我们看下判断os pagecache繁忙的代码:
// 核心是基于mappedfile来读写数据,基于os pagecache内存区域来映射一块磁盘文件// 写入也是写到os pagecache里去,读取也是从os pagecache里读取// 可能会出现一个问题,os pagecache,broker才采取了一个机制,读写分离,写入到堆外缓存里去// transient pool,我们可以往这个里面去写入,定时触发一个commit,从transient pool里提交到os pagecache里去// 读取的时候还是去读取pagecache里,写入和读取分离开来解决一个os pagecahce busy@Overridepublic boolean isOSPageCacheBusy {// 获取到commitlog里面的起始时间long begin = this.getCommitLog.getBeginTimeInLock;// 把commitlog起始时间和当前时间做一个差值difflong diff = this.systemClock.now - begin;// 如果说时间差值是小于10 000 000,同时这个差值大于了os pagecache busy时间阈值return diff < 10000000&& diff > this.messageStoreConfig.getOsPageCacheBusyTimeOutMills;}其实就是在锁内的时间超多1s,当前时间减去获取锁的时间(这个获取锁的时间是某个写入消息线程获取锁的开始时间),就会任务是os pagecache 繁忙,当这个锁被释放,就会正常,会走一堆清理队列中超时任务;
3、清理超时请求我们可以看到清理的队列有SendThreadPoolQueue,超时时间是200ms,发送消息请求就会被放到这个队列中;PullThreadPoolQueue,超时时间是5000ms,拉取消息请求会在这个队列中;HeartbeatThreadPoolQueue,超时时间是31s,心跳请求放会被放到这个队列中;EndTransactionThreadPoolQueue,超时时间是3s,提交事务,回滚事务的请求就会被放到这个队列中。
// 清理超时请求// 发送消息请求队列cleanExpiredRequestInQueue(this.brokerController.getSendThreadPoolQueue,this.brokerController.getBrokerConfig.getWaitTimeMillsInSendQueue);// 拉取消息请求队列cleanExpiredRequestInQueue(this.brokerController.getPullThreadPoolQueue,this.brokerController.getBrokerConfig.getWaitTimeMillsInPullQueue);// 心跳队列cleanExpiredRequestInQueue(this.brokerController.getHeartbeatThreadPoolQueue,this.brokerController.getBrokerConfig.getWaitTimeMillsInHeartbeatQueue);// 结束事物请求队列cleanExpiredRequestInQueue(this.brokerController.getEndTransactionThreadPoolQueue, this.brokerController.getBrokerConfig.getWaitTimeMillsInTransactionQueue);我们来看下具体怎么清理的
void cleanExpiredRequestInQueue(final BlockingQueue blockingQueue, final long maxWaitTimeMillsInQueue) {while (true) {try {if (!blockingQueue.isEmpty) {final Runnable runnable = blockingQueue.peek;if (null == runnable) {break;}final RequestTask rt = castRunnable(runnable);if (rt == null || rt.isStopRun) {break;}// 当前时间-任务创建时间戳final long behind = System.currentTimeMillis - rt.getCreateTimestamp;// 判断是否大于指定时间if (behind >= maxWaitTimeMillsInQueue) {if (blockingQueue.remove(runnable)) {// 设置状态rt.setStopRun(true);// 系统繁忙rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", behind, blockingQueue.size));}} else {break;}} else {break;}} catch (Throwable ignored) {}}}本文主要介绍了一下RocketMQ broker 里面的FastFailure机制是啥,以及造成这个原因,然后从源码的角度看了一下这个FastFailure 的实现。
来源:http://www.yidianzixun.com/article/0kWcpPiB
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |
本帖子中包含更多资源
您需要 登录 才可以下载或查看,没有账号?立即注册
x
|