1.Znode
Znode是 Zookeeper中数据的最小单元
作用: 存储数据,或者挂载子节点。
1.Znode的类型
public enum CreateMode {
PERSISTENT(0, false, false, false, false),//持久化节点
EPHEMERAL(1, true, false, false, false),//临时节点
PERSISTENT_SEQUENTIAL(2, false, true, false, false),//自增持久化节点
EPHEMERAL_SEQUENTIAL(3, true, true, false, false),//自增临时节点
CONTAINER(4, false, false, true, false), //容器节点
PERSISTENT_WITH_TTL(5, false, false, false, true), //持久化过期
PERSISTENT_SEQUENTIAL_WITH_TTL(6, false, true, false, true); // 持久化顺序过期
}
- ephemeral: 是否为临时节点(EPHEMERAL临时、PERSISTENT持久化,失去客户端连接时是否自动删除/客户端主动删除节点)
- sequential : 创建node时设置顺序标识,由父节点维护,并一直自增。
- isContainer: 是否为容器节点
- isTTL: 在过期时间内未被修改会被删除节点
2.Znode节点状态
public class Stat implements Record {
private long czxid;//节点被创建的Zxid值
private long mzxid;//节点被修改的Zxid值
private long ctime;//节点被创建的时间
private long mtime;//节点最后一次被修改的时间
private int version;//节点被修改的版本号
private int cversion;//节点的所拥有子节点被修改的版本号
private int aversion;//节点的ACL被修改的版本号
private long ephemeralOwner;//如果此节点为临时节点,那么它的值为这个节点 拥有者的会话ID;
private int dataLength;//节点数据域的长度
private int numChildren;//节点拥有的子节点个数
private long pzxid;//子节点最有一次被修改时的事务ID
}
2.watcher
- 一次性: 客户端和服务端 ,一旦watcher被触发,都会被移除
- 时效性: 在session_timeout之内,都可能保证 watches
1.watcher类型
enum WatcherType {
Children(1), //ChildWatches,znode的孩子节点发生变更触发的watch事件
Data(2), //DataWatches,基于znode节点的数据变更从而触发 watch 事件
Any(3); //任意一种类型
}
- DataWatches:
- ChildWatches
2.watcher的机制
-
注册: client向server注册watcher
public class ZooKeeper implements AutoCloseable { ZooKeeper(String connectString, int sessionTimeout, Watcher watcher);//构造函数 List<String> getChildren(final String path, Watcher watcher);//获取路径下的子节点 byte[] getData(final String path, Watcher watcher, Stat stat);//获取路径下的节点状态 Stat exists(final String path, Watcher watcher);//判断节点是否存在 }-
创建WatchRegistration对象:
WatchRegistration对象中包含了节点路径和Watcher对象
public abstract static class WatchRegistration { private Watcher watcher; private String clientPath; } -
把WatchRegistration对象封装进Packet对象中:
packet是一个最小的通信单元,用于传输信息。
static class Packet { RequestHeader requestHeader;//请求头 ReplyHeader replyHeader;// Record request;//请求数据 Record response;//响应数据 ByteBuffer bb; String clientPath;//客户端视图路径 String serverPath;//服务端视图路径 boolean finished;//是否请求结束 AsyncCallback cb;//响应回调 Object ctx;//上下文 WatchRegistration watchRegistration;//监听注册 public boolean readOnly;//是否只读 WatchDeregistration watchDeregistration; } -
发送packet对象:
把packet添加到队列中,通过SendThread发送请求。
public class ClientCnxn { public Packet queuePacket(){ packet = new Packet(); //... outgoingQueue.add(packet); sendThread.getClientCnxnSocket().packetAdded(); return packet; } }
-
-
存储: 服务端接受到请求,采用责任链的方式处理请求。
-
根据不同操作不同处理
/** * 操作类型 **/ public interface OpCode { int notification = 0;//通知 int create = 1;//创建节点 int delete = 2;//删除节点 int exists = 3;//节点是否存在 int getData = 4;//获取节点数据 int setData = 5;//保存节点数据 int getACL = 6;//获取权限 int setACL = 7;//设置权限 int getChildren = 8;//获取子节点 int sync = 9;//同步数据 int ping = 11;//ping int getChildren2 = 12;//获取子节点 int check = 13; int multi = 14; int create2 = 15; int reconfig = 16; int checkWatches = 17;//检查监听 int removeWatches = 18;//移除监听 int createContainer = 19;//创建容器 int deleteContainer = 20;//删除容器 int createTTL = 21; int multiRead = 22; int auth = 100;//认证 int setWatches = 101;//设置监听 int sasl = 102; int getEphemerals = 103; int getAllChildrenNumber = 104; int setWatches2 = 105; int addWatch = 106;//添加监听 int whoAmI = 107; int createSession = -10; int closeSession = -11; int error = -1; } -
保存监听到watchTable和watch2Paths
public class WatchManager implements IWatchManager { //路径-Watcher private final Map<String, Set<Watcher>> watchTable = new HashMap<>(); //Watcher-路径 private final Map<Watcher, Set<String>> watch2Paths = new HashMap<>(); public synchronized boolean addWatch(String path, Watcher watcher, WatcherMode watcherMode){ Set<Watcher> list = watchTable.get(path); if (list == null) { list = new HashSet<>(4); watchTable.put(path, list); } list.add(watcher); Set<String> paths = watch2Paths.get(watcher); if (paths == null) { paths = new HashSet<>(); watch2Paths.put(watcher, paths); } watcherModeManager.setWatcherMode(watcher, path, watcherMode); return paths.add(path); } } -
客户端收到服务端响应,将WatchRegistration注册到 ZKWatchManager中
public class ClientCnxn { protected void finishPacket(Packet p) { int err = p.replyHeader.getErr(); if (p.watchRegistration != null) { //注册 p.watchRegistration.register(err); } } } -
ZKWatchManager保存监听
class ZKWatchManager implements ClientWatchManager { //DataWatches private final Map<String, Set<Watcher>> dataWatches = new HashMap<>(); private final Map<String, Set<Watcher>> existWatches = new HashMap<>(); //ChildWatches private final Map<String, Set<Watcher>> childWatches = new HashMap<>(); private final Map<String, Set<Watcher>> persistentWatches = new HashMap<>(); private final Map<String, Set<Watcher>> persistentRecursiveWatches = new HashMap<>(); }
-
-
通知:
-
当发生变化时发起通知 在server端watcher触发一次就会删除
public class WatchManager implements IWatchManager { public WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet supress) { Set<Watcher> watchers = new HashSet<>(); //...从watchtable中获取watcher for (Watcher w : watchers) { if (supress != null && supress.contains(w)) { continue; } w.process(e); } } } -
发送通知给客户端
public class NIOServerCnxn extends ServerCnxn { public void process(WatchedEvent event) { ReplyHeader h = new ReplyHeader(ClientCnxn.NOTIFICATION_XID, -1L, 0); // Convert WatchedEvent to a type that can be sent over the wire WatcherEvent e = event.getWrapper(); //发送响应通知 int responseSize = sendResponse(h, e, "notification", null, null, ZooDefs.OpCode.error); ServerMetrics.getMetrics().WATCH_BYTES.add(responseSize); } } -
客户端收到通知,SendThread处理响应
class SendThread extends ZooKeeperThread { void readResponse(ByteBuffer incomingBuffer) throws IOException { ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer); BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis); ReplyHeader replyHdr = new ReplyHeader(); replyHdr.deserialize(bbia, "header"); switch (replyHdr.getXid()) { case PING_XID: return; case AUTHPACKET_XID: return; case NOTIFICATION_XID: //通知事件 //... //交由eventThread处理 eventThread.queueEvent(we); } } } -
事件处理
class EventThread extends ZooKeeperThread { private void processEvent(Object event) { if (event instanceof WatcherSetEventPair) { WatcherSetEventPair pair = (WatcherSetEventPair) event; for (Watcher watcher : pair.watchers) { try { watcher.process(pair.event); } catch (Throwable t) { LOG.error("Error while calling watcher.", t); } } } } }
-
能摸鱼就很舒服
Show Disqus Comments
扫码关注公众号:纯洁的微笑
发送 290992
即可立即永久解锁本站全部文章