掃二維碼與項目經(jīng)理溝通
我們在微信上24小時期待你的聲音
解答本文疑問/技術(shù)咨詢/運營咨詢/技術(shù)建議/互聯(lián)網(wǎng)交流
Zookeeper(后續(xù)簡稱ZK)是一個分布式的,開放源碼的分布式應(yīng)用程序協(xié)調(diào)服務(wù),通常以集群模式運轉(zhuǎn),其協(xié)調(diào)能力可以理解為是基于觀察者設(shè)計模式來實現(xiàn)的;ZK服務(wù)會使用Znode存儲使用者的數(shù)據(jù),并將這些數(shù)據(jù)以樹形目錄的形式來組織管理,支持使用者以觀察者的角色指定自己關(guān)注哪些節(jié)點\數(shù)據(jù)的變更,當(dāng)這些變更發(fā)生時,ZK會通知其觀察者;為滿足本篇目標(biāo)所需,著重介紹以下幾個關(guān)鍵特性:

創(chuàng)新互聯(lián)是專業(yè)的鳳岡網(wǎng)站建設(shè)公司,鳳岡接單;提供成都網(wǎng)站建設(shè)、網(wǎng)站建設(shè),網(wǎng)頁設(shè)計,網(wǎng)站設(shè)計,建網(wǎng)站,PHP網(wǎng)站建設(shè)等專業(yè)做網(wǎng)站服務(wù);采用PHP框架,可快速的進行鳳岡網(wǎng)站開發(fā)網(wǎng)頁制作和功能擴展;專業(yè)做搜索引擎喜愛的網(wǎng)站,專業(yè)的做網(wǎng)站團隊,希望更多企業(yè)前來合作!
ZooKeeper's Hierarchical Namespace
ZooKeeper Service
監(jiān)聽機制:給某個節(jié)點注冊監(jiān)聽器,該節(jié)點一旦發(fā)生變更(例如更新或者刪除),監(jiān)聽者就會收到一個Watch Event,可以感知到節(jié)點\數(shù)據(jù)的變更
臨時節(jié)點:session鏈接斷開臨時節(jié)點就沒了,不能創(chuàng)建子節(jié)點(很關(guān)鍵)
ZK的分布式鎖正是基于以上特性來實現(xiàn)的,簡單來說是:
可能讀者是單篇閱讀,這里引入上一篇《分布式鎖上-初探》中的一些內(nèi)容,一個分布式鎖應(yīng)具備這樣一些功能特點:
基于上文的內(nèi)容,這里簡單總結(jié)一下ZK的能力矩陣(其它分布式鎖的情況會在后續(xù)文章中補充):
|
能力 |
ZK |
MySql |
Redis原生 |
Redlock |
ETCD |
|
互斥 |
是 |
||||
|
安全 |
鏈接異常,session關(guān)閉后鎖會自動釋放 |
||||
|
可用性 |
相對還好 |
||||
|
可重入 |
線程可重入 |
||||
|
加解鎖速度 |
居中 |
||||
|
阻塞非阻塞 |
都支持 |
||||
|
公平非公平 |
僅公平鎖 |
關(guān)于性能不太高的一種說法
因為每次在創(chuàng)建鎖和釋放鎖的過程中,都要動態(tài)創(chuàng)建、銷毀臨時節(jié)點來實現(xiàn)鎖功能。ZK中創(chuàng)建和刪除節(jié)點只能通過Leader服務(wù)器來執(zhí)行,然后Leader服務(wù)器還需要將數(shù)據(jù)同步到所有的Follower機器上,這樣頻繁的網(wǎng)絡(luò)通信,性能的短板是非常突出的。在高性能,高并發(fā)的場景下,不建議使用ZooKeeper的分布式鎖。
由于ZooKeeper的高可用特性,在并發(fā)量不是太高的場景,也推薦使用ZK的分布式鎖。
Zookeeper 客戶端框架 Curator 提供的 InterProcessMutex 是分布式鎖的一種實現(xiàn),acquire 方法阻塞|非阻塞獲取鎖,release 方法釋放鎖,另外還提供了可撤銷、可重入功能。
4.1 接口介紹
// 獲取互斥鎖
public void acquire() throws Exception;
// 在給定的時間內(nèi)獲取互斥鎖
public boolean acquire(long time, TimeUnit unit) throws Exception;
// 釋放鎖處理
public void release() throws Exception;
// 如果當(dāng)前線程獲取了互斥鎖,則返回true
boolean isAcquiredInThisProcess();
4.2 pom依賴
org.apache.logging.log4j
log4j-core
2.8.2
org.apache.zookeeper
zookeeper
3.5.7
org.apache.curator
curator-framework
4.3.0
org.apache.curator
curator-recipes
4.3.0
org.apache.curator
curator-client
4.3.0
4.3 示例
package com.atguigu.case3;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class CuratorLockTest {
public static void main(String[] args) {
// 創(chuàng)建分布式鎖1
InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");
// 創(chuàng)建分布式鎖2
InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");
new Thread(new Runnable() {
@Override
public void run() {
try {
lock1.acquire();
System.out.println("線程1 獲取到鎖");
lock1.acquire();
System.out.println("線程1 再次獲取到鎖");
Thread.sleep(5 * 1000);
lock1.release();
System.out.println("線程1 釋放鎖");
lock1.release();
System.out.println("線程1 再次釋放鎖");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
lock2.acquire();
System.out.println("線程2 獲取到鎖");
lock2.acquire();
System.out.println("線程2 再次獲取到鎖");
Thread.sleep(5 * 1000);
lock2.release();
System.out.println("線程2 釋放鎖");
lock2.release();
System.out.println("線程2 再次釋放鎖");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
private static CuratorFramework getCuratorFramework() {
ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3);
CuratorFramework client = CuratorFrameworkFactory.builder().connectString("xxx:2181,xxx:2181,xxx:2181")
.connectionTimeoutMs(2000)
.sessionTimeoutMs(2000)
.retryPolicy(policy).build();
// 啟動客戶端
client.start();
System.out.println("zookeeper 啟動成功");
return client;
}
}
通過這個實例對照第2節(jié)內(nèi)容來理解加解鎖的流程,以及如何避免驚群效應(yīng)。
package com.rock.case2;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/**
* zk 分布式鎖 v1版本:
* 完成功能 :
* 1. 避免了驚群效應(yīng)
* 缺失功能:
* 1. 超時控制
* 2. 讀寫鎖
* 3. 重入控制
*/
public class DistributedLock {
private String connectString;
private int sessionTimeout;
private ZooKeeper zk;
private CountDownLatch connectLatch = new CountDownLatch(1);
private CountDownLatch waitLatch = new CountDownLatch(1);
private String waitPath;
private String currentNode;
private String LOCK_ROOT_PATH;
private static String NODE_PREFIX = "w";
public DistributedLock(String connectString, int sessionTimeout, String lockName){
//TODO:數(shù)據(jù)校驗
this.connectString = connectString;
this.sessionTimeout = sessionTimeout;
this.LOCK_ROOT_PATH = lockName;
}
public void init() throws IOException, KeeperException, InterruptedException {
// 建聯(lián)
zk = new ZooKeeper(connectString, sessionTimeout, watchedEvent -> {
// connectLatch 連接上zk后 釋放
if (watchedEvent.getState() == Watcher.Event.KeeperState.SyncConnected) {
connectLatch.countDown();
}
});
connectLatch.await();// 等待zk正常連接后
// 判斷鎖名稱節(jié)點是否存在
Stat stat = zk.exists(LOCK_ROOT_PATH, false);
if (stat == null) {
// 創(chuàng)建一下鎖名稱節(jié)點
try {
zk.create(LOCK_ROOT_PATH, LOCK_ROOT_PATH.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} catch (KeeperException e) {
//并發(fā)創(chuàng)建沖突忽略。
if (!e.code().name().equals("NODEEXISTS")) {
throw e;
}
}
}
}
/**
* 待補充功能:
* 1. 超時設(shè)置
* 2. 讀寫區(qū)分
* 3. 重入控制
*/
public void zklock() throws KeeperException, InterruptedException {
if (!tryLock()) {
waitLock();
zklock();
}
}
/**
*
*/
private void waitLock() throws KeeperException, InterruptedException {
try {
zk.getData(waitPath, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent){
// waitLatch 需要釋放
if (watchedEvent.getType() == Watcher.Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)) {
waitLatch.countDown();
}
}
}, new Stat());
// 等待監(jiān)聽
waitLatch.await();
} catch (KeeperException.NoNodeException e) {
//如果等待的節(jié)點已經(jīng)被清除了,不等了,再嘗試去搶鎖
return;
}
}
private boolean tryLock() throws KeeperException, InterruptedException {
currentNode = zk.create(LOCK_ROOT_PATH + "/" + NODE_PREFIX, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
// 判斷創(chuàng)建的節(jié)點是否是最小的序號節(jié)點,如果是獲取到鎖;如果不是,監(jiān)聽他序號前一個節(jié)點
Listchildren = zk.getChildren(LOCK_ROOT_PATH, false);
// 如果children 只有一個值,那就直接獲取鎖; 如果有多個節(jié)點,需要判斷,誰最小
if (children.size() == 1) {
return true;
} else {
String thisNode = currentNode.substring(LOCK_ROOT_PATH.length() + 1);
// 通過w00000000獲取該節(jié)點在children集合的位置
int index = children.indexOf(thisNode);
if (index == 0) {
//自己就是第一個節(jié)點
return true;
}
// 需要監(jiān)聽 他前一個節(jié)點變化
waitPath = LOCK_ROOT_PATH + "/" + children.get(index - 1);
}
return false;
}
// 解鎖
public void unZkLock(){
// 刪除節(jié)點
try {
zk.delete(this.currentNode, -1);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
}
本文轉(zhuǎn)載自微信公眾號「架構(gòu)染色」,可以通過以下二維碼關(guān)注。轉(zhuǎn)載本文請聯(lián)系【架構(gòu)染色】公眾號作者。

我們在微信上24小時期待你的聲音
解答本文疑問/技術(shù)咨詢/運營咨詢/技術(shù)建議/互聯(lián)網(wǎng)交流