日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
Linux下搭建KafkaStream架構(gòu)的實踐(linuxkafka)

隨著大數(shù)據(jù)的迅猛發(fā)展,對于時間序列數(shù)據(jù)的處理變得越來越重要。Apache Kafka stream作為流處理核心框架,有著非常好的支持性,在大數(shù)據(jù)領(lǐng)域得到了廣泛的應(yīng)用。本文將介紹如何搭建和配置Kafka Stream架構(gòu)在linux系統(tǒng)上運行程序,以及常見的使用方法。

### 1. 系統(tǒng)要求

Kafka Stream有一些關(guān)鍵的系統(tǒng)要求,如操作系統(tǒng)環(huán)境,使用的Java版本以及用到的Kafka Stream工具集等。搭建環(huán)境前,必須保證系統(tǒng)能夠支持和滿足Kafka Stream系統(tǒng)要求,才能通過后續(xù)配置步驟形成可運行程序。

### 2. 集群節(jié)點搭建

在安裝集群節(jié)點之前,需要考慮集群節(jié)點數(shù)配置,確定主節(jié)點和從節(jié)點,用于分開承擔(dān)不同的任務(wù),例如主節(jié)點負責(zé)訂閱消息,從節(jié)點負責(zé)處理數(shù)據(jù)。在準備環(huán)境之后,使用以下命令可完成Linux系統(tǒng)的Kafka Stream節(jié)點安裝:

# 設(shè)置Kafka_stream_ home路徑
export KAFKA_STREAMS_HOME=/usr/local/kafka_streams

# 下載安裝包

wget http://download.kafka.apache.org/streams/1.5.2/kafka-streams-1.5.2-bin.tar.gz

# 解壓安裝包

tar -zxvf kafka-streams-1.5.2-bin.tar.gz

# 復(fù)制解壓好的文件到Kafka home

mv kafka-streams-1.5.2/* $KAFKA_STREAMS_HOME

# 刪除壓縮文件

rm kafka-streams-1.5.2-bin.tar.gz

# 根據(jù)節(jié)點類型進行配置

# 主節(jié)點配置

# /usr/local/kafka_streams/conf/server.properties

streamConfig.broker= # 設(shè)置broker地址

# 從節(jié)點配置

# /usr/local/kafka_streams/conf/consumer.properties

bootstrap.servers= # 設(shè)置Zookeeper地址

group.id= # 設(shè)置groupid

完成集群節(jié)點的搭建之后,就可以開始利用Kafka Streams節(jié)點搭建Kafka Stream任務(wù)。

### 3. 編寫Stream任務(wù)

Kafka Stream的任務(wù)形式類似于MapReduce,它可以實現(xiàn)從處理和聚合單詞出現(xiàn)頻度及計數(shù)等高級功能。在編寫任務(wù)之前,首先需要創(chuàng)建Topic,使用以下命令:

# 創(chuàng)建主題
$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic test-topic

# 檢查主題

$KAFKA_HOME/bin/kafka-topics.sh --list --zookeeper localhost:2181

Kafka Stream的任務(wù)編寫就是一個實體類,可以使用Java和Scala等編程語言編寫類,內(nèi)部實現(xiàn)Streams API:

“`Java

public class StreamExample {

public static void main(String[] args) {

// 配置文件

final Properties props = new Properties();

// 設(shè)置應(yīng)用的ID

props.put(StreamsConfig.APPLICATION_ID_CONFIG, “stream-example-app”);

// 設(shè)置應(yīng)用的Broker

props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, “l(fā)ocalhost:9092”);

// 設(shè)置Client ID

props.put(StreamsConfig.CLIENT_ID_CONFIG, “stream-example-client”);

// 創(chuàng)建StreamsBuilder

final StreamsBuilder builder = new StreamsBuilder();

// 從topic獲取流

final KStream source = builder.stream(“test-topic”);

// 進行聚合

final KTable counts = stream.flatMap((key, value) ->

Arrays.asList(value.split(” “)).iterator())

.map((key, value) -> new KeyValue(value, value))

.countByKey(“counts”);

// 輸出到另一個topic

counts.toStream().to(“streams-wordcount-output”);

// 創(chuàng)建Topology

final Topology topology = builder.build();

// 寫入控制臺

System.out.println(topology.describe());

//初始化一個KafkaStream對象

final KafkaStreams streams = new KafkaStreams(topology, props);

//啟動程序

streams.start();

}

}


### 4. 實時流數(shù)據(jù)分析

任務(wù)編寫好之后,OK!Kafka Stream的搭建以及配置和使用就完成啦。

創(chuàng)新互聯(lián)服務(wù)器托管擁有成都T3+級標準機房資源,具備完善的安防設(shè)施、三線及BGP網(wǎng)絡(luò)接入帶寬達10T,機柜接入千兆交換機,能夠有效保證服務(wù)器托管業(yè)務(wù)安全、可靠、穩(wěn)定、高效運行;創(chuàng)新互聯(lián)專注于成都服務(wù)器托管租用十余年,得到成都等地區(qū)行業(yè)客戶的一致認可。


網(wǎng)頁名稱:Linux下搭建KafkaStream架構(gòu)的實踐(linuxkafka)
本文地址:http://www.5511xx.com/article/dpgoogh.html