首页>>后端>>java->Zookeeper源码篇8

Zookeeper源码篇8

时间:2023-12-06 本站 点击:0

欢迎大家关注 github.com/hsfxuebao ,希望对大家有所帮助,要是觉得可以的话麻烦给点一下Star哈

1. 客户端连接源码分析

ZKClient 客户端,Curator 客户端,详情见:Zookeeper安装和客户端使用

先下结论:

Client 要创建一个连接,其首先会在本地创建一个 ZooKeeper 对象,用于表示其所连接上的 Server。连接成功后,该连接的各种临时性数据会被初始化到 zk 对象中。连接关闭后,这个代表 Server 的 zk 对象会被删除。

我们知道常用的ZK客户端技术有ZKClient 客户端,Curator 客户端,而客户端在连接ZK Server的时候,会配置集群信息,而连接集群中具体哪一台服务器,采用轮询的方式,先将集群配置信息打散,打散以后再轮询(默认情况,当然可以指定重连策略)

1.1 ZKClient源码分析:

下面是ZKClient使用的DEMO:

public class ZKClientTest {    // 指定 zk 集群    private static final String CLUSTER = "zkOS:2181";    // 指定节点名称    private static final String PATH = "/mylog";    public static void main(String[] args) {        // ---------------- 创建会话 -----------        // 创建 zkClient        ZkClient zkClient = new ZkClient(CLUSTER);        // 为 zkClient 指定序列化器        zkClient.setZkSerializer(new SerializableSerializer());        // ---------------- 创建节点 -----------        // 指定创建持久节点        CreateMode mode = CreateMode.PERSISTENT;        // 指定节点数据内容        String data = "first log";        // 创建节点        String nodeName = zkClient.create(PATH, data, mode);        ...

追踪ZKClient源码,看下是如何连接的,从ZkClient 构造开始:

public class ZkClient implements Watcher {    ...    public ZkClient(String serverstring) {        this(serverstring, Integer.MAX_VALUE);    }    public ZkClient(String zkServers, int connectionTimeout) {        //关键点 看到创建了ZkConnection对象        this(new ZkConnection(zkServers), connectionTimeout);    }    ...    //构造一直走,会走到下面该方法    public ZkClient(final IZkConnection zkConnection, final int connectionTimeout, final ZkSerializer zkSerializer, final long operationRetryTimeout) {        if (zkConnection == null) {            throw new NullPointerException("Zookeeper connection is null!");        }        //将创建的ZkConnection,赋值到_connection 成员变量上        _connection = zkConnection;        _zkSerializer = zkSerializer;        _operationRetryTimeoutInMillis = operationRetryTimeout;        _isZkSaslEnabled = isZkSaslEnabled();        connect(connectionTimeout, this);    }    public void connect(final long maxMsToWaitUntilConnected, Watcher watcher) throws ZkInterruptedException, ZkTimeoutException, IllegalStateException {        boolean started = false;        acquireEventLock();        try {            setShutdownTrigger(false);            _eventThread = new ZkEventThread(_connection.getServers());            _eventThread.start();            //调用ZkConnection.connect进行连接            _connection.connect(watcher);            LOG.debug("Awaiting connection to Zookeeper server");            boolean waitSuccessful = waitUntilConnected(maxMsToWaitUntilConnected, TimeUnit.MILLISECONDS);            if (!waitSuccessful) {                throw new ZkTimeoutException("Unable to connect to zookeeper server '" + _connection.getServers() + "' with timeout of " + maxMsToWaitUntilConnected + " ms");            }            started = true;        } finally {            getEventLock().unlock();            // we should close the zookeeper instance, otherwise it would keep            // on trying to connect            if (!started) {                close();            }        }    }}

通过上面源码追踪,看到ZKClient连接实际上是通过ZkConnection.connect方法进行连接的,我们继续追踪ZkConnection

public class ZkConnection implements IZkConnection {    ...    //关键对象ZooKeeper     private ZooKeeper _zk = null;    ...    public ZkConnection(String zkServers, int sessionTimeOut) {        _servers = zkServers;        _sessionTimeOut = sessionTimeOut;    }    @Override    public void connect(Watcher watcher) {        _zookeeperLock.lock();        try {            if (_zk != null) {                throw new IllegalStateException("zk client has already been started");            }            try {                LOG.debug("Creating new ZookKeeper instance to connect to " + _servers + ".");                //!!!可以看到实际上ZKCLient与服务端连接,靠的就是ZooKeeper对象                _zk = new ZooKeeper(_servers, _sessionTimeOut, watcher);            } catch (IOException e) {                throw new ZkException("Unable to connect to " + _servers, e);            }        } finally {            _zookeeperLock.unlock();        }    }}

1.2 Curator 源码分析:

下面是Curator使用的DEMO:

public class FluentTest {    public static void main(String[] args) throws Exception {        // ---------------- 创建会话 -----------        // 创建重试策略对象:重试间隔时间是1秒,最多重试 3 次        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);        // 创建客户端        CuratorFramework client = CuratorFrameworkFactory                                    .builder()                                    .connectString("zkOS:2181")                                    .sessionTimeoutMs(15000)                                    .connectionTimeoutMs(13000)                                    .retryPolicy(retryPolicy)                                    //namespace:根路径,所有操作都是基于该路径之上                                    .namespace("logs")                                    .build();        // 开启客户端        client.start();        ...

追踪Curator源码,看下是如何连接的,从client.start()开始:

public class CuratorFrameworkImpl implements CuratorFramework{    ...    @Override    public void start(){        log.info("Starting");        if ( !state.compareAndSet(CuratorFrameworkState.LATENT, CuratorFrameworkState.STARTED) ){            throw new IllegalStateException("Cannot be started more than once");        }        try{            ...            this.getConnectionStateListenable().addListener(listener);            client.start();            ...        }catch ( Exception e ){            ThreadUtils.checkInterrupted(e);            handleBackgroundOperationException(null, e);        }    }}

关注client.start();这个方法:

public class CuratorZookeeperClient implements Closeable{    ...    public void start() throws Exception    {        log.debug("Starting");        if ( !started.compareAndSet(false, true) )        {            throw new IllegalStateException("Already started");        }        state.start();    }    ...}

继续追踪state.start();

class ConnectionState implements Watcher, Closeable{    ...    void start() throws Exception{        log.debug("Starting");        ensembleProvider.start();        reset();    }    synchronized void reset() throws Exception{        log.debug("reset");        instanceIndex.incrementAndGet();        isConnected.set(false);        connectionStartMs = System.currentTimeMillis();        handleHolder.closeAndReset();        handleHolder.getZooKeeper();   // initiate connection    }    ...}

关键点看handleHolder.getZooKeeper()方法:

class HandleHolder{    ...    ZooKeeper getZooKeeper() throws Exception{        return (helper != null) ? helper.getZooKeeper() : null;    }    ...}class Helper{    private final Data data;    ...    ZooKeeper getZooKeeper() throws Exception{        return data.zooKeeperHandle;    }    ...}

直接从data里面取了,Hepler是什么时候创建的呢?回到org.apache.curator.ConnectionState#reset,看handleHolder.closeAndReset()方法:

class HandleHolder{    ...    void closeAndReset() throws Exception{        internalClose(0);        Helper.Data data = new Helper.Data();        helper = new Helper(data){            @Override            ZooKeeper getZooKeeper() throws Exception{                synchronized(this){                    if ( data.zooKeeperHandle == null ){                        resetConnectionString(ensembleProvider.getConnectionString());                        data.zooKeeperHandle = zookeeperFactory.newZooKeeper(data.connectionString, sessionTimeout, watcher, canBeReadOnly);                    }                    helper = new Helper(data);                    return super.getZooKeeper();                }            }        };    }    ...}

我们看下data.zooKeeperHandle到底是怎么创建的:

public class NonAdminZookeeperFactory implements ZookeeperFactory{    @Override    public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception{        return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly);    }}

可以看到,无论哪种客户端技术,最终都会在本地创建一个ZooKeeper对象,接下来我们分析ZK源码中的ZooKeeper对象

1.3 ZK源码中客户端对象ZooKeeper

我们找到ZK源码中的ZooKeeper对象代码(下面是构造代码):

public class ZkClient implements Watcher {    ...    public ZkClient(String serverstring) {        this(serverstring, Integer.MAX_VALUE);    }    public ZkClient(String zkServers, int connectionTimeout) {        //关键点 看到创建了ZkConnection对象        this(new ZkConnection(zkServers), connectionTimeout);    }    ...    //构造一直走,会走到下面该方法    public ZkClient(final IZkConnection zkConnection, final int connectionTimeout, final ZkSerializer zkSerializer, final long operationRetryTimeout) {        if (zkConnection == null) {            throw new NullPointerException("Zookeeper connection is null!");        }        //将创建的ZkConnection,赋值到_connection 成员变量上        _connection = zkConnection;        _zkSerializer = zkSerializer;        _operationRetryTimeoutInMillis = operationRetryTimeout;        _isZkSaslEnabled = isZkSaslEnabled();        connect(connectionTimeout, this);    }    public void connect(final long maxMsToWaitUntilConnected, Watcher watcher) throws ZkInterruptedException, ZkTimeoutException, IllegalStateException {        boolean started = false;        acquireEventLock();        try {            setShutdownTrigger(false);            _eventThread = new ZkEventThread(_connection.getServers());            _eventThread.start();            //调用ZkConnection.connect进行连接            _connection.connect(watcher);            LOG.debug("Awaiting connection to Zookeeper server");            boolean waitSuccessful = waitUntilConnected(maxMsToWaitUntilConnected, TimeUnit.MILLISECONDS);            if (!waitSuccessful) {                throw new ZkTimeoutException("Unable to connect to zookeeper server '" + _connection.getServers() + "' with timeout of " + maxMsToWaitUntilConnected + " ms");            }            started = true;        } finally {            getEventLock().unlock();            // we should close the zookeeper instance, otherwise it would keep            // on trying to connect            if (!started) {                close();            }        }    }}0

ConnectStringParser connectStringParser = new ConnectStringParser(connectString)创建一个zk集群字符串解析器,将解析出的ip与port构建为一个地址实例,放入到缓存集合

public class ZkClient implements Watcher {    ...    public ZkClient(String serverstring) {        this(serverstring, Integer.MAX_VALUE);    }    public ZkClient(String zkServers, int connectionTimeout) {        //关键点 看到创建了ZkConnection对象        this(new ZkConnection(zkServers), connectionTimeout);    }    ...    //构造一直走,会走到下面该方法    public ZkClient(final IZkConnection zkConnection, final int connectionTimeout, final ZkSerializer zkSerializer, final long operationRetryTimeout) {        if (zkConnection == null) {            throw new NullPointerException("Zookeeper connection is null!");        }        //将创建的ZkConnection,赋值到_connection 成员变量上        _connection = zkConnection;        _zkSerializer = zkSerializer;        _operationRetryTimeoutInMillis = operationRetryTimeout;        _isZkSaslEnabled = isZkSaslEnabled();        connect(connectionTimeout, this);    }    public void connect(final long maxMsToWaitUntilConnected, Watcher watcher) throws ZkInterruptedException, ZkTimeoutException, IllegalStateException {        boolean started = false;        acquireEventLock();        try {            setShutdownTrigger(false);            _eventThread = new ZkEventThread(_connection.getServers());            _eventThread.start();            //调用ZkConnection.connect进行连接            _connection.connect(watcher);            LOG.debug("Awaiting connection to Zookeeper server");            boolean waitSuccessful = waitUntilConnected(maxMsToWaitUntilConnected, TimeUnit.MILLISECONDS);            if (!waitSuccessful) {                throw new ZkTimeoutException("Unable to connect to zookeeper server '" + _connection.getServers() + "' with timeout of " + maxMsToWaitUntilConnected + " ms");            }            started = true;        } finally {            getEventLock().unlock();            // we should close the zookeeper instance, otherwise it would keep            // on trying to connect            if (!started) {                close();            }        }    }}1

createDefaultHostProvider(connectString)创建主机提供者,把将缓存集合中的地址打散

public class ZkClient implements Watcher {    ...    public ZkClient(String serverstring) {        this(serverstring, Integer.MAX_VALUE);    }    public ZkClient(String zkServers, int connectionTimeout) {        //关键点 看到创建了ZkConnection对象        this(new ZkConnection(zkServers), connectionTimeout);    }    ...    //构造一直走,会走到下面该方法    public ZkClient(final IZkConnection zkConnection, final int connectionTimeout, final ZkSerializer zkSerializer, final long operationRetryTimeout) {        if (zkConnection == null) {            throw new NullPointerException("Zookeeper connection is null!");        }        //将创建的ZkConnection,赋值到_connection 成员变量上        _connection = zkConnection;        _zkSerializer = zkSerializer;        _operationRetryTimeoutInMillis = operationRetryTimeout;        _isZkSaslEnabled = isZkSaslEnabled();        connect(connectionTimeout, this);    }    public void connect(final long maxMsToWaitUntilConnected, Watcher watcher) throws ZkInterruptedException, ZkTimeoutException, IllegalStateException {        boolean started = false;        acquireEventLock();        try {            setShutdownTrigger(false);            _eventThread = new ZkEventThread(_connection.getServers());            _eventThread.start();            //调用ZkConnection.connect进行连接            _connection.connect(watcher);            LOG.debug("Awaiting connection to Zookeeper server");            boolean waitSuccessful = waitUntilConnected(maxMsToWaitUntilConnected, TimeUnit.MILLISECONDS);            if (!waitSuccessful) {                throw new ZkTimeoutException("Unable to connect to zookeeper server '" + _connection.getServers() + "' with timeout of " + maxMsToWaitUntilConnected + " ms");            }            started = true;        } finally {            getEventLock().unlock();            // we should close the zookeeper instance, otherwise it would keep            // on trying to connect            if (!started) {                close();            }        }    }}2

打散的目的在于负载均衡,不然每个客户端轮询都会连上第一个

cnxn = new ClientCnxn(…);创建一个连接实例 cnxn.start();启动连接

public class ZkClient implements Watcher {    ...    public ZkClient(String serverstring) {        this(serverstring, Integer.MAX_VALUE);    }    public ZkClient(String zkServers, int connectionTimeout) {        //关键点 看到创建了ZkConnection对象        this(new ZkConnection(zkServers), connectionTimeout);    }    ...    //构造一直走,会走到下面该方法    public ZkClient(final IZkConnection zkConnection, final int connectionTimeout, final ZkSerializer zkSerializer, final long operationRetryTimeout) {        if (zkConnection == null) {            throw new NullPointerException("Zookeeper connection is null!");        }        //将创建的ZkConnection,赋值到_connection 成员变量上        _connection = zkConnection;        _zkSerializer = zkSerializer;        _operationRetryTimeoutInMillis = operationRetryTimeout;        _isZkSaslEnabled = isZkSaslEnabled();        connect(connectionTimeout, this);    }    public void connect(final long maxMsToWaitUntilConnected, Watcher watcher) throws ZkInterruptedException, ZkTimeoutException, IllegalStateException {        boolean started = false;        acquireEventLock();        try {            setShutdownTrigger(false);            _eventThread = new ZkEventThread(_connection.getServers());            _eventThread.start();            //调用ZkConnection.connect进行连接            _connection.connect(watcher);            LOG.debug("Awaiting connection to Zookeeper server");            boolean waitSuccessful = waitUntilConnected(maxMsToWaitUntilConnected, TimeUnit.MILLISECONDS);            if (!waitSuccessful) {                throw new ZkTimeoutException("Unable to connect to zookeeper server '" + _connection.getServers() + "' with timeout of " + maxMsToWaitUntilConnected + " ms");            }            started = true;        } finally {            getEventLock().unlock();            // we should close the zookeeper instance, otherwise it would keep            // on trying to connect            if (!started) {                close();            }        }    }}3

查看启动连接线程sendThread的run方法

public class ZkClient implements Watcher {    ...    public ZkClient(String serverstring) {        this(serverstring, Integer.MAX_VALUE);    }    public ZkClient(String zkServers, int connectionTimeout) {        //关键点 看到创建了ZkConnection对象        this(new ZkConnection(zkServers), connectionTimeout);    }    ...    //构造一直走,会走到下面该方法    public ZkClient(final IZkConnection zkConnection, final int connectionTimeout, final ZkSerializer zkSerializer, final long operationRetryTimeout) {        if (zkConnection == null) {            throw new NullPointerException("Zookeeper connection is null!");        }        //将创建的ZkConnection,赋值到_connection 成员变量上        _connection = zkConnection;        _zkSerializer = zkSerializer;        _operationRetryTimeoutInMillis = operationRetryTimeout;        _isZkSaslEnabled = isZkSaslEnabled();        connect(connectionTimeout, this);    }    public void connect(final long maxMsToWaitUntilConnected, Watcher watcher) throws ZkInterruptedException, ZkTimeoutException, IllegalStateException {        boolean started = false;        acquireEventLock();        try {            setShutdownTrigger(false);            _eventThread = new ZkEventThread(_connection.getServers());            _eventThread.start();            //调用ZkConnection.connect进行连接            _connection.connect(watcher);            LOG.debug("Awaiting connection to Zookeeper server");            boolean waitSuccessful = waitUntilConnected(maxMsToWaitUntilConnected, TimeUnit.MILLISECONDS);            if (!waitSuccessful) {                throw new ZkTimeoutException("Unable to connect to zookeeper server '" + _connection.getServers() + "' with timeout of " + maxMsToWaitUntilConnected + " ms");            }            started = true;        } finally {            getEventLock().unlock();            // we should close the zookeeper instance, otherwise it would keep            // on trying to connect            if (!started) {                close();            }        }    }}4

while (state.isAlive()) 判断当前连接对象是否处于激活状态

public class ZkClient implements Watcher {    ...    public ZkClient(String serverstring) {        this(serverstring, Integer.MAX_VALUE);    }    public ZkClient(String zkServers, int connectionTimeout) {        //关键点 看到创建了ZkConnection对象        this(new ZkConnection(zkServers), connectionTimeout);    }    ...    //构造一直走,会走到下面该方法    public ZkClient(final IZkConnection zkConnection, final int connectionTimeout, final ZkSerializer zkSerializer, final long operationRetryTimeout) {        if (zkConnection == null) {            throw new NullPointerException("Zookeeper connection is null!");        }        //将创建的ZkConnection,赋值到_connection 成员变量上        _connection = zkConnection;        _zkSerializer = zkSerializer;        _operationRetryTimeoutInMillis = operationRetryTimeout;        _isZkSaslEnabled = isZkSaslEnabled();        connect(connectionTimeout, this);    }    public void connect(final long maxMsToWaitUntilConnected, Watcher watcher) throws ZkInterruptedException, ZkTimeoutException, IllegalStateException {        boolean started = false;        acquireEventLock();        try {            setShutdownTrigger(false);            _eventThread = new ZkEventThread(_connection.getServers());            _eventThread.start();            //调用ZkConnection.connect进行连接            _connection.connect(watcher);            LOG.debug("Awaiting connection to Zookeeper server");            boolean waitSuccessful = waitUntilConnected(maxMsToWaitUntilConnected, TimeUnit.MILLISECONDS);            if (!waitSuccessful) {                throw new ZkTimeoutException("Unable to connect to zookeeper server '" + _connection.getServers() + "' with timeout of " + maxMsToWaitUntilConnected + " ms");            }            started = true;        } finally {            getEventLock().unlock();            // we should close the zookeeper instance, otherwise it would keep            // on trying to connect            if (!started) {                close();            }        }    }}5

serverAddress = hostProvider.next(1000);获取要连接的zkServer的地址

public class ZkClient implements Watcher {    ...    public ZkClient(String serverstring) {        this(serverstring, Integer.MAX_VALUE);    }    public ZkClient(String zkServers, int connectionTimeout) {        //关键点 看到创建了ZkConnection对象        this(new ZkConnection(zkServers), connectionTimeout);    }    ...    //构造一直走,会走到下面该方法    public ZkClient(final IZkConnection zkConnection, final int connectionTimeout, final ZkSerializer zkSerializer, final long operationRetryTimeout) {        if (zkConnection == null) {            throw new NullPointerException("Zookeeper connection is null!");        }        //将创建的ZkConnection,赋值到_connection 成员变量上        _connection = zkConnection;        _zkSerializer = zkSerializer;        _operationRetryTimeoutInMillis = operationRetryTimeout;        _isZkSaslEnabled = isZkSaslEnabled();        connect(connectionTimeout, this);    }    public void connect(final long maxMsToWaitUntilConnected, Watcher watcher) throws ZkInterruptedException, ZkTimeoutException, IllegalStateException {        boolean started = false;        acquireEventLock();        try {            setShutdownTrigger(false);            _eventThread = new ZkEventThread(_connection.getServers());            _eventThread.start();            //调用ZkConnection.connect进行连接            _connection.connect(watcher);            LOG.debug("Awaiting connection to Zookeeper server");            boolean waitSuccessful = waitUntilConnected(maxMsToWaitUntilConnected, TimeUnit.MILLISECONDS);            if (!waitSuccessful) {                throw new ZkTimeoutException("Unable to connect to zookeeper server '" + _connection.getServers() + "' with timeout of " + maxMsToWaitUntilConnected + " ms");            }            started = true;        } finally {            getEventLock().unlock();            // we should close the zookeeper instance, otherwise it would keep            // on trying to connect            if (!started) {                close();            }        }    }}6

startConnect(serverAddress);开启连接尝试(有可能连接不上,连接不上会循环获取下一个地址继续尝试连接):

public class ZkClient implements Watcher {    ...    public ZkClient(String serverstring) {        this(serverstring, Integer.MAX_VALUE);    }    public ZkClient(String zkServers, int connectionTimeout) {        //关键点 看到创建了ZkConnection对象        this(new ZkConnection(zkServers), connectionTimeout);    }    ...    //构造一直走,会走到下面该方法    public ZkClient(final IZkConnection zkConnection, final int connectionTimeout, final ZkSerializer zkSerializer, final long operationRetryTimeout) {        if (zkConnection == null) {            throw new NullPointerException("Zookeeper connection is null!");        }        //将创建的ZkConnection,赋值到_connection 成员变量上        _connection = zkConnection;        _zkSerializer = zkSerializer;        _operationRetryTimeoutInMillis = operationRetryTimeout;        _isZkSaslEnabled = isZkSaslEnabled();        connect(connectionTimeout, this);    }    public void connect(final long maxMsToWaitUntilConnected, Watcher watcher) throws ZkInterruptedException, ZkTimeoutException, IllegalStateException {        boolean started = false;        acquireEventLock();        try {            setShutdownTrigger(false);            _eventThread = new ZkEventThread(_connection.getServers());            _eventThread.start();            //调用ZkConnection.connect进行连接            _connection.connect(watcher);            LOG.debug("Awaiting connection to Zookeeper server");            boolean waitSuccessful = waitUntilConnected(maxMsToWaitUntilConnected, TimeUnit.MILLISECONDS);            if (!waitSuccessful) {                throw new ZkTimeoutException("Unable to connect to zookeeper server '" + _connection.getServers() + "' with timeout of " + maxMsToWaitUntilConnected + " ms");            }            started = true;        } finally {            getEventLock().unlock();            // we should close the zookeeper instance, otherwise it would keep            // on trying to connect            if (!started) {                close();            }        }    }}7

1.4 Zk对象创建流程图

2. 服务端连接源码分析

2.1 ZooKeeper会话理论知识

会话是 zk 中最重要的概念之一,客户端与服务端之间的任何交互操作都与会话相关。

ZooKeeper 客户端启动时,首先会与 zk 服务器建立一个 TCP 长连接。连接一旦建立,客户端会话的生命周期也就开始了。

2.1.1  会话状态

常见的会话状态有三种:

CONNECTING:连接中。Client 要创建一个连接,其首先会在本地创建一个 zk 对象,用于表示其所连接上的 Server。

CONNECTED:已连接。连接成功后,该连接的各种临时性数据会被初始化到 zk 对象中。

CLOSED:已关闭。连接关闭后,这个代表 Server 的 zk 对象会被删除。

2.1.2 会话连接超时管理—客户端维护

我们这里的会话连接超时管理指的是,客户端所发起的服务端连接时间记录,是从客户端当前会话第一次发起服务端连接的时间开始计时。

ZK是CP架构的,服务端在进行数据同步的时候是不对外提供服务的,但是这个过程是非常快的,对于客户端来说,在连接超时时间内,会一直尝试连接,直到成功,所以服务端不对外提供服务的过程,客户端是感知不到的。

2.1.3 会话连接事件

客户端与服务端的长连接失效后,客户端将进行重连。在重连过程中客户端会产生三种会话连接事件:

CONNECTION_LOSS:连接丢失

SESSION_MOVED:会话转移。若在客户端连接超时时限范围内又连接上了 Server,且连接的 Server 与之前的不是同一个(集群中的其他机器),则会发生会话转移。

SESSION_EXPIRED:会话失效。若在客户端连接超时时限范围外连接上了 Server,而该Server 中存放的该会话的 sessionId 又被 Server 给干掉了,则该会话失效。

2.1.4 会话空闲超时管理—服务端维护

会话连接超时针对客户端来说的,会话空闲超时,是针对服务端的

服务器为每一个客户端的会话都记录着上一次交互后空闲的时长,及从上一次交互结束开始会话空闲超时的时间点。一旦空闲时长超时,服务端就会将该会话的 SessionId 从服务端清除。这也就是为什么客户端在空闲时需要定时向服务端发送心跳,就是为了维护这个会话长连接的。服务器是通过空闲超时管理来判断会话是否发生中断的。

服务端对于会话空闲超时管理,采用了一种特殊的方式——分桶策略

分桶策略

分桶策略是指,将空闲超时时间相近的会话放到同一个桶中来进行管理以减少管理的复杂度。在检查超时时,只需要检查桶中剩下的会话即可,因为没有超时的会话已经被移出了桶,而桶中存在的会话就是超时的会话。

zk 对于会话空闲的超时管理并非是精确的管理,即并非是一超时马上就执行相关的超时操作。

分桶依据

分桶的计算依据为:

CurrentTime:当前时间(这是时间轴上的时间)

SessionTimeout:会话超时时间(这是一个时间范围)

ExpirationTime:当前会话下一次超时的时间点(这是时间轴上的时间)

ExpirationInterval:桶的大小(这是一个时间范围)

BucketTime:代表的是当前会话下次超时的时间点所在的桶

从以上公式可知,一个桶的大小为 ExpirationInterval 时间只要 ExpirationTime 落入到同一个桶中,系统就会对其中的会话超时进行统一管理。

2.2 服务端连接源码分析

找到ZooKeeperServer.startup方法,一但Server启动就会触发该方法

public class ZkClient implements Watcher {    ...    public ZkClient(String serverstring) {        this(serverstring, Integer.MAX_VALUE);    }    public ZkClient(String zkServers, int connectionTimeout) {        //关键点 看到创建了ZkConnection对象        this(new ZkConnection(zkServers), connectionTimeout);    }    ...    //构造一直走,会走到下面该方法    public ZkClient(final IZkConnection zkConnection, final int connectionTimeout, final ZkSerializer zkSerializer, final long operationRetryTimeout) {        if (zkConnection == null) {            throw new NullPointerException("Zookeeper connection is null!");        }        //将创建的ZkConnection,赋值到_connection 成员变量上        _connection = zkConnection;        _zkSerializer = zkSerializer;        _operationRetryTimeoutInMillis = operationRetryTimeout;        _isZkSaslEnabled = isZkSaslEnabled();        connect(connectionTimeout, this);    }    public void connect(final long maxMsToWaitUntilConnected, Watcher watcher) throws ZkInterruptedException, ZkTimeoutException, IllegalStateException {        boolean started = false;        acquireEventLock();        try {            setShutdownTrigger(false);            _eventThread = new ZkEventThread(_connection.getServers());            _eventThread.start();            //调用ZkConnection.connect进行连接            _connection.connect(watcher);            LOG.debug("Awaiting connection to Zookeeper server");            boolean waitSuccessful = waitUntilConnected(maxMsToWaitUntilConnected, TimeUnit.MILLISECONDS);            if (!waitSuccessful) {                throw new ZkTimeoutException("Unable to connect to zookeeper server '" + _connection.getServers() + "' with timeout of " + maxMsToWaitUntilConnected + " ms");            }            started = true;        } finally {            getEventLock().unlock();            // we should close the zookeeper instance, otherwise it would keep            // on trying to connect            if (!started) {                close();            }        }    }}8

createSessionTracker();创建一个sessionTracker(Session跟踪器)线程:

public class ZkClient implements Watcher {    ...    public ZkClient(String serverstring) {        this(serverstring, Integer.MAX_VALUE);    }    public ZkClient(String zkServers, int connectionTimeout) {        //关键点 看到创建了ZkConnection对象        this(new ZkConnection(zkServers), connectionTimeout);    }    ...    //构造一直走,会走到下面该方法    public ZkClient(final IZkConnection zkConnection, final int connectionTimeout, final ZkSerializer zkSerializer, final long operationRetryTimeout) {        if (zkConnection == null) {            throw new NullPointerException("Zookeeper connection is null!");        }        //将创建的ZkConnection,赋值到_connection 成员变量上        _connection = zkConnection;        _zkSerializer = zkSerializer;        _operationRetryTimeoutInMillis = operationRetryTimeout;        _isZkSaslEnabled = isZkSaslEnabled();        connect(connectionTimeout, this);    }    public void connect(final long maxMsToWaitUntilConnected, Watcher watcher) throws ZkInterruptedException, ZkTimeoutException, IllegalStateException {        boolean started = false;        acquireEventLock();        try {            setShutdownTrigger(false);            _eventThread = new ZkEventThread(_connection.getServers());            _eventThread.start();            //调用ZkConnection.connect进行连接            _connection.connect(watcher);            LOG.debug("Awaiting connection to Zookeeper server");            boolean waitSuccessful = waitUntilConnected(maxMsToWaitUntilConnected, TimeUnit.MILLISECONDS);            if (!waitSuccessful) {                throw new ZkTimeoutException("Unable to connect to zookeeper server '" + _connection.getServers() + "' with timeout of " + maxMsToWaitUntilConnected + " ms");            }            started = true;        } finally {            getEventLock().unlock();            // we should close the zookeeper instance, otherwise it would keep            // on trying to connect            if (!started) {                close();            }        }    }}9

上面updateSessionExpiry方法介绍的是在创建sessionTracker线程时调用的,其实还有很多场景都会调用该方法,比如:

我们先来看一下touchSession方法:底层也是调用的updateSessionExpiry()方法,所以调用touchSession方法也会更新会话桶

public class ZkConnection implements IZkConnection {    ...    //关键对象ZooKeeper     private ZooKeeper _zk = null;    ...    public ZkConnection(String zkServers, int sessionTimeOut) {        _servers = zkServers;        _sessionTimeOut = sessionTimeOut;    }    @Override    public void connect(Watcher watcher) {        _zookeeperLock.lock();        try {            if (_zk != null) {                throw new IllegalStateException("zk client has already been started");            }            try {                LOG.debug("Creating new ZookKeeper instance to connect to " + _servers + ".");                //!!!可以看到实际上ZKCLient与服务端连接,靠的就是ZooKeeper对象                _zk = new ZooKeeper(_servers, _sessionTimeOut, watcher);            } catch (IOException e) {                throw new ZkException("Unable to connect to " + _servers, e);            }        } finally {            _zookeeperLock.unlock();        }    }}0

我们来看一下有哪些场景也在调用touchSession方法:

会话与当前Server交互时

public class ZkConnection implements IZkConnection {    ...    //关键对象ZooKeeper     private ZooKeeper _zk = null;    ...    public ZkConnection(String zkServers, int sessionTimeOut) {        _servers = zkServers;        _sessionTimeOut = sessionTimeOut;    }    @Override    public void connect(Watcher watcher) {        _zookeeperLock.lock();        try {            if (_zk != null) {                throw new IllegalStateException("zk client has already been started");            }            try {                LOG.debug("Creating new ZookKeeper instance to connect to " + _servers + ".");                //!!!可以看到实际上ZKCLient与服务端连接,靠的就是ZooKeeper对象                _zk = new ZooKeeper(_servers, _sessionTimeOut, watcher);            } catch (IOException e) {                throw new ZkException("Unable to connect to " + _servers, e);            }        } finally {            _zookeeperLock.unlock();        }    }}1

发生会话丢失后,客户端重新发起连接请求时

public class ZkConnection implements IZkConnection {    ...    //关键对象ZooKeeper     private ZooKeeper _zk = null;    ...    public ZkConnection(String zkServers, int sessionTimeOut) {        _servers = zkServers;        _sessionTimeOut = sessionTimeOut;    }    @Override    public void connect(Watcher watcher) {        _zookeeperLock.lock();        try {            if (_zk != null) {                throw new IllegalStateException("zk client has already been started");            }            try {                LOG.debug("Creating new ZookKeeper instance to connect to " + _servers + ".");                //!!!可以看到实际上ZKCLient与服务端连接,靠的就是ZooKeeper对象                _zk = new ZooKeeper(_servers, _sessionTimeOut, watcher);            } catch (IOException e) {                throw new ZkException("Unable to connect to " + _servers, e);            }        } finally {            _zookeeperLock.unlock();        }    }}2

public void reopenSession(ServerCnxn cnxn, long sessionId, byte[] passwd, int sessionTimeout) throws IOException { // 若当前连接密码正确,则验证会话,否则关闭会话 if (checkPasswd(sessionId, passwd)) { // 验证会话 revalidateSession(cnxn, sessionId, sessionTimeout); } else { // 关闭会话 finishSessionInit(cnxn, false); } } protected void revalidateSession(ServerCnxn cnxn, long sessionId, int sessionTimeout) throws IOException { // 判断当前sessionId会话是否还有效。若有效,则将会话放入到相应的会话桶 // rc为true表示会话有效,为false表示会话不存在或已关闭(无效) boolean rc = sessionTracker.touchSession(sessionId, sessionTimeout);

public class ZkConnection implements IZkConnection {    ...    //关键对象ZooKeeper     private ZooKeeper _zk = null;    ...    public ZkConnection(String zkServers, int sessionTimeOut) {        _servers = zkServers;        _sessionTimeOut = sessionTimeOut;    }    @Override    public void connect(Watcher watcher) {        _zookeeperLock.lock();        try {            if (_zk != null) {                throw new IllegalStateException("zk client has already been started");            }            try {                LOG.debug("Creating new ZookKeeper instance to connect to " + _servers + ".");                //!!!可以看到实际上ZKCLient与服务端连接,靠的就是ZooKeeper对象                _zk = new ZooKeeper(_servers, _sessionTimeOut, watcher);            } catch (IOException e) {                throw new ZkException("Unable to connect to " + _servers, e);            }        } finally {            _zookeeperLock.unlock();        }    }}3

}

public void finishSessionInit(ServerCnxn cnxn, boolean valid) { // register with JMX try { if (valid) {  // 若会话有效,则重新注册原来的连接,即原来的连接仍是有效的 if (serverCnxnFactory != null && serverCnxnFactory.cnxns.contains(cnxn)) { serverCnxnFactory.registerConnection(cnxn); } else if (secureServerCnxnFactory != null && secureServerCnxnFactory.cnxns.contains(cnxn)) { secureServerCnxnFactory.registerConnection(cnxn); } } } catch (Exception e) { LOG.warn("Failed to register with JMX", e); }

public class ZkConnection implements IZkConnection {    ...    //关键对象ZooKeeper     private ZooKeeper _zk = null;    ...    public ZkConnection(String zkServers, int sessionTimeOut) {        _servers = zkServers;        _sessionTimeOut = sessionTimeOut;    }    @Override    public void connect(Watcher watcher) {        _zookeeperLock.lock();        try {            if (_zk != null) {                throw new IllegalStateException("zk client has already been started");            }            try {                LOG.debug("Creating new ZookKeeper instance to connect to " + _servers + ".");                //!!!可以看到实际上ZKCLient与服务端连接,靠的就是ZooKeeper对象                _zk = new ZooKeeper(_servers, _sessionTimeOut, watcher);            } catch (IOException e) {                throw new ZkException("Unable to connect to " + _servers, e);            }        } finally {            _zookeeperLock.unlock();        }    }}4

}

public class ZkConnection implements IZkConnection {    ...    //关键对象ZooKeeper     private ZooKeeper _zk = null;    ...    public ZkConnection(String zkServers, int sessionTimeOut) {        _servers = zkServers;        _sessionTimeOut = sessionTimeOut;    }    @Override    public void connect(Watcher watcher) {        _zookeeperLock.lock();        try {            if (_zk != null) {                throw new IllegalStateException("zk client has already been started");            }            try {                LOG.debug("Creating new ZookKeeper instance to connect to " + _servers + ".");                //!!!可以看到实际上ZKCLient与服务端连接,靠的就是ZooKeeper对象                _zk = new ZooKeeper(_servers, _sessionTimeOut, watcher);            } catch (IOException e) {                throw new ZkException("Unable to connect to " + _servers, e);            }        } finally {            _zookeeperLock.unlock();        }    }}5

2.3 会话管理流程图

参考文章

zookeeper3.7版本github源码注释分析## zk源码分析系列Zookeeper原理和源码学习系列\ Zookeeper学习系列\ Zookeeper源码系列

原文:https://juejin.cn/post/7100150142904303624


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:/java/15937.html