日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關咨詢
選擇下列產(chǎn)品馬上在線溝通
服務時間:8:30-17:00
你可能遇到了下面的問題
關閉右側工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
SpringBoot整合Kafka實現(xiàn)數(shù)據(jù)高吞吐

SpringBoot 整合 Kafka 實現(xiàn)數(shù)據(jù)高吞吐

作者:鴨血粉絲Tang 2022-04-28 07:31:41
云計算
Kafka 本文主要以SpringBoot技術框架為背景,結合實際業(yè)務需求,采用 kafka 進行數(shù)據(jù)消費,實現(xiàn)數(shù)據(jù)量的高吞吐,在下篇文章中,我們會介紹消費失敗的處理流程。

為成都等地區(qū)用戶提供了全套網(wǎng)頁設計制作服務,及成都網(wǎng)站建設行業(yè)解決方案。主營業(yè)務為成都網(wǎng)站建設、成都做網(wǎng)站、成都網(wǎng)站設計,以傳統(tǒng)方式定制建設網(wǎng)站,并提供域名空間備案等一條龍服務,秉承以專業(yè)、用心的態(tài)度為用戶提供真誠的服務。我們深信只要達到每一位用戶的要求,就會得到認可,從而選擇與我們長期合作。這樣,我們也可以走得更遠!

一、介紹

在上篇文章中,我們詳細的介紹了 kafka 的架構模型,在集群環(huán)境中,kafka 可以通過設置分區(qū)數(shù)來加快數(shù)據(jù)的消費速度。

光知道理論還不行,我們得真真切切的實踐起來才行!

下面,我將結合生產(chǎn)環(huán)境的真實案例,以SpringBoot技術框架為基礎,向大家介紹 kafka 的使用以及如何實現(xiàn)數(shù)據(jù)高吞吐!

二、程序實踐

最近,公司大數(shù)據(jù)團隊每天凌晨會將客戶的訂單數(shù)據(jù)進行統(tǒng)計計算,然后把業(yè)績數(shù)據(jù)推送給我們,以便銷售人員每天能看到昨天的業(yè)績數(shù)據(jù),數(shù)據(jù)的體量大約在 1000 多萬條,以下是我對接的過程!

2.1、添加 kafka 依賴包

本次項目的SpringBoot版本為2.1.5.RELEASE,依賴的 kafka 的版本為2.2.6.RELEASE。

https://back-media.51cto.com/editor?id=707646/h6e90be6-7EV6kJbV

2.2、添加 kafka 配置變量

當添加完了依賴包之后,我們只需要在application.properties中添加 kafka 配置變量,基本上就可以正常使用了。

# 指定kafka server的地址,集群配多個,中間,逗號隔開
spring.kafka.bootstrap-servers=197.168.25.196:9092
#重試次數(shù)
spring.kafka.producer.retries=3
#批量發(fā)送的消息數(shù)量
spring.kafka.producer.batch-size=1000
#32MB的批處理緩沖區(qū)
spring.kafka.producer.buffer-memory=33554432
#默認消費者組
spring.kafka.consumer.group-id=crm-microservice-newperformance
#最早未被消費的offset
spring.kafka.consumer.auto-offset-reset=earliest
#批量一次最大拉取數(shù)據(jù)量
spring.kafka.consumer.max-poll-records=4000
#是否自動提交
spring.kafka.consumer.enable-auto-commit=true
#自動提交時間間隔,單位ms
spring.kafka.consumer.auto-commit-interval=1000

2.3、創(chuàng)建一個消費者

@Component
public class BigDataTopicListener {

private static final Logger log = LoggerFactory.getLogger(BigDataTopicListener.class);

/**
* 監(jiān)聽kafka數(shù)據(jù)
* @param consumerRecords
* @param ack
*/
@KafkaListener(topics = {"big_data_topic"})
public void consumer(ConsumerRecord consumerRecord) {
log.info("收到bigData推送的數(shù)據(jù)'{}'", consumerRecord.toString());
//...
//db.save(consumerRecord);//插入或者更新數(shù)據(jù)
}

}

2.4、模擬對方推送數(shù)據(jù)測試

@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkaProducerTest {

@Autowired
private KafkaTemplate kafkaTemplate;

@Test
public void testSend(){
for (int i = 0; i < 5000; i++) {
Map map = new LinkedHashMap<>();
map.put("datekey", 20210610);
map.put("userid", i);
map.put("salaryAmount", i);
//向kafka的big_data_topic主題推送數(shù)據(jù)
kafkaTemplate.send("big_data_topic", JSONObject.toJSONString(map));
}
}
}

起初,通過這種單條數(shù)據(jù)消費方式,進行測試程序沒太大毛病!

但是,當上到生產(chǎn)之后,發(fā)現(xiàn)一個很大的問題,就是消費1000萬條數(shù)據(jù),至少需要3個小時,結果導致數(shù)據(jù)看板一直沒數(shù)據(jù)。

