编程开源技术交流,分享技术与知识

网站首页 > 开源技术 正文

zk源码—5.请求的处理过程一

wxchong 2025-04-27 16:46:50 开源技术 2 ℃ 0 评论

大纲

1.服务器的请求处理链

(1)Leader服务器的请求处理链

一.PrepRequestProcessor请求预处理器

二.ProposalRequestProcessor事务投票处理器

三.SyncRequestProcessor事务日志处理器

四.AckRequestProcessor投票反馈处理器

五.CommitProcessor事务提交处理器


.ToBeAppliedRequestProcessor处理器

七.FinalRequestProcessor处理器

(2)Follower服务器的请求处理链

一.FollowerRequestProcessor请求转发处理器

二.SendAckRequestProcessor投票反馈处理器

2.服务端处理会话创建请求的流程

(1)请求接收

(2)会话创建

(3)请求预处理

(4)事务处理

(5)事务应用和响应


1.服务器的请求处理链

(1)Leader服务器的请求处理链

(2)Follower服务器的请求处理链


(1)Leader服务器的请求处理链

一.PrepRequestProcessor请求预处理器

二.ProposalRequestProcessor事务投票处理器

三.SyncRequestProcessor事务日志处理器

四.AckRequestProcessor投票反馈处理器

五.CommitProcessor事务提交处理器


.ToBeAppliedRequestProcessor处理器

七.FinalRequestProcessor处理器


当客户端需要和zk服务端进行相互协调通信时,首先要通过Leader服务器建立该客户端与服务端的连接会话。当会话创建成功后,zk服务端就可以接收来自客户端的请求操作了。


Leader服务器是zk集群的核心,其主要工作是:

工作一:处理事务请求,保证集群事务处理的顺序性

工作二:集群内各服务器的调度者


zk服务端会使用责任链模式处理每一个客户端的请求。在服务端启动时,会进行请求处理链的初始化Leader服务器的请求处理链如下图示,主要有7个请求处理器。

一.PrepRequestProcessor请求预处理器

zk中的事务请求就是会改变服务器状态的请求。事务请求包括创建节点更新节点删除节点创建会话等请求。


PrepRequestProcessor是Leader服务器的请求预处理器(Prepare),它能够识别出当前客户端请求是否是事务请求,它会对事务请求进行一系列的预处理操作。这些预处理包括:创建请求事务头事务体会话检查ACL检查等。


PrepRequestProcessor实现了RequestProcessor接口继承了zk线程,而且还有一个RequestProcessor类型的nextProcessor属性字段,nextProcessor属性字段的作用是指向下一个请求处理器


Leader服务器在开始处理请求时,会调用PrepRequestProcessor的processRequest()方法将请求添加到队列请求预处理器的线程启动后会不断从submittedRequests队列取出请求,然后把请求交给PrepRequestProcessor的pRequest()方法进行预处理。在pRequest()方法中,会根据请求类型来判断请求是否是事务请求。如果是事务请求,就调用pRequest2Txn()方法对事务请求进行预处理。之后再将请求交给nextProcessor属性字段指向的处理器进行下一步处理。


PrepRequestProcessor处理器也是一个线程,它会先把请求添加到队列,然后由线程进行处理


PrepRequestProcessor的nextProcessor属性指向的是ProposalRequestProcessor处理器。

public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider {
    ...
    protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException, X509Exception {
        return new Leader(this, new LeaderZooKeeperServer(logFactory, this, this.zkDb));
    }
    ...
}

public class LeaderZooKeeperServer extends QuorumZooKeeperServer {
    CommitProcessor commitProcessor;
    PrepRequestProcessor prepRequestProcessor;
    ...
    //初始化请求处理链
    @Override
    protected void setupRequestProcessors() {
        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());
        commitProcessor = new CommitProcessor(toBeAppliedProcessor, Long.toString(getServerId()), false, getZooKeeperServerListener());
        commitProcessor.start();
        ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this, commitProcessor);
        proposalProcessor.initialize();
        prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);
        prepRequestProcessor.start();//启动请求预处理器线程
        firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);
        setupContainerManager();
    }
    ...
}

public class PrepRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor {
    RequestProcessor nextProcessor;
    LinkedBlockingQueue<Request> submittedRequests = new LinkedBlockingQueue<Request>();
    
    public void processRequest(Request request) {
        //将请求添加到队列
        submittedRequests.add(request);
    }
    ...
    @Override
    public void run() {
        while (true) {
            Request request = submittedRequests.take();
            ...
            pRequest(request);
        }
    }
    
    protected void pRequest(Request request) throws RequestProcessorException {
        request.setHdr(null);
        request.setTxn(null);
        switch (request.type) {
            ...
            case OpCode.create:
                CreateRequest createRequest = new CreateRequest();
                pRequest2Txn(request.type, zks.getNextZxid(), request,createRequest, true);
                break;
            case OpCode.delete:
                ...
        }
        ...
        request.zxid = zks.getZxid();
        //将请求交给下一个处理器来处理
        nextProcessor.processRequest(request);
    }
    
    //下面这个方法专门用来对事务请求进行预处理
    protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize) {
        //设置请求的事务头事务体      
        request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid, Time.currentWallTime(), type));
        ...
    }
    ...
}

有两个入口会触发调用PrepRequestProcessor的processRequest()方法。


第一是Leader服务器监听到Learner转发给Leader的事务请求。也就是在不断运行的LearnerHandler线程中发现Learner给Leader发送请求时,会调用
LeaderZooKeeperServer.submitLearnerRequest方法来触发。


第二是zk服务端监听到的来自客户端的事务请求。此时会先调用ZooKeeperServer的processPacket()方法处理Socket的读请求,然后再调用ZooKeeperServer的submitRequest()方法提交读请求,最后就会调用ZooKeeperServer的firstProcessor的processRequest()方法。firstProcessor的processRequest()方法执行完便进入PrepRequestProcessor。

//第一个入口
public class Leader {
    ...
    void lead() throws IOException, InterruptedException {
        ...
        cnxAcceptor = new LearnerCnxAcceptor();
        cnxAcceptor.start();
        ...
    }
    
    class LearnerCnxAcceptor extends ZooKeeperCriticalThread {
        ...
        @Override
        public void run() {
            while (!stop) {
                Socket s = ss.accept();
                s.setSoTimeout(self.tickTime * self.initLimit);
                s.setTcpNoDelay(nodelay);
                BufferedInputStream is = new BufferedInputStream(s.getInputStream());
                LearnerHandler fh = new LearnerHandler(s, is, Leader.this);
                fh.start();
                ...
            }
            ...
        }
    }
    ...
}

