/*
 * Decompiled with CFR 0.152.
 */
package org.openvpms.archetype.component.dispatcher;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.time.DurationFormatUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.openvpms.archetype.component.dispatcher.Queue;
import org.openvpms.archetype.component.dispatcher.Queues;
import org.openvpms.archetype.rules.practice.PracticeService;
import org.openvpms.component.business.service.security.RunAs;
import org.openvpms.component.model.user.User;
import org.slf4j.Logger;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
import org.springframework.transaction.support.TransactionSynchronizationManager;

public abstract class Dispatcher<T, O, Q extends Queue<T, O>>
implements DisposableBean,
ApplicationListener<ContextRefreshedEvent> {
    private final PracticeService practiceService;
    private final ExecutorService executor;
    private final Semaphore waiter = new Semaphore(0);
    private final Logger log;
    private final Semaphore scheduled = new Semaphore(1);
    private Queues<O, Q> queues;
    private volatile boolean shutdown = false;
    private User serviceUser;
    private boolean missingUser;

    protected Dispatcher(PracticeService practiceService, Logger log) {
        this.practiceService = practiceService;
        this.log = log;
        this.executor = Executors.newSingleThreadExecutor();
        this.serviceUser = this.getServiceUser();
    }

    public void onApplicationEvent(ContextRefreshedEvent event) {
        this.scheduleDispatch();
    }

    public void destroy() throws Exception {
        this.shutdown = true;
        this.executor.shutdown();
        this.waiter.release();
        try {
            if (!this.executor.awaitTermination(60L, TimeUnit.SECONDS)) {
                this.executor.shutdownNow();
                if (!this.executor.awaitTermination(60L, TimeUnit.SECONDS)) {
                    this.log.error("Pool did not terminate");
                }
            }
        }
        catch (InterruptedException ie) {
            this.executor.shutdownNow();
            Thread.currentThread().interrupt();
        }
        if (this.queues != null) {
            this.queues.destroy();
        }
    }

    protected void init(Queues<O, Q> queues) {
        this.queues = queues;
    }

    protected Queues<O, Q> getQueues() {
        return this.queues;
    }

    protected Q getQueue(O owner) {
        return (Q)((Queue)this.queues.getQueue(owner));
    }

    protected boolean processFirst(Q queue) {
        boolean result = false;
        Throwable failure = null;
        try {
            Object object = ((Queue)queue).peekFirst();
            if (object != null) {
                result = true;
                failure = this.processProtected(object, queue);
            }
        }
        catch (Exception exception) {
            this.log.error(exception.getMessage(), (Throwable)exception);
            failure = exception;
        }
        if (failure != null) {
            long retry = (long)((Queue)queue).getRetryInterval() * 1000L;
            ((Queue)queue).setWaitUntil(System.currentTimeMillis() + retry);
            ((Queue)queue).error(failure);
        }
        return result;
    }

    protected void schedule() {
        if (TransactionSynchronizationManager.isSynchronizationActive()) {
            TransactionSynchronizationManager.registerSynchronization((TransactionSynchronization)new TransactionSynchronizationAdapter(){

                public void afterCommit() {
                    Dispatcher.this.scheduleDispatch();
                }
            });
        } else {
            this.scheduleDispatch();
        }
    }

    protected String toString(T object) {
        return object.toString();
    }

    protected abstract void process(T var1, Q var2) throws Exception;

    protected PracticeService getPracticeService() {
        return this.practiceService;
    }

    protected void dispatch() {
        long minWait;
        int waiting;
        boolean processed;
        this.log.debug("dispatch() - start");
        do {
            processed = false;
            waiting = 0;
            minWait = 0L;
            for (Queue queue : this.queues.getQueues()) {
                if (queue.isSuspended()) {
                    this.log.debug("dispatch() - skipping suspended queue {}", (Object)queue);
                    continue;
                }
                long wait = queue.getWaitUntil();
                if (wait == -1L || wait <= System.currentTimeMillis()) {
                    processed |= this.processFirst(queue);
                    continue;
                }
                ++waiting;
                if (minWait != 0L && wait >= minWait) continue;
                minWait = wait;
            }
        } while (processed && !this.shutdown);
        this.postDispatch(waiting, minWait);
    }

    protected void postDispatch(int waiting, long waitUntil) {
        if (!this.shutdown && waiting != 0) {
            long wait = waitUntil - System.currentTimeMillis();
            if (wait > 0L) {
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Dispatcher waiting for {}", (Object)DurationFormatUtils.formatDurationHMS((long)wait));
                }
                try {
                    this.waiter.drainPermits();
                    this.waiter.tryAcquire(wait, TimeUnit.MILLISECONDS);
                }
                catch (InterruptedException ignore) {
                    Thread.currentThread().interrupt();
                }
            }
            this.scheduleDispatch();
        }
        this.log.debug("dispatch() - end, rescheduled={}", (Object)(this.scheduled.availablePermits() == 0 ? 1 : 0));
    }

    private Throwable processProtected(T object, Q queue) {
        Exception failure = null;
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();
        try {
            this.process(object, queue);
            this.log.debug("Processed object for {} in {}", queue, (Object)stopWatch);
        }
        catch (Exception exception) {
            this.log.error("Failed to process object={} for {}, time={}", new Object[]{this.toString(object), queue, stopWatch, exception});
            failure = exception;
        }
        return failure;
    }

    private void scheduleDispatch() {
        this.waiter.release();
        User user = this.getServiceUser();
        if (this.shutdown) {
            this.log.debug("Dispatcher shutting down. Schedule request ignored");
        } else if (user == null) {
            this.log.warn("No service user. Schedule request ignored");
        } else if (this.scheduled.tryAcquire()) {
            this.log.debug("Scheduling dispatcher");
            this.executor.execute(() -> {
                this.scheduled.release();
                try {
                    RunAs.run((User)user, this::dispatch);
                }
                catch (Exception exception) {
                    this.log.error(exception.getMessage(), (Throwable)exception);
                }
            });
        } else {
            this.log.debug("Dispatcher already scheduled");
        }
    }

    private synchronized User getServiceUser() {
        if (this.serviceUser == null) {
            this.serviceUser = this.practiceService.getServiceUser();
            if (this.serviceUser == null && !this.missingUser) {
                this.log.error("Missing party.organisationPractice serviceUser. Processing cannot occur until this is configured");
                this.missingUser = true;
            }
        }
        return this.serviceUser;
    }
}

