日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
一文帶您快速入門Kafka

作者 | 蔡柱梁

創(chuàng)新互聯(lián)公司的客戶來自各行各業(yè),為了共同目標(biāo),我們?cè)诠ぷ魃厦芮信浜希瑥膭?chuàng)業(yè)型小企業(yè)到企事業(yè)單位,感謝他們對(duì)我們的要求,感謝他們從不同領(lǐng)域給我們帶來的挑戰(zhàn),讓我們激情的團(tuán)隊(duì)有機(jī)會(huì)用頭腦與智慧不斷的給客戶帶來驚喜。專業(yè)領(lǐng)域包括成都網(wǎng)站設(shè)計(jì)、成都網(wǎng)站建設(shè)、電商網(wǎng)站開發(fā)、微信營銷、系統(tǒng)平臺(tái)開發(fā)。

審校 | 重樓

目標(biāo)

  1. 了解 Kafka 的重要概念
  2. 搭建 Kafka 服務(wù)端
  3. 使用SpringBoot 實(shí)現(xiàn)簡單的 Demo

1 了解 Kafka 的重要概念

Kafka 是使用 Scala 語言開發(fā)的一個(gè)多分區(qū)、多副本且基于 ZooKeeper 協(xié)調(diào)的分布式消息系統(tǒng)。目前,它的定位是一個(gè)分布式流式處理平臺(tái)。

Kafka 在我們工作中最常扮演的三個(gè)角色:

  • 消息系統(tǒng)

Kafka 和傳統(tǒng)的消息中間件一樣具有系統(tǒng)解耦、冗余存儲(chǔ)、流量削峰、異步通信等功能。

  • 存儲(chǔ)系統(tǒng)

Kafka 會(huì)將消息持久化到磁盤,并且有多副本機(jī)制,有效降低了數(shù)據(jù)丟失的風(fēng)險(xiǎn)。有時(shí),我們也可以使用它來存儲(chǔ)數(shù)據(jù),只需要把對(duì)應(yīng)的數(shù)據(jù)保留策略設(shè)置成為“永久”即可。

  • 流式處理平臺(tái)

Kafka 不僅為很多流式處理框架(如:Storm、Spark、Flink 等)提供了可靠的數(shù)據(jù)來源,還提供了一個(gè)完整的流式處理類庫。

1.1 基本概念

上圖(圖出自于《深入理解Kafka核心設(shè)計(jì)與實(shí)踐原理》)體現(xiàn)了 Kafka 的整體架構(gòu),Producer 發(fā)送消息,Kafka 將元數(shù)據(jù)存儲(chǔ)在 ZK 中并交由ZK 管理,Consumer 通過拉模式獲取消息。

  • Producer

生產(chǎn)者,消息的投遞方,負(fù)責(zé)創(chuàng)建消息并投遞到 Kafka 中。

  • Broker

Kafka 服務(wù)實(shí)例

  • Consumer

消費(fèi)者,處理消息的一方

上面的概念都是物理層面上的,但是在實(shí)際使用過程中還有很多邏輯方面的定義,這些概念也是需要了解的。如果不了解,就算勉強(qiáng)寫出了代碼,但是自己還是一臉懵不知道自己都定義了什么,它們都有什么意義,估計(jì)離生產(chǎn)故障就不遠(yuǎn)了。

接下來我們?cè)偃チ私馊齻€(gè)重要的邏輯概念:

  • Topic(主題)

生產(chǎn)者創(chuàng)建消息是要發(fā)送給特定的主題的,而消費(fèi)者拉取消息也是要指定主題的。消息就是通過主題來歸類的。

  • Partition(分區(qū))

一個(gè)Topic 可以有多個(gè) Patition,而一個(gè) Partition 只屬于一個(gè) Topic。同一個(gè) Topic 下,不同 Partition 存儲(chǔ)的消息是不同的。

  • Offset(偏移量)

Kafka 的消息是可以持久化并反復(fù)消費(fèi)的,這是因?yàn)樵诿總€(gè)分區(qū)中,當(dāng)有消息寫入就會(huì)像追加日志那樣順序?qū)懭耄樞騃O的寫入性能是十分好的),通過Offset 來記錄對(duì)應(yīng)消息所在的位置。因此,Offset 是消息在 Partition 中的唯一標(biāo)識(shí),并且能看出同一個(gè) Partition 內(nèi)的消息的先后順序,我們稱之為 “Kafka 保證消息在分區(qū)內(nèi)是有序的”。

為了更好,更直觀體現(xiàn)上面三者的關(guān)系,我們先一起看下圖(圖出自于《深入理解Kafka核心設(shè)計(jì)與實(shí)踐原理》)

