av激情亚洲男人的天堂国语,日韩欧美精品一中文字幕,无码av一区二区三区无码,国产又色又爽又刺激的a片,国产又色又爽又刺激的a片

圖解 Kafka 網(wǎng)絡(luò)層實現(xiàn)機(jī)制(一)

今天我們就來聊聊 Kafka 是如何對 Java NIO 進(jìn)行封裝的,本系列總共分為3篇,主要剖析以下幾個問題:

創(chuàng)新互聯(lián)2013年開創(chuàng)至今,是專業(yè)互聯(lián)網(wǎng)技術(shù)服務(wù)公司,擁有項目成都做網(wǎng)站、網(wǎng)站設(shè)計、外貿(mào)營銷網(wǎng)站建設(shè)網(wǎng)站策劃,項目實施與項目整合能力。我們以讓每一個夢想脫穎而出為使命,1280元織金做網(wǎng)站,已為上家服務(wù),為織金各地企業(yè)和個人服務(wù),聯(lián)系電話:13518219792

  1. 針對 Java NIO 的 SocketChannel,kafka 是如何封裝統(tǒng)一的傳輸層來實現(xiàn)最基礎(chǔ)的網(wǎng)絡(luò)連接以及讀寫操作的?
  2. 剖析 KafkaChannel 是如何對傳輸層、讀寫 buffer 操作進(jìn)行封裝的?
  3. 剖析工業(yè)級 NIO 實戰(zhàn):如何基于位運(yùn)算來控制事件的監(jiān)聽以及拆包、粘包是如何實現(xiàn)的?
  4. 剖析 Kafka 是如何封裝 Selector 多路復(fù)用器的?
  5. 剖析 Kafka 封裝的 Selector 是如何初始化并與 Broker 進(jìn)行連接以及網(wǎng)絡(luò)讀寫的?
  6. 剖析 Kafka 網(wǎng)絡(luò)發(fā)送消息和接收響應(yīng)的整個過程是怎樣的?

本篇只討論前3個問題,剩余的放到后2篇中。

認(rèn)真讀完這篇文章,我相信你會對 Kafka 封裝 Java NIO 源碼有更加深刻的理解。

這篇文章干貨很多,希望你可以耐心讀完。

一、總體概述

??上篇??剖析了「生產(chǎn)者元數(shù)據(jù)的拉取和管理的全過程」,此時發(fā)送消息的時候就有了元數(shù)據(jù),但是還沒有進(jìn)行網(wǎng)絡(luò)通信,而網(wǎng)絡(luò)通信是一個相對復(fù)雜的過程,對于 Java 系統(tǒng)來說網(wǎng)絡(luò)通信一般會采用 NIO 庫來實現(xiàn),所以 Kafka 對 Java NIO 封裝了統(tǒng)一的框架,來實現(xiàn)多路復(fù)用的網(wǎng)絡(luò) I/O 操作。

為了方便大家理解,所有的源碼只保留骨干。

二、Kafka 對 Java NIO 的封裝

如果大家對 Java NIO 不了解的話,可以看下這個文檔,這里就不過多介紹了。

https://pdai.tech/md/java/io/java-io-nio.html。

我們來看看 Kafka 對 Java NIO 組件做了哪些封裝? 這里先說下結(jié)果,后面會深度剖析。

  1. TransportLayer:它是一個接口,封裝了底層 NIO 的 SocketChannel。
  2. NetworkReceive:封裝了 NIO 的 ByteBuffer 中的讀 Buffer,對網(wǎng)絡(luò)編程中的粘包、拆包經(jīng)典實現(xiàn)。
  3. NetworkSend:封裝了 NIO 的 ByteBuffer 中的寫 Buffer。
  4. KafkaChannel:對 TransportLayer、NetworkReceive、NetworkSend 進(jìn)一步封裝,屏蔽了底層的實現(xiàn)細(xì)節(jié),對上層更友好。
  5. KafkaSelector:封裝了 NIO 的 Selector 多路復(fù)用器組件。

接下來我們挨個對上面組件進(jìn)行剖析。

三、TransportLayer 封裝過程