第二天痛定思痛,決定改成批量消費模型,怎么操作呢,請看下面!

2.5、將 kafka 的消費模式改成批量消費

首先,創(chuàng)建一個KafkaConfiguration配置類,內(nèi)容如下!

@Configuration
public class KafkaConfiguration {

@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;

@Value("${spring.kafka.producer.retries}")
private Integer retries;

@Value("${spring.kafka.producer.batch-size}")
private Integer batchSize;

@Value("${spring.kafka.producer.buffer-memory}")
private Integer bufferMemory;

@Value("${spring.kafka.consumer.group-id}")
private String groupId;

@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;

@Value("${spring.kafka.consumer.max-poll-records}")
private Integer maxPollRecords;

@Value("${spring.kafka.consumer.batch.concurrency}")
private Integer batchConcurrency;

@Value("${spring.kafka.consumer.enable-auto-commit}")
private Boolean autoCommit;

@Value("${spring.kafka.consumer.auto-commit-interval}")
private Integer autoCommitInterval;


/**
* 生產(chǎn)者配置信息
*/
@Bean
public Map producerConfigs() {
Map props = new HashMap<>();
props.put(ProducerConfig.ACKS_CONFIG, "0");
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.RETRIES_CONFIG, retries);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}

/**
* 生產(chǎn)者工廠
*/
@Bean
public ProducerFactory producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}

/**
* 生產(chǎn)者模板
*/
@Bean
public KafkaTemplate kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}


/**
* 消費者配置信息
*/
@Bean
public Map consumerConfigs() {
Map props = new HashMap<>();
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}

/**
* 消費者批量工廠
*/
@Bean
public KafkaListenerContainerFactory batchFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
//設置并發(fā)量,小于或等于Topic的分區(qū)數(shù)
factory.setConcurrency(batchConcurrency);
factory.getContainerProperties().setPollTimeout(1500);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
//設置為批量消費,每個批次數(shù)量在Kafka配置參數(shù)中設置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
factory.setBatchListener(true);
return factory;
}

}

同時,新增一個spring.kafka.consumer.batch.concurrency變量,用來設置并發(fā)數(shù),通過這個參數(shù)我們可以指定幾個線程來實現(xiàn)消費。

在application.properties配置文件中,添加如下變量:

#批消費并發(fā)量,小于或等于Topic的分區(qū)數(shù)
spring.kafka.consumer.batch.concurrency = 3

#設置每次批量拉取的最大數(shù)量為4000
spring.kafka.consumer.max-poll-records=4000

#設置自動提交改成false
spring.kafka.consumer.enable-auto-commit=false

最后,將單個消費方法改成批量消費方法模式。

@Component
public class BigDataTopicListener {

private static final Logger log = LoggerFactory.getLogger(BigDataTopicListener.class);

/**
* 監(jiān)聽kafka數(shù)據(jù)(批量消費)
* @param consumerRecords
* @param ack
*/
@KafkaListener(topics = {"big_data_topic"}, containerFactory = "batchFactory")
public void batchConsumer(List> consumerRecords, Acknowledgment ack) {
long start = System.currentTimeMillis();

//...
//db.batchSave(consumerRecords);//批量插入或者批量更新數(shù)據(jù)

//手動提交
ack.acknowledge();
log.info("收到bigData推送的數(shù)據(jù),拉取數(shù)據(jù)量:{},消費時間:{}ms", consumerRecords.size(), (System.currentTimeMillis() - start));
}

}

此時,消費性能大大的提升,數(shù)據(jù)處理的非常快,500萬條數(shù)據(jù),最多 30 分鐘就全部消費完畢了。

本例中的消費微服務,生產(chǎn)環(huán)境部署了3臺服務器,同時big_data_topic主題的分區(qū)數(shù)為3,因此并發(fā)數(shù)設置為3比較合適。

隨著推送的數(shù)據(jù)量不斷增加,如果你覺得消費速度還不夠,你可以重新設置每次批量拉取的最大數(shù)量,活著橫向擴展微服務的集群實例數(shù)量和 topic 的分區(qū)數(shù),以此來加快數(shù)據(jù)的消費速度。

但是,如果在單臺機器中,每次批量拉取的最大數(shù)量過大,大對象也會很大,會造成頻繁的 gc 告警!

因此,在實際的使用過程中,每次批量拉取的最大數(shù)量并不是越大越好,根據(jù)當前服務器的硬件配置,調(diào)節(jié)到合適的閥值,才是最優(yōu)的選擇!

三、小結

本文主要以SpringBoot技術框架為背景,結合實際業(yè)務需求,采用 kafka 進行數(shù)據(jù)消費,實現(xiàn)數(shù)據(jù)量的高吞吐,在下篇文章中,我們會介紹消費失敗的處理流程。


分享名稱:SpringBoot整合Kafka實現(xiàn)數(shù)據(jù)高吞吐
網(wǎng)頁URL:http://www.5511xx.com/article/dpeshpe.html