掃二維碼與項(xiàng)目經(jīng)理溝通
我們?cè)谖⑿派?4小時(shí)期待你的聲音
解答本文疑問(wèn)/技術(shù)咨詢/運(yùn)營(yíng)咨詢/技術(shù)建議/互聯(lián)網(wǎng)交流
作者:翟永超 2017-05-11 14:05:25
開發(fā)
開發(fā)工具
分布式 本文將繼續(xù)討論基于Consul的分布式鎖實(shí)現(xiàn)。信號(hào)量是我們?cè)趯?shí)現(xiàn)并發(fā)控制時(shí)會(huì)經(jīng)常使用的手段,主要用來(lái)限制同時(shí)并發(fā)線程或進(jìn)程的數(shù)量。

讓客戶滿意是我們工作的目標(biāo),不斷超越客戶的期望值來(lái)自于我們對(duì)這個(gè)行業(yè)的熱愛。我們立志把好的技術(shù)通過(guò)有效、簡(jiǎn)單的方式提供給客戶,將通過(guò)不懈努力成為客戶在信息化領(lǐng)域值得信任、有價(jià)值的長(zhǎng)期合作伙伴,公司提供的服務(wù)項(xiàng)目有:空間域名、虛擬主機(jī)、營(yíng)銷軟件、網(wǎng)站建設(shè)、奈曼網(wǎng)站維護(hù)、網(wǎng)站推廣。
本文將繼續(xù)討論基于Consul的分布式鎖實(shí)現(xiàn)。信號(hào)量是我們?cè)趯?shí)現(xiàn)并發(fā)控制時(shí)會(huì)經(jīng)常使用的手段,主要用來(lái)限制同時(shí)并發(fā)線程或進(jìn)程的數(shù)量,比如:Zuul默認(rèn)情況下就使用信號(hào)量來(lái)限制每個(gè)路由的并發(fā)數(shù),以實(shí)現(xiàn)不同路由間的資源隔離。
信號(hào)量(Semaphore),有時(shí)被稱為信號(hào)燈,是在多線程環(huán)境下使用的一種設(shè)施,是可以用來(lái)保證兩個(gè)或多個(gè)關(guān)鍵代碼段不被并發(fā)調(diào)用。在進(jìn)入一個(gè)關(guān)鍵代碼段之前,線程必須獲取一個(gè)信號(hào)量;一旦該關(guān)鍵代碼段完成了,那么該線程必須釋放信號(hào)量。其他想進(jìn)入該關(guān)鍵代碼段的線程必須等待直到***個(gè)線程釋放信號(hào)量。為了完成這個(gè)過(guò)程,需要?jiǎng)?chuàng)建一個(gè)信號(hào)量VI,然后將Acquire Semaphore VI以及Release Semaphore VI分別放置在每個(gè)關(guān)鍵代碼段的首末端,確認(rèn)這些信號(hào)量VI引用的是初始創(chuàng)建的信號(hào)量。如在這個(gè)停車場(chǎng)系統(tǒng)中,車位是公共資源,每輛車好比一個(gè)線程,看門人起的就是信號(hào)量的作用。
實(shí)現(xiàn)思路
- {
- "limit": 3,
- "holders": [
- "90c0772a-4bd3-3a3c-8215-3b8937e36027",
- "93e5611d-5365-a374-8190-f80c4a7280ab"
- ]
- }
流程圖
代碼實(shí)現(xiàn)
- public class Semaphore {
- private Logger logger = Logger.getLogger(getClass());
- private static final String prefix = "semaphore/"; // 信號(hào)量參數(shù)前綴
- private ConsulClient consulClient;
- private int limit;
- private String keyPath;
- private String sessionId = null;
- private boolean acquired = false;
- /**
- *
- * @param consulClient consul客戶端實(shí)例
- * @param limit 信號(hào)量上限值
- * @param keyPath 信號(hào)量在consul中存儲(chǔ)的參數(shù)路徑
- */
- public Semaphore(ConsulClient consulClient, int limit, String keyPath) {
- this.consulClient = consulClient;
- this.limit = limit;
- this.keyPath = prefix + keyPath;
- }
- /**
- * acquired信號(hào)量
- *
- * @param block 是否阻塞。如果為true,那么一直嘗試,直到獲取到該資源為止。
- * @return
- * @throws IOException
- */
- public Boolean acquired(boolean block) throws IOException {
- if(acquired) {
- logger.error(sessionId + " - Already acquired");
- throw new RuntimeException(sessionId + " - Already acquired");
- }
- // create session
- clearSession();
- this.sessionId = createSessionId("semaphore");
- logger.debug("Create session : " + sessionId);
- // add contender entry
- String contenderKey = keyPath + "/" + sessionId;
- logger.debug("contenderKey : " + contenderKey);
- PutParams putParams = new PutParams();
- putParams.setAcquireSession(sessionId);
- Boolean b = consulClient.setKVValue(contenderKey, "", putParams).getValue();
- if(!b) {
- logger.error("Failed to add contender entry : " + contenderKey + ", " + sessionId);
- throw new RuntimeException("Failed to add contender entry : " + contenderKey + ", " + sessionId);
- }
- while(true) {
- // try to take the semaphore
- String lockKey = keyPath + "/.lock";
- String lockKeyValue;
- GetValue lockKeyContent = consulClient.getKVValue(lockKey).getValue();
- if (lockKeyContent != null) {
- // lock值轉(zhuǎn)換
- lockKeyValue = lockKeyContent.getValue();
- BASE64Decoder decoder = new BASE64Decoder();
- byte[] v = decoder.decodeBuffer(lockKeyValue);
- String lockKeyValueDecode = new String(v);
- logger.debug("lockKey=" + lockKey + ", lockKeyValueDecode=" + lockKeyValueDecode);
- Gson gson = new Gson();
- ContenderValue contenderValue = gson.fromJson(lockKeyValueDecode, ContenderValue.class);
- // 當(dāng)前信號(hào)量已滿
- if(contenderValue.getLimit() == contenderValue.getHolders().size()) {
- logger.debug("Semaphore limited " + contenderValue.getLimit() + ", waiting...");
- if(block) {
- // 如果是阻塞模式,再嘗試
- try {
- Thread.sleep(100L);
- } catch (InterruptedException e) {
- }
- continue;
- }
- // 非阻塞模式,直接返回沒有獲取到信號(hào)量
- return false;
- }
- // 信號(hào)量增加
- contenderValue.getHolders().add(sessionId);
- putParams = new PutParams();
- putParams.setCas(lockKeyContent.getModifyIndex());
- boolean c = consulClient.setKVValue(lockKey, contenderValue.toString(), putParams).getValue();
- if(c) {
- acquired = true;
- return true;
- }
- else
- continue;
- } else {
- // 當(dāng)前信號(hào)量還沒有,所以創(chuàng)建一個(gè),并馬上搶占一個(gè)資源
- ContenderValue contenderValue = new ContenderValue();
- contenderValue.setLimit(limit);
- contenderValue.getHolders().add(sessionId);
- putParams = new PutParams();
- putParams.setCas(0L);
- boolean c = consulClient.setKVValue(lockKey, contenderValue.toString(), putParams).getValue();
- if (c) {
- acquired = true;
- return true;
- }
- continue;
- }
- }
- }
- /**
- * 創(chuàng)建sessionId
- * @param sessionName
- * @return
- */
- public String createSessionId(String sessionName) {
- NewSession newnewSession = new NewSession();
- newSession.setName(sessionName);
- return consulClient.sessionCreate(newSession, null).getValue();
- }
- /**
- * 釋放session、并從lock中移除當(dāng)前的sessionId
- * @throws IOException
- */
- public void release() throws IOException {
- if(this.acquired) {
- // remove session from lock
- while(true) {
- String contenderKey = keyPath + "/" + sessionId;
- String lockKey = keyPath + "/.lock";
- String lockKeyValue;
- GetValue lockKeyContent = consulClient.getKVValue(lockKey).getValue();
- if (lockKeyContent != null) {
- // lock值轉(zhuǎn)換
- lockKeyValue = lockKeyContent.getValue();
- BASE64Decoder decoder = new BASE64Decoder();
- byte[] v = decoder.decodeBuffer(lockKeyValue);
- String lockKeyValueDecode = new String(v);
- Gson gson = new Gson();
- ContenderValue contenderValue = gson.fromJson(lockKeyValueDecode, ContenderValue.class);
- contenderValue.getHolders().remove(sessionId);
- PutParams putParams = new PutParams();
- putParams.setCas(lockKeyContent.getModifyIndex());
- consulClient.deleteKVValue(contenderKey);
- boolean c = consulClient.setKVValue(lockKey, contenderValue.toString(), putParams).getValue();
- if(c) {
- break;
- }
- }
- }
- // remove session key
- }
- this.acquired = false;
- clearSession();
- }
- public void clearSession() {
- if(sessionId != null) {
- consulClient.sessionDestroy(sessionId, null);
- sessionId = null;
- }
- }
- class ContenderValue implements Serializable {
- private Integer limit;
- private List
holders = new ArrayList<>(); - public Integer getLimit() {
- return limit;
- }
- public void setLimit(Integer limit) {
- this.limit = limit;
- }
- public List
getHolders() { - return holders;
- }
- public void setHolders(List
holders) { - this.holders = holders;
- }
- @Override
- public String toString() {
- return new Gson().toJson(this);
- }
- }
- }
單元測(cè)試
下面單元測(cè)試的邏輯:通過(guò)線程的方式來(lái)模擬不同的分布式服務(wù)來(lái)獲取信號(hào)量執(zhí)行業(yè)務(wù)邏輯。由于信號(hào)量與簡(jiǎn)單的分布式互斥鎖有所不同,它不是只限定一個(gè)線程可以操作,而是可以控制多個(gè)線程的并發(fā),所以通過(guò)下面的單元測(cè)試,我們?cè)O(shè)置信號(hào)量為3,然后同時(shí)啟動(dòng)15個(gè)線程來(lái)競(jìng)爭(zhēng)的情況,來(lái)觀察分布式信號(hào)量實(shí)現(xiàn)的結(jié)果如何。
- INFO [Thread-6] SemaphoreRunner - Thread 7 start!
- INFO [Thread-2] SemaphoreRunner - Thread 3 start!
- INFO [Thread-7] SemaphoreRunner - Thread 8 start!
- INFO [Thread-2] SemaphoreRunner - Thread 3 end!
- INFO [Thread-5] SemaphoreRunner - Thread 6 start!
- INFO [Thread-6] SemaphoreRunner - Thread 7 end!
- INFO [Thread-9] SemaphoreRunner - Thread 10 start!
- INFO [Thread-5] SemaphoreRunner - Thread 6 end!
- INFO [Thread-1] SemaphoreRunner - Thread 2 start!
- INFO [Thread-7] SemaphoreRunner - Thread 8 end!
- INFO [Thread-10] SemaphoreRunner - Thread 11 start!
- INFO [Thread-10] SemaphoreRunner - Thread 11 end!
- INFO [Thread-12] SemaphoreRunner - Thread 13 start!
- INFO [Thread-1] SemaphoreRunner - Thread 2 end!
- INFO [Thread-3] SemaphoreRunner - Thread 4 start!
- INFO [Thread-9] SemaphoreRunner - Thread 10 end!
- INFO [Thread-0] SemaphoreRunner - Thread 1 start!
- INFO [Thread-3] SemaphoreRunner - Thread 4 end!
- INFO [Thread-14] SemaphoreRunner - Thread 15 start!
- INFO [Thread-12] SemaphoreRunner - Thread 13 end!
- INFO [Thread-0] SemaphoreRunner - Thread 1 end!
- INFO [Thread-13] SemaphoreRunner - Thread 14 start!
- INFO [Thread-11] SemaphoreRunner - Thread 12 start!
- INFO [Thread-13] SemaphoreRunner - Thread 14 end!
- INFO [Thread-4] SemaphoreRunner - Thread 5 start!
- INFO [Thread-4] SemaphoreRunner - Thread 5 end!
- INFO [Thread-8] SemaphoreRunner - Thread 9 start!
- INFO [Thread-11] SemaphoreRunner - Thread 12 end!
- INFO [Thread-14] SemaphoreRunner - Thread 15 end!
- INFO [Thread-8] SemaphoreRunner - Thread 9 end!
- public class TestLock {
- private Logger logger = Logger.getLogger(getClass());
- @Test
- public void testSemaphore() throws Exception {
- new Thread(new SemaphoreRunner(1)).start();
- new Thread(new SemaphoreRunner(2)).start();
- new Thread(new SemaphoreRunner(3)).start();
- new Thread(new SemaphoreRunner(4)).start();
- new Thread(new SemaphoreRunner(5)).start();
- new Thread(new SemaphoreRunner(6)).start();
- new Thread(new SemaphoreRunner(7)).start();
- new Thread(new SemaphoreRunner(8)).start();
- new Thread(new SemaphoreRunner(9)).start();
- new Thread(new SemaphoreRunner(10)).start();
- Thread.sleep(1000000L);
- }
- }
- public class SemaphoreRunner implements Runnable {
- private Logger logger = Logger.getLogger(getClass());
- private int flag;
- public SemaphoreRunner(int flag) {
- this.flag = flag;
- }
- @Override
- public void run() {
- Semaphore semaphore = new Semaphore(new ConsulClient(), 3, "mg-init");
- try {
- if (semaphore.acquired(true)) {
- // 獲取到信號(hào)量,執(zhí)行業(yè)務(wù)邏輯
- logger.info("Thread " + flag + " start!");
- Thread.sleep(new Random().nextInt(10000));
- logger.info("Thread " + flag + " end!");
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- try {
- // 信號(hào)量釋放、Session鎖釋放、Session刪除
- semaphore.release();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- }
從測(cè)試結(jié)果,我們可以發(fā)現(xiàn)當(dāng)信號(hào)量持有者數(shù)量達(dá)到信號(hào)量上限3的時(shí)候,其他競(jìng)爭(zhēng)者就開始進(jìn)行等待了,只有當(dāng)某個(gè)持有者釋放信號(hào)量之后,才會(huì)有新的線程變成持有者,從而開始執(zhí)行自己的業(yè)務(wù)邏輯。所以,分布式信號(hào)量可以幫助我們有效的控制同時(shí)操作某個(gè)共享資源的并發(fā)數(shù)。
優(yōu)化建議
同前文一樣,這里只是做了簡(jiǎn)單的實(shí)現(xiàn)。線上應(yīng)用還必須加入TTL的session清理以及對(duì).lock資源中的無(wú)效holder進(jìn)行清理的機(jī)制。
參考文檔:https://www.consul.io/docs/guides/semaphore.html
實(shí)現(xiàn)代碼
【本文為51CTO專欄作者“翟永超”的原創(chuàng)稿件,轉(zhuǎn)載請(qǐng)通過(guò)51CTO聯(lián)系作者獲取授權(quán)】

我們?cè)谖⑿派?4小時(shí)期待你的聲音
解答本文疑問(wèn)/技術(shù)咨詢/運(yùn)營(yíng)咨詢/技術(shù)建議/互聯(lián)網(wǎng)交流