public class LearnerHandler extends ZooKeeperThread {
    ...
    @Override
    public void run() {
        ...
        while (true) {
            ...
            case Leader.REQUEST:
                ...
                //调用LeaderZooKeeperServer的submitLearnerRequest方法
                leader.zk.submitLearnerRequest(si);
            ...
        }
        ...
    }
    ...
}

public class LeaderZooKeeperServer extends QuorumZooKeeperServer {
    PrepRequestProcessor prepRequestProcessor;
    ...
    @Override
    protected void setupRequestProcessors() {
        ...
        prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);
        prepRequestProcessor.start();
        firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);
        ...
    }
    
    public void submitLearnerRequest(Request request) {
        prepRequestProcessor.processRequest(request);
    }
    ...
}

//第二个入口
public class NIOServerCnxnFactory extends ServerCnxnFactory {
    ...
    class SelectorThread extends AbstractSelectThread {
        @Override
        public void run() {
            ...
            while (!stopped) {
                select();
                ...
            }
            ...
        }
        
        private void select() {
            selector.select();
            Set<SelectionKey> selected = selector.selectedKeys();
            ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(selected);
            Collections.shuffle(selectedList);
            Iterator<SelectionKey> selectedKeys = selectedList.iterator();
          
            while (!stopped && selectedKeys.hasNext()) {
                SelectionKey key = selectedKeys.next();
                selected.remove(key);
                ...
                if (key.isReadable() || key.isWritable()) {
                    //服务端从客户端读数据(读取请求) + 服务端向客户端写数据(发送响应)
                    handleIO(key);
                }
                ...
            }
        }
        
        private void handleIO(SelectionKey key) {
            IOWorkRequest workRequest = new IOWorkRequest(this, key);
            NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment();
            cnxn.disableSelectable();
            key.interestOps(0);
            //激活连接:添加连接到连接过期队列
            touchCnxn(cnxn);
            //通过工作线程池来处理请求
            workerPool.schedule(workRequest);
        }
        ...
    }
    
    private class IOWorkRequest extends WorkerService.WorkRequest {
        private final NIOServerCnxn cnxn;
        
        public void doWork() throws InterruptedException {
            ...
            if (key.isReadable() || key.isWritable()) {
                cnxn.doIO(key);
                ...
            }
        }
        ...
    }
}

public class WorkerService {
    ...
    public void schedule(WorkRequest workRequest, long id) {
        ScheduledWorkRequest scheduledWorkRequest = new ScheduledWorkRequest(workRequest);
        int size = workers.size();
        if (size > 0) {
            int workerNum = ((int) (id % size) + size) % size;
            ExecutorService worker = workers.get(workerNum);
            worker.execute(scheduledWorkRequest);
        } else {
            scheduledWorkRequest.run();
        }
    }
    
    private class ScheduledWorkRequest implements Runnable {
        private final WorkRequest workRequest;
        
        ScheduledWorkRequest(WorkRequest workRequest) {
            this.workRequest = workRequest;
        }
        
        @Override
        public void run() {
            ...
            workRequest.doWork();
        }
    }
    ...
}

public class NIOServerCnxn extends ServerCnxn {
    private final ZooKeeperServer zkServer;
    
    void doIO(SelectionKey k) throws InterruptedException {
        ...
        if (k.isReadable()) {
            ...
            readPayload();
        }
    }
    
    private void readPayload() throws IOException, InterruptedException {
        ...
        readRequest();
    }
    
    private void readRequest() throws IOException {
        //处理输入流
        zkServer.processPacket(this, incomingBuffer);
    }
    ...
}

public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
    ...
    public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
        InputStream bais = new ByteBufferInputStream(incomingBuffer);
        BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
        RequestHeader h = new RequestHeader();
        h.deserialize(bia, "header");
        incomingBuffer = incomingBuffer.slice();
        ...
        Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), incomingBuffer, cnxn.getAuthInfo());
        submitRequest(si);
        ...
    }
    
    public void submitRequest(Request si) {
        ...
        //激活会话
        touch(si.cnxn);
        //firstProcessor.processRequest方法执行完便进入PrepRequestProcessor
        firstProcessor.processRequest(si);
        ...
    }
    ...
}

二.ProposalRequestProcessor事务投票处理器

ProposalRequestProcessor处理器是Leader服务器的事务投票处理器。它是PrepRequestProcessor请求预处理器的下一个处理器,它的主要作用是对事务请求进行处理,包括创建提议发起投票


对于非事务请求:它会将请求直接交给CommitProcessor处理器处理,不再做其他处理


对于事务请求:除了将请求交给CommitProcessor处理器外,还会创建请求对应的Proposal提议,并将Proposal提议发送给所有Follower发起一次集群内的事务投票,同时还会将事务请求交给SyncRequestProcessor处理器记录事务日志


提议是指:当处理一个事务请求时,zk会先在服务端发起一次投票流程。该投票的主要作用是通知zk服务端的各机器处理事务请求,从而避免因某个机器出现问题而造成事务不一致的问题。


ProposalRequestProcessor事务投票处理器的三个子流程分别是:Commit流程Proposal流程Sync流程


流程一:Commit流程

完成Proposal流程后,zk服务器上的数据还没有进行任何改变。完成Proposal流程只是说明zk服务端可以执行事务请求操作了,真正执行具体数据的变更需要在Commit流程中实现。Commit流程的主要作用就是完成请求的执行。该流程是由CommitProcessor处理器来实现的。


流程二:Proposal流程

处理事务请求时,zk要取得集群中过半机器的投票才能修改数据。Proposal流程的主要工作就是投票和统计投票结果。


流程三:Sync流程

Sync流程是由SyncRequestProcessor处理器来实现的。


ProposalRequestProcessor处理器不是一个线程,它的nextProcessor就是CommitProcessor处理器,它会调用SyncRequestProcessor处理器的processRequest()方法;

public class LeaderZooKeeperServer extends QuorumZooKeeperServer {
    ...
    @Override
    protected void setupRequestProcessors() {
        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());
        commitProcessor = new CommitProcessor(toBeAppliedProcessor, Long.toString(getServerId()), false, getZooKeeperServerListener());
        commitProcessor.start();
        //构建ProposalRequestProcessor处理器,下一个处理器为CommitProcessor处理器
        ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this, commitProcessor);
        proposalProcessor.initialize();//初始化ProposalRequestProcessor处理器
        prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);
        prepRequestProcessor.start();
        firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);
        setupContainerManager();
    }
    ...
}

//ProposalRequestProcessor的nextProcessor就是CommitProcessor
public class ProposalRequestProcessor implements RequestProcessor {
    LeaderZooKeeperServer zks;
    RequestProcessor nextProcessor;//nextProcessor其实就是CommitProcessor处理器
    SyncRequestProcessor syncProcessor;//事务日志处理器,它的下一个处理器是AckRequestProcessor
    
