基于3.7.0
QuorumPeer类从逻辑上完备代表了zookeeper集群的一个节点,可以完整实现zab协议,以及对应节点角色的消息处理流程。执行逻辑的设计上充分利用了线程,BlockingQueue
1)不同线程处理zab不同阶段的重要逻辑
- 通过blockingqueue作为消息总线,解耦并保证消息有序性
根据zookeeper官网dockerfile分析zookeeper-docker-3.7.0:
CMD ["zkServer.sh", "start-foreground"]
那么开始从代码zookeeper-3.7.0源码查看zkServer.sh
org.apache.zookeeper.server.quorum.QuorumPeerMain
来看看QuorumPeerMain 做了什么?
main(String[] args)
QuorumPeerMain.initializeAndRun(args)
runFromConfig(config) // cluster mode
QuorumPeer.start()
loadDataBase() // 从磁盘初始化zk内存数据
startLeaderElection() // 开始选主...
thread.start()
QuorumPeer.join()
ZooKeeperServerMain.main(args) // standalone mode
// 无选主等过程,直接启动
// 启动线程,准备好选主端口tcp所需的接收和发送线程
QuorumPeer.start()
startLeaderElection()
createElectionAlgorithm
fle = FastLeaderElection(this, qcm)
fle.start()
messenger.start()
wsThread.start() // 选主消息发送线程
wrThread.start() // 选主消息接收线程 *recvqueue
thread.start()
// 节点循环,在looking, following, leading状态间转换
// 完成在各状态下的功能
QuorumPeer.run()
while (running)
case looking:
setCurrentVote(makeLEStrategy().lookForLeader()) // 完成快速选主 *recvqueue
sendNotifications()
case following:
follower.followLeader()
case leading:
leader.lead()
变更皆由leader发起,由quorum协同完成,属于系统节点间通信
其核心类是SyncRequestProcessor,通过**zks.getZKDatabase().commit()**完成内存数据存盘。
对于从节点,SyncRequestProcessor.nextprocessor=SendAckRequestProcessor (FollowerZooKeeperServer配置),通过flush完成对leader的ack确认。
对于主节点SyncRequestProcessor.nextprocessor=AckRequestProcessor (LeaderZooKeeperServer配置),通过leader.processAck对自身确认。
/**
* SyncRequestProcessor
*
* This RequestProcessor logs requests to disk. It batches the requests to do
* the io efficiently. The request is not passed to the next RequestProcessor
* until its log has been synced to disk.
*
* SyncRequestProcessor is used in 3 different cases
* 1. Leader - Sync request to disk and forward it to AckRequestProcessor which
* send ack back to itself.
* 2. Follower - Sync request to disk and forward request to
* SendAckRequestProcessor which send the packets to leader.
* SendAckRequestProcessor is flushable which allow us to force
* push packets to leader.
* 3. Observer - Sync committed request to disk (received as INFORM packet).
* It never send ack back to the leader, so the nextProcessor will
* be null. This change the semantic of txnlog on the observer
* since it only contains committed txns.
*/
QuorumPeer
startLeaderElection()
createElectionAlgorithm()
QuorumCnxManager qcm = createCnxnManager()
/**
* QuorumCnxManager
*
* This class implements a connection manager for leader election using TCP. It
* maintains one connection for every pair of servers. The tricky part is to
* guarantee that there is exactly one connection for every pair of servers that
* are operating correctly and that can communicate over the network.
*/
# follower
followLeader()
leaderServer = findLeader()
connectToLeader(leaderServer.addr, leaderServer.hostname)
learner.connectToLeader()
new LeaderConnector(address, socket, latch)
sock.connect()
while(true){
...
}
# leader
lead()
// Start thread that waits for connection requests from
// new followers.
cnxAcceptor = new LearnerCnxAcceptor()
cnxAcceptor.start()
cnxAcceptor.run()
// 所有follower
serverSockets.forEach(serverSocket ->
executor.submit(new LearnerCnxAcceptorHandler(serverSocket, latch)))
LearnerCnxAcceptorHandler.run()
acceptConnections()
LearnerHandler.run()
...
while(true){
...
}
会话,sock连接,节点主要类之间的关系:
main(String[] args)
QuorumPeerMain.initializeAndRun(args)
runFromConfig(config) // cluster mode
cnxnFactory = ServerCnxnFactory.createFactory()
NIOServerCnxnFactory or NettyServerCnxnFactory
cnxnFactory.startup(zkServer)
NIOServerCnxnFactory.startup()
start()
# 线程池处理请求
workerPool = new WorkerService
# selector 机制io
selectorThreads.start()
run()
handleIO(key)
workerPool.schedule(workRequest)
(ExecutorService)worker.execute(scheduledWorkRequest)
IOWorkRequest.doWork()
(NIOServerCnxn)cnxn.doIO(key)
readPayload()
# session管理
readConnectRequest()
zkServer.processConnectRequest(this, incomingBuffer)
createSession(cnxn, passwd, sessionTimeout)
sessionId = sessionTracker.createSession(timeout)
cnxn.setSessionId(sessionId)
# 请求处理
readRequest()
zkServer.processPacket(this, incomingBuffer)
submitRequest(si)
enqueueRequest(si)
RequestThrottler.submitRequest()
RequestThrottler.run()
zks.submitRequestNow(request)
firstProcessor.processRequest(si)
handleWrite(k)
acceptThread.start()
expirerThread.start()
ZooKeeperServer.startup()
startupWithServerState(State state)
startSessionTracker()
setupRequestProcessors()
quorumPeer.setCnxnFactory(cnxnFactory)
...
QuorumPeer.start()
...
QuorumPeer.join()
逻辑详见SyncRequestProcessor
关键在于明确snapshot和txn 日志在恢复系统的意义
本来想tcpdump后自己解析,但是zookeeper已经提供了相关工具
// 具体数据格式定义参考
FileTxnLog
zookeeper.jute
transaction解析命令行:
./zkTxnLogToolkit.sh --dump /zoo1/log/version-2/log.200000001
snapshot解析命令行:
./zkSnapShotToolkit.sh -d /zoo1/version-2/snapshot.800000008
watch机制
临时节点,序列节点实现
锁的实现
内存数据结构
-
blockingqueue的实现
/** * Removes a node from head of queue. */ private E dequeue() { Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC head = first; E x = first.item; // head永远指向第一个元素,被删除的元素变为head,且其元素为null first.item = null; return x; } public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); }
关于两个条件变量和while 检查的详细解释
Operating Systems: Three Easy Pieces-->Condition Variables
private final Condition notEmpty = takeLock.newCondition(); private final Condition notFull = putLock.newCondition(); try { /* * Note that count is used in wait guard even though it is * not protected by lock. This works because count can * only decrease at this point (all other puts are shut * out by lock), and we (or some other waiting put) are * signalled if it ever changes from capacity. Similarly * for all other uses of count in other wait guards. */ while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); }
-
join / interupt, java 线程与操作系统线程的关系
-
java socket / nio / selector / netty
-
this.logStream = null 会发生什么?