日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關咨詢
選擇下列產(chǎn)品馬上在線溝通
服務時間:8:30-17:00
你可能遇到了下面的問題
關閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
ElasticSearch-批量更新bulk死鎖問題排查

一、問題系統(tǒng)介紹

1. 監(jiān)聽商品變更MQ消息,查詢商品最新的信息,調(diào)用BulkProcessor批量更新ES集群中的商品字段信息;

六合網(wǎng)站建設公司創(chuàng)新互聯(lián),六合網(wǎng)站設計制作,有大型網(wǎng)站制作公司豐富經(jīng)驗。已為六合近千家提供企業(yè)網(wǎng)站建設服務。企業(yè)網(wǎng)站搭建\成都外貿(mào)網(wǎng)站建設公司要多少錢,請找那個售后服務好的六合做網(wǎng)站的公司定做!

2. 由于商品數(shù)據(jù)非常多,所以將商品數(shù)據(jù)存儲到ES集群上,整個ES集群共劃分了256個分片,并根據(jù)商品的三級類目ID進行分片路由。

比如一個SKU的商品名稱發(fā)生變化,我們就會收到這個SKU的變更MQ消息,然后再去查詢商品接口,將商品的最新名稱查詢回來,再根據(jù)這個SKU的三級分類ID進行路由,找到對應的ES集群分片,然后更新商品名稱字段信息。

由于商品變更MQ消息量巨大,為了提升更新ES的性能,防止出現(xiàn)MQ消息積壓問題,所以本系統(tǒng)使用了BulkProcessor進行批量異步更新。

ES客戶端版本如下:


            elasticsearch-rest-client
            org.elasticsearch.client
            6.5.3
        

BulkProcessor配置偽代碼如下:

//在這里調(diào)用build()方法構(gòu)造bulkProcessor,在底層實際上是用了bulk的異步操作
        this.fullDataBulkProcessor = BulkProcessor.builder((request, bulkListener) ->
                fullDataEsClient.getClient().bulkAsync(request, RequestOptions.DEFAULT, bulkListener), listener)
                // 1000條數(shù)據(jù)請求執(zhí)行一次bulk
                .setBulkActions(1000)
                // 5mb的數(shù)據(jù)刷新一次bulk
                .setBulkSize(new ByteSizeValue(5L, ByteSizeUnit.MB))
                // 并發(fā)請求數(shù)量, 0不并發(fā), 1并發(fā)允許執(zhí)行
                .setConcurrentRequests(1)
                // 固定1s必須刷新一次
                .setFlushInterval(TimeValue.timeValueSeconds(1L))
                // 重試5次,間隔1s
                .setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 5))
                .build();

二、問題怎么發(fā)現(xiàn)的

1. 618大促開始后,由于商品變更MQ消息非常頻繁,MQ消息每天的消息量更是達到了日常的數(shù)倍,而且好多商品還變更了三級類目ID;

2. 系統(tǒng)在更新這些三級類目ID發(fā)生變化的SKU商品信息時,根據(jù)修改后的三級類目ID路由后的分片更新商品信息時發(fā)生了錯誤,并且重試了5次,依然沒有成功;

3. 因為在新路由的分片上沒有這個商品的索引信息,這些更新請求永遠也不會執(zhí)行成功,系統(tǒng)的日志文件中也記錄了大量的異常重試日志。

4. 商品變更MQ消息也開始出現(xiàn)了積壓報警,MQ消息的消費速度明顯趕不上生產(chǎn)速度。

5. 觀察MQ消息消費者的UMP監(jiān)控數(shù)據(jù),發(fā)現(xiàn)消費性能很平穩(wěn),沒有明顯波動,但是調(diào)用次數(shù)會在系統(tǒng)消費MQ一段時間后出現(xiàn)斷崖式下降,由原來的每分鐘幾萬調(diào)用量逐漸下降到個位數(shù)。

6. 在重啟應用后,系統(tǒng)又開始消費,UMP監(jiān)控調(diào)用次數(shù)恢復到正常水平,但是系統(tǒng)運行一段時間后,還是會出現(xiàn)消費暫停問題,仿佛所有消費線程都被暫停了一樣。

三、排查問題的詳細過程

首先找一臺暫停消費MQ消息的容器,查看應用進程ID,使用jstack命令dump應用進程的整個線程堆棧信息,將導出的線程堆棧信息打包上傳到 https://fastthread.io/ 進行線程狀態(tài)分析。分析報告如下:

通過分析報告發(fā)現(xiàn)有124個處于BLOCKED狀態(tài)的線程,然后可以點擊查看各線程的詳細堆棧信息,堆棧信息如下:

連續(xù)查看多個線程的詳細堆棧信息,MQ消費線程都是在waiting to lock <0x00000005eb781b10> (a
org.elasticsearch.action.bulk.BulkProcessor),然后根據(jù)0x00000005eb781b10去搜索發(fā)現(xiàn),這個對象鎖正在被另外一個線程占用,占用線程堆棧信息如下:

這個線程狀態(tài)此時正處于WAITING狀態(tài),通過線程名稱發(fā)現(xiàn),該線程應該是ES客戶端內(nèi)部線程。正是該線程搶占了業(yè)務線程的鎖,然后又在等待其他條件觸發(fā)該線程執(zhí)行,所以導致了所有的MQ消費業(yè)務線程一直無法獲取BulkProcessor內(nèi)部的鎖,導致出現(xiàn)了消費暫停問題。

但是這個線程elasticsearch[scheduler][T#1]為啥不能執(zhí)行? 它是什么時候啟動的? 又有什么作用?

就需要我們對BulkProcessor進行深入分析,由于BulkProcessor是通過builder模塊進行創(chuàng)建的,所以深入builder源碼,了解一下BulkProcessor的創(chuàng)建過程。

public static Builder builder(BiConsumer> consumer, Listener listener) {
        Objects.requireNonNull(consumer, "consumer");
        Objects.requireNonNull(listener, "listener");
        final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = Scheduler.initScheduler(Settings.EMPTY);
        return new Builder(consumer, listener,
                (delay, executor, command) -> scheduledThreadPoolExecutor.schedule(command, delay.millis(), TimeUnit.MILLISECONDS),
                () -> Scheduler.terminate(scheduledThreadPoolExecutor, 10, TimeUnit.SECONDS));
    }

內(nèi)部創(chuàng)建了一個時間調(diào)度執(zhí)行線程池,線程命名規(guī)則和上述持有鎖的線程名稱相似,具體代碼如下:

static ScheduledThreadPoolExecutor initScheduler(Settings settings) {
        ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1,
                EsExecutors.daemonThreadFactory(settings, "scheduler"), new EsAbortPolicy());
        scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
        scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
        scheduler.setRemoveOnCancelPolicy(true);
        return scheduler;
    }

最后在build方法內(nèi)部執(zhí)行了BulkProcessor的內(nèi)部有參構(gòu)造方法,在構(gòu)造方法內(nèi)部啟動了一個周期性執(zhí)行的flushing任務,代碼如下

BulkProcessor(BiConsumer> consumer, BackoffPolicy backoffPolicy, Listener listener,
                  int concurrentRequests, int bulkActions, ByteSizeValue bulkSize, @Nullable TimeValue flushInterval,
                  Scheduler scheduler, Runnable onClose) {
        this.bulkActions = bulkActions;
        this.bulkSize = bulkSize.getBytes();
        this.bulkRequest = new BulkRequest();
        this.scheduler = scheduler;
        this.bulkRequestHandler = new BulkRequestHandler(consumer, backoffPolicy, listener, scheduler, concurrentRequests);
        // Start period flushing task after everything is setup
        this.cancellableFlushTask = startFlushTask(flushInterval, scheduler);
        this.onClose = onClose;
    }
private Scheduler.Cancellable startFlushTask(TimeValue flushInterval, Scheduler scheduler) {
        if (flushInterval == null) {
            return new Scheduler.Cancellable() {
                @Override
                public void cancel() {}

                @Override
                public boolean isCancelled() {
                    return true;
                }
            };
        }
        final Runnable flushRunnable = scheduler.preserveContext(new Flush());
        return scheduler.scheduleWithFixedDelay(flushRunnable, flushInterval, ThreadPool.Names.GENERIC);
    }
class Flush implements Runnable {

        @Override
        public void run() {
            synchronized (BulkProcessor.this) {
                if (closed) {
                    return;
                }
                if (bulkRequest.numberOfActions() == 0) {
                    return;
                }
                execute();
            }
        }
    }

通過源代碼發(fā)現(xiàn),該flush任務就是在創(chuàng)建BulkProcessor對象時設置的固定時間flush邏輯,當setFlushInterval方法參數(shù)生效,就會啟動一個后臺定時flush任務。flush間隔,由setFlushInterval方法參數(shù)定義。該flush任務在運行期間,也會搶占BulkProcessor對象鎖,搶到鎖后,才會執(zhí)行execute方法。具體的方法調(diào)用關系源代碼如下:

/**
     * Adds the data from the bytes to be processed by the bulk processor
     */
    public synchronized BulkProcessor add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType,
                                          @Nullable String defaultPipeline, @Nullable Object payload, XContentType xContentType) throws Exception {
        bulkRequest.add(data, defaultIndex, defaultType, null, null, null, defaultPipeline, payload, true, xContentType);
        executeIfNeeded();
        return this;
    }

    private void executeIfNeeded() {
        ensureOpen();
        if (!isOverTheLimit()) {
            return;
        }
        execute();
    }

    // (currently) needs to be executed under a lock
    private void execute() {
        final BulkRequest bulkRequest = this.bulkRequest;
        final long executionId = executionIdGen.incrementAndGet();

        this.bulkRequest = new BulkRequest();
        this.bulkRequestHandler.execute(bulkRequest, executionId);
    }

而上述代碼中的add方法,則是由MQ消費業(yè)務線程去調(diào)用,在該方法上同樣有一個synchronized關鍵字,所以消費MQ業(yè)務線程會和flush任務執(zhí)行線程直接會存在鎖競爭關系。具體MQ消費業(yè)務線程調(diào)用偽代碼如下:

@Override
 public void upsertCommonSku(CommonSkuEntity commonSkuEntity) {
            String source = JsonUtil.toString(commonSkuEntity);
            UpdateRequest updateRequest = new UpdateRequest(Constants.INDEX_NAME_SPU, Constants.INDEX_TYPE, commonSkuEntity.getSkuId().toString());
            updateRequest.doc(source, XContentType.JSON);
            IndexRequest indexRequest = new IndexRequest(Constants.INDEX_NAME_SPU, Constants.INDEX_TYPE, commonSkuEntity.getSkuId().toString());
            indexRequest.source(source, XContentType.JSON);
            updateRequest.upsert(indexRequest);
            updateRequest.routing(commonSkuEntity.getCat3().toString());
            fullbulkProcessor.add(updateRequest);
}