    public ProposalRequestProcessor(LeaderZooKeeperServer zks, RequestProcessor nextProcessor) {
        this.zks = zks;
        this.nextProcessor = nextProcessor;
        AckRequestProcessor ackProcessor = new AckRequestProcessor(zks.getLeader());
        //创建事务日志处理器,它的下一个处理器是AckRequestProcessor
        syncProcessor = new SyncRequestProcessor(zks, ackProcessor);
    }
    
    //初始化ProposalRequestProcessor处理器
    public void initialize() {
        syncProcessor.start();//启动事务日志处理器的线程
    }
    
    public void processRequest(Request request) throws RequestProcessorException {
        if (request instanceof LearnerSyncRequest) {
            //处理Learner的数据同步请求
            zks.getLeader().processSync((LearnerSyncRequest)request);
        } else {
            //Commit流程,nextProcessor其实就是CommitProcessor处理器
            nextProcessor.processRequest(request);
            if (request.getHdr() != null) {
                //Proposal流程
                zks.getLeader().propose(request);
                //Sync流程,将请求添加到队列,然后由事务日志处理器线程去处理
                syncProcessor.processRequest(request);
            }
        }
    }
    ...
}

public class Leader {
    final ConcurrentMap<Long, Proposal> outstandingProposals = new ConcurrentHashMap<Long, Proposal>();
    ...
    public Proposal propose(Request request) throws XidRolloverException {
        ...
        byte[] data = SerializeUtils.serializeRequest(request);
        proposalStats.setLastBufferSize(data.length);
        QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null);
        //生成Proposal提议
        Proposal p = new Proposal();
        p.packet = pp;
        p.request = request;
       
        synchronized(this) {
            p.addQuorumVerifier(self.getQuorumVerifier());
            if (request.getHdr().getType() == OpCode.reconfig) {
                self.setLastSeenQuorumVerifier(request.qv, true);                       
            }
            if (self.getQuorumVerifier().getVersion()<self.getLastSeenQuorumVerifier().getVersion()) {
                p.addQuorumVerifier(self.getLastSeenQuorumVerifier());
            }
            lastProposed = p.packet.getZxid();
            //将发送的Proposal提议放入outstandingProposals队列中
            outstandingProposals.put(lastProposed, p);
            //发送Proposal提议,其实就是把Proposal提议交给LearnerHandler处理
            sendPacket(pp);
        }
        return p;
    }
    
    void sendPacket(QuorumPacket qp) {
        synchronized (forwardingFollowers) {
            for (LearnerHandler f : forwardingFollowers) {
                //LearnerHandler会将提议放入其发送队列里
                f.queuePacket(qp);
            }
        }
    }
    ...
}

public class LearnerHandler extends ZooKeeperThread {
    final LinkedBlockingQueue<QuorumPacket> queuedPackets = new LinkedBlockingQueue<QuorumPacket>();
    ...
    void queuePacket(QuorumPacket p) {
        queuedPackets.add(p);
    }
    
    @Override
    public void run() {
        ...
        //启动一个线程去发送Packet,比如Proposal提议
        startSendingPackets();
        ...
    }
    
    protected void startSendingPackets() {
        if (!sendingThreadStarted) {
            // Start sending packets
            new Thread() {
                public void run() {
                    Thread.currentThread().setName("Sender-" + sock.getRemoteSocketAddress());
                    sendPackets();
                }
            }.start();
            sendingThreadStarted = true;
        } else {
            LOG.error("Attempting to start sending thread after it already started");
        }
    }
    
    private void sendPackets() throws InterruptedException {
        long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
        while (true) {
            QuorumPacket p;
            p = queuedPackets.poll();
            if (p == null) {
                bufferedOutput.flush();
                p = queuedPackets.take();
            }
            if (p == proposalOfDeath) {
                break;
            }
            if (p.getType() == Leader.PING) {
                traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
            }
            if (p.getType() == Leader.PROPOSAL) {
                syncLimitCheck.updateProposal(p.getZxid(), System.nanoTime());
            }
            if (LOG.isTraceEnabled()) {
                ZooTrace.logQuorumPacket(LOG, traceMask, 'o', p);
            }
            oa.writeRecord(p, "packet");
        }
    }
    ...
}

三.SyncRequestProcessor事务日志处理器

SyncRequestProcessor处理器是事务日志处理器。它的作用是将事务请求记录到事务日志文件中,同时触发zk进行数据快照


SyncRequestProcessor处理器也是一个线程,它会先把请求添加到队列,然后由线程处理,它的nextProcessor是AckRequestProcessor处理器。

//SyncRequestProcessor事务日志处理器,它的下一个处理器是AckRequestProcessor
public class SyncRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor {
    private final ZooKeeperServer zks;
    private final LinkedBlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<Request>();
    private final RequestProcessor nextProcessor;//AckRequestProcessor处理器
    private Thread snapInProcess = null;
    volatile private boolean running;
    private final LinkedList<Request> toFlush = new LinkedList<Request>();
    private final Random r = new Random();
    private static int snapCount = ZooKeeperServer.getSnapCount();
    private final Request requestOfDeath = Request.requestOfDeath;
    
    public SyncRequestProcessor(ZooKeeperServer zks, RequestProcessor nextProcessor) {
        super("SyncThread:" + zks.getServerId(), zks.getZooKeeperServerListener());
        this.zks = zks;
        this.nextProcessor = nextProcessor;
        running = true;
    }
    
    public void processRequest(Request request) {
        //将请求添加到队列
        queuedRequests.add(request);
    }
    
    @Override
    public void run() {
        try {
            int logCount = 0;
            int randRoll = r.nextInt(snapCount/2);
            while (true) {
                Request si = null;
                if (toFlush.isEmpty()) {
                    si = queuedRequests.take();
                } else {
                    si = queuedRequests.poll();
                    if (si == null) {
                        flush(toFlush);
                        continue;
                    }
                }
                if (si == requestOfDeath) {
                    break;
                }
                if (si != null) {
                    if (zks.getZKDatabase().append(si)) {
                        logCount++;
                        if (logCount > (snapCount / 2 + randRoll)) {
                            randRoll = r.nextInt(snapCount/2);
                            // roll the log
                            zks.getZKDatabase().rollLog();
                            // take a snapshot
                            if (snapInProcess != null && snapInProcess.isAlive()) {
                                LOG.warn("Too busy to snap, skipping");
                            } else {
                                snapInProcess = new ZooKeeperThread("Snapshot Thread") {
                                    public void run() {
                                        try {
                                            zks.takeSnapshot();
                                        } catch(Exception e) {
                                            LOG.warn("Unexpected exception", e);
                                        }
                                    }
                                };
                                snapInProcess.start();
                            }
                            logCount = 0;
                        }
                    } else if (toFlush.isEmpty()) {
                        if (nextProcessor != null) {
                            nextProcessor.processRequest(si);
                            if (nextProcessor instanceof Flushable) {
                                ((Flushable)nextProcessor).flush();
                            }
                        }
                        continue;
                    }
                    toFlush.add(si);
                    if (toFlush.size() > 1000) {
                        flush(toFlush);
                    }
                }
            }
        } catch (Throwable t) {
            handleException(this.getName(), t);
        } finally{
            running = false;
        }
    }
    
