日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
高效讀取大文件,再也不用擔(dān)心OOM了!

最近阿粉接到一個(gè)需求,需要從文件讀取數(shù)據(jù),然后經(jīng)過業(yè)務(wù)處理之后存儲到數(shù)據(jù)庫中。這個(gè)需求,說實(shí)話不是很難,阿粉很快完成了第一個(gè)版本。

內(nèi)存讀取

第一個(gè)版本,阿粉采用內(nèi)存讀取的方式,所有的數(shù)據(jù)首先讀讀取到內(nèi)存中,程序代碼如下:

 
 
 
 
  1. Stopwatch stopwatch = Stopwatch.createStarted();
  2. // 將全部行數(shù)讀取的內(nèi)存中
  3. List lines = FileUtils.readLines(new File("temp/test.txt"), Charset.defaultCharset());
  4. for (String line : lines) {
  5.     // pass
  6. }
  7. stopwatch.stop();
  8. System.out.println("read all lines spend " + stopwatch.elapsed(TimeUnit.SECONDS) + " s");
  9. // 計(jì)算內(nèi)存占用
  10. logMemory();

logMemory方法如下:

 
 
 
 
  1. MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
  2. //堆內(nèi)存使用情況
  3. MemoryUsage memoryUsage = memoryMXBean.getHeapMemoryUsage();
  4. //初始的總內(nèi)存
  5. long totalMemorySize = memoryUsage.getInit();
  6. //已使用的內(nèi)存
  7. long usedMemorySize = memoryUsage.getUsed();
  8. System.out.println("Total Memory: " + totalMemorySize / (1024 * 1024) + " Mb");
  9. System.out.println("Free Memory: " + usedMemorySize / (1024 * 1024) + " Mb");

上述程序中,阿粉使用 Apache Common-Io 開源第三方庫,F(xiàn)ileUtils#readLines將會把文件中所有內(nèi)容,全部讀取到內(nèi)存中。

這個(gè)程序簡單測試并沒有什么問題,但是等拿到真正的數(shù)據(jù)文件,運(yùn)行程序,很快程序發(fā)生了 OOM。

之所以會發(fā)生 OOM,主要原因是因?yàn)檫@個(gè)數(shù)據(jù)文件太大。假設(shè)上面測試文件 test.txt總共有 200W 行數(shù)據(jù),文件大小為:740MB。

通過上述程序讀取到內(nèi)存之后,在我的電腦上內(nèi)存占用情況如下:

可以看到一個(gè)實(shí)際大小為 700 多 M 的文件,讀到內(nèi)存中占用內(nèi)存量為 1.5G 之多。而我之前的程序,虛擬機(jī)設(shè)置內(nèi)存大小只有 1G,所以程序發(fā)生了 OOM。

當(dāng)然這里最簡單的辦法就是加內(nèi)存唄,將虛擬機(jī)內(nèi)存設(shè)置到 2G,甚至更多。不過機(jī)器內(nèi)存始終有限,如果文件更大,還是沒有辦法全部都加載到內(nèi)存。

不過仔細(xì)一想真的需要將全部數(shù)據(jù)一次性加載到內(nèi)存中?

很顯然,不需要!

在上述的場景中,我們將數(shù)據(jù)到加載內(nèi)存中,最后不還是一條條處理數(shù)據(jù)。

所以下面我們將讀取方式修改成逐行讀取。

逐行讀取

逐行讀取的方式比較多,這里阿粉主要介紹兩種方式:

  • BufferReader
  • Apache Commons IO
  • Java8 stream

BufferReader

我們可以使用 BufferReader#readLine 逐行讀取數(shù)據(jù)。

 
 
 
 
  1. try (BufferedReader fileBufferReader = new BufferedReader(new FileReader("temp/test.txt"))) {
  2.     String fileLineContent;
  3.     while ((fileLineContent = fileBufferReader.readLine()) != null) {
  4.         // process the line.
  5.     }
  6. } catch (FileNotFoundException e) {
  7.     e.printStackTrace();
  8. } catch (IOException e) {
  9.     e.printStackTrace();
  10. }

Apache Commons IOCommon-IO

中有一個(gè)方法 FileUtils#lineIterator可以實(shí)現(xiàn)逐行讀取方式,使用代碼如下:

 
 
 
 
  1. Stopwatch stopwatch = Stopwatch.createStarted();
  2. LineIterator fileContents = FileUtils.lineIterator(new File("temp/test.txt"), StandardCharsets.UTF_8.name());
  3. while (fileContents.hasNext()) {
  4.     fileContents.nextLine();
  5.     //  pass
  6. }
  7. logMemory();
  8. fileContents.close();
  9. stopwatch.stop();
  10. System.out.println("read all lines spend " + stopwatch.elapsed(TimeUnit.SECONDS) + " s");

這個(gè)方法返回一個(gè)迭代器,每次我們都可以獲取的一行數(shù)據(jù)。

