日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關咨詢
選擇下列產(chǎn)品馬上在線溝通
服務時間:8:30-17:00
你可能遇到了下面的問題
關閉右側工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
30G超大數(shù)據(jù)文件,如何用一周時間導入生產(chǎn)數(shù)據(jù)庫?

上面的問題其實是前段時間接到一個真實的業(yè)務需求,將一個老系統(tǒng)歷史數(shù)據(jù)通過線下文件的方式遷移到新的生產(chǎn)系統(tǒng)。

創(chuàng)新互聯(lián)公司長期為上千余家客戶提供的網(wǎng)站建設服務,團隊從業(yè)經(jīng)驗10年,關注不同地域、不同群體,并針對不同對象提供差異化的產(chǎn)品和服務;打造開放共贏平臺,與合作伙伴共同營造健康的互聯(lián)網(wǎng)生態(tài)環(huán)境。為白銀區(qū)企業(yè)提供專業(yè)的成都網(wǎng)站建設、網(wǎng)站制作,白銀區(qū)網(wǎng)站改版等技術服務。擁有十載豐富建站經(jīng)驗和眾多成功案例,為您定制開發(fā)。

由于老板們已經(jīng)敲定了新系統(tǒng)上線時間,所以只留給我一周的時間將歷史數(shù)據(jù)導入生產(chǎn)系統(tǒng)。

由于時間緊,而數(shù)據(jù)量又超大,所以在設計的過程想到一下解決辦法:

  • 拆分文件
  • 多線程導入

拆分文件

首先我們可以寫個小程序,或者使用拆分命令  ??split?? 將這個超大文件拆分一個個小文件。

-- 將一個大文件拆分成若干個小文件,每個文件 100000 行  
split -l 100000 largeFile.txt -d -a 4 smallFile_  

這里之所以選擇先將大文件拆分,主要考慮到兩個原因:

1、如果程序直接讀取這個大文件,假設讀取一半的時候,程序突然宕機,這樣就會直接丟失文件讀取的進度,又需要重新開頭讀取。

而文件拆分之后,一旦小文件讀取結束,我們可以將小文件移動一個指定文件夾。

這樣即使應用程序宕機重啟,我們重新讀取時,只需要讀取剩余的文件。

2、一個文件,只能被一個應用程序讀取,這樣就限制了導入的速度。

而文件拆分之后,我們可以采用多節(jié)點部署的方式,水平擴展。每個節(jié)點讀取一部分文件,這樣就可以成倍的加快導入速度。

多線程導入

當我們拆分完文件,接著我們就需要讀取文件內(nèi)容,進行導入。

之前拆分的時候,設置每個小文件包含 10w 行的數(shù)據(jù)。由于擔心一下子將 10w 數(shù)據(jù)讀取應用中,導致堆內(nèi)存占用過高,引起頻繁的 「Full GC」,所以下面采用流式讀取的方式,一行一行的讀取數(shù)據(jù)。

當然了,如果拆分之后文件很小,或者說應用的堆內(nèi)存設置很大,我們可以直接將文件加載到應用內(nèi)存中處理。這樣相對來說簡單一點。

逐行讀取的代碼如下:

File file = ...  
try (LineIterator iterator = IOUtils.lineIterator(new FileInputStream(file), "UTF-8")) {  
    while (iterator.hasNext()) {  
        String line=iterator.nextLine();  
        convertToDB(line);  
    }  
  
}  

上面代碼使用 ??commons-io??  中的 ??LineIterator??類,這個類底層使用了 ??BufferedReader?? 讀取文件內(nèi)容。它將其封裝成迭代器模式,這樣我們可以很方便的迭代讀取。

如果當前使用 JDK1.8 ,那么上述操作更加簡單,我們可以直接使用 JDK 原生的類 ??Files??將文件轉成 ??Stream?? 方式讀取,代碼如下:

Files.lines(Paths.get("文件路徑"), Charset.defaultCharset()).forEach(line -> {  
    convertToDB(line);  
});  

其實仔細看下 ??Files#lines??底層源碼,其實原理跟上面的 ??LineIterator??類似,同樣也是封裝成迭代器模式。

多線程的引入存在的問題

上述讀取的代碼寫起來不難,但是存在效率問題,主要是因為只有單線程在導入,上一行數(shù)據(jù)導入完成之后,才能繼續(xù)操作下一行。

為了加快導入速度,那我們就多來幾個線程,并發(fā)導入。

多線程我們自然將會使用線程池的方式,相關代碼改造如下:

File file = ...;  
ExecutorService executorService = new ThreadPoolExecutor(  
        5,  
        10,  
        60,  
        TimeUnit.MINUTES,  
     // 文件數(shù)量,假設文件包含 10W 行  
        new ArrayBlockingQueue<>(10*10000),  
      // guava 提供  
        new ThreadFactoryBuilder().setNameFormat("test-%d").build());  
try (LineIterator iterator = IOUtils.lineIterator(new FileInputStream(file), "UTF-8")) {  
    while (iterator.hasNext()) {  
        String line = iterator.nextLine();  
        executorService.submit(() -> {  
            convertToDB(line);  
        });  
    }  
  
}  

上述代碼中,每讀取到一行內(nèi)容,就會直接交給線程池來執(zhí)行。

我們知道線程池原理如下:

  1. 如果核心線程數(shù)未滿,將會直接創(chuàng)建線程執(zhí)行任務。
  2. 如果核心線程數(shù)已滿,將會把任務放入到隊列中。
  3. 如果隊列已滿,將會再創(chuàng)建線程執(zhí)行任務。
  4. 如果最大線程數(shù)已滿,隊列也已滿,那么將會執(zhí)行拒絕策略。

線程池執(zhí)行流程圖

由于我們上述線程池設置的核心線程數(shù)為 5,很快就到達了最大核心線程數(shù),后續(xù)任務只能被加入隊列。

為了后續(xù)任務不被線程池拒絕,我們可以采用如下方案:

  • 將隊列容量設置成很大,包含整個文件所有行數(shù)
  • 將最大線程數(shù)設置成很大,數(shù)量大于整個文件所有行數(shù)

以上兩種方案都存在同樣的問題,第一種是相當于將文件所有內(nèi)容加載到內(nèi)存,將會占用過多內(nèi)存。

而第二種創(chuàng)建過多的線程,同樣也會占用過多內(nèi)存。

一旦內(nèi)存占用過多,GC 無法清理,就可能會引起頻繁的 「Full GC」,甚至導致 「OOM」,導致程序?qū)胨俣冗^慢。

當然了,我們還可以第三種方案,綜合前兩種,設置合適隊列長度,以及合適最大線程數(shù)。不過呢,「合適」這個度真不好把握,另外也還是有 **「OOM」 **問題。

所以為了解決這個問題,日思夜想研究出兩個解決方案:

  • CountDownLatch 批量執(zhí)行
  • 擴展線程池

CountDownLatch 批量執(zhí)行

JDK 提供的 ??CountDownLatch??,可以讓主線程等待子線程都執(zhí)行完成之后,再繼續(xù)往下執(zhí)行。

利用這個特性,我們可以改造多線程導入的代碼,主體邏輯如下:

try (LineIterator iterator = IOUtils.lineIterator(new FileInputStream(file), "UTF-8")) {  
    // 存儲每個任務執(zhí)行的行數(shù)  
    List lines = Lists.newArrayList();  
    // 存儲異步任務  
    List tasks = Lists.newArrayList();  
    while (iterator.hasNext()) {  
        String line = iterator.nextLine();  
        lines.add(line);  
        // 設置每個線程執(zhí)行的行數(shù)  
        if (lines.size() == 1000) {  
            // 新建異步任務,注意這里需要創(chuàng)建一個 List  
            tasks.add(new ConvertTask(Lists.newArrayList(lines)));  
            lines.clear();  
        }  
        if (tasks.size() == 10) {  
            asyncBatchExecuteTask(tasks);  
        }  
  
    }  
    // 文件讀取結束,但是可能還存在未被內(nèi)容  
    tasks.add(new ConvertTask(Lists.newArrayList(lines)));  
    // 最后再執(zhí)行一次  
    asyncBatchExecuteTask(tasks);  
}  

這段代碼中,每個異步任務將會導入 1000 行數(shù)據(jù),等積累了 10 個異步任務,然后將會調(diào)用 ??asyncBatchExecuteTask?? 使用線程池異步執(zhí)行。

/**  
 * 批量執(zhí)行任務  
 *  
 * @param tasks  
 */  
private static void asyncBatchExecuteTask(List tasks) throws InterruptedException {  
    CountDownLatch countDownLatch = new CountDownLatch(tasks.size());  
    for (ConvertTask task : tasks) {  
        task.setCountDownLatch(countDownLatch);  
        executorService.submit(task);  
    }  
    // 主線程等待異步線程 countDownLatch 執(zhí)行結束  
    countDownLatch.await();  
    // 清空,重新添加任務  
    tasks.clear();  
}  

??asyncBatchExecuteTask??  方法內(nèi)將會創(chuàng)建  ??CountDownLatch??,然后主線程內(nèi)調(diào)用 ??await??方法等待所有異步線程執(zhí)行結束。

