package org.openvpms.hl7.impl;

import ca.uhn.hl7v2.HL7Exception;
import ca.uhn.hl7v2.HapiContext;
import ca.uhn.hl7v2.app.Connection;
import ca.uhn.hl7v2.llp.LLPException;
import ca.uhn.hl7v2.model.Message;
import ca.uhn.hl7v2.protocol.ReceivingApplication;
import ca.uhn.hl7v2.util.idgenerator.IDGenerator;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.openvpms.archetype.rules.practice.PracticeRules;
import org.openvpms.component.business.domain.im.act.DocumentAct;
import org.openvpms.component.business.domain.im.common.IMObjectReference;
import org.openvpms.component.business.domain.im.party.Party;
import org.openvpms.component.business.service.security.RunAs;
import org.openvpms.component.model.user.User;
import org.openvpms.hl7.impl.ConnectorsImpl;
import org.openvpms.hl7.io.Connector;
import org.openvpms.hl7.io.Connectors;
import org.openvpms.hl7.io.MessageDispatcher;
import org.openvpms.hl7.io.MessageService;
import org.openvpms.hl7.io.Statistics;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
import org.springframework.transaction.support.TransactionSynchronizationManager;

/* loaded from: input_file:org/openvpms/hl7/impl/MessageDispatcherImpl.class */
public class MessageDispatcherImpl implements MessageDispatcher, DisposableBean, InitializingBean {
    private final MessageService messageService;
    private final ConnectorsImpl connectors;
    private final PracticeRules rules;
    private final HeaderPopulator populator;
    private final HapiContext messageContext;
    private final IDGenerator generator;
    private final Map<IMObjectReference, MessageQueue> queueMap;
    private final Map<IMObjectReference, MessageReceiver> receiverMap;
    private final Map<Integer, DemultiplexingReceiver> receivers;
    private final ExecutorService executor;
    private final ConnectorsImpl.Listener listener;
    private final Semaphore waiter;
    private Semaphore scheduled;
    private volatile boolean shutdown;
    private volatile User user;
    private boolean missingUser;
    private static final Log log = LogFactory.getLog(MessageDispatcherImpl.class);

    public MessageDispatcherImpl(MessageService messageService, ConnectorsImpl connectorsImpl, PracticeRules practiceRules) {
        this(messageService, connectorsImpl, practiceRules, HapiContextFactory.create());
    }

    public MessageDispatcherImpl(MessageService messageService, ConnectorsImpl connectorsImpl, PracticeRules practiceRules, HapiContext hapiContext) {
        this.queueMap = Collections.synchronizedMap(new HashMap());
        this.receiverMap = Collections.synchronizedMap(new HashMap());
        this.receivers = new HashMap();
        this.waiter = new Semaphore(0);
        this.scheduled = new Semaphore(1);
        this.shutdown = false;
        this.messageService = messageService;
        this.connectors = connectorsImpl;
        this.rules = practiceRules;
        this.populator = new HeaderPopulator();
        this.messageContext = hapiContext;
        this.generator = this.messageContext.getParserConfiguration().getIdGenerator();
        this.executor = Executors.newSingleThreadExecutor();
        this.user = getServiceUser();
        this.listener = new ConnectorsImpl.Listener() { // from class: org.openvpms.hl7.impl.MessageDispatcherImpl.1
            @Override // org.openvpms.hl7.impl.ConnectorsImpl.Listener
            public void added(Connector connector) {
                MessageDispatcherImpl.this.update(connector);
            }

            @Override // org.openvpms.hl7.impl.ConnectorsImpl.Listener
            public void removed(Connector connector) {
                MessageDispatcherImpl.this.remove(connector);
            }
        };
    }

    public HapiContext getMessageContext() {
        return this.messageContext;
    }

    @Override // org.openvpms.hl7.io.MessageDispatcher
    public DocumentAct queue(Message message, Connector connector, HL7Mapping hL7Mapping, User user) {
        if (!(connector instanceof MLLPSender)) {
            throw new IllegalArgumentException("Unsupported connector: " + connector);
        }
        try {
            populate(message, (MLLPSender) connector, hL7Mapping);
            return queue(message, (MLLPSender) connector, user);
        } catch (Throwable th) {
            throw new IllegalStateException(th);
        }
    }