    private void flush(LinkedList<Request> toFlush) throws IOException, RequestProcessorException {
        if (toFlush.isEmpty()) {
            return;
        }
        zks.getZKDatabase().commit();
        while (!toFlush.isEmpty()) {
            Request i = toFlush.remove();
            if (nextProcessor != null) {
                nextProcessor.processRequest(i);
            }
        }
        if (nextProcessor != null && nextProcessor instanceof Flushable) {
            ((Flushable)nextProcessor).flush();
        }
    }
    
    public void shutdown() {
        queuedRequests.add(requestOfDeath);
        if (running) {
            this.join();
        }
        if (!toFlush.isEmpty()) {
            flush(toFlush);
        }
        if (nextProcessor != null) {
            nextProcessor.shutdown();
        }
    }
}

四.AckRequestProcessor投票反馈处理器

SyncRequestProcessor的nextProcessor就是AckRequestProcessor,AckRequestProcessor是Leader特有的处理器。


它负责在SyncRequestProcessor处理器完成事务日志记录后,通过Leader的processAck()方法向Proposal提议添加来自Leader的ACK响应。也就是将Leader的SID添加到Proposal提议的投票收集器里,然后调用Leader的tryToCommit()方法检查提议是否已有过半ACK尝试提交


同理,如果Leader收到Follower对该Proposal提议请求返回的ACK响应,也会通过Leader的processAck()方法向提议添加来自Follower的ACK响应,也就是将Follower的SID添加到Proposal提议的投票收集器里,然后调用Leader的tryToCommit()方法检查提议是否已有过半ACK尝试提交


AckRequestProcessor处理器不是一个线程,它没有nextProcessor属性字段。

//SyncRequestProcessor的nextProcessor就是AckRequestProcessor
class AckRequestProcessor implements RequestProcessor {
    Leader leader;
    
    AckRequestProcessor(Leader leader) {
        this.leader = leader;
    }
    
    //Forward the request as an ACK to the leader
    public void processRequest(Request request) {
        QuorumPeer self = leader.self;
        if (self != null) {
            //Leader也作为参与Proposal投票的一份子进行ACK响应
            //将Leader的SID添加到Proposal提议的投票收集器里 + 检查Proposal提议的投票收集器是否有过半ACK才提交
            leader.processAck(self.getId(), request.zxid, null);
        } else {
            LOG.error("Null QuorumPeer");
        }
    }
}

public class LearnerHandler extends ZooKeeperThread {
    ...
    @Override
    public void run() {
        ...
        while (true) {
            ...
            switch (qp.getType()) {
                case Leader.ACK:
                    ...
                    //如果Leader收到Follower对某Proposal提议请求返回的ACK响应
                    //那么就将Follower的SID添加到该Proposal提议的投票收集器里
                    leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
                    break;
                ...
            }
        ...
    }
    ...
}

public class Leader {
    final ConcurrentMap<Long, Proposal> outstandingProposals = new ConcurrentHashMap<Long, Proposal>();
    ...
    public Proposal propose(Request request) throws XidRolloverException {
        ...
        byte[] data = SerializeUtils.serializeRequest(request);
        proposalStats.setLastBufferSize(data.length);
        QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null);
        //生成Proposal提议
        Proposal p = new Proposal();
        p.packet = pp;
        p.request = request;
       
        synchronized(this) {
            p.addQuorumVerifier(self.getQuorumVerifier());
            if (request.getHdr().getType() == OpCode.reconfig) {
                self.setLastSeenQuorumVerifier(request.qv, true);                       
            }
            if (self.getQuorumVerifier().getVersion()<self.getLastSeenQuorumVerifier().getVersion()) {
                p.addQuorumVerifier(self.getLastSeenQuorumVerifier());
            }
            lastProposed = p.packet.getZxid();
            //将发送的Proposal提议放入outstandingProposals队列中
            outstandingProposals.put(lastProposed, p);
            //发送Proposal提议,其实就是把Proposal提议交给LearnerHandler处理
            sendPacket(pp);
        }
        return p;
    }
    
    void sendPacket(QuorumPacket qp) {
        synchronized (forwardingFollowers) {
            for (LearnerHandler f : forwardingFollowers) {
                //LearnerHandler会将提议放入其发送队列里
                f.queuePacket(qp);
            }
        }
    }
    
    synchronized public void processAck(long sid, long zxid, SocketAddress followerAddr) {        
        ...
        //检查请求的ZXID,需要比上次已提交的请求的ZXID也就是lastCommitted要大
        if (lastCommitted >= zxid) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("proposal has already been committed, pzxid: 0x{} zxid: 0x{}", Long.toHexString(lastCommitted), Long.toHexString(zxid));
            }
            // The proposal has already been committed
            return;
        }
        Proposal p = outstandingProposals.get(zxid);
        //将Leader的SID添加到Proposal提议的投票收集器里
        p.addAck(sid);
        //尝试提交,即检查Proposal提议的投票收集器中是否有过半ACK响应
        boolean hasCommitted = tryToCommit(p, zxid, followerAddr);
        ...
    }
    
    synchronized public boolean tryToCommit(Proposal p, long zxid, SocketAddress followerAddr) {       
        //如果提议队列中存在该提议的前一个提议,说明该提议的前一个提议还没提交,那么就返回false
        //zxid - 1是因为,只有事务请求才会生成zxid,那么前一个事务肯定就是zxid = 1
        if (outstandingProposals.containsKey(zxid - 1)) return false;
        
        //getting a quorum from all necessary configurations.
        //Proposal提议的投票收集器是否已过半
        if (!p.hasAllQuorums()) {
            return false;                 
        }
        ...
        outstandingProposals.remove(zxid);
        if (p.request != null) {
            toBeApplied.add(p);
        }
        ...
        //一旦提议通过,马上就要在Leader中标记lastCommitted为最新的提交ZXID
        commit(zxid);//给Follower广播commit消息
        inform(p);//给Observer发送commit消息
        ...
        //调用CommitProcessor处理器的commit方法提交请求
        zk.commitProcessor.commit(p.request);//让Leader执行commit消息
        //下面处理的是Learner发起的同步请求
        if (pendingSyncs.containsKey(zxid)) {
            for (LearnerSyncRequest r: pendingSyncs.remove(zxid)) {
                sendSync(r);
            }               
        } 
        return true;   
    }
    
    //广播commit消息
    public void commit(long zxid) {
        synchronized(this) {
            //标记lastCommitted为最新的提交ZXID
            lastCommitted = zxid;
        }
        QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, null, null);
        sendPacket(qp);
    }
    
    void sendPacket(QuorumPacket qp) {
        synchronized (forwardingFollowers) {
            for (LearnerHandler f : forwardingFollowers) {
                //调用LearnerHandler的queuePacket方法添加Packet到发送队列
                f.queuePacket(qp);
            }
        }
    }
    
    public void inform(Proposal proposal) {
        QuorumPacket qp = new QuorumPacket(Leader.INFORM, proposal.request.zxid, proposal.packet.getData(), null);
        sendObserverPacket(qp);
    }
    
    ...
    static public class Proposal extends SyncedLearnerTracker {
        public QuorumPacket packet;
        public Request request;
        ...
    }
}