其實(shí)我們查看代碼,其實(shí)可以發(fā)現(xiàn) FileUtils#lineIterator,其實(shí)用的就是 BufferReader,感興趣的同學(xué)可以自己查看一下源碼。

由于公號內(nèi)無法插入外鏈,關(guān)注『Java極客技術(shù)』,回復(fù)『20200610』 獲取源碼

Java8 stream

Java8 Files 類新增了一個(gè) lines,可以返回 Stream我們可以逐行處理數(shù)據(jù)。

 
 
 
 
  1. Stopwatch stopwatch = Stopwatch.createStarted();
  2. // lines(Path path, Charset cs)
  3. try (Stream inputStream = Files.lines(Paths.get("temp/test.txt"), StandardCharsets.UTF_8)) {
  4.     inputStream
  5.             .filter(str -> str.length() > 5)// 過濾數(shù)據(jù)
  6.             .forEach(o -> {
  7.                 // pass do sample logic
  8.             });
  9. }
  10. logMemory();
  11. stopwatch.stop();
  12. System.out.println("read all lines spend " + stopwatch.elapsed(TimeUnit.SECONDS) + " s");

使用這個(gè)方法有個(gè)好處在于,我們可以方便使用 Stream 鏈?zhǔn)讲僮鳎鲆恍┻^濾操作。

注意:這里我們使用 try-with-resources 方式,可以安全的確保讀取結(jié)束,流可以被安全的關(guān)閉。

并發(fā)讀取

逐行的讀取的方式,解決我們 OOM 的問題。不過如果數(shù)據(jù)很多,我們這樣一行行處理,需要花費(fèi)很多時(shí)間。

上述的方式,只有一個(gè)線程在處理數(shù)據(jù),那其實(shí)我們可以多來幾個(gè)線程,增加并行度。

下面在上面的基礎(chǔ)上,阿粉就拋磚引玉,介紹下阿粉自己比較常用兩種并行處理方式。

逐行批次打包

第一種方式,先逐行讀取數(shù)據(jù),加載到內(nèi)存中,等到積累一定數(shù)據(jù)之后,然后再交給線程池異步處理。

 
 
 
 
  1. @SneakyThrows
  2. public static void readInApacheIOWithThreadPool() {
  3.     // 創(chuàng)建一個(gè) 最大線程數(shù)為 10,隊(duì)列最大數(shù)為 100 的線程池
  4.     ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10, 60l, TimeUnit.SECONDS, new LinkedBlockingDeque<>(100));
  5.     // 使用 Apache 的方式逐行讀取數(shù)據(jù)
  6.     LineIterator fileContents = FileUtils.lineIterator(new File("temp/test.txt"), StandardCharsets.UTF_8.name());
  7.     List lines = Lists.newArrayList();
  8.     while (fileContents.hasNext()) {
  9.         String nextLine = fileContents.nextLine();
  10.         lines.add(nextLine);
  11.         // 讀取到十萬的時(shí)候
  12.         if (lines.size() == 100000) {
  13.             // 拆分成兩個(gè) 50000 ,交給異步線程處理
  14.             List> partition = Lists.partition(lines, 50000);
  15.             List futureList = Lists.newArrayList();
  16.             for (List strings : partition) {
  17.                 Future future = threadPoolExecutor.submit(() -> {
  18.                     processTask(strings);
  19.                 });
  20.                 futureList.add(future);
  21.             }
  22.             // 等待兩個(gè)線程將任務(wù)執(zhí)行結(jié)束之后,再次讀取數(shù)據(jù)。這樣的目的防止,任務(wù)過多,加載的數(shù)據(jù)過多,導(dǎo)致 OOM
  23.             for (Future future : futureList) {
  24.                 // 等待執(zhí)行結(jié)束
  25.                 future.get();
  26.             }
  27.             // 清除內(nèi)容
  28.             lines.clear();
  29.         }
  30.     }
  31.     // lines 若還有剩余,繼續(xù)執(zhí)行結(jié)束
  32.     if (!lines.isEmpty()) {
  33.         // 繼續(xù)執(zhí)行
  34.         processTask(lines);
  35.     }
  36.   threadPoolExecutor.shutdown();
  37. }
  38.     private static void processTask(List strings) {
  39.         for (String line : strings) {
  40.             // 模擬業(yè)務(wù)執(zhí)行
  41.             try {
  42.                 TimeUnit.MILLISECONDS.sleep(10L);
  43.             } catch (InterruptedException e) {
  44.                 e.printStackTrace();
  45.             }
  46.         }
  47.     }

上述方法,等到內(nèi)存的數(shù)據(jù)到達(dá) 10000 的時(shí)候,拆封兩個(gè)任務(wù)交給異步線程執(zhí)行,每個(gè)任務(wù)分別處理 50000 行數(shù)據(jù)。

