/*
 * Decompiled with CFR 0.152.
 */
package org.openvpms.smartflow.event.impl;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.microsoft.azure.servicebus.ExceptionPhase;
import com.microsoft.azure.servicebus.IMessage;
import com.microsoft.azure.servicebus.IMessageHandler;
import com.microsoft.azure.servicebus.IQueueClient;
import com.microsoft.azure.servicebus.MessageHandlerOptions;
import com.microsoft.azure.servicebus.QueueClient;
import com.microsoft.azure.servicebus.ReceiveMode;
import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Date;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import org.openvpms.component.model.party.Party;
import org.openvpms.smartflow.client.FlowSheetException;
import org.openvpms.smartflow.client.FlowSheetServiceFactory;
import org.openvpms.smartflow.client.ReferenceDataService;
import org.openvpms.smartflow.event.EventDispatcher;
import org.openvpms.smartflow.event.EventStatus;
import org.openvpms.smartflow.i18n.FlowSheetMessages;
import org.openvpms.smartflow.model.ServiceBusConfig;
import org.openvpms.smartflow.model.event.Event;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class QueueDispatcher {
    private final Party location;
    private final EventDispatcher dispatcher;
    private final ObjectMapper mapper;
    private final ExecutorService executorService;
    private final FlowSheetServiceFactory factory;
    private IQueueClient queue;
    private boolean started = false;
    private volatile boolean destroyed = false;
    private Date lastReceived;
    private Date lastError;
    private String errorMessage;
    private static final Logger log = LoggerFactory.getLogger(QueueDispatcher.class);

    public QueueDispatcher(Party location, EventDispatcher dispatcher, ObjectMapper mapper, ExecutorService executorService, FlowSheetServiceFactory factory) {
        this.location = location;
        this.dispatcher = dispatcher;
        this.mapper = mapper;
        this.executorService = executorService;
        this.factory = factory;
    }

    public Party getLocation() {
        return this.location;
    }

    public synchronized boolean start() {
        boolean result = false;
        if (this.destroyed) {
            return false;
        }
        try {
            if (!this.started) {
                this.clearError();
                this.queue = this.createQueue();
                MessageHandlerOptions options = new MessageHandlerOptions(1, false, Duration.ofMinutes(5L));
                this.queue.registerMessageHandler((IMessageHandler)new MessageHandler(), options, this.executorService);
                this.started = true;
            }
            result = true;
        }
        catch (Throwable exception) {
            this.stopped(exception);
            log.error("Failed start dispatching SFS events for location=" + this.location.getName() + ": " + exception.getMessage(), exception);
        }
        return result;
    }

    public synchronized boolean isStarted() {
        return this.started;
    }

    public synchronized void stop() {
        if (this.started) {
            try {
                this.queue.close();
            }
            catch (Throwable exception) {
                log.error(exception.getMessage(), exception);
            }
            finally {
                this.queue = null;
                this.stopped(null);
            }
        }
    }

    public boolean isDestroyed() {
        return this.destroyed;
    }

    public void destroy() {
        try {
            this.stop();
        }
        finally {
            this.destroyed = true;
        }
    }

    public synchronized EventStatus getStatus() {
        return new EventStatus(this.lastReceived, this.lastError, this.errorMessage);
    }

    protected void dispatch(IMessage message) {
        this.lastReceived = new Date();
        this.clearError();
        try {
            Event<?> event = this.getEvent(message);
            if (event != null) {
                this.dispatcher.dispatch(event);
            }
            this.queue.complete(message.getLockToken());
        }
        catch (Throwable exception) {
            log.error("Failed to process message=" + message.getMessageId() + " for location=" + this.location.getName() + ": " + exception.getMessage(), exception);
            this.setError(exception);
        }
    }

    protected Event<?> getEvent(IMessage message) {
        byte[] bytes = null;
        List binaryData = message.getMessageBody().getBinaryData();
        if (binaryData != null && binaryData.size() != 0) {
            bytes = (byte[])binaryData.get(0);
        }
        if (log.isDebugEnabled()) {
            log.debug("location='" + this.location.getName() + "', messageID=" + message.getMessageId() + ", timeToLive=" + message.getTimeToLive() + ", sequence=" + message.getSequenceNumber() + ", contentType=" + message.getContentType() + ", content=" + this.getContent(bytes));
        }
        Event event = null;
        if (bytes != null) {
            try {
                event = (Event)this.mapper.readValue((InputStream)new ByteArrayInputStream(bytes), Event.class);
            }
            catch (Throwable exception) {
                log.error("Failed to deserialize message for location='" + this.location.getName() + "', messageID=" + message.getMessageId() + ", sequence=" + message.getSequenceNumber() + ", timeToLive=" + message.getTimeToLive() + ", contentType=" + message.getContentType() + ", content=" + this.getContent(bytes), exception);
            }
        }
        return event;
    }

    protected IQueueClient createQueue() {
        QueueClient result;
        try {
            ReferenceDataService service = this.factory.getReferenceDataService(this.location);
            ServiceBusConfig config = service.getServiceBusConfig();
            result = new QueueClient(new ConnectionStringBuilder(config.getConnectionString(), config.getQueueName()), ReceiveMode.PEEKLOCK);
        }
        catch (FlowSheetException exception) {
            throw exception;
        }
        catch (Throwable exception) {
            throw new FlowSheetException(FlowSheetMessages.failedToCreateAzureServiceBusQueue(this.location, exception.getMessage()), exception);
        }
        return result;
    }

    private String getContent(byte[] bytes) {
        return bytes != null ? new String(bytes, StandardCharsets.UTF_8) : null;
    }

    private synchronized void stopped(Throwable exception) {
        this.started = false;
        if (exception == null) {
            this.clearError();
        } else {
            this.setError(exception);
        }
    }

    private void clearError() {
        this.lastError = null;
        this.errorMessage = null;
    }

    private void setError(Throwable exception) {
        this.lastError = new Date();
        this.errorMessage = exception.getMessage();
        if (this.errorMessage == null) {
            this.errorMessage = exception.getClass().getSimpleName();
        }
    }

    private class MessageHandler
    implements IMessageHandler {
        private MessageHandler() {
        }

        public CompletableFuture<Void> onMessageAsync(IMessage message) {
            QueueDispatcher.this.dispatch(message);
            return CompletableFuture.completedFuture(null);
        }

        public void notifyException(Throwable throwable, ExceptionPhase exceptionPhase) {
            log.error("Exception on Azure Service Bus queue for location=" + QueueDispatcher.this.location.getName() + ", exceptionPhase=" + exceptionPhase + ": " + throwable.getMessage(), throwable);
        }
    }
}