TransportLayer 接口是對 NIO 中 「SocketChannel」 的封裝。它的實現(xiàn)類總共有 2 個:

  1. PlaintextTransportLayer:明文網(wǎng)絡(luò)傳輸實現(xiàn)。
  2. SslTransportLayer:SSL 加密網(wǎng)絡(luò)傳輸實現(xiàn)。

本篇只剖析 PlaintextTransportLayer 的實現(xiàn)。

github 源碼地址如下:

https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java。

public class PlaintextTransportLayer implements TransportLayer {
// java nio 中 SelectionKey 事件
private final SelectionKey key;
// java nio 中的SocketChannel
private final SocketChannel socketChannel;
// 安全相關(guān)
private final Principal principal = KafkaPrincipal.ANONYMOUS;
// 初始化
public PlaintextTransportLayer(SelectionKey key) throws IOException {
// 對 NIO 中 SelectionKey 類的對象引用
this.key = key;
// 對 NIO 中 SocketChannel 類的對象引用
this.socketChannel = (SocketChannel) key.channel();
}
}

從上面代碼可以看出,該類就是對底層 NIO 的 socketChannel 封裝引用。將構(gòu)造函數(shù)的 SelectionKey 類對象賦值給 key,然后從 key 中取出對應(yīng)的 SocketChannel 賦值給 socketChannel,這樣就完成了初始化工作。

接下來,我們看看幾個重要方法是如何使用這2個 NIO 組件的。

1、finishConnect()

@Override
// 判斷網(wǎng)絡(luò)連接是否完成
public boolean finishConnect() throws IOException {
// 1. 調(diào)用socketChannel的finishConnect方法,返回該連接是否已經(jīng)連接完成
boolean connected = socketChannel.finishConnect();
// 2. 如果網(wǎng)絡(luò)連接完成以后就刪除對OP_CONNECT事件的監(jiān)聽,同時添加對OP_READ事件的監(jiān)聽
if (connected)
// 事件操作
key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
// 3. 最后返回網(wǎng)絡(luò)連接
return connected;
}

該方法主要用來判斷網(wǎng)絡(luò)連接是否完成,如果完成就關(guān)注 「OP_READ」 事件,并取消 「OP_CONNECT」 事件。

  1. 首先調(diào)用 socketChannel 通道的 finishConnect() 判斷連接是否完成。
  2. 如果網(wǎng)絡(luò)連接完成以后就刪除對 OP_CONNECT 事件的監(jiān)聽,同時添加對 OP_READ 事件的監(jiān)聽,因為連接完成后就可能接收數(shù)據(jù)了。
  3. 最后返回網(wǎng)絡(luò)連接 connected。

二進(jìn)制位運(yùn)算事件監(jiān)聽

這里通過「二進(jìn)制位運(yùn)算」巧妙的解決了網(wǎng)絡(luò)事件的監(jiān)聽操作,實現(xiàn)非常經(jīng)典。

通過 socketChannel 在 Selector 多路復(fù)用器注冊事件返回 SelectionKey ,SelectionKey 的類型包括:

  1. OP_READ:可讀事件,值為:1<<0 == 1 == 00000001。
  2. OP_WRITE:可寫事件,值為:1<<2 == 4 == 00000100。
  3. OP_CONNECT:客戶端連接服務(wù)端的事件,一般為創(chuàng)建 SocketChannel 客戶端 channel,值為:1<<3 == 8 ==00001000。
  4. OP_ACCEPT:服務(wù)端接收客戶端連接的事件,一般為創(chuàng)建 ServerSocketChannel 服務(wù)端 channel,值為:1<<4 == 16 == 00010000。
key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);

首先"~"符號代表按位取反,"&"代表按位取與,通過 key.interestOps() 獲取當(dāng)前的事件,然后和 OP_CONNECT事件取反「11110111」 后按位與操作。

所以,"& ~xx" 代表刪除 xx 事件,有就刪除,沒有就不變;而 "| xx" 代表將 xx 事件添加進(jìn)去。

2、read()

@Override
public int read(ByteBuffer dst) throws IOException {
// 調(diào)用 NIO 的通道實現(xiàn)數(shù)據(jù)的讀取
return socketChannel.read(dst);
}

