/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.sql.legacy.executor;

import java.io.IOException;
import java.time.Duration;
import java.util.Map;
import java.util.function.Predicate;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.OpenSearchException;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RestChannel;
import org.opensearch.rest.RestResponse;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.common.utils.QueryContext;
import org.opensearch.sql.legacy.esdomain.LocalClusterState;
import org.opensearch.sql.legacy.exception.SqlParseException;
import org.opensearch.sql.legacy.executor.RestExecutor;
import org.opensearch.sql.legacy.metrics.MetricName;
import org.opensearch.sql.legacy.metrics.Metrics;
import org.opensearch.sql.legacy.query.QueryAction;
import org.opensearch.sql.legacy.query.join.BackOffRetryStrategy;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.Transports;
import org.opensearch.transport.client.Client;

public class AsyncRestExecutor
implements RestExecutor {
    private static final Logger LOG = LogManager.getLogger(AsyncRestExecutor.class);
    private static final Predicate<QueryAction> ALL_ACTION_IS_BLOCKING = anyAction -> true;
    private final RestExecutor executor;
    private final Predicate<QueryAction> isBlocking;

    AsyncRestExecutor(RestExecutor executor) {
        this(executor, ALL_ACTION_IS_BLOCKING);
    }

    AsyncRestExecutor(RestExecutor executor, Predicate<QueryAction> isBlocking) {
        this.executor = executor;
        this.isBlocking = isBlocking;
    }

    @Override
    public void execute(Client client, Map<String, String> params, QueryAction queryAction, RestChannel channel) throws Exception {
        if (this.isBlockingAction(queryAction) && this.isRunningInTransportThread()) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("[{}] Async blocking query action [{}] for executor [{}] in current thread [{}]", (Object)QueryContext.getRequestId(), (Object)this.name(this.executor), (Object)this.name(queryAction), (Object)Thread.currentThread().getName());
            }
            this.async(client, params, queryAction, channel);
        } else {
            if (LOG.isDebugEnabled()) {
                LOG.debug("[{}] Continue running query action [{}] for executor [{}] in current thread [{}]", (Object)QueryContext.getRequestId(), (Object)this.name(this.executor), (Object)this.name(queryAction), (Object)Thread.currentThread().getName());
            }
            this.doExecuteWithTimeMeasured(client, params, queryAction, channel);
        }
    }

    @Override
    public String execute(Client client, Map<String, String> params, QueryAction queryAction) throws Exception {
        return this.executor.execute(client, params, queryAction);
    }

    private boolean isBlockingAction(QueryAction queryAction) {
        return this.isBlocking.test(queryAction);
    }

    private boolean isRunningInTransportThread() {
        return Transports.isTransportThread((Thread)Thread.currentThread());
    }

    private void async(Client client, Map<String, String> params, QueryAction queryAction, RestChannel channel) {
        ThreadPool threadPool = client.threadPool();
        Runnable runnable = () -> {
            try {
                this.doExecuteWithTimeMeasured(client, params, queryAction, channel);
            }
            catch (IOException | OpenSearchException | SqlParseException e) {
                Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment();
                LOG.warn("[{}] [MCB] async task got an IO/SQL exception: {}", (Object)QueryContext.getRequestId(), (Object)e.getMessage());
                channel.sendResponse((RestResponse)new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage()));
            }
            catch (IllegalStateException e) {
                Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment();
                LOG.warn("[{}] [MCB] async task got a runtime exception: {}", (Object)QueryContext.getRequestId(), (Object)e.getMessage());
                channel.sendResponse((RestResponse)new BytesRestResponse(RestStatus.INSUFFICIENT_STORAGE, "Memory circuit is broken."));
            }
            catch (Throwable t) {
                Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment();
                LOG.warn("[{}] [MCB] async task got an unknown throwable: {}", (Object)QueryContext.getRequestId(), (Object)t.getMessage());
                channel.sendResponse((RestResponse)new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, String.valueOf(t.getMessage())));
            }
            finally {
                BackOffRetryStrategy.releaseMem(this.executor);
            }
        };
        threadPool.schedule(QueryContext.withCurrentContext((Runnable)runnable), new TimeValue(0L), "sql-worker");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doExecuteWithTimeMeasured(Client client, Map<String, String> params, QueryAction action, RestChannel channel) throws Exception {
        long startTime = System.nanoTime();
        try {
            this.executor.execute(client, params, action, channel);
        }
        finally {
            Duration elapsed = Duration.ofNanos(System.nanoTime() - startTime);
            int slowLogThreshold = (Integer)LocalClusterState.state().getSettingValue(Settings.Key.SQL_SLOWLOG);
            if (elapsed.getSeconds() >= (long)slowLogThreshold) {
                LOG.warn("[{}] Slow query: elapsed={} (ms)", (Object)QueryContext.getRequestId(), (Object)elapsed.toMillis());
            }
        }
    }

    private String name(Object object) {
        return object.getClass().getSimpleName();
    }
}