    @Override // org.openvpms.hl7.io.MessageDispatcher
    public void resubmit(DocumentAct documentAct) {
        this.messageService.resubmit(documentAct);
        schedule();
    }

    @Override // org.openvpms.hl7.io.MessageDispatcher
    public void listen(Connector connector, ReceivingApplication receivingApplication, User user) throws InterruptedException {
        DemultiplexingReceiver demultiplexingReceiver;
        if (connector instanceof MLLPReceiver) {
            int port = ((MLLPReceiver) connector).getPort();
            synchronized (this.receivers) {
                demultiplexingReceiver = this.receivers.get(Integer.valueOf(port));
                if (demultiplexingReceiver == null) {
                    demultiplexingReceiver = new DemultiplexingReceiver(this.messageService, this.messageContext, port);
                    this.receivers.put(Integer.valueOf(port), demultiplexingReceiver);
                }
            }
            this.receiverMap.put(connector.getReference(), demultiplexingReceiver.add(connector, receivingApplication, user));
        }
    }

    @Override // org.openvpms.hl7.io.MessageDispatcher
    public void start() {
        for (Map.Entry<Integer, DemultiplexingReceiver> entry : this.receivers.entrySet()) {
            DemultiplexingReceiver value = entry.getValue();
            if (!value.isRunning()) {
                value.start();
            }
            if (!value.isRunning()) {
                log.error("Failed to start listener for port=" + entry.getKey());
            }
        }
    }

    @Override // org.openvpms.hl7.io.MessageDispatcher
    public void stop(Connector connector) {
        if (connector instanceof MLLPReceiver) {
            int port = ((MLLPReceiver) connector).getPort();
            synchronized (this.receivers) {
                DemultiplexingReceiver demultiplexingReceiver = this.receivers.get(Integer.valueOf(port));
                if (demultiplexingReceiver != null) {
                    synchronized (demultiplexingReceiver) {
                        demultiplexingReceiver.remove(connector);
                        this.receiverMap.remove(connector.getReference());
                        if (demultiplexingReceiver.isEmpty()) {
                            this.receivers.remove(Integer.valueOf(port));
                            demultiplexingReceiver.stop();
                        }
                    }
                }
            }
        }
    }

    @Override // org.openvpms.hl7.io.MessageDispatcher
    public Statistics getStatistics(IMObjectReference iMObjectReference) {
        Statistics statistics = this.queueMap.get(iMObjectReference);
        if (statistics == null) {
            statistics = this.receiverMap.get(iMObjectReference);
        }
        return statistics;
    }

    public void afterPropertiesSet() {
        initialise();
    }

    public void initialise() {
        initialise(this.connectors);
        this.connectors.addListener(this.listener);
        log.info("HL7 message dispatcher initialised");
        schedule();
    }

    public void destroy() throws Exception {
        this.shutdown = true;
        this.connectors.removeListener(this.listener);
        this.executor.shutdown();
        this.waiter.release();
        try {
            if (!this.executor.awaitTermination(60L, TimeUnit.SECONDS)) {
                this.executor.shutdownNow();
                if (!this.executor.awaitTermination(60L, TimeUnit.SECONDS)) {
                    log.error("Pool did not terminate");
                }
            }
        } catch (InterruptedException e) {
            this.executor.shutdownNow();
            Thread.currentThread().interrupt();
        }
        synchronized (this.receivers) {
            Iterator<DemultiplexingReceiver> it = this.receivers.values().iterator();
            while (it.hasNext()) {
                it.next().stop();
            }
        }
        this.messageContext.getExecutorService();
        this.messageContext.close();
    }

    protected Date createMessageTimestamp() {
        return new Date();
    }

    protected String createMessageControlID() throws IOException {
        return this.generator.getID();
    }