public class SyncedLearnerTracker {
    protected ArrayList<QuorumVerifierAcksetPair> qvAcksetPairs = new ArrayList<QuorumVerifierAcksetPair>();
    ...
    //添加到投票收集器
    public boolean addAck(Long sid) {
        boolean change = false;
        for (QuorumVerifierAcksetPair qvAckset : qvAcksetPairs) {
            if (qvAckset.getQuorumVerifier().getVotingMembers().containsKey(sid)) {
                qvAckset.getAckset().add(sid);
                change = true;
            }
        }
        return change;
    }
    
    //判断投票收集器是否过半
    public boolean hasAllQuorums() {
        for (QuorumVerifierAcksetPair qvAckset : qvAcksetPairs) {
            if (!qvAckset.getQuorumVerifier().containsQuorum(qvAckset.getAckset()))
                return false;
        }
        return true;
    }
    ...
}

五.CommitProcessor事务提交处理器

ProposalRequestProcessor的nextProcessor就是CommitProcessor处理器,CommitProcessor就是事务提交处理器。


对于非事务请求,CommitProcessor会将其转交给nextProcessor处理。对于事务请求,CommitProcessor会阻塞等待Proposal提议可以被提交。


CommitProcessor有个LinkedBlockingQueue队列queuedRequests。当调用CommitProcessor的processRequest()方法时,请求会被添加到该队列。CommitProcessor线程会从queuedRequests队列中取出请求进行处理。此外还通过nextPending和committedRequests队列保证请求的顺序处理


CommitProcessor处理器也是一个线程,它会先把请求添加到队列,然后由线程处理,它的nextProcessor是
ToBeAppliedRequestProcessor.

public class CommitProcessor extends ZooKeeperCriticalThread implements RequestProcessor {
    //请求队列
    protected final LinkedBlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<Request>();
    protected final LinkedBlockingQueue<Request> committedRequests = new LinkedBlockingQueue<Request>();
    //下一个要提交的请求
    protected final AtomicReference<Request> nextPending = new AtomicReference<Request>();
    //当前正在处理的请求数
    protected AtomicInteger numRequestsProcessing = new AtomicInteger(0);
    ...
    
    @Override
    public void processRequest(Request request) {
        if (stopped) {
            return;
        }
        queuedRequests.add(request);
        if (!isWaitingForCommit()) {
            wakeup();//唤醒
        }
    }
    
    private boolean isProcessingRequest() {
        return numRequestsProcessing.get() != 0;
    }
    
    private boolean isWaitingForCommit() {
        return nextPending.get() != null;
    }
    
    private boolean isProcessingCommit() {
        return currentlyCommitting.get() != null;
    }
    
    synchronized private void wakeup() {
        notifyAll();//唤醒阻塞的线程
    }
    
    @Override
    public void run() {
        Request request;
        while (!stopped) {
            synchronized(this) {
                while (!stopped && ((queuedRequests.isEmpty() || isWaitingForCommit() || isProcessingCommit()) && (committedRequests.isEmpty() || isProcessingRequest()))) {
                    wait();//阻塞等待
                }
            }
            while (!stopped && !isWaitingForCommit() && !isProcessingCommit() && (request = queuedRequests.poll()) != null) {
                if (needCommit(request)) {//需要进行提交的事务请求
                    nextPending.set(request);//设置下一个要提交的请求
                } else {//非事务请求转交给下一个处理器
                    sendToNextProcessor(request);
                }
            }
            processCommitted();//处理提交
        }
    }
    
    protected void processCommitted() {
        Request request;
        if (!stopped && !isProcessingRequest() && (committedRequests.peek() != null)) {
            if ( !isWaitingForCommit() && !queuedRequests.isEmpty()) {
                return;
            }
            request = committedRequests.poll();
            Request pending = nextPending.get();
            if (pending != null && pending.sessionId == request.sessionId && pending.cxid == request.cxid) {
                pending.setHdr(request.getHdr());
                pending.setTxn(request.getTxn());
                pending.zxid = request.zxid;
                currentlyCommitting.set(pending);
                nextPending.set(null);
                sendToNextProcessor(pending);
            } else {
                currentlyCommitting.set(request);
                sendToNextProcessor(request);
            }
        }      
    }
    
    public void commit(Request request) {
        committedRequests.add(request);
        if (!isProcessingCommit()) {//CommitProcessor处理器当前没有提交请求
            wakeup();//CommitProcessor唤醒线程
        }
    }
    
    private void sendToNextProcessor(Request request) {
        numRequestsProcessing.incrementAndGet();
        workerPool.schedule(new CommitWorkRequest(request), request.sessionId);
    }
    
    private class CommitWorkRequest extends WorkerService.WorkRequest {
        private final Request request;
        CommitWorkRequest(Request request) {
            this.request = request;
        }
        ...
        public void doWork() throws RequestProcessorException {
            try {
                nextProcessor.processRequest(request);
            } finally {
                currentlyCommitting.compareAndSet(request, null);
                if (numRequestsProcessing.decrementAndGet() == 0) {
                    if (!queuedRequests.isEmpty() || !committedRequests.isEmpty()) {
                        wakeup();
                    }
                }
            }
        }
    }
    ...
}

如何理解保证事务请求的顺序处理:


顺序排队的事务请求在被ProposalRequestProcessor处理的过程中,首先会执行CommitProcessor的processRequest()方法将请求加入请求队列,所以请求队列queuedRequests里面的请求是按顺序排好的。然后会生成Proposal提议发送给Follower并收集ACK响应,最后当ACK响应过半时才调用CommitProcessor的commit()方法,此时可以进行提交的请求就会被添加到CommitProcessor的committedRequests队列中。


是否会因网络原因,导致CommitProcessor的committedRequests队列里的请求并不一定按顺序排好呢?