該方法主要用來把 socketChannel 里面的數(shù)據(jù)讀取緩沖區(qū) ByteBuffer 里,通過調(diào)用 socketChannel.read() 實現(xiàn)。

3、write()

@Override
public int write(ByteBuffer src) throws IOException {
return socketChannel.write(src);
}

該方法主要用來把緩沖區(qū) ByteBuffer 的數(shù)據(jù)寫到 SocketChannel 里,通過調(diào)用 socketChannel.write() 實現(xiàn)。

大家都知道在網(wǎng)絡(luò)編程中,一次讀寫操作并一定能把數(shù)據(jù)讀寫完,所以就需要判斷是否讀寫完成,勢必會涉及數(shù)據(jù)的「拆包」、「粘包」操作。 這些操作比較繁瑣,因此 Kafka 將 ByteBuffer 的讀寫操作進(jìn)行重新封裝,分別對應(yīng) NetworkReceive 讀操作、NetworkSend 寫操作,對于上層調(diào)用無需判斷是否讀寫完成,更加友好。

接下來我們就來分別剖析下這2個類的實現(xiàn)。

四、NetworkReceive 封裝過程

public class NetworkReceive implements Receive {
....
// 空 ByteBuffer
private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
private final String source;
// 存儲響應(yīng)消息數(shù)據(jù)長度
private final ByteBuffer size;
// 響應(yīng)消息數(shù)據(jù)的最大長度
private final int maxSize;
// ByteBuffer 內(nèi)存池
private final MemoryPool memoryPool;
// 已讀取字節(jié)大小
private int requestedBufferSize = -1;
// 存儲響應(yīng)消息數(shù)據(jù)體
private ByteBuffer buffer;
// 初始化構(gòu)造函數(shù)
public NetworkReceive(int maxSize, String source, MemoryPool memoryPool) {
this.source = source;
// 分配4個字節(jié)大小的數(shù)據(jù)長度
this.size = ByteBuffer.allocate(4);
this.buffer = null;
// 能接收消息的最大長度
this.maxSize = maxSize;
this.memoryPool = memoryPool;
}
}
  1. EMPTY_BUFFER:空 Buffer,值為 ByteBuffer.allocate(0)。
  2. source:final類型,用來確定對應(yīng) channel id。
  3. size:final類型,存儲響應(yīng)消息數(shù)據(jù)長度,大小為4字節(jié)。
  4. maxSize:final類型,接收響應(yīng)消息數(shù)據(jù)的最大長度。
  5. memoryPool:final類型,ByteBuffer 內(nèi)存池。
  6. requestedBufferSize:已讀取字節(jié)大小。
  7. buffer:存儲響應(yīng)消息數(shù)據(jù)體。

從屬性可以看出,包含2個 ByteBuffer,分別是 size 和 buffer。這里重點說下源碼中的size字段的初始化。通過長度編碼方式實現(xiàn),上來就先分配了4字節(jié)大小的 ByteBuffer 來存儲響應(yīng)消息數(shù)據(jù)長度,即32位,與 Java int 占用相同的字節(jié)數(shù),完全滿足表示消息長度的值。

介紹完字段后,我們來深度剖析下該類的幾個重要的方法。

1、readFrom()

