大数据之zookeeper 分布式协调核心功能 配置 / 同步 / 选举

大数据阿木 发布于 3 天前 1 次阅读


Zookeeper:分布式协调核心功能解析与代码实现

随着大数据时代的到来,分布式系统在各个领域得到了广泛应用。Zookeeper 作为 Apache 软件基金会的一个开源项目,已经成为分布式系统中不可或缺的协调服务。它提供了强大的分布式协调功能,包括配置管理、同步和选举等。本文将围绕 Zookeeper 的核心功能,通过代码示例进行详细解析。

Zookeeper 简介

Zookeeper 是一个高性能的分布式协调服务,它允许分布式应用程序协调彼此的行为。Zookeeper 提供了一个简单的原语集,如节点创建、读取、更新和删除,这些原语可以用来实现复杂的分布式协调功能。

Zookeeper 的核心数据结构是一个树形结构,称为 ZNode(Zookeeper Node)。每个 ZNode 都可以存储数据,并且可以拥有子节点。Zookeeper 的数据模型如下:


/


├── /config


│ ├── /app1


│ └── /app2


├── /sync


│ ├── /app1


│ └── /app2


└── /election


└── /leader


配置管理

配置管理是 Zookeeper 最常用的功能之一。它允许分布式系统动态地读取和更新配置信息。

1. 创建配置节点

以下是一个简单的示例,演示如何使用 Zookeeper 创建一个配置节点:

java

import org.apache.zookeeper.ZooKeeper;


import org.apache.zookeeper.CreateMode;

public class ConfigManager {


private static final String ZOOKEEPER_SERVER = "localhost:2181";


private static final String CONFIG_PATH = "/config/app1";

public static void main(String[] args) throws Exception {


ZooKeeper zookeeper = new ZooKeeper(ZOOKEEPER_SERVER, 3000, new Watcher() {


@Override


public void process(WatchedEvent watchedEvent) {


// 处理事件


}


});

String configData = "config data for app1";


String configNode = zookeeper.create(CONFIG_PATH, configData.getBytes(), ZooKeeper.CreateMode.PERSISTENT);

System.out.println("Config node created: " + configNode);


zookeeper.close();


}


}


2. 读取配置节点

以下是一个示例,演示如何读取配置节点的数据:

java

import org.apache.zookeeper.ZooKeeper;


import org.apache.zookeeper.data.Stat;

public class ConfigManager {


private static final String ZOOKEEPER_SERVER = "localhost:2181";


private static final String CONFIG_PATH = "/config/app1";

public static void main(String[] args) throws Exception {


ZooKeeper zookeeper = new ZooKeeper(ZOOKEEPER_SERVER, 3000, new Watcher() {


@Override


public void process(WatchedEvent watchedEvent) {


// 处理事件


}


});

Stat stat = new Stat();


byte[] configData = zookeeper.getData(CONFIG_PATH, false, stat);

System.out.println("Config data: " + new String(configData));


zookeeper.close();


}


}


3. 更新配置节点

以下是一个示例,演示如何更新配置节点的数据:

java

import org.apache.zookeeper.ZooKeeper;


import org.apache.zookeeper.data.Stat;

public class ConfigManager {


private static final String ZOOKEEPER_SERVER = "localhost:2181";


private static final String CONFIG_PATH = "/config/app1";

public static void main(String[] args) throws Exception {


ZooKeeper zookeeper = new ZooKeeper(ZOOKEEPER_SERVER, 3000, new Watcher() {


@Override


public void process(WatchedEvent watchedEvent) {


// 处理事件


}


});

String newConfigData = "new config data for app1";


Stat stat = new Stat();


zookeeper.setData(CONFIG_PATH, newConfigData.getBytes(), stat.getVersion());

System.out.println("Config node updated");


zookeeper.close();


}


}


同步

Zookeeper 提供了强大的同步机制,可以用于实现分布式锁、分布式队列等功能。

1. 分布式锁

以下是一个使用 Zookeeper 实现分布式锁的示例:

java

import org.apache.zookeeper.ZooKeeper;


import org.apache.zookeeper.CreateMode;


import org.apache.zookeeper.WatchedEvent;


import org.apache.zookeeper.Watcher;


import org.apache.zookeeper.ZooKeeper.States;

public class DistributedLock {


private static final String ZOOKEEPER_SERVER = "localhost:2181";


private static final String LOCK_PATH = "/lock";


private ZooKeeper zookeeper;


private String lockNode;

public DistributedLock() throws Exception {


this.zookeeper = new ZooKeeper(ZOOKEEPER_SERVER, 3000, new Watcher() {


@Override


public void process(WatchedEvent watchedEvent) {


if (watchedEvent.getState() == States.Expired) {


try {


acquireLock();


} catch (Exception e) {


e.printStackTrace();


}


}


}


});


}

public void acquireLock() throws Exception {


String lockNode = zookeeper.create(LOCK_PATH + "/lock-", new byte[0], ZooKeeper.CreateMode.EPHEMERAL_SEQUENTIAL);


System.out.println("Lock node created: " + lockNode);

List<String> children = zookeeper.getChildren(LOCK_PATH, false);


Collections.sort(children);

if (lockNode.equals(LOCK_PATH + "/" + children.get(0))) {


System.out.println("Lock acquired");


} else {


String prevNode = LOCK_PATH + "/" + children.get(Collections.binarySearch(children, lockNode.substring(LOCK_PATH.length() + 1)) - 1);


Stat stat = zookeeper.exists(prevNode, false);


if (stat != null) {


zookeeper.getData(prevNode, new Watcher() {


@Override


public void process(WatchedEvent watchedEvent) {


try {


acquireLock();


} catch (Exception e) {


e.printStackTrace();


}


}


}, stat);


}


}


}

public void releaseLock() throws Exception {


zookeeper.delete(lockNode, -1);


System.out.println("Lock released");


}

public static void main(String[] args) throws Exception {


DistributedLock lock = new DistributedLock();


lock.acquireLock();


// 执行业务逻辑


lock.releaseLock();


}


}


