# Java连接Zookeeper
# 1 前言
之前介绍了如何安装Zookeeper
,可参考如何安装Zookeeper的单机模式及集群模式 (opens new window),官方提供了客户端工具zkCli
来连接服务端。本文介绍如何通过Java
程序进行连接。
# 2 代码
# 2.1 Zookeeper代码管理类
建立连接的类,主要用于创建客户端与服务端的连接:
package com.pkslow.zk;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
public class ZookeeperConnection {
private ZooKeeper zooKeeper;
CountDownLatch connectionLatch = new CountDownLatch(1);
public ZooKeeper connect(String host) throws IOException, InterruptedException {
zooKeeper = new ZooKeeper(host, 3000, watchedEvent -> {
if (watchedEvent.getState() == Watcher.Event.KeeperState.SyncConnected) {
connectionLatch.countDown();
}
});
connectionLatch.await();
return zooKeeper;
}
public void close() throws InterruptedException {
zooKeeper.close();
}
}
Zookeeper
管理类:
package com.pkslow.zk;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
public class ZookeeperManager {
private String connectString;
private ZooKeeper zooKeeper;
private ZookeeperConnection zookeeperConnection;
public ZookeeperManager(String connectString) throws IOException, InterruptedException {
this.connectString = connectString;
initialize();
}
private void initialize() throws IOException, InterruptedException {
zookeeperConnection = new ZookeeperConnection();
zooKeeper = zookeeperConnection.connect(connectString);
}
public void closeConnection() throws InterruptedException {
zookeeperConnection.close();
}
public void create(String path, byte[] data)
throws KeeperException,
InterruptedException {
zooKeeper.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
public Object getZNodeData(String path, boolean watchFlag) throws KeeperException, InterruptedException {
byte[] b;
b = zooKeeper.getData(path, watchFlag, null);
return new String(b, StandardCharsets.UTF_8);
}
public void update(String path, byte[] data) throws KeeperException, InterruptedException {
int version = zooKeeper.exists(path, true).getVersion();
zooKeeper.setData(path, data, version);
}
public void delete(String path) throws KeeperException, InterruptedException {
int version = zooKeeper.exists(path, true).getVersion();
zooKeeper.delete(path, version);
}
}
# 2.2 示例测试类
注意这里的连接串为localhost:2181,localhost:2182,localhost:2183
,连接的是集群,只要任意一个连接有效就可以。
package com.pkslow.zk;
public class ZookeeperSample {
public static void main(String[] args) throws Exception {
ZookeeperManager manager = new ZookeeperManager("localhost:2181,localhost:2182,localhost:2183");
final String path = "/pkslow/website";
System.out.println("delete for path: " + path);
manager.delete(path);
System.out.println("create the path: " + path);
manager.create(path, "www.pkslow.com".getBytes());
String data = (String) manager.getZNodeData(path, true);
System.out.println("data from zookeeper: " + data);
System.out.println("update for path: " + path);
manager.update(path, "https://www.pkslow.com".getBytes());
data = (String) manager.getZNodeData(path, true);
System.out.println("data from zookeeper: " + data);
manager.closeConnection();
}
}
执行后输出日志为:
delete for path: /pkslow/website
create the path: /pkslow/website
data from zookeeper: www.pkslow.com
update for path: /pkslow/website
data from zookeeper: https://www.pkslow.com
# 3 总结
本文就是贴个Java
连接Zookeeper
的代码,实际生产很少直接这样用。很多情况是中间件使用Zookeeper
做集群管理,如kafka
。