public long readFrom(ScatteringByteChannel channel) throws IOException {
// 讀取數(shù)據(jù)總大小
int read = 0;
// 1.判斷響應(yīng)消息數(shù)據(jù)長度的 ByteBuffer 是否讀完
if (size.hasRemaining()) {
// 2.還有剩余,直接讀取消息數(shù)據(jù)的長度
int bytesRead = channel.read(size);
if (bytesRead < 0)
throw new EOFException();
// 3.每次讀取后,累加到總讀取數(shù)據(jù)大小里
read += bytesRead;
// 4.判斷響應(yīng)消息數(shù)據(jù)長度的緩存是否讀完了
if (!size.hasRemaining()) {
// 5.重置position
size.rewind();
// 6.讀取響應(yīng)消息數(shù)據(jù)長度
int receiveSize = size.getInt();
// 7.如果有異常就拋出
if (receiveSize < 0)
throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + ")");
if (maxSize != UNLIMITED && receiveSize > maxSize)
throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")");
// 8.將讀到數(shù)據(jù)長度賦值已讀取字節(jié)大小,即數(shù)據(jù)體的大小
requestedBufferSize = receiveSize;
if (receiveSize == 0) {
buffer = EMPTY_BUFFER;
}
}
}
// 9.如果數(shù)據(jù)體buffer還沒有分配,且響應(yīng)消息數(shù)據(jù)頭已讀完
if (buffer == null && requestedBufferSize != -1) {
// 10.分配requestedBufferSize字節(jié)大小的內(nèi)存空間給數(shù)據(jù)體buffer
buffer = memoryPool.tryAllocate(requestedBufferSize);
if (buffer == null)
log.trace("Broker low on memory - could not allocate buffer of size {} for source {}", requestedBufferSize, source);
}
// 11.判斷buffer是否分配成功
if (buffer != null) {
// 12.把channel里的數(shù)據(jù)讀到buffer中
int bytesRead = channel.read(buffer);
if (bytesRead < 0)
throw new EOFException();
// 13.累計讀取數(shù)據(jù)總大小
read += bytesRead;
}
// 14. 返回總大小
return read;
}

該方法主要用來把對應(yīng) channel 中的數(shù)據(jù)讀到 ByteBuffer 中,包括響應(yīng)消息數(shù)據(jù)長度的 size 和響應(yīng)消息數(shù)據(jù)體長度的 buffer,可能會被多次調(diào)用,每次都需要判斷 size 和 buffer 的狀態(tài)并讀取。

在讀取時,先讀取4字節(jié)到 size 中,再根據(jù) size 的大小為 buffer 分配內(nèi)存,然后讀滿整個 buffer 時就表示讀取完成了。

通過短短的30行左右代碼就解決了工業(yè)級「拆包」 、「粘包」問題,相當(dāng)?shù)慕?jīng)典。

如果要解決「粘包」問題,就是在每個響應(yīng)數(shù)據(jù)中間插入一個特殊的字節(jié)大小的「分隔符」,這里就在響應(yīng)消息體前面插入4個字節(jié),代表響應(yīng)消息自己本身的數(shù)據(jù)大小,如下圖所示:

具體「拆包」的操作步驟如下:

  1. 調(diào)用 size.hasRemaining() 返回position 至 limit 之間的字節(jié)大小來判斷響應(yīng)消息數(shù)據(jù)長度的 ByteBuffer 是否讀完。
  2. 當(dāng)未讀完則通過調(diào)用 NIO 的方法 channel.read(size),直接把讀取4字節(jié)的響應(yīng)消息數(shù)據(jù)的長度寫入到 ByteBuffer size 中,如果已經(jīng)讀取到了4字節(jié),此時 position=4,與  limit  相同,表示 ByteBuffer size 已經(jīng)讀滿了。
  3. 每次讀取后,累加到總讀取數(shù)據(jù)大小里
  4. 再次判斷響應(yīng)消息數(shù)據(jù)長度的緩存是否讀完了。
  5. 如果讀完了,先重置 position 位置為0,此時就可以從 ByteBuffer 中讀取數(shù)據(jù)了,然后調(diào)用 size.getInt() 從 ByteBuffer 當(dāng)前 position 位置讀取4個字節(jié),并轉(zhuǎn)化成int 類型數(shù)值賦給 receiveSize,即響應(yīng)體的長度。
  6. 如果有異常就拋出,包括響應(yīng)數(shù)據(jù)體的長度無效或者大于最大長度等。
  7. 將讀到響應(yīng)數(shù)據(jù)長度賦值 requestedBufferSize,即數(shù)據(jù)體的大小。
  8. 如果響應(yīng)數(shù)據(jù)體 buffer 還沒有分配,且響應(yīng)數(shù)據(jù)頭已讀完,分配 requestedBufferSize 字節(jié)大小的內(nèi)存空間給數(shù)據(jù)體 buffer。
  9. 如果 buffer 分配成功,表示 size 已讀完,此時直接把 channel 里的響應(yīng)數(shù)據(jù)讀到跟它大小一致的 ByteBuffer 中,再次累計讀取數(shù)據(jù)總大小。
  10. 最后返回數(shù)據(jù)總大小。

