掃二維碼與項目經理溝通
我們在微信上24小時期待你的聲音
解答本文疑問/技術咨詢/運營咨詢/技術建議/互聯網交流
作者:梁勇 2022-02-21 08:48:00
前端
云原生 Apache Pulsar越來越多的公司使用,與Apache Kafka、Apache RocketMQ并列成為消息領域三家馬車,有必要對其研究一番。下面以筆者曾在生產環(huán)境使用的配置梳理,希望對大家有所幫助!

專注于為中小企業(yè)提供成都網站設計、網站建設服務,電腦端+手機端+微信端的三站合一,更高效的管理,為中小企業(yè)三穗免費做網站提供優(yōu)質的服務。我們立足成都,凝聚了一批互聯網行業(yè)人才,有力地推動了數千家企業(yè)的穩(wěn)健成長,幫助中小企業(yè)通過網站建設實現規(guī)模擴充和轉變。
Pulsar安裝包包含了zookeeper、broker、bookie三個組件。
下載Pulsar二進制包
https://pulsar.apache.org/download/
解壓壓縮包
tar -zvxf apache-pulsar-2.9.1-bin.tar.gz
創(chuàng)建目錄
mkdir -p data/zookeeper
echo 1 > data/zookeeper/myid
修改zk配置,文件位于conf/zookeeper.conf
# 數據目錄
dataDir=data/zookeeper
# 日志目錄
dataLogDir=data/zookeeper/logs
# zk集群配置,server.1~n
server.1=127.0.0.1:2888:3888
bin/pulsar-daemon start zookeeper
doing start zookeeper ...
starting zookeeper, logging to /Users/admin/work/software_install/apache-pulsar-2.9.1/logs/pulsar-zookeeper-M-C02GL1NTQ05P.log
Note: Set immediateFlush to true in conf/log4j2.yaml will guarantee the logging event is flushing to disk immediately. The default behavior is switched off due to performance considerations.
通過pulsar-daemon管理pulsar組件
bin/pulsar-daemon help
Error: no enough arguments provided.
Usage: pulsar-daemon (start|stop|restart)
where command is one of:
broker Run a broker server
bookie Run a bookie server
zookeeper Run a zookeeper server
configuration-store Run a configuration-store server
websocket Run a websocket proxy server
functions-worker Run a functions worker server
standalone Run a standalone Pulsar service
proxy Run a Proxy Pulsar service
備注:可以通過pulsar-daemon命令對broker、bookie、zookeeper等組件啟動、關閉或者重啟。
zookeeper啟動日志和查看zookeeper進程
ps axu | grep zookeeper
bin/pulsar initialize-cluster-metadata \
--cluster pulsar-cluster-1 \
--zookeeper 127.0.0.1:2181 \
--configuration-store 127.0.0.1:2181 \
--web-service-url http://127.0.0.1:8080 \
--web-service-url-tls https://127.0.0.1:8443 \
--broker-service-url pulsar://127.0.0.1:6650 \
--broker-service-url-tls pulsar+ssl://127.0.0.1:6651
參數說明
|
參數 |
說明 |
|
cluster |
默認集群名稱 |
|
zookeeper |
本地集群使用的zk地址 |
|
configuration-store |
多個集群全局的zk集群地址,各個集群之間同步數據,單機群地址同上面參數zookeeper即可 |
|
web-service-url |
Broker的管理流地址,例如創(chuàng)建刪除主題等 |
|
web-service-url-tls |
Broker開啟TLS,管理流則使用該地址 |
|
broker-service-url |
Broker數據流地址,發(fā)送接受消息等 |
|
broker-service-url-tls |
Broker開啟TLS,數據流則使用該地址 |
備注:生產環(huán)境可以使用域名。
bin/pulsar zookeeper-shell
[zk: localhost:2181(CONNECTED) 1] ls /
[admin, bookies, ledgers, pulsar, stream, zookeeper]
bindAddress=127.0.0.1
advertisedAddress=127.0.0.1
zkServers=127.0.0.1:2181
|
參數 |
說明 |
|
bindAddress |
服務監(jiān)聽的地址,默認 0.0.0.0 |
|
advertisedAddress |
服務向外發(fā)布的主機名或者IP,默認為IntetAddress.getLocalHost().getHostName |
|
zkServers |
zk集群地址,可與broker共用 |
bin/pulsar-daemon start bookie
doing start bookie ...
starting bookie, logging to /Users/admin/work/software_install/apache-pulsar-2.9.1/logs/pulsar-bookie-M-C02GL1NTQ05P.log
Note: Set immediateFlush to true in conf/log4j2.yaml will guarantee the logging event is flushing to disk immediately. The default behavior is switched off due to performance considerations.
bin/bookkeeper shell simpletest --ensemble 1 --writeQuorum 1 --ackQuorum 1 -- numEntries 1000
...
2022-02-19T23:43:03,391+0800 [main] INFO org.apache.bookkeeper.tools.cli.commands.client.SimpleTestCommand - 722 entries written
2022-02-19T23:43:03,983+0800 [main] INFO org.apache.bookkeeper.tools.cli.commands.client.SimpleTestCommand - 1000 entries written to ledger 0
2022-02-19T23:43:04,041+0800 [main] INFO org.apache.bookkeeper.proto.PerChannelBookieClient - Closing the per channel bookie client for 127.0.0.1:3181
...
備注:通過simpletest命令向bookie集群寫入測試數據,完成測試后會自動刪除。
zookeeperServers=127.0.0.1:2181
configurationStoreServers=127.0.0.1:2181
bindAddress=127.0.0.1
# 默認InetAddress.getLocalHost().getHostName()
advertisedAddress=127.0.0.1
clusterName=pulsar-cluster-1
bin/pulsar-daemon start broker
doing start broker ...
starting broker, logging to /Users/admin/work/software_install/apache-pulsar-2.9.1/logs/pulsar-broker-M-C02GL1NTQ05P.log
Note: Set immediateFlush to true in conf/log4j2.yaml will guarantee the logging event is flushing to disk immediately. The default behavior is switched off due to performance considerations.
查看集群節(jié)點
bin/pulsar-admin brokers list cluster-1
"172.17.13.184:8080"
發(fā)送測試消息
bin/pulsar-client produce persistent://public/default/test -n 1 -m "Hello Pulsar"
...
2022-02-20T13:31:18,469+0800 [main] INFO org.apache.pulsar.client.cli.PulsarClientTool - 1 messages successfully produced
...
消費測試消息
bin/pulsar-client consume persistent://public/default/test -n 100 -s "consumer-test" -t "Exclusive"
...
----- got message -----
key:[null], properties:[], content:Hello Pulsar
...
小結:至此測試集群搭建完成,下文將介紹生產環(huán)境配置的調整項。
下面為生產環(huán)境搭建Pulsar集群,由3個zookeeper節(jié)點、3個broker節(jié)點和5個bookie節(jié)點構成。
組件配置
組件 配置
zookeeper 4C8G100G * 3
broker 16C64G500G * 3
bookie 16C64G500G * 5
備注:每個組件集群部署時可以同城跨可用區(qū)部署,提高高可用。broker不存儲消息100G即可,bookie存儲消息通常需要較大磁盤,比如3T,具體根據消息量計算。
|
配置項 |
內存大小或者比例,總大小 |
|
系統(tǒng)OS緩存 |
1~2G |
|
Jvm內存和堆外內存 |
1/2(除去系統(tǒng)緩存后剩余緩存的一半),其中Jvm heap占1/3,堆外內存Direct Memory占2/3 |
|
PageCache內存大小 |
1/2(除去系統(tǒng)緩存后剩余緩存的一半) |
以內存64G大小,在文件conf/pulsar_env.sh修改如下內容:
PULSAR_MEM=${PULSAR_MEM:-"-Xms10g -Xmx10g -XX:MaxDirectMemorySize=20g"}
以內存64G大小,在文件conf/bkenv.sh修改如下內容:
BOOKIE_MEM=${BOOKIE_MEM:-${PULSAR_MEM:-"-Xms10g -Xmx10g -XX:MaxDirectMemorySize=20g"}}
|
配置項 |
說明 |
|
zookeeperServers=x.x.x.x:2181,x.x.x.x:2181,x.x.x.x:2181 |
本地zookeeper集群地址 |
|
configurationStoreServers=x.x.x.x:2181,x.x.x.x:2181,x.x.x.x:2181 |
配置存儲Zookeeper集群地址 |
|
bindAddress=x.x.x.x |
服務監(jiān)聽的地址,可以為本機IP,默認為0.0.0.0 |
|
advertisedAddress=x.x.x.x |
服務向外發(fā)布的主機名或者IP,默認為IntetAddress.getLocalHost().getHostName |
|
clusterName=cluster-xxx |
集群名稱 |
|
brokerDeleteInactiveTopicsEnabled=false |
關閉自動刪除不活動的主題 |
|
defaultNumberOfNamespaceBundles=12 |
Bundle的數量應為broker數量的整數倍,默認為4 |
|
defaultRetentionSizeInMB=1T |
消費確認過的消息超過該??后會觸發(fā)刪除策略 |
|
defaultRetentionTimeInMinutes=1w |
消費確認過的消息超過指定時間后觸發(fā)刪除策略 |
|
backlogQuotaDefaultLimitGB=-1 |
保持默認,未被消費確認的消息?存儲?? 默認為-1表示沒有限制,可以通過set-message-ttl設置過期時間,防?磁盤爆滿 |
|
backlogQuotaDefaultRetentionPolicy=producer_request_hold |
保持默認,未被消費確認的消息超過存儲??的策略 |
|
managedLedgerDefaultEnsembleSize=3 |
創(chuàng)建Ledger時指定Ensemble的?? |
|
managedLedgerDefaultWriteQuorum=3 |
創(chuàng)建Ledger時指定Quorum的?? |
|
managedLedgerDefaultAckQuorum=2 |
創(chuàng)建Ledger時指定ack Quorum的?? |
|
dispatcherMaxReadBatchSize=500 |
?次從bookkeeper讀取的數量,默認為100條 |
|
loadBalancerAutoBundleSplitEnabled=false |
關閉auto bundle split功能,提?客戶端穩(wěn)定性 |
|
loadBalancerAutoUnloadSplitBundlesEnabled=false |
關閉auto bundle split功能,提?客戶端穩(wěn)定性 |
|
loadBalancerSheddingEnabled=false |
禁?Pulsar?動均衡 |
|
loadBalancerEnabled=false |
禁?Pulsar?動均衡 |
備注:參數根據實際情況調整,在線上開啟負載均衡時,發(fā)現有重復消息,此處先將其關閉。
配置項說明
|
配置項 |
說明 |
|
zkServers=x.x.x.x:2181,x.x.x.x:2181,x.x.x.x:2181 |
本地zookeeper集群地址 |
|
journalDirectory=/data/bookkeeper/journal01,/data/bookkeeper/journal02 |
BookKeeper存儲其預寫?志的?錄,多個?錄逗號進?分割,防?線程阻塞 |
|
ledgerDirectories=/data/bookkeeper/ledgers01,/data/bookkeeper/ledgers02 |
指定存儲BookKeeper輸出ledger的?錄。多個ledger?錄,需要使?逗號分割 |
備注:journalDirectory和ledgerDirectories在條件允許的情況可以配置到不同的磁盤。
本文轉載自微信公眾號「瓜農老梁」,可以通過以下二維碼關注。轉載本文請聯系瓜農老梁公眾號。

我們在微信上24小時期待你的聲音
解答本文疑問/技術咨詢/運營咨詢/技術建議/互聯網交流