微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Zookeeper集群中leader服务器处理一次事务请求的源码解析

请求接收

zookeeper中的ServerCnxn是负责网络通信的主要实现类,该类有两个具体的实现,分别是NIOServerCnxn和NettyServerCnxn,本文以NIOServerCnxn为例入手分析。

我们从NIOServerCnxn这个类看起,正如描述所说:server会为每一个client创建一个该类的实例,负责处理来自该client的请求:

图片

接收的方法代码如下:

图片

关于SelectionKey

zookeeper使用NIO作为传输媒介,SelectionKey作为Channel的句柄,此处仅用于感知事件或操作类型。

readPayload

这里会出现两种情况:NIOServerCnxn已经完成初始化和尚未初始化(没有初始化也可以读出请求内容)。
对于未初始化时的请求,则当作创建回话的请求处理。代码如下:

图片


创建会话的请求处理

对于一个创建会话的请求,服务端会进行如下几个步骤的处理:
    1.反序列化ConnectRequest实体
    2.判断是否是readOnly客户端,如果不是只读客户端,而当前服务器又是只读服务器,则抛出异常
    3.检查客户端携带的zxid,一般情况下,服务端的zxid是一定大于客户端的。所以一旦出现客户端zxid大于服务端的时候,会抛出异常;
    4.确定一个sessionTimeout
    5.如果客户端携带了一个sessionId,重连。否则为客户端创建一个sessionId
整个流程非常清晰,源码如下

图片


关于zxid

ZooKeeper Transaction Id:每次请求对应一个唯一的zxid,如果zxid a < zxid b ,则可以保证a一定发生在b之前。


创建会话和会话管理

创建会话需要进行如下几个步骤:

    1.生成一个sessionId
    2.在sessionTracker中注册会话
    3.激活会话
    4.为客户端生成一个session密码,作为客户端请求服务端集群中不同机器的凭证
源码如下

图片

SessionTracker

上述过程中,重点在于注册会话和激活会话,这个功能由SessionTracker完成,下面我们来具体分析一下。
首先,SessionTracker是一个线程,它继承了Thread

public class SessionTrackerImpl extends Thread implements SessionTracker

run方法我们稍后再说,先记住它本身是一个线程就行了。
第二,SessionTracker内部维护了几个集合,分别是

图片

一个session生成时,它会被分别放到这三个集合中
    sessionById:key=sessionId,value=SessionImpl
    sessionsWithTimeout:key=sessionId,value=timeout
这两个集合很好理解,但两者本身是冗余的,冗余则是为了提高查找效率
    sessionSets:这个集合里存放的是所有临近时间将要过期的session集合们
比如所有t1时间附近即将过期的session集合会被放到同一个sessionSet1中,以key=t1,value=sessionSet1存入sessionSets。
理解了上述三个集合,SessionTracker的工作原理就非常清晰了。

生成session:
首先将生成的session分别放入sessionsById和sessionsWithTimeout集合中,然后执行激活touchSession。

图片


激活session:
首先通过roundToInterval方法协商出一个近似的过期时间,这可以保证所有拥有近似timeout的session被分到同一个桶中。
如果不是首次激活,需要把session从旧桶中移除。
然后再放到新的过期桶中。

synchronized public boolean touchSession(long sessionId, int timeout) {
        if (LOG.isTraceEnabled()) {
            ZooTrace.logTraceMessage(LOG,ZooTrace.CLIENT_PING_TRACE_MASK,"SessionTrackerImpl --- Touch session: 0x"+ Long.toHexString(sessionId) + " with timeout " + timeout);
        }
        SessionImpl s = sessionsById.get(sessionId);
        // Return false, if the session doesn't exists or marked as closing
        if (s == null || s.isClosing()) {
            return false;
        }
        long expireTime = roundToInterval(System.currentTimeMillis() + timeout);
        if (s.tickTime >= expireTime) {
            // nothing needs to be done
            return true;
        }
        SessionSet set = sessionSets.get(s.tickTime);
        if (set != null) {
            set.sessions.remove(s);
        }
        s.tickTime = expireTime;
        set = sessionSets.get(s.tickTime);
        if (set == null) {
            set = new SessionSet();
            sessionSets.put(expireTime, set);
        }
        set.sessions.add(s);
        return true;
    }