??ConvertTask?? 異步任務邏輯如下:

/**  
 * 異步任務  
 * 等數(shù)據(jù)導入完成之后,一定要調(diào)用 countDownLatch.countDown()  
 * 不然,這個主線程將會被阻塞,  
 */  
private static class ConvertTask implements Runnable {  
  
    private CountDownLatch countDownLatch;  
  
    private List lines;  
  
    public ConvertTask(List lines) {  
        this.lines = lines;  
    }  
  
    public void setCountDownLatch(CountDownLatch countDownLatch) {  
        this.countDownLatch = countDownLatch;  
    }  
  
    @Override  
    public void run() {  
        try {  
            for (String line : lines) {  
                convertToDB(line);  
            }  
        } finally {  
            countDownLatch.countDown();  
        }  
    }  
}  

??ConvertTask??任務類邏輯就非常簡單,遍歷所有行,將其導入到數(shù)據(jù)庫中。所有數(shù)據(jù)導入結束,調(diào)用 ??countDownLatch#countDown??。

一旦所有異步線程執(zhí)行結束,調(diào)用 ??countDownLatch#countDown??,主線程將會被喚醒,繼續(xù)執(zhí)行文件讀取。

雖然這種方式解決上述問題,但是這種方式,每次都需要積累一定任務數(shù)才能開始異步執(zhí)行所有任務。

另外每次都需要等待所有任務執(zhí)行結束之后,才能開始下一批任務,批量執(zhí)行消耗的時間等于最慢的異步任務消耗的時間。

這種方式線程池中線程存在一定的閑置時間,那有沒有辦法一直壓榨線程池,讓它一直在干活呢?

擴展線程池

回到最開始的問題,文件讀取導入,其實就是一個「生產(chǎn)者-消費者」消費模型。

主線程作為生產(chǎn)者不斷讀取文件,然后將其放置到隊列中。

異步線程作為消費者不斷從隊列中讀取內(nèi)容,導入到數(shù)據(jù)庫中。

「一旦隊列滿載,生產(chǎn)者應該阻塞,直到消費者消費任務?!?/strong>

其實我們使用線程池的也是一個「生產(chǎn)者-消費者」消費模型,其也使用阻塞隊列。

那為什么線程池在隊列滿載的時候,不發(fā)生阻塞?

這是因為線程池內(nèi)部使用 ??offer?? 方法,這個方法在隊列滿載的時候「不會發(fā)生阻塞」,而是直接返回 。

那我們有沒有辦法在線程池隊列滿載的時候,阻塞主線程添加任務?

其實是可以的,我們自定義線程池拒絕策略,當隊列滿時改為調(diào)用 ??BlockingQueue.put?? 來實現(xiàn)生產(chǎn)者的阻塞。

RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() {  
    @Override  
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {  
        if (!executor.isShutdown()) {  
            try {  
                executor.getQueue().put(r);  
            } catch (InterruptedException e) {  
                // should not be interrupted  
            }  
        }  
  
    }  
};  

這樣一旦線程池滿載,主線程將會被阻塞。

使用這種方式之后,我們可以直接使用上面提到的多線程導入的代碼。

ExecutorService executorService = new ThreadPoolExecutor(  
        5,  
        10,  
        60,  
        TimeUnit.MINUTES,  
        new ArrayBlockingQueue<>(100),  
        new ThreadFactoryBuilder().setNameFormat("test-%d").build(),  
        (r, executor) -> {  
            if (!executor.isShutdown()) {  
                try {  
                   // 主線程將會被阻塞  
                    executor.getQueue().put(r);  
                } catch (InterruptedException e) {  
                    // should not be interrupted  
                }  
            }  
  
        });  
File file = new File("文件路徑");  
  
try (LineIterator iterator = IOUtils.lineIterator(new FileInputStream(file), "UTF-8")) {  
    while (iterator.hasNext()) {  
        String line = iterator.nextLine();  
        executorService.submit(() -> convertToDB(line));  
    }  
}      

小結

一個超大的文件,我們可以采用拆分文件的方式,將其拆分成多份文件,然后部署多個應用程序提高讀取速度。

另外讀取過程我們還可以使用多線程的方式并發(fā)導入,不過我們需要注意線程池滿載之后,將會拒絕后續(xù)任務。

我們可以通過擴展線程池,自定義拒絕策略,使讀取主線程阻塞。

好了,今天文章內(nèi)容就到這里,不知道各位有沒有其他更好的解決辦法,


文章題目:30G超大數(shù)據(jù)文件,如何用一周時間導入生產(chǎn)數(shù)據(jù)庫?
瀏覽地址:http://www.5511xx.com/article/cojgoss.html