2、complete()

@Override
public boolean complete() {
// 響應(yīng)消息頭已讀完 && 響應(yīng)消息體已讀完
return !size.hasRemaining() && buffer != null && !buffer.hasRemaining();
}

該方法主要用來判斷是否都讀取完成,即響應(yīng)頭大小和響應(yīng)體大小都讀取完。

3、size()

// 返回大小
public int size() {
return payload().limit() + size.limit();
}
public ByteBuffer payload() {
return this.buffer;
}

該方法主要用來返回響應(yīng)頭和響應(yīng)體還有多少數(shù)據(jù)需要讀出。

此時已經(jīng)剖析完讀 Buffer 的封裝,接下來我們看看寫 Buffer。

五、NetworkSend 封裝過程

github 源碼地址如下:

https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java。

https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java。

https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/network/Send.java。

調(diào)用關(guān)系圖如下:

1、Send 接口

我們先看一下接口 Send 都定義了哪些方法。

public interface Send {
// 要把數(shù)據(jù)寫入目標(biāo)的 channel id
String destination();
// 要發(fā)送的數(shù)據(jù)是否發(fā)送完了
boolean completed();
// 把數(shù)據(jù)寫到對應(yīng) channel 中
long writeTo(GatheringByteChannel channel) throws IOException;
// 發(fā)送數(shù)據(jù)的大小
long size();
}

Send 作為要發(fā)送數(shù)據(jù)的接口, 子類 ByteBufferSend 實現(xiàn) complete() 方法用于判斷是否已經(jīng)發(fā)送完成,實現(xiàn) writeTo() 方法來實現(xiàn)寫入數(shù)據(jù)到Channel中。

2、ByteBufferSend 類

ByteBufferSend 類實現(xiàn)了 Send 接口,即實現(xiàn)了數(shù)據(jù)從 ByteBuffer 數(shù)組發(fā)送到 channel:

public class ByteBufferSend implements Send {
private final String destination;
// 總共要寫多少字節(jié)數(shù)據(jù)
private final int size;
// 用于寫入channel里的ByteBuffer數(shù)組,說明kafka一次最大傳輸字節(jié)是有限定的
protected final ByteBuffer[] buffers;
// 總共還剩多少字節(jié)沒有寫完
private int remaining;
private boolean pending = false;

public ByteBufferSend(String destination, ByteBuffer... buffers) {
this.destination = destination;
this.buffers = buffers;
for (ByteBuffer buffer : buffers)
remaining += buffer.remaining();
// 計算需要寫入字節(jié)的總和
this.size = remaining;
}
}

我們來看下這個類中的幾個重要字段:

  1. destination:數(shù)據(jù)寫入的目標(biāo) channel id。
  2. size:總共需要往 channel 里寫多少字節(jié)數(shù)據(jù)。
  3. buffers:ByteBuffer數(shù)組類型,用來存儲要寫入 channel 里的數(shù)據(jù)。
  4. remaining:ByteBuffer數(shù)組所有的ByteBuffer 還剩多少字節(jié)沒有寫完。

介紹完字段后,我們來深度剖析下該類的幾個重要的方法。

(1)writeTo()

@Override
// 將字節(jié)流數(shù)據(jù)寫入到channel中
public long writeTo(GatheringByteChannel channel) throws IOException {
// 1.調(diào)用nio底層write方法把buffers寫入傳輸層返回寫入的字節(jié)數(shù)
long written = channel.write(buffers);
if (written < 0)
throw new EOFException("Wrote negative bytes to channel. This shouldn't happen.");
// 2.計算還剩多少字節(jié)沒有寫入傳輸層
remaining -= written;
// 每次發(fā)送 都檢查是否
pending = TransportLayers.hasPendingWrites(channel);
return written;
}