session失效:
按照时间索引找到失效桶,然后把里边所有的session做失效处理,并重复执行以上步骤。

图片


创建会话流程结束

如果是创建会话的请求,流程到这里就全部结束了。NIOServerCnxn完成了自身的初始化,并且为客户端创建了会话。
客户端的下一次请求会进入readRequest分支

if (!initialized) {
    readConnectRequest();
} else {
    readRequest();
}


事务请求的接收

下面我们通过一个修改节点的请求为例,说明事务请求的处理流程。

从ZookeeperServer.processpacket方法看起
首先,还是先反序列化一个RequestHeader对象(注意这里是RequestHeader,而processConnectRequest则是反序列化一个ConnectRequest),
然后会根据请求类型不同走向不同的分支(具体类型在ZooDefs.OpCode有定义,在此不一一列举),这里只分析一个普通的更新事务流程。
请求会进入构造一个普通的Request对象,然后进入submitRequest(Request si)方法。这个方法下边细说,先看processpacket的最后一步:

cnxn.incrOutstandingRequests(h)

这个方法的源码如下:

protected void incrOutstandingRequests(RequestHeader h) {
        if (h.getXid() >= 0) {
            synchronized (this) {
                outstandingRequests++;
            }
            synchronized (this.factory) {
                // check throttling
                if (zkServer.getInProcess() > outstandingLimit) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Throttling recv " + zkServer.getInProcess());
                    }
                    disableRecv();
                    // following lines should not be needed since we are
                    // already reading
                    // } else {
                    // enableRecv();
                }
            }
        }

    }

这个方法的意思是:给“处理中的请求”计数器++,然后判断处理中的请求数量是否达到了配置上限outstandingLimit,
如果超过了配置阈值,则执行disableRecv();顾名思义,拒绝接收新的请求,具体的处理方式是通过SelectionKey关闭对应的Channel。
这里顺便说一下,outstandingLimit参数是针对单台服务器而言,设定太大可能会导致内存溢出。
一个请求被处理完时,会再次检测这个值,如果满足要求,会执行enableRecv();重新打开通道。
具体代码在sendResponse(ReplyHeader h, Record r, String tag)方法的最后:

if (h.getXid() > 0) {
                synchronized(this){
                    outstandingRequests--;
                }
                // check throttling
                synchronized (this.factory) {
                    if (zkServer.getInProcess() < outstandingLimit
                            || outstandingRequests < 1) {
                        sk.selector().wakeup();
                        enableRecv();
                    }
                }
            }


leader服务器的处理责任链

对于不同的Zookeeper服务器角色,请求处理责任链是不一样的,下面以leader服务器为例分析。
leader服务器责任链构造如下:

@Override protected void setupRequestProcessors() {
        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        RequestProcessor toBeAppliedProcessor = new leader.ToBeAppliedRequestProcessor(
                finalProcessor, getleader().toBeApplied);
        commitProcessor = new CommitProcessor(toBeAppliedProcessor,
                Long.toString(getServerId()), false);
        commitProcessor.start();
        ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this,
                commitProcessor);
        proposalProcessor.initialize();
        firstProcessor = new PrepRequestProcessor(this, proposalProcessor);
        ((PrepRequestProcessor)firstProcessor).start();
    }

在把请求交给责任链之前,leader本身会做一些处理,具体处理代码在ZookeeperServer.submitRequest(Request si)方法中,这个方法一共干了三件事:

    1.校验服务器是否正常启动
    2.激活session
    3.把请求交给责任链的第一环:firstProcessor,即PrepRequestProcessor。
代码如下:

public void submitRequest(Request si) {
        if (firstProcessor == null) {
            synchronized (this) {
                try {
                    while (!running) {
                        wait(1000);
                    }
                } catch (InterruptedException e) {
                    LOG.warn("Unexpected interruption", e);
                }
                if (firstProcessor == null) {
                    throw new RuntimeException("Not started");
                }
            }
        }
        try {
            touch(si.cnxn);
            boolean validpacket = Request.isValid(si.type);
            if (validpacket) {
                firstProcessor.proce***equest(si);
                if (si.cnxn != null) {
                    incInProcess();
                }
            } else {
                LOG.warn("Dropping packet at server of type " + si.type);
                // if invalid packet drop the packet.
            }
        } catch (MissingSessionException e) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Dropping request: " + e.getMessage());
            }
        } catch (RequestProcessorException e) {
            LOG.error("Unable to process request:" + e.getMessage(), e);
        }
    }