該圖展示了一個(gè)擁有4個(gè) Partition 的 Topic,而分區(qū)里面的阿拉伯?dāng)?shù)字就是 Offset(也表示著一條消息),虛線部分代表新消息可以插入的位置。每條消息在發(fā)送到 Broker 之前,會(huì)先計(jì)算當(dāng)前消息應(yīng)該發(fā)送到哪個(gè) Partition。因此,只要我們?cè)O(shè)置合理,消息可以均勻地分配在不同的 Partition 上,當(dāng)發(fā)現(xiàn)請(qǐng)求數(shù)量激增時(shí),我們也可以考慮通過適當(dāng)增加 Partition(Broker 也要增加)的方式,從而降低每個(gè) Broker 的 I/O 壓力。

另外,為了降低消息丟失的風(fēng)險(xiǎn),Kafka 為 Partition 引進(jìn)了多副本(Replica)機(jī)制,通過增加副本數(shù)量來提高容災(zāi)能力。副本之間采用的是“一主多從”的設(shè)計(jì),其中 Leader 負(fù)責(zé)讀寫請(qǐng)求,F(xiàn)ollower 則僅負(fù)責(zé)同步 Leader 的消息(這種設(shè)計(jì)方式,大家應(yīng)該要意識(shí)到會(huì)存在同步滯后的問題),并且副本處于不同的 Broker 中,當(dāng) Leader 出現(xiàn)故障(一般是因?yàn)槠渌诘?Broker 出現(xiàn)故障導(dǎo)致的)時(shí),就從 Follower 中重新選舉出新的 Leader 提供服務(wù)。當(dāng)選出新的 Leader 并恢復(fù)服務(wù)后,Consumer 可以通過之前自己保存的 Offset 來繼續(xù)拉取消息消費(fèi)。

結(jié)合到目前為止我們所知道的知識(shí)點(diǎn),一起看下 4 個(gè) Broker 的 Kafka 集群中,某一個(gè) Topic 有三個(gè) Partition,其副本因子為 3(副本因子為3就是每個(gè) Partition 有 3 個(gè)副本,一個(gè) Leader,兩個(gè) Follower)的架構(gòu)圖(圖出自于《深入理解Kafka核心設(shè)計(jì)與實(shí)踐原理》)。

1.2 Message 與 Partition

在 1.1 小節(jié)中,我們已經(jīng)知道一條消息只會(huì)存在一個(gè) Partition中(只管 Leader,不管 Follower),而 Offset 則是消息在 Partition 中的唯一標(biāo)識(shí)。而在本章節(jié),我們將一起更深入地了解消息與 Partition 的關(guān)系,還有副本間同步數(shù)據(jù)所衍生的一些概念。

上面有提到 Kafka 的多副本機(jī)制是 Leader 提供讀寫,而 Flower 是需要同步 Leader 的數(shù)據(jù)的,那么具體是怎樣的呢?請(qǐng)看下圖(單主題單分區(qū)3副本):

當(dāng)Producer 不斷往 Leader 寫入消息時(shí),F(xiàn)lower 會(huì)不斷去 Leader 拉取消息,但是每臺(tái)機(jī)器的性能會(huì)有出入,所以同步也有差異,正如上圖這般。對(duì)于 Consumer 而言,只有 HW 之前的消息是可見可拉取消費(fèi)的,這樣做有個(gè)好處就是當(dāng)發(fā)生故障轉(zhuǎn)移時(shí),Consumer 的 Offset 也不會(huì)發(fā)生數(shù)組越界的問題。這種做法是 Kafka 權(quán)衡利弊后給出的數(shù)據(jù)可靠性性能平衡的方案,即不采取同步復(fù)制(性能差,對(duì)于高并發(fā)場景是災(zāi)難般的設(shè)計(jì)),也不采取異步復(fù)制(完全異步,數(shù)據(jù)丟失問題突出)。

當(dāng)然,對(duì)于Producer 而言就是消息丟失了,有時(shí)我們需要確保消息百分百投遞,這樣不就有問題了嗎?不急,Kafka 可以在 Producer 的配置上配置 acks=-1 + min.insync.replicas=n(n 大于 1),這樣配置后,只有消息被寫入所有副本后,Kafka 服務(wù)端才會(huì)返回 ack 給 Producer。

下面來梳理下上面提及的幾個(gè)概念:

  • HW(Heigh Watermark)

它標(biāo)識(shí)了Consumer 可以拉取消息的最高水位,客戶端拉取的 Offset 必須小于 HW。

  • LEO(Log End Offset)

這個(gè)標(biāo)記位標(biāo)識(shí)下一條寫入的消息應(yīng)該存放的位置。

  • AR(Assigned Rplicas)