后續(xù)使用 future#get(),等待異步線程執(zhí)行完成之后,主線程才能繼續(xù)讀取數(shù)據(jù)。

之所以這么做,主要原因是因?yàn)椋€程池的任務(wù)過多,再次導(dǎo)致 OOM 的問題。

大文件拆分成小文件第二種方式,首先我們將一個(gè)大文件拆分成幾個(gè)小文件,然后使用多個(gè)異步線程分別逐行處理數(shù)據(jù)。

 
 
 
 
  1. public static void splitFileAndRead() throws Exception {
  2.     // 先將大文件拆分成小文件
  3.     List fileList = splitLargeFile("temp/test.txt");
  4.     // 創(chuàng)建一個(gè) 最大線程數(shù)為 10,隊(duì)列最大數(shù)為 100 的線程池
  5.     ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10, 60l, TimeUnit.SECONDS, new LinkedBlockingDeque<>(100));
  6.     List futureList = Lists.newArrayList();
  7.     for (File file : fileList) {
  8.         Future future = threadPoolExecutor.submit(() -> {
  9.             try (Stream inputStream = Files.lines(file.toPath(), StandardCharsets.UTF_8)) {
  10.                 inputStream.forEach(o -> {
  11.                     // 模擬執(zhí)行業(yè)務(wù)
  12.                     try {
  13.                         TimeUnit.MILLISECONDS.sleep(10L);
  14.                     } catch (InterruptedException e) {
  15.                         e.printStackTrace();
  16.                     }
  17.                 });
  18.             } catch (IOException e) {
  19.                 e.printStackTrace();
  20.             }
  21.         });
  22.         futureList.add(future);
  23.     }
  24.     for (Future future : futureList) {
  25.         // 等待所有任務(wù)執(zhí)行結(jié)束
  26.         future.get();
  27.     }
  28.     threadPoolExecutor.shutdown();
  29. }
  30. private static List splitLargeFile(String largeFileName) throws IOException {
  31.     LineIterator fileContents = FileUtils.lineIterator(new File(largeFileName), StandardCharsets.UTF_8.name());
  32.     List lines = Lists.newArrayList();
  33.     // 文件序號
  34.     int num = 1;
  35.     List files = Lists.newArrayList();
  36.     while (fileContents.hasNext()) {
  37.         String nextLine = fileContents.nextLine();
  38.         lines.add(nextLine);
  39.         // 每個(gè)文件 10w 行數(shù)據(jù)
  40.         if (lines.size() == 100000) {
  41.             createSmallFile(lines, num, files);
  42.             num++;
  43.         }
  44.     }
  45.     // lines 若還有剩余,繼續(xù)執(zhí)行結(jié)束
  46.     if (!lines.isEmpty()) {
  47.         // 繼續(xù)執(zhí)行
  48.         createSmallFile(lines, num, files);
  49.     }
  50.     return files;
  51. }

上述方法,首先將一個(gè)大文件拆分成多個(gè)保存 10W 行的數(shù)據(jù)的小文件,然后再將小文件交給線程池異步處理。

由于這里的異步線程每次都是逐行從小文件的讀取數(shù)據(jù),所以這種方式不用像上面方法一樣擔(dān)心 OOM 的問題。

另外,上述我們使用 Java 代碼,將大文件拆分成小文件。這里阿粉還有一個(gè)簡單的辦法,我們可以直接使用下述命令,直接將大文件拆分成小文件:

 
 
 
 
  1. # 將大文件拆分成 100000 的小文件
  2.  split -l 100000 test.txt

后續(xù) Java 代碼只需要直接讀取小文件即可。

總結(jié)當(dāng)我們從文件讀取數(shù)據(jù)時(shí),如果文件不是很大,我們可以考慮一次性讀取到內(nèi)存中,然后快速處理。

如果文件過大,我們就沒辦法一次性加載到內(nèi)存中,所以我們需要考慮逐行讀取,然后處理數(shù)據(jù)。但是單線程處理數(shù)據(jù)畢竟有限,所以我們考慮使用多線程,加快處理數(shù)據(jù)。

本篇文章我們只是簡單介紹了下,數(shù)據(jù)從文件讀取幾種方式。數(shù)據(jù)讀取之后,我們肯定還需要處理,然后最后會存儲到數(shù)據(jù)庫中或者輸出到另一個(gè)文件中。

這個(gè)過程,說實(shí)話比較麻煩,因?yàn)槲覀兊臄?shù)據(jù)源文件,可能是 txt,也可能是 excel,這樣我們就需要增加多種讀取方法。同樣的,當(dāng)數(shù)據(jù)處理完成之后,也有同樣的問題。

不過好在,上述的問題我們可以使用 Spring Batch 完美解決。


分享名稱:高效讀取大文件,再也不用擔(dān)心OOM了!
網(wǎng)頁網(wǎng)址:http://www.5511xx.com/article/djjeioh.html