/*
 * Decompiled with CFR 0.152.
 */
package com.terracottatech.frs.recovery;

import com.terracottatech.frs.DeleteFilter;
import com.terracottatech.frs.Disposable;
import com.terracottatech.frs.action.Action;
import com.terracottatech.frs.action.ActionManager;
import com.terracottatech.frs.config.Configuration;
import com.terracottatech.frs.config.FrsProperty;
import com.terracottatech.frs.log.LogManager;
import com.terracottatech.frs.log.LogRecord;
import com.terracottatech.frs.recovery.AbstractFilter;
import com.terracottatech.frs.recovery.Filter;
import com.terracottatech.frs.recovery.RecoveryException;
import com.terracottatech.frs.recovery.RecoveryListener;
import com.terracottatech.frs.recovery.RecoveryManager;
import com.terracottatech.frs.recovery.SkipsFilter;
import com.terracottatech.frs.transaction.TransactionFilter;
import com.terracottatech.frs.util.NullFuture;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RecoveryManagerImpl
implements RecoveryManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(RecoveryManager.class);
    private final LogManager logManager;
    private final ActionManager actionManager;
    private final boolean compressedSkipSet;
    private final ReplayFilter replayFilter;
    private final Configuration configuration;

    RecoveryManagerImpl(LogManager logManager, ActionManager actionManager, Configuration configuration, Runtime runtime) {
        this(logManager, actionManager, configuration, runtime.availableProcessors());
    }

    RecoveryManagerImpl(LogManager logManager, ActionManager actionManager, Configuration configuration, int availableProcessors) {
        this.logManager = logManager;
        this.actionManager = actionManager;
        this.compressedSkipSet = configuration.getBoolean(FrsProperty.RECOVERY_COMPRESSED_SKIP_SET);
        this.replayFilter = new ReplayFilter(configuration.getInt(FrsProperty.RECOVERY_REPLAY_PER_BATCH_SIZE), configuration.getInt(FrsProperty.RECOVERY_REPLAY_TOTAL_BATCH_SIZE_MAX), configuration.getDBHome(), availableProcessors);
        this.configuration = configuration;
    }

    public RecoveryManagerImpl(LogManager logManager, ActionManager actionManager, Configuration configuration) {
        this(logManager, actionManager, configuration, Runtime.getRuntime());
    }

    @Override
    public Future<Void> recover(RecoveryListener ... listeners) throws RecoveryException, InterruptedException {
        Iterator<LogRecord> i = this.logManager.startup();
        long filter = 0L;
        long put = 0L;
        long ntime = System.nanoTime();
        DeleteFilter deleteFilter = new DeleteFilter(this.replayFilter);
        TransactionFilter transactionFilter = new TransactionFilter(deleteFilter);
        SkipsFilter skipsFilter = new SkipsFilter(transactionFilter, this.logManager.lowestLsn(), this.compressedSkipSet);
        ProgressLoggingFilter progressLoggingFilter = new ProgressLoggingFilter(this.replayFilter.dbHome, skipsFilter, this.logManager.lowestLsn());
        long lastRecoveredLsn = Long.MAX_VALUE;
        try {
            while (i.hasNext()) {
                LogRecord logRecord = i.next();
                Action action = this.actionManager.extract(logRecord);
                long ctime = System.nanoTime();
                filter += ctime - ntime;
                boolean replayed = progressLoggingFilter.filter(action, logRecord.getLsn(), false);
                ntime = System.nanoTime();
                put += ntime - ctime;
                this.replayFilter.checkError();
                lastRecoveredLsn = logRecord.getLsn();
                if (action instanceof Disposable) {
                    if (replayed) continue;
                    ((Disposable)((Object)action)).dispose();
                    continue;
                }
                logRecord.close();
            }
        }
        catch (IOException ioe) {
            throw new RecoveryException("failed to restart", ioe);
        }
        finally {
            this.replayFilter.finish();
            this.replayFilter.checkError();
        }
        if (lastRecoveredLsn != Long.MAX_VALUE && lastRecoveredLsn > this.logManager.lowestLsn()) {
            throw new RecoveryException("Recovery is incomplete for log " + this.configuration.getDBHome() + ". Files may be missing.");
        }
        for (RecoveryListener listener : listeners) {
            listener.recovered();
        }
        LOGGER.debug("count " + this.replayFilter.getReplayCount() + " put " + put + " filter " + filter);
        LOGGER.debug(skipsFilter.toString());
        return new NullFuture();
    }

    private static class ReplayElement {
        private final Action action;
        private final long lsn;

        private ReplayElement(Action action, long lsn) {
            this.action = action;
            this.lsn = lsn;
        }

        void replay() {
            this.action.replay(this.lsn);
            if (this.action instanceof Disposable) {
                ((Disposable)((Object)this.action)).dispose();
            }
        }
    }

    private static class MaxProcessorsToPrime {
        private static final int[] primesTo100 = new int[]{23, 31, 41, 53, 61, 71, 83, 97, 137, 149, 157, 167, 179, 181, 191, 211, 223, 241, 269, 293};
        private static final int[] primesTo200 = new int[]{317, 337, 359, 379, 397, 409, 439, 479, 509, 557};
        private static final int[] primesTo300 = new int[]{599, 691, 797, 887, 997};
        private static final int[] primesTo500 = new int[]{1091, 1117, 1217, 1319};
        private static final int primeTo1000 = 2399;

        private MaxProcessorsToPrime() {
        }

        private static int getNextPrime(int numProcessors) {
            if (numProcessors <= 2) {
                return 3;
            }
            if (numProcessors < 100) {
                return primesTo100[numProcessors / 5];
            }
            if (numProcessors < 200) {
                return primesTo200[(numProcessors - 100) / 10];
            }
            if (numProcessors < 300) {
                return primesTo300[(numProcessors - 200) / 20];
            }
            if (numProcessors < 500) {
                return primesTo500[(numProcessors - 300) / 50];
            }
            return 2399;
        }
    }

    private static class ReplayFilter
    implements Filter<Action> {
        private final AtomicInteger threadId = new AtomicInteger();
        private final AtomicReference<Throwable> firstError = new AtomicReference();
        private final ForkJoinPool replayPool;
        private final File dbHome;
        private final int replayPerBatchSize;
        private final int replayTotalBatchSize;
        private long replayed = 0L;
        private long submitted = 0L;
        private ReplayElement[][] batches;
        private int[] currentIndices;
        private ForkJoinTask<Void> replayBatchTask;

        ReplayFilter(int replayPerBatchSize, int replayTotalBatchSize, File dbHome, int maxThreadCount) {
            this.dbHome = dbHome;
            this.replayPerBatchSize = replayPerBatchSize;
            this.replayTotalBatchSize = replayTotalBatchSize;
            int numBatches = MaxProcessorsToPrime.getNextPrime(maxThreadCount);
            this.batches = new ReplayElement[numBatches][replayPerBatchSize];
            this.currentIndices = new int[numBatches];
            this.replayBatchTask = null;
            ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
            this.replayPool = new ForkJoinPool(maxThreadCount, pool -> {
                ForkJoinWorkerThread thread = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
                thread.setName("Replay Thread - " + this.threadId.getAndIncrement());
                thread.setContextClassLoader(classLoader);
                return thread;
            }, null, false);
        }

        public long getReplayCount() {
            return this.replayed;
        }

        @Override
        public boolean filter(Action element, long lsn, boolean filtered) {
            int nextIdx2;
            if (filtered) {
                return false;
            }
            int idx1 = (element.replayConcurrency() & Integer.MAX_VALUE) % this.batches.length;
            int idx2 = this.currentIndices[idx1];
            this.currentIndices[idx1] = nextIdx2 = idx2 + 1;
            ++this.submitted;
            this.batches[idx1][idx2] = new ReplayElement(element, lsn);
            if (this.submitted - this.replayed >= (long)this.replayTotalBatchSize || nextIdx2 >= this.replayPerBatchSize - 1) {
                this.submitJob(false);
            }
            return true;
        }

        private void submitJob(boolean last) {
            ReplayElement[][] go = this.batches;
            if (!last) {
                this.batches = new ReplayElement[this.batches.length][this.replayPerBatchSize];
                this.currentIndices = new int[this.batches.length];
            } else {
                this.batches = null;
                this.currentIndices = null;
            }
            if (this.replayBatchTask != null) {
                this.waitForReplayBatchTask();
            }
            if (this.replayed != this.submitted) {
                this.replayed = this.submitted;
                this.replayBatchTask = this.replayBatch(go);
                if (last) {
                    this.waitForReplayBatchTask();
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        private void waitForReplayBatchTask() {
            boolean interrupted = false;
            try {
                while (this.replayBatchTask != null) {
                    try {
                        this.replayBatchTask.get();
                    }
                    catch (ExecutionException e) {
                        this.firstError.compareAndSet(null, e);
                        LOGGER.error("Error replaying record: " + e.getMessage());
                    }
                    catch (InterruptedException e) {
                        interrupted |= Thread.interrupted();
                    }
                    finally {
                        if (!this.replayBatchTask.isDone()) continue;
                        this.replayBatchTask = null;
                    }
                }
                return;
            }
            finally {
                if (interrupted) {
                    Thread.currentThread().interrupt();
                }
            }
        }

        private ForkJoinTask<Void> replayBatch(ReplayElement[][] go) {
            return this.replayPool.submit(() -> {
                ((Stream)Arrays.stream(go).filter(rs -> rs[0] != null).parallel()).forEach(rs -> {
                    try {
                        for (ReplayElement r : rs) {
                            if (r == null) break;
                            r.replay();
                        }
                    }
                    catch (Throwable t) {
                        this.firstError.compareAndSet(null, t);
                        LOGGER.error("Error replaying record: " + t.getMessage());
                    }
                });
                return null;
            });
        }

        void checkError() throws RecoveryException {
            Throwable t = this.firstError.get();
            if (t != null) {
                throw new RecoveryException("Caught an error recovering from log at " + this.dbHome.getAbsolutePath(), t);
            }
        }

        void finish() throws InterruptedException {
            boolean done;
            this.submitJob(true);
            this.replayPool.shutdown();
            do {
                if (done = this.replayPool.awaitTermination(2L, TimeUnit.MINUTES)) continue;
                LOGGER.warn("Unable to ensure recovery completion.");
                LOGGER.warn("Cannot proceed further. Checking Again for recovery completion...");
            } while (!done);
        }
    }

    private static class ProgressLoggingFilter
    extends AbstractFilter<Action> {
        private final long lowestLsn;
        private int position = 10;
        private long count = 0L;

        ProgressLoggingFilter(File home, Filter<Action> delegate, long lowestLsn) {
            super(delegate);
            LOGGER.info("Starting recovery for " + home.getAbsolutePath());
            this.lowestLsn = lowestLsn;
        }

        @Override
        public boolean filter(Action element, long lsn, boolean filtered) {
            if (this.count-- <= 0L && this.position > 0) {
                LOGGER.info("Recovery progress " + (10 - this.position) * 10 + "%");
                this.count = (lsn - this.lowestLsn) / (long)this.position--;
            }
            if (lsn == this.lowestLsn) {
                LOGGER.info("Recovery progress 100%");
            }
            return this.delegate(element, lsn, filtered);
        }
    }
}

