package org.openvpms.smartflow.event.impl;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.microsoft.azure.servicebus.ExceptionPhase;
import com.microsoft.azure.servicebus.IMessage;
import com.microsoft.azure.servicebus.IMessageHandler;
import com.microsoft.azure.servicebus.IQueueClient;
import com.microsoft.azure.servicebus.MessageHandlerOptions;
import com.microsoft.azure.servicebus.QueueClient;
import com.microsoft.azure.servicebus.ReceiveMode;
import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder;
import java.io.ByteArrayInputStream;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Date;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import org.openvpms.component.model.party.Party;
import org.openvpms.smartflow.client.FlowSheetException;
import org.openvpms.smartflow.client.FlowSheetServiceFactory;
import org.openvpms.smartflow.event.EventDispatcher;
import org.openvpms.smartflow.event.EventStatus;
import org.openvpms.smartflow.i18n.FlowSheetMessages;
import org.openvpms.smartflow.model.ServiceBusConfig;
import org.openvpms.smartflow.model.event.Event;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/openvpms/smartflow/event/impl/QueueDispatcher.class */
public class QueueDispatcher {
    private final Party location;
    private final EventDispatcher dispatcher;
    private final ObjectMapper mapper;
    private final ExecutorService executorService;
    private final FlowSheetServiceFactory factory;
    private IQueueClient queue;
    private boolean started = false;
    private volatile boolean destroyed = false;
    private Date lastReceived;
    private Date lastError;
    private String errorMessage;
    private static final Logger log = LoggerFactory.getLogger(QueueDispatcher.class);

    /* loaded from: input_file:org/openvpms/smartflow/event/impl/QueueDispatcher$MessageHandler.class */
    private class MessageHandler implements IMessageHandler {
        private MessageHandler() {
        }

        public CompletableFuture<Void> onMessageAsync(IMessage iMessage) {
            QueueDispatcher.this.dispatch(iMessage);
            return CompletableFuture.completedFuture(null);
        }

        public void notifyException(Throwable th, ExceptionPhase exceptionPhase) {
            QueueDispatcher.log.error("Exception on Azure Service Bus queue for location=" + QueueDispatcher.this.location.getName() + ", exceptionPhase=" + exceptionPhase + ": " + th.getMessage(), th);
        }
    }

    public QueueDispatcher(Party party, EventDispatcher eventDispatcher, ObjectMapper objectMapper, ExecutorService executorService, FlowSheetServiceFactory flowSheetServiceFactory) {
        this.location = party;
        this.dispatcher = eventDispatcher;
        this.mapper = objectMapper;
        this.executorService = executorService;
        this.factory = flowSheetServiceFactory;
    }

    public Party getLocation() {
        return this.location;
    }

    public synchronized boolean start() {
        boolean z = false;
        if (this.destroyed) {
            return false;
        }
        try {
            if (!this.started) {
                clearError();
                this.queue = createQueue();
                this.queue.registerMessageHandler(new MessageHandler(), new MessageHandlerOptions(1, false, Duration.ofMinutes(5L)), this.executorService);
                this.started = true;
            }
            z = true;
        } catch (Throwable th) {
            stopped(th);
            log.error("Failed start dispatching SFS events for location=" + this.location.getName() + ": " + th.getMessage(), th);
        }
        return z;
    }

    public synchronized boolean isStarted() {
        return this.started;
    }

    public synchronized void stop() {
        if (this.started) {
            try {
                this.queue.close();
            } catch (Throwable th) {
                log.error(th.getMessage(), th);
            } finally {
                this.queue = null;
                stopped(null);
            }
        }
    }

    public boolean isDestroyed() {
        return this.destroyed;
    }

    public void destroy() {
        try {
            stop();
        } finally {
            this.destroyed = true;
        }
    }

    public synchronized EventStatus getStatus() {
        return new EventStatus(this.lastReceived, this.lastError, this.errorMessage);
    }

    protected void dispatch(IMessage iMessage) {
        this.lastReceived = new Date();
        clearError();
        try {
            Event<?> event = getEvent(iMessage);
            if (event != null) {
                this.dispatcher.dispatch(event);
            }
            this.queue.complete(iMessage.getLockToken());
        } catch (Throwable th) {
            log.error("Failed to process message=" + iMessage.getMessageId() + " for location=" + this.location.getName() + ": " + th.getMessage(), th);
            setError(th);
        }
    }

    protected Event<?> getEvent(IMessage iMessage) {
        byte[] bArr = null;
        List binaryData = iMessage.getMessageBody().getBinaryData();
        if (binaryData != null && binaryData.size() != 0) {
            bArr = (byte[]) binaryData.get(0);
        }
        if (log.isDebugEnabled()) {
            log.debug("location='" + this.location.getName() + "', messageID=" + iMessage.getMessageId() + ", timeToLive=" + iMessage.getTimeToLive() + ", sequence=" + iMessage.getSequenceNumber() + ", contentType=" + iMessage.getContentType() + ", content=" + getContent(bArr));
        }
        Event<?> event = null;
        if (bArr != null) {
            try {
                event = (Event) this.mapper.readValue(new ByteArrayInputStream(bArr), Event.class);
            } catch (Throwable th) {
                log.error("Failed to deserialize message for location='" + this.location.getName() + "', messageID=" + iMessage.getMessageId() + ", sequence=" + iMessage.getSequenceNumber() + ", timeToLive=" + iMessage.getTimeToLive() + ", contentType=" + iMessage.getContentType() + ", content=" + getContent(bArr), th);
            }
        }
        return event;
    }

    protected IQueueClient createQueue() {
        try {
            ServiceBusConfig serviceBusConfig = this.factory.getReferenceDataService(this.location).getServiceBusConfig();
            return new QueueClient(new ConnectionStringBuilder(serviceBusConfig.getConnectionString(), serviceBusConfig.getQueueName()), ReceiveMode.PEEKLOCK);
        } catch (FlowSheetException e) {
            throw e;
        } catch (Throwable th) {
            throw new FlowSheetException(FlowSheetMessages.failedToCreateAzureServiceBusQueue(this.location, th.getMessage()), th);
        }
    }

    private String getContent(byte[] bArr) {
        if (bArr != null) {
            return new String(bArr, StandardCharsets.UTF_8);
        }
        return null;
    }

    private synchronized void stopped(Throwable th) {
        this.started = false;
        if (th == null) {
            clearError();
        } else {
            setError(th);
        }
    }

    private void clearError() {
        this.lastError = null;
        this.errorMessage = null;
    }

    private void setError(Throwable th) {
        this.lastError = new Date();
        this.errorMessage = th.getMessage();
        if (this.errorMessage == null) {
            this.errorMessage = th.getClass().getSimpleName();
        }
    }
}
