/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.rdf4j.federated.optimizer;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.federated.algebra.EmptyStatementPattern;
import org.eclipse.rdf4j.federated.algebra.ExclusiveStatement;
import org.eclipse.rdf4j.federated.algebra.StatementSource;
import org.eclipse.rdf4j.federated.algebra.StatementSourcePattern;
import org.eclipse.rdf4j.federated.cache.SourceSelectionCache;
import org.eclipse.rdf4j.federated.endpoint.Endpoint;
import org.eclipse.rdf4j.federated.evaluation.TripleSource;
import org.eclipse.rdf4j.federated.evaluation.concurrent.ControlledWorkerScheduler;
import org.eclipse.rdf4j.federated.evaluation.concurrent.ParallelExecutor;
import org.eclipse.rdf4j.federated.evaluation.concurrent.ParallelTaskBase;
import org.eclipse.rdf4j.federated.exception.ExceptionUtil;
import org.eclipse.rdf4j.federated.exception.OptimizationException;
import org.eclipse.rdf4j.federated.structures.QueryInfo;
import org.eclipse.rdf4j.federated.structures.SubQuery;
import org.eclipse.rdf4j.federated.util.QueryStringUtil;
import org.eclipse.rdf4j.query.BindingSet;
import org.eclipse.rdf4j.query.QueryEvaluationException;
import org.eclipse.rdf4j.query.algebra.StatementPattern;
import org.eclipse.rdf4j.query.impl.EmptyBindingSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SourceSelection {
    private static final Logger log = LoggerFactory.getLogger(SourceSelection.class);
    protected final List<Endpoint> endpoints;
    protected final SourceSelectionCache cache;
    protected final QueryInfo queryInfo;
    protected Map<StatementPattern, List<StatementSource>> stmtToSources = new ConcurrentHashMap<StatementPattern, List<StatementSource>>();

    public SourceSelection(List<Endpoint> endpoints, SourceSelectionCache cache, QueryInfo queryInfo) {
        this.endpoints = endpoints;
        this.cache = cache;
        this.queryInfo = queryInfo;
    }

    public void doSourceSelection(List<StatementPattern> stmts) {
        ArrayList<CheckTaskPair> remoteCheckTasks = new ArrayList<CheckTaskPair>();
        for (StatementPattern stmt : stmts) {
            if (this.stmtToSources.containsKey(stmt)) continue;
            this.stmtToSources.put(stmt, new ArrayList());
            SubQuery q = new SubQuery(stmt, this.queryInfo.getDataset());
            for (Endpoint e : this.endpoints) {
                SourceSelectionCache.StatementSourceAssurance a = this.cache.getAssurance(q, e);
                if (a == SourceSelectionCache.StatementSourceAssurance.HAS_REMOTE_STATEMENTS) {
                    this.addSource(stmt, new StatementSource(e.getId(), StatementSource.StatementSourceType.REMOTE));
                    continue;
                }
                if (a == SourceSelectionCache.StatementSourceAssurance.POSSIBLY_HAS_STATEMENTS) {
                    remoteCheckTasks.add(new CheckTaskPair(e, stmt, this.queryInfo));
                    continue;
                }
                if (a == SourceSelectionCache.StatementSourceAssurance.NONE) continue;
                throw new IllegalStateException("Unexpected statement source assurance: " + a);
            }
        }
        if (remoteCheckTasks.size() > 0) {
            SourceSelectionExecutorWithLatch.run(this, remoteCheckTasks, this.cache);
        }
        for (StatementPattern stmt : stmts) {
            List<StatementSource> sources = this.stmtToSources.get(stmt);
            if (sources.size() > 1) {
                StatementSourcePattern stmtNode = new StatementSourcePattern(stmt, this.queryInfo);
                for (StatementSource s : sources) {
                    stmtNode.addStatementSource(s);
                }
                stmt.replaceWith(stmtNode);
                continue;
            }
            if (sources.size() == 1) {
                stmt.replaceWith(new ExclusiveStatement(stmt, sources.get(0), this.queryInfo));
                continue;
            }
            if (log.isDebugEnabled()) {
                log.debug("Statement " + QueryStringUtil.toString(stmt) + " does not produce any results at the provided sources, replacing node with EmptyStatementPattern.");
            }
            stmt.replaceWith(new EmptyStatementPattern(stmt));
        }
    }

    public Set<Endpoint> getRelevantSources() {
        HashSet<Endpoint> endpoints = new HashSet<Endpoint>();
        for (List<StatementSource> sourceList : this.stmtToSources.values()) {
            for (StatementSource source : sourceList) {
                endpoints.add(this.queryInfo.getFederationContext().getEndpointManager().getEndpoint(source.getEndpointID()));
            }
        }
        return endpoints;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void addSource(StatementPattern stmt, StatementSource source) {
        List<StatementSource> sources;
        List<StatementSource> list = sources = this.stmtToSources.get(stmt);
        synchronized (list) {
            sources.add(source);
        }
    }

    protected static class ParallelCheckTask
    extends ParallelTaskBase<BindingSet> {
        protected final Endpoint endpoint;
        protected final StatementPattern stmt;
        protected final SourceSelectionExecutorWithLatch control;
        protected final QueryInfo queryInfo;

        public ParallelCheckTask(Endpoint endpoint, StatementPattern stmt, QueryInfo queryInfo, SourceSelectionExecutorWithLatch control) {
            this.endpoint = endpoint;
            this.stmt = stmt;
            this.queryInfo = queryInfo;
            this.control = control;
        }

        @Override
        protected CloseableIteration<BindingSet, QueryEvaluationException> performTaskInternal() throws Exception {
            try {
                TripleSource t = this.endpoint.getTripleSource();
                boolean hasResults = false;
                hasResults = t.hasStatements(this.stmt, EmptyBindingSet.getInstance(), this.queryInfo, this.queryInfo.getDataset());
                SourceSelection sourceSelection = this.control.sourceSelection;
                sourceSelection.cache.updateInformation(new SubQuery(this.stmt, this.queryInfo.getDataset()), this.endpoint, hasResults);
                if (hasResults) {
                    sourceSelection.addSource(this.stmt, new StatementSource(this.endpoint.getId(), StatementSource.StatementSourceType.REMOTE));
                }
                return null;
            }
            catch (Exception e) {
                throw new OptimizationException("Error checking results for endpoint " + this.endpoint.getId() + ": " + e.getMessage(), e);
            }
        }

        @Override
        public ParallelExecutor<BindingSet> getControl() {
            return this.control;
        }

        @Override
        public void cancel() {
            this.control.latch.countDown();
            super.cancel();
        }
    }

    protected class CheckTaskPair {
        public final Endpoint e;
        public final StatementPattern t;
        public final QueryInfo queryInfo;

        public CheckTaskPair(Endpoint e, StatementPattern t, QueryInfo queryInfo) {
            this.e = e;
            this.t = t;
            this.queryInfo = queryInfo;
        }
    }

    protected static class SourceSelectionExecutorWithLatch
    implements ParallelExecutor<BindingSet> {
        private final SourceSelection sourceSelection;
        private final ControlledWorkerScheduler<BindingSet> scheduler;
        private CountDownLatch latch;
        private boolean finished = false;
        protected List<Exception> errors = new CopyOnWriteArrayList<Exception>();

        public static void run(SourceSelection sourceSelection, List<CheckTaskPair> tasks, SourceSelectionCache cache) {
            new SourceSelectionExecutorWithLatch(sourceSelection).executeRemoteSourceSelection(tasks, cache);
        }

        private SourceSelectionExecutorWithLatch(SourceSelection sourceSelection) {
            this.sourceSelection = sourceSelection;
            this.scheduler = sourceSelection.queryInfo.getFederationContext().getManager().getJoinScheduler();
        }

        private void executeRemoteSourceSelection(List<CheckTaskPair> tasks, SourceSelectionCache cache) {
            if (tasks.isEmpty()) {
                return;
            }
            this.latch = new CountDownLatch(tasks.size());
            for (CheckTaskPair checkTaskPair : tasks) {
                this.scheduler.schedule(new ParallelCheckTask(checkTaskPair.e, checkTaskPair.t, checkTaskPair.queryInfo, this));
            }
            try {
                boolean completed = this.latch.await(this.getQueryInfo().getMaxRemainingTimeMS(), TimeUnit.MILLISECONDS);
                if (!completed) {
                    throw new OptimizationException("Source selection has run into a timeout");
                }
            }
            catch (InterruptedException e) {
                log.debug("Error during source selection. Thread got interrupted.");
                this.errors.add(e);
            }
            this.finished = true;
            if (this.errors.size() > 0) {
                StringBuilder sb = new StringBuilder();
                sb.append(this.errors.size() + " errors were reported while optimizing query " + this.getQueryInfo().getQueryID());
                for (Exception e : this.errors) {
                    sb.append("\n" + ExceptionUtil.getExceptionString("Error occured", e));
                }
                log.debug(sb.toString());
                Exception exception = this.errors.get(0);
                this.errors.clear();
                if (exception instanceof OptimizationException) {
                    throw (OptimizationException)exception;
                }
                throw new OptimizationException(exception.getMessage(), exception);
            }
        }

        @Override
        public void run() {
        }

        @Override
        public void addResult(CloseableIteration<BindingSet, QueryEvaluationException> res) {
            this.latch.countDown();
        }

        @Override
        public void toss(Exception e) {
            this.latch.countDown();
            this.errors.add(e);
            this.getQueryInfo().abort();
        }

        @Override
        public void done() {
        }

        @Override
        public boolean isFinished() {
            return this.finished;
        }

        @Override
        public QueryInfo getQueryInfo() {
            return this.sourceSelection.queryInfo;
        }
    }
}