該方法主要用來把 buffers 數(shù)組寫入到 SocketChannel里,因為在網(wǎng)絡(luò)編程中,寫一次不一定可以完全把數(shù)據(jù)都寫成功,所以調(diào)用底層 channel.write(buffers) 方法會返回「已經(jīng)寫入成功多少字節(jié)」的返回值,這樣調(diào)用一次后就知道已經(jīng)寫入多少字節(jié)了。

(2)some other

@Override
public String destination() {
// 返回對應(yīng)的channel id
return destination;
}
@Override
public boolean completed() {
// 判斷是否完成 即沒有剩余&pending=false
return remaining <= 0 && !pending;
}
/**
* always returns false as there will be not be any
* pending writes since we directly write to socketChannel.
*/
@Override
public boolean hasPendingWrites() {
// 在PLAINTEXT下 pending 始終為 false
return false;
}
@Override
public long size() {
// 返回寫入字節(jié)的總和
return this.size;
}

3、NetworkSend 類

NetworkSend 類繼承了 ByteBufferSend 類,真正用來寫 Buffer。

public class NetworkSend extends ByteBufferSend {
// 實例化
public NetworkSend(String destination, ByteBuffer buffer) {
// 調(diào)用父類的方法初始化
super(destination, sizeBuffer(buffer.remaining()), buffer);
}
// 用來構(gòu)造4個字節(jié)的 sizeBuffer
private static ByteBuffer sizeBuffer(int size) {
// 先分配一個4個字節(jié)的ByteBuffer
ByteBuffer sizeBuffer = ByteBuffer.allocate(4);
// 寫入size長度值
sizeBuffer.putInt(size);
// 重置 position
sizeBuffer.rewind();
// 返回 sizeBuffer
return sizeBuffer;
}
}

該類相對簡單些,就是構(gòu)建一個發(fā)往 channel 對應(yīng)的節(jié)點 id 的消息數(shù)據(jù),它的實例化過程如下:

  1. 先分配一個4個字節(jié)的 ByteBuffer 的變量 sizeBuffer,再把要發(fā)送的數(shù)據(jù)長度賦值給 sizeBuffer。
  2. 此時 sizeBuffer 的響應(yīng)頭字節(jié)數(shù)和 sizeBuffer 的響應(yīng)數(shù)據(jù)就都有了。
  3. 然后調(diào)用父類 ByteBufferSend 的方法進(jìn)行初始化。

另外 ByteBuffer[] 為兩個 buffer,可以理解為一個消息頭 buffer 即 size,一個消息體 buffer。消息頭 buffer 的長度為4byte,存放的是消息體 buffer 的長度。而消息體 buffer 是上層傳入的業(yè)務(wù)數(shù)據(jù),所以 send 就是持有一個待發(fā)送的 ByteBuffer。

接下來我們來看看 KafkaChannel 是如何對上面幾個類進(jìn)行封裝的。

六、KafkaChannel 封裝過程

github 源碼地址如下:

https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java。

public class KafkaChannel implements AutoCloseable {
....
// 節(jié)點 id
private final String id;
// 傳輸層對象
private final TransportLayer transportLayer;
....
// 最大能接收請求的字節(jié)數(shù)
private final int maxReceiveSize;
// 內(nèi)存池,用來分配指定大小的 ByteBuffer
private final MemoryPool memoryPool;
// NetworkReceive 類的實例
private NetworkReceive receive;
// NetworkSend 類的實例
private Send send;
// 是否關(guān)閉連接
private boolean disconnected;
....
// 連接狀態(tài)
private ChannelState state;
// 需要連接的遠(yuǎn)端地址
private SocketAddress remoteAddress;
// 初始化
public KafkaChannel(String id, TransportLayer transportLayer, Supplier authenticatorCreator,int maxReceiveSize, MemoryPool memoryPool, ChannelMetadataRegistry metadataRegistry) {
this.id = id;
this.transportLayer = transportLayer;
this.authenticatorCreator = authenticatorCreator;
this.authenticator = authenticatorCreator.get();
this.networkThreadTimeNanos = 0L;
this.maxReceiveSize = maxReceiveSize;
this.memoryPool = memoryPool;
this.metadataRegistry = metadataRegistry;
this.disconnected = false;
this.muteState = ChannelMuteState.NOT_MUTED;
this.state = ChannelState.NOT_CONNECTED;
}
}