2. 分布式队列

以下是一个使用 Zookeeper 实现分布式队列的示例:

java

import org.apache.zookeeper.ZooKeeper;


import org.apache.zookeeper.CreateMode;


import org.apache.zookeeper.WatchedEvent;


import org.apache.zookeeper.Watcher;


import org.apache.zookeeper.ZooKeeper.States;

public class DistributedQueue {


private static final String ZOOKEEPER_SERVER = "localhost:2181";


private static final String QUEUE_PATH = "/queue";


private ZooKeeper zookeeper;

public DistributedQueue() throws Exception {


this.zookeeper = new ZooKeeper(ZOOKEEPER_SERVER, 3000, new Watcher() {


@Override


public void process(WatchedEvent watchedEvent) {


if (watchedEvent.getState() == States.Expired) {


try {


take();


} catch (Exception e) {


e.printStackTrace();


}


}


}


});


}

public void take() throws Exception {


String queueNode = zookeeper.create(QUEUE_PATH + "/queue-", new byte[0], ZooKeeper.CreateMode.EPHEMERAL_SEQUENTIAL);


System.out.println("Queue node created: " + queueNode);

List<String> children = zookeeper.getChildren(QUEUE_PATH, false);


Collections.sort(children);

if (queueNode.equals(QUEUE_PATH + "/" + children.get(0))) {


System.out.println("Queue item taken");


zookeeper.delete(queueNode, -1);


} else {


String prevNode = QUEUE_PATH + "/" + children.get(Collections.binarySearch(children, queueNode.substring(QUEUE_PATH.length() + 1)) - 1);


Stat stat = zookeeper.exists(prevNode, false);


if (stat != null) {


zookeeper.getData(prevNode, new Watcher() {


@Override


public void process(WatchedEvent watchedEvent) {


try {


take();


} catch (Exception e) {


e.printStackTrace();


}


}


}, stat);


}


}


}

public static void main(String[] args) throws Exception {


DistributedQueue queue = new DistributedQueue();


queue.take();


// 执行业务逻辑


}


}


选举

Zookeeper 提供了强大的选举机制,可以用于实现主从复制、分布式锁等功能。

1. 主从复制

以下是一个使用 Zookeeper 实现主从复制的示例:

java

import org.apache.zookeeper.ZooKeeper;


import org.apache.zookeeper.CreateMode;


import org.apache.zookeeper.WatchedEvent;


import org.apache.zookeeper.Watcher;


import org.apache.zookeeper.ZooKeeper.States;

public class MasterElection {


private static final String ZOOKEEPER_SERVER = "localhost:2181";


private static final String ELECTION_PATH = "/election";


private ZooKeeper zookeeper;


private String electionNode;

public MasterElection() throws Exception {


this.zookeeper = new ZooKeeper(ZOOKEEPER_SERVER, 3000, new Watcher() {


@Override


public void process(WatchedEvent watchedEvent) {


if (watchedEvent.getState() == States.Expired) {


try {


becomeMaster();


} catch (Exception e) {


e.printStackTrace();


}


}


}


});


}

public void becomeMaster() throws Exception {


String electionNode = zookeeper.create(ELECTION_PATH + "/master-", new byte[0], ZooKeeper.CreateMode.EPHEMERAL_SEQUENTIAL);


System.out.println("Election node created: " + electionNode);

List<String> children = zookeeper.getChildren(ELECTION_PATH, false);


Collections.sort(children);

if (electionNode.equals(ELECTION_PATH + "/" + children.get(0))) {


System.out.println("Master elected");


} else {


String prevNode = ELECTION_PATH + "/" + children.get(Collections.binarySearch(children, electionNode.substring(ELECTION_PATH.length() + 1)) - 1);


Stat stat = zookeeper.exists(prevNode, false);


if (stat != null) {


zookeeper.getData(prevNode, new Watcher() {


@Override


public void process(WatchedEvent watchedEvent) {


try {


becomeMaster();


} catch (Exception e) {


e.printStackTrace();


}


}


}, stat);


}


}


}

public static void main(String[] args) throws Exception {


MasterElection master = new MasterElection();


master.becomeMaster();


// 执行主节点业务逻辑


}


}


2. 分布式锁

在前面提到的分布式锁示例中,我们已经使用了 Zookeeper 的选举机制来实现锁的获取。

总结

Zookeeper 是一个强大的分布式协调服务,它提供了丰富的功能,如配置管理、同步和选举等。通过本文的代码示例,我们可以看到 Zookeeper 在实际应用中的强大能力。在实际项目中,我们可以根据具体需求选择合适的 Zookeeper 功能,实现高效的分布式系统。