Giter Club home page Giter Club logo

bitnami-docker-zookeeper's Introduction

源码

基于3.7.0

类的关系

执行逻辑

QuorumPeer类从逻辑上完备代表了zookeeper集群的一个节点,可以完整实现zab协议,以及对应节点角色的消息处理流程。执行逻辑的设计上充分利用了线程,BlockingQueue

1)不同线程处理zab不同阶段的重要逻辑

  1. 通过blockingqueue作为消息总线,解耦并保证消息有序性

集群如何启动?

根据zookeeper官网dockerfile分析zookeeper-docker-3.7.0

CMD ["zkServer.sh", "start-foreground"]

那么开始从代码zookeeper-3.7.0源码查看zkServer.sh

org.apache.zookeeper.server.quorum.QuorumPeerMain

来看看QuorumPeerMain 做了什么?

main(String[] args)
	QuorumPeerMain.initializeAndRun(args)
		runFromConfig(config) // cluster mode
			QuorumPeer.start()
				loadDataBase()	// 从磁盘初始化zk内存数据
				startLeaderElection() // 开始选主...
				thread.start()
			QuorumPeer.join()
		ZooKeeperServerMain.main(args) // standalone mode
			// 无选主等过程,直接启动
		

选主逻辑?

// 启动线程,准备好选主端口tcp所需的接收和发送线程
QuorumPeer.start()
	startLeaderElection()
		createElectionAlgorithm
			fle = FastLeaderElection(this, qcm)
			fle.start()
				messenger.start()
					wsThread.start() // 选主消息发送线程
          wrThread.start() // 选主消息接收线程 *recvqueue
	thread.start()
	
// 节点循环,在looking, following, leading状态间转换
// 完成在各状态下的功能
QuorumPeer.run()
	while (running)
		case looking:
			setCurrentVote(makeLEStrategy().lookForLeader()) // 完成快速选主 *recvqueue
				sendNotifications()
		case following:
			follower.followLeader()
		case leading:
			leader.lead()

选主后如何同步数据?

怎么写磁盘?

变更皆由leader发起,由quorum协同完成,属于系统节点间通信

其核心类是SyncRequestProcessor,通过**zks.getZKDatabase().commit()**完成内存数据存盘。

对于从节点,SyncRequestProcessor.nextprocessor=SendAckRequestProcessor (FollowerZooKeeperServer配置),通过flush完成对leader的ack确认。

对于主节点SyncRequestProcessor.nextprocessor=AckRequestProcessor (LeaderZooKeeperServer配置),通过leader.processAck对自身确认。

/**
 * SyncRequestProcessor
 * 
 * This RequestProcessor logs requests to disk. It batches the requests to do
 * the io efficiently. The request is not passed to the next RequestProcessor
 * until its log has been synced to disk.
 *
 * SyncRequestProcessor is used in 3 different cases
 * 1. Leader - Sync request to disk and forward it to AckRequestProcessor which
 *             send ack back to itself.
 * 2. Follower - Sync request to disk and forward request to
 *             SendAckRequestProcessor which send the packets to leader.
 *             SendAckRequestProcessor is flushable which allow us to force
 *             push packets to leader.
 * 3. Observer - Sync committed request to disk (received as INFORM packet).
 *             It never send ack back to the leader, so the nextProcessor will
 *             be null. This change the semantic of txnlog on the observer
 *             since it only contains committed txns.
 */

通信是如何管理的?

选主通信管理

QuorumPeer
	startLeaderElection()
		createElectionAlgorithm()
			QuorumCnxManager qcm = createCnxnManager()

/**
 * QuorumCnxManager
 * 
 * This class implements a connection manager for leader election using TCP. It
 * maintains one connection for every pair of servers. The tricky part is to
 * guarantee that there is exactly one connection for every pair of servers that
 * are operating correctly and that can communicate over the network.
 */

peer-peer通信管理

# follower
followLeader()
	leaderServer = findLeader()
	connectToLeader(leaderServer.addr, leaderServer.hostname)
		learner.connectToLeader()
			new LeaderConnector(address, socket, latch)
				sock.connect()
	while(true){
	 ...
	}
# leader
lead()
	// Start thread that waits for connection requests from
  // new followers.
  cnxAcceptor = new LearnerCnxAcceptor()
  cnxAcceptor.start()
  cnxAcceptor.run()
    // 所有follower
  	serverSockets.forEach(serverSocket ->
                        executor.submit(new LearnerCnxAcceptorHandler(serverSocket, latch)))
    LearnerCnxAcceptorHandler.run()
    	acceptConnections()
    		LearnerHandler.run()
    			...
    			while(true){
    			 ...
    			}

