读流程
客户端接入后,下面一步操作就是读取客户端传输过来的数据,这一节我们就来分析下服务端读取客户端数据流程。从前面分析来看,channel
的事件轮询、事件处理是在NioEventLoop
的run
方法中,从这里我们就很容易找我服务端读流程的入口方法:processSelectedKeys()
。
从processSelectedKeys()
一直追踪下去,可以看到OP_READ
处理逻辑分支:
(资料图片)
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read();}
可能你会比较奇怪:为什么OP_READ
和OP_ACCEPT
都会走这个分支?
OP_ACCEPT
是NioServerSocketChannel
处理的事件,而OP_READ
是NioSocketChannel
处理的事件,所以,虽然它们都走这个分支,但是channel类型确是不一样的,即这里的unsafe
类型也不一样,一个是:NioMessageUnsafe
,另一个是:NioSocketChannelUnsafe
。NioServerSocketChannel
负责监听客户端连接,当有客户端连接进入时,对它来说就是有个读入消息需要被处理。这里我们是处理client channle
的OP_READ
,所以,unsafe
是NioSocketChannelUnsafe
类型实例。
AbstractNioByteChannel.NioByteUnsafe#read
方法代码如下:
public final void read() { final ChannelConfig config = config(); if (shouldBreakReadReady(config)) { clearReadPending(); return; } final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); allocHandle.reset(config); ByteBuf byteBuf = null; boolean close = false; try { do { // 申请ByteBuf对象 byteBuf = allocHandle.allocate(allocator); //doReadBytes(byteBuf):将数据读取到ByteBuf中 //lastBytesRead()将读取的字节数设置到lastBytesRead allocHandle.lastBytesRead(doReadBytes(byteBuf)); if (allocHandle.lastBytesRead() <= 0) { byteBuf.release(); byteBuf = null; close = allocHandle.lastBytesRead() < 0; if (close) { readPending = false; } break; } allocHandle.incMessagesRead(1); readPending = false; //触发pipeline channelRead事件,将读入数据ByteBuf传入到handler中 pipeline.fireChannelRead(byteBuf); byteBuf = null; } while (allocHandle.continueReading());//判断是否继续读取 allocHandle.readComplete(); //触发pipeline channelReadComplete pipeline.fireChannelReadComplete(); if (close) { closeOnRead(pipeline); } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close, allocHandle); } finally { if (!readPending && !config.isAutoRead()) { removeReadOp(); } }}
这个方法刨除其它逻辑,关于客户端数据处理逻辑主要包括3个步骤:
allocHandle.lastBytesRead(doReadBytes(byteBuf))
:调用java api
,从channel
中读取字节数据到ByteBuf
缓存中;pipeline.fireChannelRead(byteBuf)
:触发pipeline
的channelRead
事件,并将带有读入数据的ByteBuf
通过参数传入;pipeline.fireChannelReadComplete()
:触发pipeline
的channelReadComplete
事件;事件传播
调用pipeline
的fireChannelRead()
就可触发channelRead
事件在handler
之间传播,事件传播这块代码比较绕,给人感觉不停的来回调用容易绕晕,下面通过图可以更加直观的看出调用流程,再配合代码就很好理解了。
关键点就在于HandlerContext
中提供了一个静态方法:invokeChannelRead(final AbstractChannelHandlerContext next, Object msg)
,第一个是在哪个handler
上触发事件,第二个参数就是数据本身,通过这个方法就可以指定在哪个handler
上触发channelRead
事件。由于pipeline
中的handler
是被包装成HandlerContext
放入的,所以,可以通过handler()
方法找到真正的handler
对象进行触发。
比如pipeline
的fireChannelRead()
就是触发head
的channelRead
事件,如果处理完成需要把事件继续传播给下一个handler
,就需要调用ctx.fireChannelRead(msg)
方法即可,该方法中通过next
属性获取到下一个节点,然后执行static invokeChannelRead(next, msg)
这个方法就可以将事件传播到下一个节点上。
pipeline.fireChannelRead(byteBuf)
运行完成后会调用pipeline.fireChannelReadComplete()
方法,触发channelReadComplete
事件,执行机制和channelRead
事件一样,就不再赘述。
搞清楚上面原理,就很容易理解
ctx.fireChannelRead()
和ctx.pipeline().fireChannelRead()
之间的区别了,避免误用。
Pipeline线程模型
上面分析的都是常规模式,没有给handler
指定额外线程情况下channelRead
和channelReadComplete
传播机制,大致如下图:
先触发channelRead
事件,按照pipeline
中顺序依次触发,当所有handler
都触发完后,再触发channelReadComplete
事件,按照pipeline
中的顺序依次触发。这些所有流程采用的都是同步方式,在同一个线程中执行,这个线程就是channel
注册的NioEventLoop
。
我们来看下static void invokeChannelRead()
这个方法:
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(m); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRead(m); } }); }}
在执行next.invokeChannelRead(m)
方法前有个executor.inEventLoop()
判断,判断当前执行线程是不是就是handler
执行所需的线程。执行handler
方法是不能随便线程都可以去执行的,必须使用handler
内部指定的executor
线程执行器中执行才行。如下图,也就是说红色框框中的内容必须在executor
线程执行器中执行,如果当前线程和handler
执行线程不是同一个,就需要进行线程切换:则调用封装成一个任务,提交到executor
的任务队列中让其执行。
executor
线程执行器是通过next.executor()
方法获取到的,从这个方法源码中可以看到获取逻辑:如果HandlerContext
中executor
有值则直接返回;否则返回channel
注册的NioEventLoop
作为线程执行器。
在添加handler
时可以指定一个EventGroup
:pipeline.addLast( bizGroup, "handler2", new OtherTest02());
,这样,再把handler
包装成HandlerContext
过程中会从这个EventGroup
根据chooser
选取策略获得一个EventLoop
赋值给executor
。
所以,从上面分析,默认情况下handler
都是在channel
注册的NioEventLoop
线程中执行的,除非在addLast
添加handloer
时特别指定。
下面我们通过一个案例分析下pipeline
线程模型,如下,给handler02
添加一个额外的线程池:
EventLoopGroup bizGroup = new NioEventLoopGroup(10, new ThreadFactoryBuilder().setNameFormat("biz-%s").build());protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast( "handler01", new OtherTest01()); pipeline.addLast( bizGroup, "handler02", new OtherTest02()); pipeline.addLast( "handler03", new OtherTest03());}
这时,channelRead
和channelReadComplete
事件触发流程见下图:
channelRead
事件执行流程说明:
handler01
的channelRead
事件,本身当前线程和handler01
是同一个线程,所以,直接调用handler#channelRead()
方法;handler01#channelRead()
方法执行完成后,事件继续向下传播,需要调用handler02#channelRead()
方法,但是handler02
执行线程并不是默认的channel
的注册线程,而是额外设置的biz
线程,需要将调用包装成一个任务提交到biz
线程的任务队列taskQueue
中,然后直接返回;biz线程执行器内部线程会一直循环从taskQueue
中获取任务执行,这样就完成了线程切换效果;当handler02#channelRead()
方法执行完成后,需要执行handler03#channelRead()
,它们又不在同一个线程中执行,这时有需要切换线程,所以会把handler03#channelRead()
的调用封装成一个任务提交到register eventLoop的taskQueue
中,待其内部线程提取执行;下面再来看下channelReadComplete
事件执行流程:
a1
将任务提交给taskQueue
任务队列后直接返回了,而不是等其执行完成再返回;a1
返回后,从源码分析来看,会立即触发channelReadComplete
事件,涉及到线程切换,同理b1
这里也是将handler02#channelReadComplete()
调用封装成任务放入到biz eventLoop
的taskQueue
中的,然后也直接返回了;这样,biz eventLoop
线程执行器taskQueue
中就有两个任务,会按照顺序依次执行:先执行channelRead()
调用,再执行channelReadComplete()
调用;执行a3、b3
时同理;总结
从上面可以看出,Pipeline
中handler
可以在不同线程间切换得到关键是:taskQueue
。还要一点非常重要:handler
线程池执行器默认使用的channel
注册的NioEventLoop
这个,NioEventLoop
采用的是单线程工作模式,同时还需要处理selector.select()
事件轮询,所以,handler
里肯定不能有耗时、特别是IO
阻塞等操作,不然卡在handler
中,selector#select()
执行不到,无法及时接收到客户端传送过来的数据。
关键词:
客户端接入后,下面一步操作就是读取客户端传输过来的数据,这一节我们就来分析下服务端读取客户端数据流程。从前面分析来看,channel的事件轮2023-03-27
1、周代礼制完整地讲应称之为礼乐制度,分礼和乐两个部分。礼的部分主要对人的身份进行划分和社会规范,最终形成等级制度。2、乐的部分主要是2023-03-27
1、北京市第三十五中学(BeijingNo 35HighSchool)是一所由北京市西城区教委主管的全日制公立完全2023-03-27
3月27日电,中国电子技术标准研究院、江西省工业和信息化厅、江西省宜春市人民政府在宜春市召开锂离子电池产业链供应链合作对2023-03-27
近日,个人所得税开始退税啦!许多网友在网络平台晒出了自己的退税金额,少则几十元,多的上万元。在这个退税的高峰期,诈骗分子也开始蠢蠢欲2023-03-27
位于浙江省平湖市独山港镇的粮食码头丁字坝,每年1到4月,大批“捞金人”从这里出海,穿梭于距离海岸线3到8公里的海域,捕捞2023-03-27
1、今天听了这个公司的介绍,本科实习1500员,年薪才二万五千元。2、吃住都不管,研究生是三千多,如果签约的话要签五年呢2023-03-27
【疾病描述】:有些元素在日常生活中对皮肤非常刺激,对人体健康有害。特别是很多患者不能及时关注生活细节,导致白癜风爆发,严重影响健康。【2023-03-27
近日,甘肃省农业大学附属中学师生、兰州工业学院马克思主义学院师生、安宁区银滩路街道社区禁毒专干走进甘肃省第二强制隔离戒毒2023-03-27
近日,西安市科技局发布了第四批秦创原“三器”示范平台立项名单,共认定秦创原“三器”示范平台34家,西安经开区共有7家单位2023-03-27
1、Windows10系统游戏闪退问题,要么是游戏自身的bug或者设置问题,要么是显卡驱动程序bug或设置问题。2、先到显卡驱动程序中把显卡设置恢复默2023-03-27
3月24日,上交所针对山东黄金定增募资不超99亿元下发审核问询函。根据申报材料及公开资料,公司本次向特定对象发行股票拟募2023-03-27
1、师资方面嘛,临床类的课程都是附属医院临床一线的老师来上课的,一般都是教授、主任。2、住宿要看学校分到哪栋宿舍,有4人间、有6人间,有82023-03-27
天津发布重点群体创业推进行动工作方案八大举措促进高质量充分就业,2023-03-26
证券时报·数据宝统计,北上资金延续上周净流入态势,上周净流入109 51亿元,其中深股通净流入60 68亿元,沪股通净流入48 83亿元。值得注意的2023-03-26
3月26日,海南省第53期小客车保有量调控增量指标配置结果出炉,本次总中签率达95%。2023-03-26
普利制药(SZ300630,收盘价:27 27元)3月26日晚间发布公告称,海南普利制药股份有限公司于近日收到了国家药品监督管理局签发的碘帕醇注射液的2023-03-26
1、在上古卷轴5中结婚有两个方法,第一种方法比较简单,做任务就可以,具体是到烈风谷玛拉神庙的婚姻任务,需要帮助3对情侣就可以可以了,具体2023-03-26
本文目录一览:1、魔界复仇最终版2 2隐藏英雄谁厉害2、魔界复仇2 2隐藏英雄密码3、急求~魔界复仇2023-03-26
斗鱼王者荣耀公会赛正在如火如荼地进行当中,经历了将近一个月的争夺,目前已经进入到季后赛阶段了。总共32队伍层层筛选,目前只剩下8支队伍征2023-03-26
“放大亚运电竞效应”为啥会被“喷”成材率太低或是主因,江苏6年来无本科院校开设电竞专业虽然电竞比赛红红火火,但成材率过低始终为人诟病新2023-03-26
不算劳斯莱斯一些特殊的车型,目前市面上在售的(量产车),最贵的是幻影,官方指导价:688~800 8万元。不过世界上公认的劳斯莱斯最贵的一款车2023-03-26
1、冷水浴的好处如下:33601。2、增强心血管功能,冷水能使血液流向心脏,使心跳加快,加速体内血液流动。3、冷水刺激,2023-03-26
1、镜子里的自己和别人看到的不一样,自己看镜子里的自己会美化。2、现代镜按材质可分为铝镜和银镜两种。3、银镜镜面光亮,水银密度高,容易与2023-03-26
1、阴虚,阴不制阳,阳相对亢盛而致虚火炽盛的病理变化,可见到烦躁易怒,两颧潮红,性欲亢进等症。2、阴虚火旺是指脏腑阴分2023-03-25