我們來看下這個類中的幾個重要字段:

  1. id:channel 對應(yīng)的節(jié)點 id。
  2. transportLayer:傳輸層對象。
  3. maxReceiveSize:最大能接收請求的字節(jié)數(shù)。
  4. memoryPool:內(nèi)存池,用來分配指定大小的 ByteBuffer。
  5. receive:NetworkReceive 類的實例。
  6. send:NetworkSend 類的實例。
  7. disconnected:是否關(guān)閉連接。
  8. state:KafkaChannel 的狀態(tài)。
  9. remoteAddress:需要連接的遠(yuǎn)端地址。

從屬性可以看出,有3個最重要的成員變量:TransportLayer、NetworkReceive、Send。KafkaChannel 通過 TransportLayer 進(jìn)行讀寫操作,NetworkReceive 用來讀取,Send 用來寫出。

為了封裝普通和加密的Channel「TransportLayer根據(jù)網(wǎng)絡(luò)協(xié)議的不同,提供不同的子類」而對于 KafkaChannel 提供統(tǒng)一的接口,「這是策略模式很好的應(yīng)用」。

  1. 每個 NetworkReceive 代表一個單獨(dú)的響應(yīng),KafkaChannel 讀取的數(shù)據(jù)會存儲到 NetworkReceive 中,當(dāng) NetworkReceive 讀滿,一個請求就完整讀取了。
  2. 每個 Send 代表一個單獨(dú)的請求,需要寫出時只需賦值此變量,之后調(diào)用 write() 方法將其中的數(shù)據(jù)寫出。

介紹完字段后,我們來深度剖析下其網(wǎng)絡(luò)讀寫操作是如何實現(xiàn)的?

1、setSend()

public void setSend(Send send) {
if (this.send != null)
throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress, connection id is " + id);
// 設(shè)置要發(fā)送消息的字段
this.send = send;
// 調(diào)用傳輸層增加寫事件
this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
}
// PlaintextTransportLayer 類方法
@Override
public void addInterestOps(int ops) {
//通過 key.interestOps() | ops 來添加事件
key.interestOps(key.interestOps() | ops);
}

該方法主要用來預(yù)發(fā)送,即在發(fā)送網(wǎng)絡(luò)請求前,將需要發(fā)送的ByteBuffer 數(shù)據(jù)保存到 KafkaChannel 的 send 中,然后調(diào)用傳輸層方法增加對這個 channel 上「OP_WRITE」事件的關(guān)注。當(dāng)真正執(zhí)行發(fā)送的時候,會從 send 中讀取數(shù)據(jù)。

2、write()

public long write() throws IOException {
// 判斷 send 是否為空,如果為空表示已經(jīng)發(fā)送完畢了
if (send == null)
return 0;
midWrite = true;
// 調(diào)用ByteBufferSend.writeTo把數(shù)據(jù)真正發(fā)送出去
return send.writeTo(transportLayer);
}

該方法主要用來把保存在 send 上的數(shù)據(jù)真正發(fā)送出去。

  1. 首先判斷要發(fā)送的 send 是否為空,如果為空則表示在 KafkaChannel 的 Buffer 的數(shù)據(jù)都發(fā)送完畢了。
  2. 如果不為空就調(diào)用ByteBufferSend.writeTo() 方法通過網(wǎng)絡(luò) I/O 操作將數(shù)據(jù)發(fā)送出去。

3、read()

public long read() throws IOException {
// 如果receive為空表示數(shù)據(jù)已經(jīng)讀完,需要重新實例化對象
if (receive == null) {

網(wǎng)頁標(biāo)題:圖解 Kafka 網(wǎng)絡(luò)層實現(xiàn)機(jī)制(一)
新聞來源:http://uogjgqi.cn/article/dhipood.html
掃二維碼與項目經(jīng)理溝通

我們在微信上24小時期待你的聲音

解答本文疑問/技術(shù)咨詢/運(yùn)營咨詢/技術(shù)建議/互聯(lián)網(wǎng)交流