所有副本的統(tǒng)稱

  • ISR(In-Syns Rplicas)

與Leader 保持一定程度同步的 Flower 集合。這個(gè)一定程度指的是在可容忍滯后范圍內(nèi),這個(gè)可容忍范圍可以通過配置修改。

  • OSR(Out-of-Sync Rplicas)

同步滯后超過了容忍范圍的Flower 集合。

2 搭建 Kafka 服務(wù)端

這里僅以單節(jié)點(diǎn)為例,不配置集群。

2.1 安裝 ZooKeeper

在第一章節(jié),我們知道 Kafka 會(huì)將元數(shù)據(jù)交由 ZK 管理,所以我們要先安裝好 ZK。

1.首先檢查自己的Linux 是否安裝好了 yum 工具

rpm -qa|grep yum

使用 yum 安裝好 wget

2.下載 ZK

wget https://archive.apache.org/dist/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz

3.解壓

tar -zxvf zookeeper-3.4.6.tar.gz

4.為ZK 創(chuàng)建存放數(shù)據(jù)和日志的文件夾

mkdir data 
mkdir logs

5.修改ZK 配置文件

cd conf
cp zoo_sample.cfg zoo.cfg

vi zoo.cfg

修改配置內(nèi)容具體如下:

# ZooKeeper 服務(wù)器心跳時(shí)間,單位:毫秒
tickTime=2000
# 投票選舉新 Leader 的初始化時(shí)間
initLimit=10
# Leader 與 Flower 心跳檢測最大容忍時(shí)間,響應(yīng)超過 syncLimit*tickTime,就剔除 Flower
syncLimit=5
# 存放數(shù)據(jù)的文件夾
dataDir=/root/zookeeper-3.4.6/data
# 存放日志的文件夾
dataLogDir=/root/zookeeper-3.4.6/logs
# ZooKeeper提供給接入客戶端的連接端口
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature

接著,到 /root/zookeeper-3.4.6/data 創(chuàng)建文件 myid(如果部署的是集群,那么這個(gè) myid 必需唯一,不能重復(fù))。

cat > myid
vi myid

具體如下:

6.配置環(huán)境變量

vi /etc/profile
 export ZOOKEEPER_HOME=/root/zookeeper-3.4.6
export PATH=$PATH:$ZOOKEEPER_HOME/bin

再執(zhí)行 source /etc/profile

至此,ZooKeeper 已經(jīng)配置好了,我們可以啟動(dòng)看下是否有問題。

2.2 安裝 Kafka

1.到官網(wǎng)下載安裝包

2.使用 psftp 上傳到服務(wù)器

# put dir remoteDir
put D:\downloads\kafka_2.13-3.5.0.tgz /root/kafka_2.13-3.5.0.tgz

3.解壓

tar -zxvf kafka_2.13-3.5.0.tgz

4.修改配置

cd kafka_2.13-3.5.0cd config/

 由于 server.properties 比較大,就不全部貼上來了,只貼我修改的部分:

# 是Broker的標(biāo)識(shí),因此在集群中必需唯一
broker.id=0
# Broker 對(duì)外服務(wù)地址(我這里vmware的ip是192.168.226.140)
listeners=PLAINTEXT://192.168.226.140:9092
# 實(shí)際工作中,會(huì)分內(nèi)網(wǎng)外網(wǎng),當(dāng)有需要提供給外部客戶端使用時(shí),我們一般 listeners 配置內(nèi)網(wǎng)供 Broker 之間通信使用,而 advertised.listeners 配置走外網(wǎng)給接入的客戶端使用
#advertised.listeners=PLAINTEXT://your.host.name:9092
# 存放消息日志文件地址
log.dirs=/root/kafka_2.13-3.5.0/logs
# ZK 的訪問路徑,我這里因?yàn)?ZK 和 Kafka 放在了同一個(gè)服務(wù)器上,所以就使用了 localhost
zookeeper.connect=localhost:2181

5.修改環(huán)境變量

vi /etc/profile
 export KAFKA_HOME=/root/kafka_2.13-3.5.0
export PATH=$PATH:$KAFKA_HOME/bin

再執(zhí)行 source /etc/profile

6.進(jìn)入bin目錄,啟動(dòng) Broker

kafka-server-start.sh ../config/server.properties &

ps -ef|grep kafka 看下進(jìn)程,但是是否已經(jīng)可以使用,要通過發(fā)送消息和消費(fèi)消息來驗(yàn)證。

3 使用 Spring Boot 實(shí)現(xiàn)簡單的 Demo

下面是示例代碼:

pom.xml



    4.0.0
    
        org.springframework.boot
        spring-boot-starter-parent
        
        2.7.12
         
    
    com.example.czl
    kafka
    0.0.1-SNAPSHOT
    springboot-kafka
    spring boot集成kafka demo
    
        1.8
        3.5.3.1
        2.3
        1.18.26
        31.1-jre
    
    
        
            org.springframework.boot
            spring-boot-starter-web
        
        
            org.springframework.kafka
            spring-kafka
        

        
            com.mysql
            mysql-connector-j
            runtime
        
        
            org.springframework.boot
            spring-boot-starter-test
            test
        
        
            org.springframework.kafka
            spring-kafka-test
            test
            
                
                    scala-library
                    org.scala-lang
                
                
                    scala-reflect
                    org.scala-lang
                
            
        

        
        
            com.baomidou
            mybatis-plus-boot-starter
            ${mybatis-plus.version}
        
        
            com.baomidou
            mybatis-plus-generator
            ${mybatis-plus.version}
        
        
            org.apache.velocity
            velocity-engine-core
            ${velocity-engine-core.version}
        

        
            org.projectlombok
            lombok
            ${lombok.version}
        

        
            com.google.guava
            guava
            ${guava.version}
        
    

    
    

    
        
            
                org.springframework.boot
                spring-boot-maven-plugin
            
        
    

application.yml

spring:
  application:
    name: spring-boot-kafka
  profiles:
    active: dev

server:
  port: 8080

application-dev.yml

spring:
  datasource:
    url: "jdbc:mysql://***:***/***?useSSL=false&useUnicode=true&characterEncoding=utf8&ApplicationName=spring-boot-demo&serverTimezone=UTC&allowMultiQueries=true"
    username: "***"
    password: "***"
  kafka:
    bootstrap-servers: "192.168.226.140:9092" # 訪問Kafka服務(wù)端的地址
    consumer:
      group-id: ${spring.application.name}-${spring.profiles.active} # 一條消息只會(huì)被訂閱了該主題的同一個(gè)分組內(nèi)的一個(gè)消費(fèi)者消費(fèi)

mybatis-plus:
  configuration:
    # 打印sql
    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl

logback.xml



    
    

    
    
        
            %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5level %logger{36}\(%L\) - [%X{traceId}] %msg%n
        
        
        
            ${LOG_PATH_HOME}/log.%d{yyyy-MM-dd}.%i.log
            200MB
        
    

    
    
        
            %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5level %logger{36}\(%L\) - [%X{traceId}] %msg%n
        
    

    
    
    
    
        
        
    

ProducerDemo

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

/**
 * @author CaiZhuliang
 * @date 2023/6/18
 */
@Slf4j
@Component
@RequiredArgsConstructor
public class ProducerDemo {
    private final KafkaTemplate kafkaTemplate;

    /**
     * 發(fā)送消息
     * @param topic 主題
     * @param msg 消息
     * @param callback 鉤子
     */
    public void send(String topic, String msg, ListenableFutureCallback> callback) {
        log.info("發(fā)送Kafka消息 - topic : {}, msg : {}", topic, msg);
        ListenableFuture> future = kafkaTemplate.send(topic, msg);
        if (null != callback) {
            future.addCallback(callback);
        }
    }
}

ConsumerDemo

package com.example.czl.kafka.kafka.producer.consumer;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

/**
 * @author CaiZhuliang
 * @date 2023/6/18
 */
@Slf4j
@Component
@RequiredArgsConstructor
public class ConsumerDemo {
    @KafkaListener(topics = "test-topic-1")
    public void receivingMsg(String msg) {
        log.info("接收到Kafka消息 - msg : {}", msg);
    }
}

TestController

package com.example.czl.kafka.controller;

import com.example.czl.kafka.kafka.producer.ProducerDemo;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author CaiZhuliang
 * @date 2023/6/18
 */
@Slf4j
@RestController
@RequiredArgsConstructor
@RequestMapping("/test")
public class TestController {
    private final ProducerDemo producerDemo;

    @GetMapping("/send/kafka_msg")
    public Long sendMsg(String msg) {
        log.info("測試發(fā)送kafka消息 - msg : {}", msg);
        producerDemo.send("test-topic-1", msg, null);
        return System.currentTimeMillis();
    }
}

postman請(qǐng)求測試如下:

控制臺(tái)信息如下:

作者介紹

蔡柱梁,社區(qū)編輯,從事Java后端開發(fā)8年,做過傳統(tǒng)項(xiàng)目廣電BOSS系統(tǒng),后投身互聯(lián)網(wǎng)電商,負(fù)責(zé)過訂單,TMS,中間件等。


網(wǎng)站標(biāo)題:一文帶您快速入門Kafka
新聞來源:http://www.5511xx.com/article/cddpogo.html