LarryDpk
发布于 2020-08-31 / 1243 阅读
0

Java连接Zookeeper

1 前言

之前介绍了如何安装Zookeeper,可参考如何安装Zookeeper的单机模式及集群模式,官方提供了客户端工具zkCli来连接服务端。本文介绍如何通过Java程序进行连接。

2 代码

2.1 Zookeeper代码管理类

建立连接的类,主要用于创建客户端与服务端的连接:

package com.pkslow.zk;

import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

public class ZookeeperConnection {
    private ZooKeeper zooKeeper;
    CountDownLatch connectionLatch = new CountDownLatch(1);
  
    public ZooKeeper connect(String host) throws IOException, InterruptedException {
        zooKeeper = new ZooKeeper(host, 3000, watchedEvent -> {
            if (watchedEvent.getState() == Watcher.Event.KeeperState.SyncConnected) {
                connectionLatch.countDown();
            }
        });
        connectionLatch.await();
        return zooKeeper;
    }

    public void close() throws InterruptedException {
        zooKeeper.close();
    }
}

Zookeeper管理类:

package com.pkslow.zk;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.nio.charset.StandardCharsets;

public class ZookeeperManager {
    private String connectString;
    private ZooKeeper zooKeeper;
    private ZookeeperConnection zookeeperConnection;

    public ZookeeperManager(String connectString) throws IOException, InterruptedException {
        this.connectString = connectString;
        initialize();
    }

    private void initialize() throws IOException, InterruptedException {
        zookeeperConnection = new ZookeeperConnection();
        zooKeeper = zookeeperConnection.connect(connectString);
    }

    public void closeConnection() throws InterruptedException {
        zookeeperConnection.close();
    }

    public void create(String path, byte[] data)
            throws KeeperException,
            InterruptedException {
        zooKeeper.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    }

    public Object getZNodeData(String path, boolean watchFlag) throws KeeperException, InterruptedException {

        byte[] b;
        b = zooKeeper.getData(path, watchFlag, null);
        return new String(b, StandardCharsets.UTF_8);
    }

    public void update(String path, byte[] data) throws KeeperException, InterruptedException {
        int version = zooKeeper.exists(path, true).getVersion();
        zooKeeper.setData(path, data, version);
    }

    public void delete(String path) throws KeeperException, InterruptedException {
        int version = zooKeeper.exists(path, true).getVersion();
        zooKeeper.delete(path, version);
    }
}

2.2 示例测试类

注意这里的连接串为localhost:2181,localhost:2182,localhost:2183,连接的是集群,只要任意一个连接有效就可以。

package com.pkslow.zk;

public class ZookeeperSample {
    public static void main(String[] args) throws Exception {
        ZookeeperManager manager = new ZookeeperManager("localhost:2181,localhost:2182,localhost:2183");

        final String path = "/pkslow/website";

        System.out.println("delete for path: " + path);
        manager.delete(path);

        System.out.println("create the path: " + path);
        manager.create(path, "www.pkslow.com".getBytes());
        String data = (String) manager.getZNodeData(path, true);
        System.out.println("data from zookeeper: " + data);

        System.out.println("update for path: " + path);
        manager.update(path, "https://www.pkslow.com".getBytes());
        data = (String) manager.getZNodeData(path, true);
        System.out.println("data from zookeeper: " + data);

        manager.closeConnection();
    }
}

执行后输出日志为:

delete for path: /pkslow/website
create the path: /pkslow/website
data from zookeeper: www.pkslow.com
update for path: /pkslow/website
data from zookeeper: https://www.pkslow.com

3 总结

本文就是贴个Java连接Zookeeper的代码,实际生产很少直接这样用。很多情况是中间件使用Zookeeper做集群管理,如kafka