通過以上對線程堆棧分析,發(fā)現(xiàn)所有的業(yè)務線程都在等待elasticsearch[scheduler][T#1]線程釋放BulkProcessor對象鎖,但是該線程確一直沒有釋放該對象鎖,從而出現(xiàn)了業(yè)務線程的死鎖問題。

結(jié)合應用日志文件中出現(xiàn)的大量異常重試日志,可能與BulkProcessor的異常重試策略有關,然后進一步了解BulkProcessor的異常重試代碼邏輯。由于業(yè)務線程中提交BulkRequest請求都統(tǒng)一提交到了BulkRequestHandler對象中的execute方法內(nèi)部進行處理,代碼如下:

public final class BulkRequestHandler {
    private final Logger logger;
    private final BiConsumer> consumer;
    private final BulkProcessor.Listener listener;
    private final Semaphore semaphore;
    private final Retry retry;
    private final int concurrentRequests;

    BulkRequestHandler(BiConsumer> consumer, BackoffPolicy backoffPolicy,
                       BulkProcessor.Listener listener, Scheduler scheduler, int concurrentRequests) {
        assert concurrentRequests >= 0;
        this.logger = Loggers.getLogger(getClass());
        this.consumer = consumer;
        this.listener = listener;
        this.concurrentRequests = concurrentRequests;
        this.retry = new Retry(backoffPolicy, scheduler);
        this.semaphore = new Semaphore(concurrentRequests > 0 ? concurrentRequests : 1);
    }

    public void execute(BulkRequest bulkRequest, long executionId) {
        Runnable toRelease = () -> {};
        boolean bulkRequestSetupSuccessful = false;
        try {
            listener.beforeBulk(executionId, bulkRequest);
            semaphore.acquire();
            toRelease = semaphore::release;
            CountDownLatch latch = new CountDownLatch(1);
            retry.withBackoff(consumer, bulkRequest, new ActionListener() {
                @Override
                public void onResponse(BulkResponse response) {
                    try {
                        listener.afterBulk(executionId, bulkRequest, response);
                    } finally {
                        semaphore.release();
                        latch.countDown();
                    }
                }

                @Override
                public void onFailure(Exception e) {
                    try {
                        listener.afterBulk(executionId, bulkRequest, e);
                    } finally {
                        semaphore.release();
                        latch.countDown();
                    }
                }
            });
            bulkRequestSetupSuccessful = true;
            if (concurrentRequests == 0) {
                latch.await();
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            logger.info(() -> new ParameterizedMessage("Bulk request {} has been cancelled.", executionId), e);
            listener.afterBulk(executionId, bulkRequest, e);
        } catch (Exception e) {
            logger.warn(() -> new ParameterizedMessage("Failed to execute bulk request {}.", executionId), e);
            listener.afterBulk(executionId, bulkRequest, e);
        } finally {
            if (bulkRequestSetupSuccessful == false) {  // if we fail on client.bulk() release the semaphore
                toRelease.run();
            }
        }
    }

    boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException {
        if (semaphore.tryAcquire(this.concurrentRequests, timeout, unit)) {
            semaphore.release(this.concurrentRequests);
            return true;
        }
        return false;
    }
}

BulkRequestHandler通過構(gòu)造方法初始化了一個Retry任務對象,該對象中也傳入了一個Scheduler,且該對象和flush任務中傳入的是同一個線程池,該線程池內(nèi)部只維護了一個固定線程。而execute方法首先會先根據(jù)Semaphore來控制并發(fā)執(zhí)行數(shù)量,該并發(fā)數(shù)量在構(gòu)建BulkProcessor時通過參數(shù)指定,通過上述配置發(fā)現(xiàn)該值配置為1。所以每次只允許一個線程執(zhí)行該方法。即MQ消費業(yè)務線程和flush任務線程,同一時間只能有一個線程可以執(zhí)行。然后下面在了解一下重試任務是如何執(zhí)行的,具體看如下代碼:

public void withBackoff(BiConsumer> consumer, BulkRequest bulkRequest,
                            ActionListener listener) {
        RetryHandler r = new RetryHandler(backoffPolicy, consumer, listener, scheduler);
        r.execute(bulkRequest);
    }

RetryHandler內(nèi)部會執(zhí)行提交bulkRequest請求,同時也會監(jiān)聽bulkRequest執(zhí)行異常狀態(tài),然后執(zhí)行任務重試邏輯,重試代碼如下:

private void retry(BulkRequest bulkRequestForRetry) {
            assert backoff.hasNext();
            TimeValue next = backoff.next();
            logger.trace("Retry of bulk request scheduled in {} ms.", next.millis());
            Runnable command = scheduler.preserveContext(() -> this.execute(bulkRequestForRetry));
            scheduledRequestFuture = scheduler.schedule(next, ThreadPool.Names.SAME, command);
        }

RetryHandler將執(zhí)行失敗的bulk請求重新交給了內(nèi)部scheduler線程池去執(zhí)行,通過以上代碼了解,該線程池內(nèi)部只維護了一個固定線程,同時該線程池可能還會被另一個flush任務去占用執(zhí)行。所以如果重試邏輯正在執(zhí)行的時候,此時線程池內(nèi)的唯一線程正在執(zhí)行flush任務,則會阻塞重試邏輯執(zhí)行,重試邏輯不能執(zhí)行完成,則不會釋放Semaphore,但是由于并發(fā)數(shù)量配置的是1,所以flush任務線程需要等待其他線程釋放一個Semaphore許可后才能繼續(xù)執(zhí)行。所以此處形成了循環(huán)等待,導致Semaphore和BulkProcessor對象鎖都無法釋放,從而使得所有的MQ消費業(yè)務線程都阻塞在獲取BulkProcessor鎖之前。

同時,在GitHub的ES客戶端源碼客戶端上也能搜索到類似問題,例如:
https://github.com/elastic/elasticsearch/issues/47599 ,所以更加印證了之前的猜想,就是因為bulk的不斷重試從而引發(fā)了BulkProcessor內(nèi)部的死鎖問題。

四、如何解決問題

既然前邊已經(jīng)了解到了問題產(chǎn)生的原因,所以就有了如下幾種解決方案:

1.升級ES客戶端版本到7.6正式版,后續(xù)版本通過將異常重試任務線程池和flush任務線程池進行了物理隔離,從而避免了線程池的競爭,但是需要考慮版本兼容性。

2.由于該死鎖問題是由大量異常重試邏輯引起的,可以在不影響業(yè)務邏輯的情況取消重試邏輯,該方案可以不需要升級客戶端版本,但是需要評估業(yè)務影響,執(zhí)行失敗的請求可以通過其他其他方式進行業(yè)務重試。

作者:京東零售 曹志飛

來源:京東云開發(fā)者社區(qū)


當前名稱:ElasticSearch-批量更新bulk死鎖問題排查
轉(zhuǎn)載注明:http://www.5511xx.com/article/cdegojc.html