新聞中心
Pulsar3.0 介紹
Pulsar3.0 是 Pulsar 社區(qū)推出的第一個(gè) LTS 長(zhǎng)期支持版本。

成都創(chuàng)新互聯(lián)是一家集網(wǎng)站建設(shè),新化企業(yè)網(wǎng)站建設(shè),新化品牌網(wǎng)站建設(shè),網(wǎng)站定制,新化網(wǎng)站建設(shè)報(bào)價(jià),網(wǎng)絡(luò)營(yíng)銷(xiāo),網(wǎng)絡(luò)優(yōu)化,新化網(wǎng)站推廣為一體的創(chuàng)新建站企業(yè),幫助傳統(tǒng)企業(yè)提升企業(yè)形象加強(qiáng)企業(yè)競(jìng)爭(zhēng)力??沙浞譂M足這一群體相比中小企業(yè)更為豐富、高端、多元的互聯(lián)網(wǎng)需求。同時(shí)我們時(shí)刻保持專(zhuān)業(yè)、時(shí)尚、前沿,時(shí)刻以成就客戶成長(zhǎng)自我,堅(jiān)持不斷學(xué)習(xí)、思考、沉淀、凈化自己,讓我們?yōu)楦嗟钠髽I(yè)打造出實(shí)用型網(wǎng)站。
圖片
如圖所示,LTS 版本會(huì)最長(zhǎng)支持到 36 個(gè)月,而 Feature 版本最多只有六個(gè)月;類(lèi)似于我們使用的 JDK11,17,21 都是可以長(zhǎng)期使用的;所以也推薦大家都升級(jí)到 LTS 版本。
作為首個(gè) LTS 版本,3.0 自然也是自帶了許多新特性,這個(gè)會(huì)在后續(xù)介紹。
升級(jí)指南
先來(lái)看看升級(jí)指南:
圖片
在官方的兼容表中會(huì)發(fā)現(xiàn):不推薦跨版本升級(jí)。
也就是說(shuō)如果你現(xiàn)在還在使用的是 2.10.x,那么推薦是先升級(jí)到 2.11.x 然后再升級(jí)到 3.0.x.
而且根據(jù)我們的使用經(jīng)驗(yàn)來(lái)看,首個(gè)版本是不保險(xiǎn)的,即便是 LTS 版本;所以不推薦直接升級(jí)到 3.0.0,而是更推薦 3.0.1+,這個(gè)小版本會(huì)修復(fù) 3.0 所帶來(lái)的一些 bug。
先講一下我們的升級(jí)流程,大家可以用做參考。
升級(jí)前準(zhǔn)備
根據(jù)我們的使用場(chǎng)景,為了以防萬(wàn)一,首先需要將我們的插件依賴(lài)升級(jí)到對(duì)應(yīng)的版本。
圖片
其實(shí)簡(jiǎn)單來(lái)說(shuō)就是更新下依賴(lài),然后再重新打包,在后續(xù)的流程進(jìn)行測(cè)試。
預(yù)熱鏡像
之后是預(yù)熱鏡像,我們使用 harbor 搭建了自己的 docker 鏡像倉(cāng)庫(kù),這樣在升級(jí)重啟鏡像的時(shí)候可以更快的從內(nèi)網(wǎng)拉取鏡像。
畢竟一個(gè) pulsar-all 的鏡像也不小,盡量的縮短啟動(dòng)時(shí)間。
預(yù)熱的過(guò)程也很簡(jiǎn)單:
docker pull apachepulsar/pulsar-all:3.0.1
docker tag apachepulsar/pulsar-all:3.0.1 harbor-private.xx.com/pulsar/pulsar-all:3.0.1
docker image push harbor-private.xx.com/pulsar/pulsar-all:3.0.1之后升級(jí)的時(shí)候就可以使用私服的鏡像了。
功能測(cè)試
我這邊有寫(xiě)了一個(gè) cli 可以幫我快速創(chuàng)建或升級(jí)一個(gè)集群,然后觸發(fā)我所編寫(xiě)的功能測(cè)試。
./pulsar-upgrade-cli upgrade pulsar-test ./charts/pulsar --version x.x.x -f charts/pulsar/values.yaml -n pulsar-test這個(gè) cli 很簡(jiǎn)單,一共就做三件事:
- 使用 helm 接口升級(jí)集群
- 等待所有的 Pod 都升級(jí)成功
- 觸發(fā)功能測(cè)試
之后的效果如下:
圖片
主要就是覆蓋了我們的使用場(chǎng)景,都跑通過(guò)之后才會(huì)走后續(xù)的流程。
運(yùn)行監(jiān)控
圖片
之后會(huì)啟動(dòng)一個(gè) 200 左右的并發(fā)生產(chǎn)和消費(fèi)數(shù)據(jù),模擬線上的使用情況,會(huì)一直讓這個(gè)任務(wù)跑著,大概一晚上就可以了,第二天通過(guò)監(jiān)控查看:
- 應(yīng)用有無(wú)異常日志
- 流量是否正常
- 各個(gè)組件的內(nèi)存占用
- 寫(xiě)入延遲等信息
升級(jí)步驟
組件的升級(jí)步驟這里參考了官方指南:https://pulsar.apache.org/docs/3.1.x/administration-upgrade/#upgrade-zookeeper-optional
圖片
- 升級(jí)ZK
- 關(guān)閉auto recovery
- 升級(jí)Bookkeeper
- 升級(jí)Broker
- 升級(jí)Proxy
- 開(kāi)啟auto recovery
只要一步步按照這個(gè)流程走,問(wèn)題不大,哪一步出現(xiàn)問(wèn)題后需要及時(shí)回滾,回滾流程參考下面的回滾部分。
同時(shí)在升級(jí)過(guò)程中需要一直查看 broker 的 error 日志,如果有明顯的不符合預(yù)期的日志一定要注意。
在升級(jí) bookkeeper 的時(shí)候,broker 可能會(huì)出現(xiàn) bk 連接失敗的異常,這個(gè)可以不用在意。
線上驗(yàn)證
都升級(jí)完后就是線上業(yè)務(wù)驗(yàn)證環(huán)節(jié)了:
- [x] 查看監(jiān)控面板,是否有明顯的流量、內(nèi)存、延遲的異常指標(biāo)。 2023-12-24
- [x] topic 元數(shù)據(jù)完整性驗(yàn)證:這個(gè)是因?yàn)槲覀冞@次升級(jí)出了一個(gè) topic 被刪除的 bug,所以需要重點(diǎn)驗(yàn)證下;這部分會(huì)在下次詳細(xì)分析。 2023-12-24
- [x] 查看業(yè)務(wù)消息收發(fā)有無(wú)異常 2023-12-24
- [x] 鏈路查詢是否正常,我們有一個(gè)消息鏈路查詢的頁(yè)面,主要是使用 Pulsar-SQL 和 broker-interceptor 實(shí)現(xiàn)的。 2023-12-24
異?;貪L
當(dāng)出現(xiàn)異常的時(shí)候需要立即回滾,這里的異常一般就是消息收發(fā)異常,客戶端掉線等。
經(jīng)過(guò)我的測(cè)試 3.0.x 的存儲(chǔ)和之前的版本是兼容的,所以 bookkeeper 都能降級(jí)其他的組件就沒(méi)啥可擔(dān)心的了。
需要降級(jí)時(shí)直接將所有組件降級(jí)為上一個(gè)版本即可。
災(zāi)難恢復(fù)
因?yàn)槭菑?2.x 升級(jí)到 3.x 也是涉及到了跨大版本,所以也準(zhǔn)備了災(zāi)難恢復(fù)的方案。
比如極端情況下升級(jí)失敗,所有數(shù)據(jù)丟失的情況。
整個(gè)災(zāi)難恢復(fù)的主要目的就是恢復(fù)后的集群對(duì)外提供的域名不發(fā)生變化,同時(shí)所有的客戶端可以自動(dòng)重連上來(lái),也就是最壞的情況下所有的數(shù)據(jù)丟了可以接受,但不能影響業(yè)務(wù)正常使用。
所以我們的流程如下:
備份 topic
@SneakyThrows
@Test
void backup(){
List topicList = pulsarAdmin.topics().getPartitionedTopicList("tenant/namespace");
log.info("topic size={}",topicList.size());
// create a custom thread pool
CopyOnWriteArrayList dataList = new CopyOnWriteArrayList<>();
ExecutorService customThreadPool = Executors.newFixedThreadPool(10);
for (String topicName : topicList) {
customThreadPool.execute(()-> {
PartitionedTopicMetadata metadata;
try {
metadata = pulsarAdmin.topics().getPartitionedTopicMetadata(topicName);
TopicMeta topicMeta = new TopicMeta();
// backup topic
topicMeta.setName(topicName);
topicMeta.setPartition(metadata.partitions);
// backup permission
Map> permissions = pulsarAdmin.topics().getPermissions(topicName);
topicMeta.setPermissions(permissions);
// back sub
List subscriptions = new ArrayList<>();
PartitionedTopicStats topicStats = pulsarAdmin.topics().getPartitionedStats(topicName, true);
topicStats.getSubscriptions().forEach((k,v)-> subscriptions.add(k));
topicMeta.setSubscriptions(subscriptions);
dataList.add(topicMeta);
} catch (PulsarAdminException e) {
throw new RuntimeException(e);
} }); }
customThreadPool.shutdown();
while (!customThreadPool.isTerminated()) {
}
log.info("{}",dataList.size());
log.info("{}",JSONUtil.toJsonStr(dataList));
}
// TopicMetaData
@Data
public class TopicMeta {
private String name;
private int partition;
Map> permissions;
List subscriptions = new ArrayList<>();
} 第一步是備份 topic:
- topic 主要是名稱(chēng)和分區(qū)數(shù)量
- 備份權(quán)限
- 備份 topic 的訂閱者
公私鑰備份
因?yàn)槲覀兛蛻舳耸褂昧?JWT 驗(yàn)證,所有為了使得恢復(fù)的 Pulsar 集群可以讓客戶端無(wú)縫切換到新集群,因此必須得使用相同的公私鑰。
這個(gè)其實(shí)比較簡(jiǎn)單,我們使用的是 helm 安裝的集群,所以只需要備份好 Secret 即可。
apiVersion: v1
data:
PRIVATEKEY: XXX
PUBLICKEY: XXX
kind: Secret
metadata:
name: pulsar-token-asymmetric-key
namespace: pulsar
type: Opaque
# 還有幾個(gè) superUser 的 Secret
數(shù)據(jù)恢復(fù)
創(chuàng)建新集群
首先使用 helm 重新創(chuàng)建一個(gè)新集群:
./scripts/pulsar/prepare_helm_release.sh -n pulsar -k pulsar
helm install \ --values charts/pulsar/values.yaml \ --set namespace=pulsar\
--set initialize=true \
pulsar ./charts/pulsar -n pulsar
恢復(fù)公私鑰
直接使用剛才備份的公私鑰覆蓋到新集群即可。
恢復(fù)namespace
進(jìn)入 toolset pod 創(chuàng)建需要使用的 tenant/namespace
k exec -it pulsar-toolset-0 -n pulsar bash
bin/pulsar-admin tenants create tenant
bin/pulsar-admin namespaces create tenant/namespace
元數(shù)據(jù)恢復(fù)
之后便是最重要的元數(shù)據(jù)恢復(fù)了:
@SneakyThrows
@Test
void restore() {
PulsarAdmin pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl("http://url:8080")
.authentication(AuthenticationFactory.token(token))
.build();
Path filePath = Path.of("restore-ns.json");
String fileContent = Files.readString(filePath);
List topicMetaList = JSON.parseArray(fileContent, TopicMeta.class);
ExecutorService customThreadPool = Executors.newFixedThreadPool(50);
for (TopicMeta topicMeta : topicMetaList) {
customThreadPool.execute(() -> {
// Create topic
try {
pulsarAdmin.topics().createPartitionedTopic(topicMeta.getName(), topicMeta.getPartition());
} catch (PulsarAdminException e) {
log.error("Create topic error");
}
// Create sub
for (String subscription : topicMeta.getSubscriptions()) {
try {
pulsarAdmin.topics().createSubscription(topicMeta.getName(), subscription, MessageId.latest);
} catch (PulsarAdminException e) {
log.error("createSubscription error");
} }
// Grant permission
topicMeta.getPermissions().forEach((role, authActions) -> {
permission(pulsarAdmin, topicMeta.getName(), role, authActions);
});
log.info("topic:{} restore success", topicMeta.getName());
}); }
customThreadPool.shutdown();
while (!customThreadPool.isTerminated()) {
} log.info("restore success");
}
private synchronized void permission(PulsarAdmin pulsarAdmin, String topic, String role, Set authActions) {
try {
pulsarAdmin.topics().grantPermission(topic, role, authActions);
} catch (PulsarAdminException e) {
log.error("grantPermission error", e);
}
} 流程和備份類(lèi)似:
- 創(chuàng)建分區(qū) topic
- 創(chuàng)建訂閱者
- 授權(quán)角色信息
因?yàn)槭跈?quán)接口限制了并發(fā)調(diào)用,所有需要加鎖,導(dǎo)致整個(gè)恢復(fù)的流程就會(huì)比較慢。
8000 topic 的 namespace 大概恢復(fù)時(shí)間為 40min 左右。
之后依次恢復(fù)其他 namespace 即可。
恢復(fù) police
admin.namespaces().setNamespaceMessageTTL("tenant/namespace", 3600 * 6);
admin.namespaces().setBacklogQuota("tenant/namespace", BacklogQuota)如果之前的集群有設(shè)置 TTL 或者是 backlogQuota 時(shí)都需要手動(dòng)恢復(fù)。
總結(jié)
以上就是整個(gè)升級(jí)和災(zāi)難恢復(fù)的流程,當(dāng)然災(zāi)難恢復(fù)希望大家不要碰到。
我會(huì)在下一篇詳細(xì)介紹 Pulsar 3.0 的新功能以及所碰到的一些坑。
分享名稱(chēng):Pulsar3.0升級(jí)指北,你學(xué)會(huì)些什么?
標(biāo)題網(wǎng)址:http://www.5511xx.com/article/dpsecso.html


咨詢
建站咨詢
