首页 | 新闻 | 新品 | 文库 | 方案 | 视频 | 下载 | 商城 | 开发板 | 数据中心 | 座谈新版 | 培训 | 工具 | 博客 | 论坛 | 百科 | GEC | 活动 | 主题月 | 电子展
返回列表 回复 发帖

zookeeper原理解析-服务器端处理流程(3)

zookeeper原理解析-服务器端处理流程(3)

3)CommitProcessor
   这个处理器逻辑还是有点小复杂的, leader和learner都需要用到这个处理器
3.1) 对于非事务性的操作(查询,exist等)直接回把请求转到下一个处理器处理
3.2) leader 对于事务性操作(create, setData等)请求,CommitProcessor线程任务会hold在这里,leader中ProposalRequestProcessor处理器会将请求提案发送给所有的followers, followers响应leader,然后leader中LearnerHandler会调processAck处理响应,当超过半数的时候将调CommitProcessor.commit()方法提交请求, 紧接着CommitProcessor将请求传递到下一个处理器处理
3.2) learner对于事务性操作(create, setData等)请求CommitProcessor线程任务会hold在这里, FollowerRequestProcessor或者ObserverRequestProcessor调CommitProcessor将请求提交队列之后会立刻向leader发送事务操作提案,Follower接收到leader的commit消息或者Observer接收到leader的inform消息它们会向CommitProcessor提交请求,紧接着CommitProcessor将请求传递到下一个处理器处理

伪代码:
CommitProcessor{
   run() {
       1. toProcess需要交予下一个Processor的,先都交给下一个
       2. nextPending请求时对于事务操作的,有一个不为空一直循环直到有commit过来
       3. queuedRequests.size() == 0&& committedRequests.size() > 0 follower observer接收commit ,加入到toProcess集合中去
       4. nextPending != null&&  committedRequests.size() > 0  leader发起投票请求,并接收follower反馈的, 加入到toProcess集合中去
       5. nextPending == null 前面循环
       6.如果是请求reqeust是事务操作赋给nextPending对象
       7.如果不是加入到toProcess集合中去

       //这里主要通过nextPending对象控制请求响应的顺序
    }

    commit(Request){
       将request添加到committedRequests队列中去
    }

    processRequest(Request) {
       由上游处理器调用,将request对象添加到queuedRequests请求队列中
   }
}

4)ToBeAppliedRequestProcessor
         这个处理器的逻辑比较简单


1)   将请求转发给一下个处理器,必须是FinalRequestProcessor
2)   其实leader在走到这个处理器之前会在CommitProcessor中hod一会等到follower反馈在到这,follower反馈后leader的LearnerHandler的processAck会将请求加入toBeApplied集合,所以在这里对于事务请求一定会在toBeApplied中有对应的移除调,如果没有ConcurrentLinkedQueue直接会抛NoSuchElementException异常

5)FinalRequestProcessor
   这个处理器是最后一个处理器,真正去执行事务操作更改dataree的数据。
1)   调底层修改数据zks.processTxn(hdr, txn)
2)   将请求加入到committedLog集合中
3)   构建请求的响应,响应客户端

伪代码:
FinalRequestProcessor{
      processRequest(Request request) {
           //zks.outstandingChanges这个玩意起什么作用,一直没弄清楚
           1.事务头不为空,是事务类操作 {
                zks.processTxn(hdr,txn) //zkServer处理事务操作
            }

           如果是closeSesion,无需生成响应

           根据请求类型(request.type)生成响应,并调NioServerCnxn.sendResponse写入chanel通道
      }
}


6) SyncRequestProcessor
这个处理器用来将请求记录到txLog文件中,通过批量刷盘的方式来提升io的性能,这里请求只有被写入到本地磁盘后,才会被传递到下一个处理器
下面看一下伪代码:
SyncRequestProcessor线程 {
    run() {
        //flush的时间点:1. queuedRequests为空 2.toFlush.size() > 1000
        //生成新的snapshot
        调zks.getZKDatabase().append(si)添加一条事务日志
           1)成功:是事务类操作有事务头, 根据规则判断是否需要生一个新的snapshot,加入到toFlush的集合中   
           2)失败:没有事务头TxnHeader, 优化直接调下一个Processor
    }

    flush() {
        zks.getZKDatabase().commit();同步到本snapshot
        然后循环调下一个Processor
    }

    processRequest() {
       加入到阻塞队列queuedRequests中, 让同步线程自己处理
    }
}

7) AckRequestProcessor
被ProposalRequestProcessor调用, leader自己做一次投票的成功响应
      

8) SendAckRequestProcessor
         对于leader投票请求的发送响应
返回列表