Java ZooKeeper是一個分布式的開源協調服務框架,提供了可靠的分布式協調服務。ZooKeeper作為一個文件系統,具有節點分級,監控和訂閱等特性,可以在分布式環境下實現消息發布/訂閱模式等應用。
在ZooKeeper中,訂閱和發布是非常常見的用例,ZooKeeper可以作為一個中介在客戶端之間傳遞消息。下面是一個使用Java ZooKeeper實現發布/訂閱的例子:
public class Subscriber implements Watcher { private ZooKeeper zk; private String zkPath = "/test"; private Listsubscribers = new ArrayList<>(); private String nodeName; public Subscriber(String nodeName) throws IOException, KeeperException, InterruptedException { this.nodeName = nodeName; zk = new ZooKeeper("localhost:2181", 5000, this); // 創建節點,假設為test zk.create(zkPath, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); // 訂閱節點變化 zk.getChildren(zkPath, true); } @Override public void process(WatchedEvent event) { try { // 節點變化,重新訂閱 if (event.getType().equals(Event.EventType.NodeChildrenChanged)) { List children = zk.getChildren(zkPath, true); subscribers.clear(); for (String child : children) { byte[] data = zk.getData(zkPath + "/" + child, false, null); subscribers.add(new String(data)); } System.out.println(nodeName + " subscriber list updated: " + subscribers); } } catch (Exception e) { e.printStackTrace(); } } } public class Publisher { private ZooKeeper zk; private String zkPath = "/test"; public Publisher() throws IOException, KeeperException, InterruptedException { zk = new ZooKeeper("localhost:2181", 5000, null); // 創建節點,假設為test zk.create(zkPath, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } public void send(String content) throws KeeperException, InterruptedException { // 創建節點,每次內容不同 zk.create(zkPath + "/message", content.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); } } public class Main { public static void main(String[] args) throws InterruptedException, KeeperException, IOException { // 創建發布者 Publisher publisher = new Publisher(); // 創建2個訂閱者 Subscriber subscriber1 = new Subscriber("subscriber1"); Subscriber subscriber2 = new Subscriber("subscriber2"); // 發布消息 publisher.send("message1"); publisher.send("message2"); Thread.sleep(10000); // 關閉ZooKeeper連接 subscriber1.zk.close(); subscriber2.zk.close(); publisher.zk.close(); } }
執行上面的代碼,可以看到訂閱者每當收到新消息時,會更新自己的訂閱列表。
可以使用Java ZooKeeper輕松實現發布/訂閱模式,這對于分布式應用來說是非常有用的。
上一篇css 圖片拉申屬性