下面来依次分析这些Processor。


预处理:PrepRequestProcessor

大部分RequestProcessor的结构和工作方式都很类似,现在这里统一说明一下
它们都会在内部维护一个待处理请求的队列:

LinkedBlockingQueue<Request> submittedRequests = new LinkedBlockingQueue<Request>();

RequestProcessor们本身是一个线程,会不停地从自己的队列里取出待处理的Request,处理,然后交给nextProcessor。
可以看到,proce***equest的方法非常简单,只做了一件事,就是把Request放入队列,然后及时释放线程。

public void proce***equest(Request request) {
        // request.addRQRec(">prep="+zks.outstandingChanges.size());
        submittedRequests.add(request);
    }

回到PrepRequestProcessor,看看它的run方法

从队列里取出一个Request,交给pRequest方法执行

    @Override
    public void run() {
        try {
            while (true) {
                Request request = submittedRequests.take();
                long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
                if (request.type == OpCode.ping) {
                    traceMask = ZooTrace.CLIENT_PING_TRACE_MASK;
                }
                if (LOG.isTraceEnabled()) {
                    ZooTrace.logRequest(LOG, traceMask, 'P', request, "");
                }
                if (Request.requestOfDeath == request) {
                    break;
                }
                pRequest(request);
            }

以setDate请求为例,最终会调用pRequest2Txn方法

pRequest2Txn方法中干了这么几件事:

    1.session校验;
    2.反序列化SetDataRequest,从而得到节点和路径;
    3.校验节点的权限;
    4.校验版本号,版本号+1;
    5.拷贝出一份目标节点的副本,加入到outstandingChanges和outstandingChangesForPath中。

case OpCode.setData:
    zks.sessionTracker.checkSession(request.sessionId, request.getowner());
    SetDataRequest setDataRequest = (SetDataRequest)record; if(deserialize)
      ByteBufferInputStream.byteBuffer2Record(request.request, setDataRequest);
    path = setDataRequest.getPath();
    nodeRecord = getRecordForPath(path);
    checkACL(zks, nodeRecord.acl, ZooDefs.Perms.WRITE,request.authInfo);
    version = setDataRequest.getVersion(); int currentVersion = nodeRecord.stat.getVersion(); if (version != -1 && version != currentVersion) { throw new KeeperException.BadVersionException(path);
    }
    version = currentVersion + 1;
    request.txn = new SetDataTxn(path, setDataRequest.getData(), version);
    nodeRecord = nodeRecord.duplicate(request.hdr.getZxid());
    nodeRecord.stat.setVersion(version);
    addChangeRecord(nodeRecord); break;

上述几件是做完以后,作为一个setData类型的请求,PrepRequestProcessor的职责就算完成了(注意此时并没有对目标节点进行真正的修改),接下来的工作会交给ProposalRequestProcessor 处理。


下面我们分析一下为什么需要把待更改的节点放到outstandingChanges和outstandingChangesForPath中,这需要从事务和回滚说起。

事务的预处理
先看一下预处理阶段如何处理事务:
    1.首先取出统一批次的事务;
    2.检查事务涉及修改哪些节点,把这些节点备份出来,回滚时使用(HashMap<String, ChangeRecord> pendingChanges = getPendingChanges(multiRequest););
    3.将事务中的Request取出来依次执行预处理,当某一个Request预处理失败时,后边的Request都不处理了;
    4.如果预处理失败,将之前执行预处理成功的Request进行回滚。
源码如下:

图片


事务回滚
从备份的节点中找到事务修改的节点们,进行还原。步骤如下:
    1.取得所有已改变的节点集合的最后一个,向前遍历(这个集合里可能会存在非待会滚的事务修改的节点);
    2.如果待修改节点的事务编号等于待会滚事务的编号,将该节点直接从集合中移除;
    3.遇到第一个非本事务修改的节点时,跳出遍历;
    4.如果待修改节点的集合不为空(说明有其他修改节点的请求),并且备份的节点事务编号>待修改节点的事务编号,使用备份进行还原。
源码如下:

图片

为了满足以上第4条,需要在备份的时候备份到最新的节点事务编号。
保证的方式很简单,直接从outstandingChanges里取最后一个就可以了(这就是为什么一个普通修改节点的请求,也会把待修改的节点放到outstandingChanges中)。
最后来看一下备份的代码

/**
     * Grab current pending change records for each op in a multi-op.
     *
     * This is used inside MultiOp error code path to rollback in the event
     * of a Failed multi-op.
     *
     * @param multiRequest
     */ HashMap<String, ChangeRecord> getPendingChanges(MultiTransactionRecord multiRequest) {
    	HashMap<String, ChangeRecord> pendingChangeRecords = new HashMap<String, ChangeRecord>(); for(Op op: multiRequest) {
    		String path = op.getPath(); try {
    			ChangeRecord cr = getRecordForPath(path); if (cr != null) {
    				pendingChangeRecords.put(path, cr);
    			}
    		} catch (KeeperException.NoNodeException e) { // ignore this one }
    	} return pendingChangeRecords;
    }


ChangeRecord getRecordForPath(String path) throws KeeperException.NoNodeException {
        ChangeRecord lastChange = null; synchronized (zks.outstandingChanges) {
            lastChange = zks.outstandingChangesForPath.get(path); /*
            for (int i = 0; i < zks.outstandingChanges.size(); i++) {
                ChangeRecord c = zks.outstandingChanges.get(i);
                if (c.path.equals(path)) {
                    lastChange = c;
                }
            }
            */ if (lastChange == null) {
                Datanode n = zks.getZKDatabase().getNode(path); if (n != null) {
                    Long acl;
                    Set<String> children; synchronized(n) {
                        acl = n.acl;
                        children = n.getChildren();
                    }
                    lastChange = new ChangeRecord(-1, path, n.stat,
                        children != null ? children.size() : 0,
                            zks.getZKDatabase().convertLong(acl));
                }
            }
        } if (lastChange == null || lastChange.stat == null) { throw new KeeperException.NoNodeException(path);
        } return lastChange;
    }


ProposalRequestProcessor

ProposalRequestProcessor的功能相对简单。
它会先将Request交给下一个Processor,然后在做自己的事,包括
    1.将请求封装成一个Proposal(提案),发送给所有的Follower;
    2.调用SyncRequestProcessor记录日志和快照。
代码如下:

    public void proce***equest(Request request) throws RequestProcessorException {
        // LOG.warn("Ack>>> cxid = " + request.cxid + " type = " +
        // request.type + " id = " + request.sessionId);
        // request.addRQRec(">prop");


        /* In the following IF-THEN-ELSE block, we process syncs on the leader.
         * If the sync is coming from a follower, then the follower
         * handler adds it to syncHandler. Otherwise, if it is a client of
         * the leader that issued the sync command, then syncHandler won't
         * contain the handler. In this case, we add it to syncHandler, and
         * call proce***equest on the next processor.
         */

        if(request instanceof LearnerSyncRequest){
            zks.getleader().processSync((LearnerSyncRequest)request);
        } else {
                nextProcessor.proce***equest(request);
            if (request.hdr != null) {
                // We need to sync and get consensus on any transactions
                try {
                    zks.getleader().propose(request);
                } catch (XidRolloverException e) {
                    throw new RequestProcessorException(e.getMessage(), e);
                }
                syncProcessor.proce***equest(request);
            }
        }
    }

SyncRequestProcessor

SyncRequestProcessor和PrepRequestProcessor类似,也是线程+队列的处理方式,这里就不赘述了。
为了便于理解,在介绍这个类之前先简单补充一下:


Zookeeper的数据存储方式
Zookeeper在启动时会把数据加载到内存数据库中,并通过2种方式向磁盘刷写数据:
    1.当某一事物提交时,会记录一条log,写入到对应的log文件中。(log文件是多个,并且以该log中记录的第一条事务id命名,比如第1条log和第99条log都会记录在log.1文件中,第100条log会记录在log.100文件中);
    2.当1重复进行一定次数后,服务器会把内存数据库的数据全量dump到磁盘上,这一过程称为快照(snapshot);
    3.服务器最多保存snapCount条未进行快照的log,每当接到请求时,logCount++,如果logCount>(snapCount / 2 + randRoll),进行快照。这说明当log累积超过配置数量的一半时,才有可能进行快照;
    4.随机数randRoll保证了所有server不会在同一时间进行快照操作。


BlockingQueue的take()和poll()的区别
使用take()函数,如果队列中没有数据,则线程wait释放cpu,而poll()则不会等待,直接返回null


SyncRequestProcessor的工作流程
线程的run方法简要流程如下:
    1.生成一个随机数;
    2.如果待写入队列(toFlush)是空的(说明当前写请求较少,线程比较闲),尝试从待处理队列中获取一个Request(如果没有获取到,说明线程太闲了,直接wait挂起);
    3.如果待写入队列不为空,但是待处理队列是空,则处理写入;
    4.如果待写入队列是空的,待处理队列不是空的(成功获取到了一个Request),并且是写请求,记录一条log(注意,此时只是appeng,还没有commit);

zks.getZKDatabase().append(si)

    5.如果满足了快照条件,记录快照,同时清空等待快照的log数量
    6.如果是读请求,直接放给下一个Processor处理;
    7.把Request加入到toFlush队列,如果toFlush.size() > 1000,执行flish。
代码如下:

图片

flush

正式提交的flush方法流程:
    1.如果队列为空,返回;
    2.将之前append的log正式commit(没错,只是log);
    3.从队列中移除Request,并交给下一个Processor处理(CommitRequestProcessor)。
代码如下:

    private void flush(LinkedList<Request> toFlush)
        throws IOException, RequestProcessorException
    {
        if (toFlush.isEmpty())
            return;

        zks.getZKDatabase().commit();
        while (!toFlush.isEmpty()) {
            Request i = toFlush.remove();
            nextProcessor.proce***equest(i);
        }
        if (nextProcessor instanceof Flushable) {
            ((Flushable)nextProcessor).flush();
        }
    }


CommitProcessor

在ProposalRequestProcessor中,广播的Proposal被Follower服务器收到后,会给leader服务器一个反馈(ack)。
CommitProcessor中有两个队列,分别是leader服务器自己从上游Processor传来的Request,和Follower服务器发来的通过提案的Request。

    /**
     * Requests that we are holding until the commit comes in.
     */ LinkedList<Request> queuedRequests = new LinkedList<Request>(); /**
     * Requests that have been committed.
     */ LinkedList<Request> committedRequests = new LinkedList<Request>();

CommitProcessor的功能就是将这两个队列中的Request进行配对(相当于投票通过的Request),然后交给下一个Processor处理。

图片


leader.ToBeAppliedRequestProcessor

Request走到这里,说明leader服务器和Follower服务器已经接受了这个提案,但是并没有正式生效。
ToBeAppliedRequestProcessor是提案正式生效前的最后一个Processor,如果不是,构造函数会抛出异常。

        /**
         * This request processor simply maintains the toBeApplied list. For
         * this to work next must be a FinalRequestProcessor and
         * FinalRequestProcessor.proce***equest MUST process the request
         * synchronously!
         * 
         * @param next
         *                a reference to the FinalRequestProcessor
         */ ToBeAppliedRequestProcessor(RequestProcessor next,
                ConcurrentLinkedQueue<Proposal> toBeApplied) { if (!(next instanceof FinalRequestProcessor)) { throw new RuntimeException(ToBeAppliedRequestProcessor.class
                        .getName()
                        + " must be connected to " + FinalRequestProcessor.class.getName()
                        + " not " + next.getClass().getName());
            } this.toBeApplied = toBeApplied; this.next = next;
        }

ToBeAppliedRequestProcessor的工作也非常简单:在正式生效之前,leader还需要最后校验一遍Request的顺序,防止顺序错乱。

        public void proce***equest(Request request) throws RequestProcessorException {
            // request.addRQRec(">tobe");
            next.proce***equest(request);
            Proposal p = toBeApplied.peek();
            if (p != null && p.request != null
                    && p.request.zxid == request.zxid) {
                toBeApplied.remove();
            }
        }

之后,请求就会走到最后一个Processor,正式生效。


FinalRequestProcessor

FinalRequestProcessor是责任链的最后一个Processor,结构很简单,没有队列和内部线程,只有一个长长的proce***equest方法
下面就来进行分段分析。
先看这段同步代码块:
    1.从头开始遍历outstandingChanges队列,移除所有事务id小于当前请求的事务id(这些是已经处理过的过期的ChangeRecord);
    2.从Request中获取请求头和事务,处理事务(下边详细说);
    3.如果是一个事务请求,执行addCommittedProposal(添加一个已提交的提案)方法(下边详细说)。

        synchronized (zks.outstandingChanges) { while (!zks.outstandingChanges.isEmpty()
                    && zks.outstandingChanges.get(0).zxid <= request.zxid) {
                ChangeRecord cr = zks.outstandingChanges.remove(0); if (cr.zxid < request.zxid) {
                    LOG.warn("Zxid outstanding " + cr.zxid
                            + " is less than current " + request.zxid);
                } if (zks.outstandingChangesForPath.get(cr.path) == cr) {
                    zks.outstandingChangesForPath.remove(cr.path);
                }
            } if (request.hdr != null) {
               TxnHeader hdr = request.hdr;
               Record txn = request.txn;

               rc = zks.processtxn(hdr, txn);
            } // do not add non quorum packets to the queue. if (Request.isQuorum(request.type)) {
                zks.getZKDatabase().addCommittedProposal(request);
            }
        }


DataTree.processtxn
zks.processtxn(hdr, txn);最终会调用到DataTree.processtxn()方法,过程很简单:
    1.更改节点;
    2.更新最新事务id。
这里就不贴代码


addCommittedProposal
这个方法主要是向committedLog里插入一条操作日志。
committedLog是一个存在于内存中的事务日志队列:

protected LinkedList<Proposal> committedLog = new LinkedList<Proposal>();

存入过程很简单,这里就不分析了,需要说明一点的是:如果超过的队列限制,会移除第一个,然后追加在最后。
具体代码如下:

public void addCommittedProposal(Request request) {
        WriteLock wl = logLock.writeLock();
        try {
            wl.lock();
            if (committedLog.size() > commitLogCount) {
                committedLog.removeFirst();
                minCommittedLog = committedLog.getFirst().packet.getZxid();
            }
            if (committedLog.size() == 0) {
                minCommittedLog = request.zxid;
                maxCommittedLog = request.zxid;
            }

            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
            try {
                request.hdr.serialize(boa, "hdr");
                if (request.txn != null) {
                    request.txn.serialize(boa, "txn");
                }
                baos.close();
            } catch (IOException e) {
                LOG.error("This really should be impossible", e);
            }
            QuorumPacket pp = new QuorumPacket(leader.PROPOSAL, request.zxid,
                    baos.toByteArray(), null);
            Proposal p = new Proposal();
            p.packet = pp;
            p.request = request;
            committedLog.add(p);
            maxCommittedLog = p.packet.getZxid();
        } finally {
            wl.unlock();
        }
    }


SyncRequestProcessor的log和FinalRequestProcessor的log的区别在哪里
这是一个很容易弄混的问题,在这里特别说明一下:
SyncRequestProcessor的log是一个FileTxnSnapLog,这个log用于定期向磁盘刷写数据。
这个log队列记录的事务编号一般来说会大于快照记录的事务编号,所以在服务器启动时,需要从快照和这个log两部分读取恢复数据。
FinalRequestProcessor的log是一个LinkedList<Proposal>,它只存在于内存中,不会向磁盘刷写数据。并且它有容量限制,每次插入最新的,移除最老的(被移除的就真的被移除了,并不会像FileTxnSnapLog一样写入到磁盘上)。
它的作用是当一个事务被通过时,Zookeeper集群里的其他Follower学习时,从这里获取数据。


返回响应
最后,服务器会更新一下处理时间,状态,然后通过ServerCnxn.sendResponse方法将响应发送给请求方。
至此,一次事务请求的处理流程结束。

        long lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid();
        ReplyHeader hdr =
            new ReplyHeader(request.cxid, lastZxid, err.intValue());

        zks.serverStats().updateLatency(request.createTime);
        cnxn.updateStatsForResponse(request.cxid, lastZxid, lastOp,
                    request.createTime, System.currentTimeMillis());

        try {
            cnxn.sendResponse(hdr, rsp, "response");
            if (closeSession) {
                cnxn.sendCloseSession();
            }
        } catch (IOException e) {
            LOG.error("FIxmsG",e);
        }


版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