    protected void initialise(Connectors connectors) {
        for (Connector connector : connectors.getConnectors()) {
            if (connector instanceof MLLPSender) {
                getMessageQueue((MLLPSender) connector);
            }
        }
    }

    protected DocumentAct queue(Message message, MLLPSender mLLPSender, User user) throws HL7Exception {
        MessageQueue messageQueue = getMessageQueue(mLLPSender);
        if (log.isDebugEnabled()) {
            log.debug("queue() - " + mLLPSender);
        }
        DocumentAct add = messageQueue.add(message, user);
        if (TransactionSynchronizationManager.isSynchronizationActive()) {
            TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() { // from class: org.openvpms.hl7.impl.MessageDispatcherImpl.2
                public void afterCommit() {
                    MessageDispatcherImpl.this.schedule();
                }
            });
        } else {
            schedule();
        }
        return add;
    }

    protected Message send(Message message, MLLPSender mLLPSender) throws HL7Exception, LLPException, IOException {
        boolean isDebugEnabled = log.isDebugEnabled();
        long j = -1;
        if (isDebugEnabled) {
            log.debug("sending message via " + mLLPSender);
            log.debug(toString(message));
            j = System.currentTimeMillis();
        }
        Connection connection = null;
        try {
            connection = this.messageContext.newClient(mLLPSender.getHost(), mLLPSender.getPort(), false);
            int responseTimeout = mLLPSender.getResponseTimeout();
            if (responseTimeout <= 0) {
                responseTimeout = 30;
            }
            connection.getInitiator().setTimeout(responseTimeout, TimeUnit.SECONDS);
            Message sendAndReceive = connection.getInitiator().sendAndReceive(message);
            if (isDebugEnabled) {
                log.debug("response received in " + (System.currentTimeMillis() - j) + "ms");
                log.debug(toString(sendAndReceive));
            }
            if (connection != null) {
                connection.close();
            }
            return sendAndReceive;
        } catch (Throwable th) {
            if (connection != null) {
                connection.close();
            }
            throw th;
        }
    }

    protected boolean sendFirst(MessageQueue messageQueue) {
        log.debug("sendFirst() - " + messageQueue.getConnector());
        boolean z = false;
        Message peekFirst = messageQueue.peekFirst();
        if (peekFirst != null) {
            send(messageQueue, peekFirst, messageQueue.peekFirstAct());
            z = true;
        } else {
            log.debug("sendFirst() - nothing to send");
        }
        return z;
    }

    protected void send(MessageQueue messageQueue, Message message, DocumentAct documentAct) {
        MLLPSender connector = messageQueue.getConnector();
        try {
            messageQueue.sent(send(message, connector));
        } catch (Throwable th) {
            log.error("Failed to send message, act Id=" + documentAct.getId(), th);
            int retryInterval = connector.getRetryInterval();
            if (retryInterval <= 0) {
                retryInterval = 30;
            }
            messageQueue.setWaitUntil(System.currentTimeMillis() + (retryInterval * 1000));
            messageQueue.error(th);
        }
    }

    private void populate(Message message, MLLPSender mLLPSender, HL7Mapping hL7Mapping) throws HL7Exception, IOException {
        this.populator.populate(message, mLLPSender, createMessageTimestamp(), createMessageControlID(), hL7Mapping);
    }

    private MessageQueue getMessageQueue(MLLPSender mLLPSender) {
        MessageQueue messageQueue;
        synchronized (this.queueMap) {
            messageQueue = this.queueMap.get(mLLPSender.getReference());
            if (messageQueue == null) {
                messageQueue = new MessageQueue(mLLPSender, this.messageService, this.messageContext);
                this.queueMap.put(mLLPSender.getReference(), messageQueue);
            }
        }
        return messageQueue;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void schedule() {
        this.waiter.release();
        final User serviceUser = getServiceUser();
        if (this.shutdown) {
            log.debug("MessageDispatcher shutting down. Schedule request ignored");
            return;
        }
        if (serviceUser == null) {
            log.debug("No service user. Schedule request ignored");
        } else if (this.scheduled.tryAcquire()) {
            this.executor.execute(new Runnable() { // from class: org.openvpms.hl7.impl.MessageDispatcherImpl.3
                @Override // java.lang.Runnable
                public void run() {
                    MessageDispatcherImpl.this.scheduled.release();
                    try {
                        RunAs.run(serviceUser, new Runnable() { // from class: org.openvpms.hl7.impl.MessageDispatcherImpl.3.1
                            @Override // java.lang.Runnable
                            public void run() {
                                MessageDispatcherImpl.this.dispatch();
                            }
                        });
                    } catch (Throwable th) {
                        MessageDispatcherImpl.log.error(th.getMessage(), th);
                    }
                }
            });
        } else {
            log.debug("MessageDispatcher already scheduled");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void dispatch() {
        int i;
        long j;
        log.debug("dispatch() - start");
        do {
            boolean z = false;
            i = 0;
            j = 0;
            for (MessageQueue messageQueue : new ArrayList(this.queueMap.values())) {
                if (!messageQueue.isSuspended()) {
                    long waitUntil = messageQueue.getWaitUntil();
                    if (waitUntil == -1 || waitUntil <= System.currentTimeMillis()) {
                        z |= sendFirst(messageQueue);
                    } else {
                        i++;
                        if (j == 0 || waitUntil < j) {
                            j = waitUntil;
                        }
                    }
                } else if (log.isDebugEnabled()) {
                    log.debug("dispatch() - skipping suspended queue " + messageQueue);
                }
            }
            if (!z) {
                break;
            }
        } while (!this.shutdown);
        if (i != 0 && !this.shutdown) {
            long currentTimeMillis = j - System.currentTimeMillis();
            if (currentTimeMillis > 0) {
                log.debug("dispatch() waiting for " + currentTimeMillis + "ms");
                try {
                    this.waiter.drainPermits();
                    this.waiter.tryAcquire(currentTimeMillis, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                }
            }
            schedule();
        }
        log.debug("dispatch() - end");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void update(Connector connector) {
        if (connector instanceof MLLPSender) {
            restartSender(connector);
        } else if (connector instanceof MLLPReceiver) {
            restartReceiver(connector);
        }
    }

    private void restartReceiver(Connector connector) {
        MessageReceiver messageReceiver = this.receiverMap.get(connector.getReference());
        if (messageReceiver != null) {
            ReceivingApplication receivingApplication = messageReceiver.getReceivingApplication();
            stop(connector);
            try {
                listen(connector, receivingApplication, messageReceiver.getUser());
            } catch (InterruptedException e) {
                log.error("Failed to update " + connector, e);
                Thread.currentThread().interrupt();
            }
        }
    }

    private void restartSender(Connector connector) {
        MessageQueue messageQueue = this.queueMap.get(connector.getReference());
        MLLPSender mLLPSender = (MLLPSender) connector;
        if (messageQueue == null) {
            getMessageQueue(mLLPSender);
            return;
        }
        log.info("Updating " + connector);
        messageQueue.setConnector(mLLPSender);
        messageQueue.setWaitUntil(-1L);
        schedule();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void remove(Connector connector) {
        if (!(connector instanceof MLLPSender)) {
            if (connector instanceof MLLPReceiver) {
                stop(connector);
            }
        } else {
            MessageQueue remove = this.queueMap.remove(connector.getReference());
            if (remove != null) {
                log.info("Removed queue for " + connector);
                remove.setSuspended(true);
            }
        }
    }

    private User getServiceUser() {
        if (this.user == null) {
            synchronized (this) {
                if (this.user == null) {
                    Party practice = this.rules.getPractice();
                    if (practice != null) {
                        this.user = this.rules.getServiceUser(practice);
                    }
                    if (this.user == null && !this.missingUser) {
                        log.error("Missing party.organisationPractice serviceUser. Messages cannot be sent until this is configured");
                        this.missingUser = true;
                    }
                }
            }
        }
        return this.user;
    }

    private String toString(Message message) {
        try {
            return HL7MessageHelper.toString(message);
        } catch (HL7Exception e) {
            log.error(e.getMessage(), e);
            return "Failed to encode message";
        }
    }
}