事务请求能保证顺序处理的根本原因是:


整个Proposal消息广播过程是基于FIFO特性的TCP协议来进行网络通信的,所以能够很容易保证消息广播过程中消息接收和发送的顺序性。也就是广播时是由一个主进程Leader去通过FIFO的TCP协议进行发送的,所以每个Follower接收到的Proposal和Commit请求都会按顺序进入队列


客户端并发执行的事务请求到达Leader时一定会按顺序入队。然后Leader对事务请求进行广播时,也会按顺序进行广播。被单一Leader进行顺序广播的多个事务请求也会顺序到达某Follower。所以某Follower收到的多个Proposal提议也会按广播时的顺序进入队列,之后某Follower都会按广播时的顺序发送ACK响应给Leader。


所以Leader收到某Follower的ACK响应都是按广播时的顺序收到的。即使Leader先收到Follower2响应的事务2,后收到Follower1的响应事务1,但最终统计过半选票时,Leader会发现事务1首先过半从而优先保证事务1的顺序。

当然,Leader的processAck()方法会先确保要被提交的请求ZXID比上次大。此外,Leader的tryToCommit()方法也会首先确保前一个事务提交了才能处理。以及Follower在接收到Proposal和Commit请求就是按顺序响应,即若Follower要提交的事务ID不是pendingTxns的头部元素,那么就退出程序。最后结合CommitProcessor里的queuedRequests + committedRequests + nextPending,于是便能保证事务请求的顺序处理。

public class Leader {
    ...
    synchronized public void processAck(long sid, long zxid, SocketAddress followerAddr) {        
        ...
        //检查请求的ZXID,需要比上次已提交的请求的ZXID也就是lastCommitted要大
        if (lastCommitted >= zxid) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("proposal has already been committed, pzxid: 0x{} zxid: 0x{}", Long.toHexString(lastCommitted), Long.toHexString(zxid));
            }
            // The proposal has already been committed
            return;
        }
        Proposal p = outstandingProposals.get(zxid);
        //将Leader的SID添加到Proposal提议的投票收集器里
        p.addAck(sid);
        //尝试提交,即检查Proposal提议的投票收集器中是否有过半ACK响应
        boolean hasCommitted = tryToCommit(p, zxid, followerAddr);
        ...
    }
    
    synchronized public boolean tryToCommit(Proposal p, long zxid, SocketAddress followerAddr) {       
        //如果提议队列中存在该提议的前一个提议,说明该提议的前一个提议还没提交,那么就返回false
        //zxid - 1是因为,只有事务请求才会生成zxid,那么前一个事务肯定就是zxid = 1
        if (outstandingProposals.containsKey(zxid - 1)) return false;
        //getting a quorum from all necessary configurations.
        //Proposal提议的投票收集器是否已过半
        if (!p.hasAllQuorums()) {
            return false;                 
        }
        ...
        zk.commitProcessor.commit(p.request);
        ...      
    }
    ...
}

public class Follower extends Learner{
    ...
    void followLeader() throws InterruptedException {
        ...
        while (this.isRunning()) {
            readPacket(qp);
            processPacket(qp);
        }
        ...
    }
    
    protected void processPacket(QuorumPacket qp) throws Exception {
        switch (qp.getType()) {
            case Leader.PING:            
                ping(qp);            
                break;
            case Leader.PROPOSAL:
                //处理Leader发起的Proposal提议投票请求
                TxnHeader hdr = new TxnHeader();
                Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
                lastQueued = hdr.getZxid();
                ...
                fzk.logRequest(hdr, txn);
                break;
            case Leader.COMMIT:
                //处理Leader发送过来的对Proposal提议进行提交的请求
                fzk.commit(qp.getZxid());
                break;
            ...
        }
    }
}

public class FollowerZooKeeperServer extends LearnerZooKeeperServer {
    LinkedBlockingQueue<Request> pendingTxns = new LinkedBlockingQueue<Request>();
    ...
    
    //将收到的投票请求放入队列pendingTxns
    public void logRequest(TxnHeader hdr, Record txn) {
        Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid());
        if ((request.zxid & 0xffffffffL) != 0) {
            pendingTxns.add(request);
        }
        syncProcessor.processRequest(request);
    }
    
    //When a COMMIT message is received, eventually this method is called,
    //which matches up the zxid from the COMMIT with (hopefully) the head of 
    //the pendingTxns queue and hands it to the commitProcessor to commit.
    //@param zxid - must correspond to the head of pendingTxns if it exists
    public void commit(long zxid) {
        if (pendingTxns.size() == 0) {
            LOG.warn("Committing " + Long.toHexString(zxid) + " without seeing txn");
            return;
        }
        long firstElementZxid = pendingTxns.element().zxid;
        if (firstElementZxid != zxid) {
            //如果Follower需要提交的事务ID不是pendingTxns的头部元素,就退出程序
            LOG.error("Committing zxid 0x" + Long.toHexString(zxid) + " but next pending txn 0x" + Long.toHexString(firstElementZxid));
            System.exit(12);
        }
        Request request = pendingTxns.remove();
        commitProcessor.commit(request);
    }
    ...
}


.ToBeAppliedRequestProcessor处理器

Leader中有一个toBeApplied队列,专门存储那些可以被提交的Proposal提议。
ToBeAppliedRequestProcessor会把已被CommitProcessor处理过的请求,
转交给下一个处理器处理,并把请求从Leader的toBeApplied队列中移除



ToBeAppliedRequestProcessor处理器
不是一个线程,它的next是FinalRequestProcessor处理器。

public class Leader {
    private final ConcurrentLinkedQueue<Proposal> toBeApplied = new ConcurrentLinkedQueue<Proposal>();
    ...
    static class ToBeAppliedRequestProcessor implements RequestProcessor {
        private final RequestProcessor next;
        private final Leader leader;
        
        ToBeAppliedRequestProcessor(RequestProcessor next, Leader leader) {
            this.leader = leader;
            this.next = next;
        }
        ...
        public void processRequest(Request request) throws RequestProcessorException {
            next.processRequest(request);
            if (request.getHdr() != null) {
                long zxid = request.getHdr().getZxid();
                Iterator<Proposal> iter = leader.toBeApplied.iterator();
                if (iter.hasNext()) {
                    Proposal p = iter.next();
                    if (p.request != null && p.request.zxid == zxid) {
                        iter.remove();
                        return;
                    }
                }
            }
        }
        ...
    }
    ...
}

七.FinalRequestProcessor处理器

FinalRequestProcessor处理器用来处理返回客户端响应前的收尾工作,包括创建客户端的响应将事务请求应用到内存数据库中


FinalRequestProcessor处理器不是一个线程,它也没有nextProcessor属性字段。