client-server通信管理

会话,sock连接,节点主要类之间的关系:

main(String[] args)
	QuorumPeerMain.initializeAndRun(args)
		runFromConfig(config) // cluster mode
		  cnxnFactory = ServerCnxnFactory.createFactory()
		  	NIOServerCnxnFactory or NettyServerCnxnFactory
		  cnxnFactory.startup(zkServer)
		  	NIOServerCnxnFactory.startup()
		  		start()
		  		  # 线程池处理请求
		  			workerPool = new WorkerService
		  			# selector 机制io
		  			selectorThreads.start()
		  				run()
		  					handleIO(key)
		  						workerPool.schedule(workRequest)
		  							(ExecutorService)worker.execute(scheduledWorkRequest)
		  								IOWorkRequest.doWork()
		  									(NIOServerCnxn)cnxn.doIO(key)
		  										readPayload()
		  										  # session管理
		  											readConnectRequest()
		  												zkServer.processConnectRequest(this, incomingBuffer)
		  													createSession(cnxn, passwd, sessionTimeout)
		  														sessionId = sessionTracker.createSession(timeout)
		  														cnxn.setSessionId(sessionId)
		  											# 请求处理
		  											readRequest()
		  												zkServer.processPacket(this, incomingBuffer)
		  													submitRequest(si)
		  														enqueueRequest(si)
		  															RequestThrottler.submitRequest()
		  															RequestThrottler.run()
		  																zks.submitRequestNow(request)
		  																	firstProcessor.processRequest(si)
		  										handleWrite(k)

		  			acceptThread.start()
		  			expirerThread.start()
		  		ZooKeeperServer.startup()
		  			startupWithServerState(State state)
		  				startSessionTracker()
		  				setupRequestProcessors()
		  quorumPeer.setCnxnFactory(cnxnFactory)
		  ...
			QuorumPeer.start()
			...
			QuorumPeer.join()

session是如何管理和自动切换节点的?

如何snapshot?

逻辑详见SyncRequestProcessor

关键在于明确snapshot和txn 日志在恢复系统的意义

解析txn和snapshot日志

本来想tcpdump后自己解析,但是zookeeper已经提供了相关工具

// 具体数据格式定义参考
FileTxnLog
zookeeper.jute

transaction解析命令行:

./zkTxnLogToolkit.sh --dump /zoo1/log/version-2/log.200000001

snapshot解析命令行:

./zkSnapShotToolkit.sh -d /zoo1/version-2/snapshot.800000008

数据变更请求?

内部功能实现

watch机制

临时节点,序列节点实现

锁的实现

内存数据结构

延伸技术点

  • blockingqueue的实现

        /**
         * Removes a node from head of queue.
         */
        private E dequeue() {
            Node<E> h = head;
            Node<E> first = h.next;
            h.next = h; // help GC
            head = first;
            E x = first.item;
            // head永远指向第一个元素,被删除的元素变为head,且其元素为null
            first.item = null;
            return x;
        }
        
        public LinkedBlockingQueue(int capacity) {
            if (capacity <= 0) throw new IllegalArgumentException();
            this.capacity = capacity;
            last = head = new Node<E>(null);
        }
    

    关于两个条件变量和while 检查的详细解释

    Operating Systems: Three Easy Pieces-->Condition Variables

         private final Condition notEmpty = takeLock.newCondition();
         private final Condition notFull = putLock.newCondition();
         
         try {
                /*
                 * Note that count is used in wait guard even though it is
                 * not protected by lock. This works because count can
                 * only decrease at this point (all other puts are shut
                 * out by lock), and we (or some other waiting put) are
                 * signalled if it ever changes from capacity. Similarly
                 * for all other uses of count in other wait guards.
                 */
                while (count.get() == capacity) {
                    notFull.await();
                }
                enqueue(node);
                c = count.getAndIncrement();
                if (c + 1 < capacity)
                    notFull.signal();
            } finally {
                putLock.unlock();
            }
    
  • join / interupt, java 线程与操作系统线程的关系

  • java socket / nio / selector / netty

  • this.logStream = null 会发生什么?
    

bitnami-docker-zookeeper's People

Contributors

andresbono avatar anshgodara avatar carrodher avatar dani8art avatar francescsanjuanmrf avatar fsniper avatar hmr-ramzi avatar hugo19941994 avatar jbianquetti-nami avatar jonathan-m-foley avatar juan131 avatar philippfreyer avatar tompizmor avatar vikram-bitnami avatar vmwarisbood avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.