public class FinalRequestProcessor implements RequestProcessor {
    ...
    public void processRequest(Request request) {
        ...
        ProcessTxnResult rc = null;
        synchronized (zks.outstandingChanges) {
            // Need to process local session requests
            rc = zks.processTxn(request);
            if (request.getHdr() != null) {
                TxnHeader hdr = request.getHdr();
                Record txn = request.getTxn();
                long zxid = hdr.getZxid();
            
                while (!zks.outstandingChanges.isEmpty() && zks.outstandingChanges.peek().zxid <= zxid) {
                    ChangeRecord cr = zks.outstandingChanges.remove();
                    if (zks.outstandingChangesForPath.get(cr.path) == cr) {
                        zks.outstandingChangesForPath.remove(cr.path);
                    }
                }
            }
            // do not add non quorum packets to the queue.
            if (request.isQuorum()) {
                zks.getZKDatabase().addCommittedProposal(request);
            }
        }
        ...
        //创建响应并发送响应给客户端
        long lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid();
        ReplyHeader hdr = new ReplyHeader(request.cxid, lastZxid, err.intValue());
        zks.serverStats().updateLatency(request.createTime);
        cnxn.updateStatsForResponse(request.cxid, lastZxid, lastOp, request.createTime, Time.currentElapsedTime());
        cnxn.sendResponse(hdr, rsp, "response");
        if (request.type == OpCode.closeSession) {
            cnxn.sendCloseSession();
        }
    }
    ...
}

总结:

PrepRequestProcessor的nextProcessor就是ProposalRequestProcessor;
ProposalRequestProcessor的nextProcessor就是CommitProcessor;
CommitProcessor的nextProcessor就是ToBeAppliedRequestProcessor;
ToBeAppliedRequestProcessor的next是FinalRequestProcessor;
FinalRequestProcessor没有nextProcessor属性字段;

ProposalRequestProcessor会调用SyncRequestProcessor处理器的方法;
SyncRequestProcessor的nextProcessor就是AckRequestProcessor;
AckRequestProcessor没有nextProcessor属性字段;

PrepRequestProcessor处理器是一个线程;
ProposalRequestProcessor处理器不是一个线程;
CommitProcessor处理器是一个线程;
ToBeAppliedRequestProcessor处理器不是一个线程;
FinalRequestProcessor处理器不是一个线程;

SyncRequestProcessor处理器是一个线程;
AckRequestProcessor处理器不是一个线程;


(2)Follower服务器的请求处理链

一.FollowerRequestProcessor请求转发处理器

二.SendAckRequestProcessor投票反馈处理器


Follower服务器的主要工作是:

一.处理非事务请求 + 转发事务请求给Leader服务器

二.参与事务请求的Proposal提议的投票

三.参与Leader选举投票


Follower服务器的请求处理链如下图示:

Leader服务器的第一个处理器是LeaderRequestProcessor,Follower服务器的第一个处理器是FollowerRequestProcessor。由于不需要处理事务请求的投票,所以Follower服务器没有ProposalRequestProcessor处理器。


一.FollowerRequestProcessor请求转发处理器

FollowerRequestProcessor主要工作是识别当前请求是否是事务请求。如果是事务请求,那么Follower就会将该事务请求转发给Leader服务器。FollowerRequestProcessor处理器会通过调用Learner的request()方法实现请求转发。Learner的request方法会往输出流leaderOs写入请求数据来发给Leader。输出流leaderOs在Follower和Leader建立好连接时就已经初始化好了的。

public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider {
    ...
    protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {
        return new Follower(this, new FollowerZooKeeperServer(logFactory, this, this.zkDb));
    }
    ...
}

public class FollowerZooKeeperServer extends LearnerZooKeeperServer {
    ...
    @Override
    protected void setupRequestProcessors() {
        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener());
        commitProcessor.start();
        firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
        ((FollowerRequestProcessor) firstProcessor).start();
        syncProcessor = new SyncRequestProcessor(this, new SendAckRequestProcessor((Learner)getFollower()));
        syncProcessor.start();
    }
    
    public Follower getFollower(){
        return self.follower;
    }
    ...
}

public class FollowerRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor {
    FollowerZooKeeperServer zks;
    RequestProcessor nextProcessor;
    LinkedBlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<Request>();
    boolean finished = false;
    
    public FollowerRequestProcessor(FollowerZooKeeperServer zks, RequestProcessor nextProcessor) {
        super("FollowerRequestProcessor:" + zks.getServerId(), zks.getZooKeeperServerListener());
        this.zks = zks;
        this.nextProcessor = nextProcessor;
    }
    
    @Override
    public void run() {
        while (!finished) {
            Request request = queuedRequests.take();
            if (request == Request.requestOfDeath) {
                break;
            }
            nextProcessor.processRequest(request);
            //如果是事务请求,则调用zks.getFollower().request(request)转发事务请求给Leader
            switch (request.type) {
                case OpCode.sync:
                    zks.pendingSyncs.add(request);
                    zks.getFollower().request(request);
                    break;
                case OpCode.create:
                case OpCode.create2:
                case OpCode.createTTL:
                case OpCode.createContainer:
                case OpCode.delete:
                case OpCode.deleteContainer:
                case OpCode.setData:
                case OpCode.reconfig:
                case OpCode.setACL:
                case OpCode.multi:
                case OpCode.check:
                    zks.getFollower().request(request);
                    break;
                case OpCode.createSession:
                case OpCode.closeSession:
                    // Don't forward local sessions to the leader.
                    if (!request.isLocalSession()) {
                        zks.getFollower().request(request);
                    }
                    break;
            }
        }
    }
    
    public void processRequest(Request request) {
        if (!finished) {
            Request upgradeRequest = null;
            upgradeRequest = zks.checkUpgradeSession(request);
            if (upgradeRequest != null) {
                queuedRequests.add(upgradeRequest);
            }
            queuedRequests.add(request);
        }
    }
    ...
}

public class Learner {
    protected BufferedOutputStream bufferedOutput;
    protected Socket sock;
    protected InputArchive leaderIs;
    protected OutputArchive leaderOs;
    ...
    //send a request packet to the leader
    //发送一个请求给Leader
    void request(Request request) throws IOException {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        DataOutputStream oa = new DataOutputStream(baos);
        oa.writeLong(request.sessionId);
        oa.writeInt(request.cxid);
        oa.writeInt(request.type);
        if (request.request != null) {
            request.request.rewind();
            int len = request.request.remaining();
            byte b[] = new byte[len];
            request.request.get(b);
            request.request.rewind();
            oa.write(b);
        }
        oa.close();
        QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos.toByteArray(), request.authInfo);
        //发送请求,往输出流leaderOs写数据
        writePacket(qp, true);
    }
    
    void writePacket(QuorumPacket pp, boolean flush) throws IOException {
        synchronized (leaderOs) {
            if (pp != null) {
                leaderOs.writeRecord(pp, "packet");
            }
            if (flush) {
                bufferedOutput.flush();
            }
        }
    }
    
    //和Leader建立连接时就已经初始化好输出流leaderOs了
    protected void connectToLeader(InetSocketAddress addr, String hostname) throws IOException, InterruptedException, X509Exception {
        this.sock = createSocket();
        int initLimitTime = self.tickTime * self.initLimit;
        int remainingInitLimitTime = initLimitTime;
        long startNanoTime = nanoTime();
        
        for (int tries = 0; tries < 5; tries++) {
            remainingInitLimitTime = initLimitTime - (int)((nanoTime() - startNanoTime) / 1000000);
            sockConnect(sock, addr, Math.min(self.tickTime * self.syncLimit, remainingInitLimitTime));
            if (self.isSslQuorum())  {
                ((SSLSocket) sock).startHandshake();
            }
            sock.setTcpNoDelay(nodelay);
            break;
        }
        Thread.sleep(1000);
        self.authLearner.authenticate(sock, hostname);
        //初始化好输入流leaderIs
        leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(sock.getInputStream()));
        bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
        //初始化好输出流leaderOs
        leaderOs = BinaryOutputArchive.getArchive(bufferedOutput);
    }
    
    //创建BIO的scoekt
    private Socket createSocket() throws X509Exception, IOException {
        Socket sock;
        if (self.isSslQuorum()) {
            sock = self.getX509Util().createSSLSocket();
        } else {
            sock = new Socket();
        }
        sock.setSoTimeout(self.tickTime * self.initLimit);
        return sock;
    }
    ...
}

二.SendAckRequestProcessor投票反馈处理器

Leader的请求处理链有个叫AckRequestProcessor的投票反馈处理器,主要负责在执行完SyncRequestProcessor处理器记录好事务日志后,向Proposal提议反馈来自Leader的ACK响应


Follower的请求处理链也有个叫SendAckRequestProcessor的投票反馈处理器,主要负责在执行完SyncRequestProcessor处理器记录好事务日志后,通过发送消息给Leader来向Proposal提议反馈来自Follower的ACK响应


Follower请求处理链的SyncRequestProcessor处理器会启动一个线程。SyncRequestProcessor处理器会先把请求添加到队列,然后由线程处理。SyncRequestProcessor的nextProcessor就是SendAckRequestProcessor请求处理器。SendAckRequestProcessor不是一个线程。

public class SendAckRequestProcessor implements RequestProcessor, Flushable {
    Learner learner;
    
    SendAckRequestProcessor(Learner peer) {
        this.learner = peer;
    }
    
    public void processRequest(Request si) {
        if (si.type != OpCode.sync) {
            QuorumPacket qp = new QuorumPacket(Leader.ACK, si.getHdr().getZxid(), null, null);
            learner.writePacket(qp, false);//向Leader发送ACK响应
            ...
        }
    }
    ...
}

public class Follower extends Learner {
    ...
    void followLeader() throws InterruptedException {
        ...
        QuorumServer leaderServer = findLeader();            
        connectToLeader(leaderServer.addr, leaderServer.hostname);
      
        long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
        syncWithLeader(newEpochZxid);                
      
        QuorumPacket qp = new QuorumPacket();
        while (this.isRunning()) {
            readPacket(qp);//读取Leader发过来的请求的输入流leaderIs
            processPacket(qp);//处理Leader发过来的请求,其中就包括Proposal提议的投票请求
        }
        ...
    }
    
    protected void processPacket(QuorumPacket qp) throws Exception{
        switch (qp.getType()) {
            case Leader.PING:            
                ping(qp);            
                break;
            //对Leader发起的Proposal提议投票进行响应
            case Leader.PROPOSAL:         
                TxnHeader hdr = new TxnHeader();
                Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
                ...
                //对Leader发起的Proposal提议投票进行响应
                //此时请求便能进入SyncRequestProcessor处理器的队列里了
                //SyncRequestProcessor线程处理完该请求,就会由SendAckRequestProcessor来发送ACK响应
                fzk.logRequest(hdr, txn);
                break;
            ...
        }
    }
    ...
}

public class FollowerZooKeeperServer extends LearnerZooKeeperServer {
    ...
    @Override
    protected void setupRequestProcessors() {
        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener());
        commitProcessor.start();
        firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
        ((FollowerRequestProcessor) firstProcessor).start();
        syncProcessor = new SyncRequestProcessor(this, new SendAckRequestProcessor((Learner)getFollower()));
        syncProcessor.start();
    }
    
    public void logRequest(TxnHeader hdr, Record txn) {
        Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid());
        if ((request.zxid & 0xffffffffL) != 0) {
            pendingTxns.add(request);
        }
        //调用SyncRequestProcessor的processRequest方法处理Proposal提议的投票响应
        syncProcessor.processRequest(request);
    }
    ...
}

public class Learner {
    protected BufferedOutputStream bufferedOutput;
    protected Socket sock;
    protected InputArchive leaderIs;
    protected OutputArchive leaderOs;
    ...
    void readPacket(QuorumPacket pp) throws IOException {
        synchronized (leaderIs) {
            //读取Leader发送过来的请求的输入流
            leaderIs.readRecord(pp, "packet");
        }
    }
    
    void writePacket(QuorumPacket pp, boolean flush) throws IOException {
        synchronized (leaderOs) {
            if (pp != null) {
                //将响应写入输出流,发送给Leader
                leaderOs.writeRecord(pp, "packet");
            }
            if (flush) {
                bufferedOutput.flush();
            }
        }
    }
    
    //和Leader建立连接时就已经初始化好输出流leaderOs了
    protected void connectToLeader(InetSocketAddress addr, String hostname) throws IOException, InterruptedException, X509Exception {
        this.sock = createSocket();
        int initLimitTime = self.tickTime * self.initLimit;
        int remainingInitLimitTime = initLimitTime;
        long startNanoTime = nanoTime();
        
        for (int tries = 0; tries < 5; tries++) {
            remainingInitLimitTime = initLimitTime - (int)((nanoTime() - startNanoTime) / 1000000);
            sockConnect(sock, addr, Math.min(self.tickTime * self.syncLimit, remainingInitLimitTime));
            if (self.isSslQuorum())  {
                ((SSLSocket) sock).startHandshake();
            }
            sock.setTcpNoDelay(nodelay);
            break;
        }
        Thread.sleep(1000);
        self.authLearner.authenticate(sock, hostname);
        //初始化好输入流leaderIs
        leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(sock.getInputStream()));
        bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
        //初始化好输出流leaderOs
        leaderOs = BinaryOutputArchive.getArchive(bufferedOutput);
    }
    ...
}

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表