package org.opentripplanner.ext.siri.updater.azure;

import ch.qos.logback.core.CoreConstants;
import com.azure.core.credential.TokenCredential;
import com.azure.identity.DefaultAzureCredentialBuilder;
import com.azure.messaging.servicebus.ServiceBusClientBuilder;
import com.azure.messaging.servicebus.ServiceBusErrorContext;
import com.azure.messaging.servicebus.ServiceBusException;
import com.azure.messaging.servicebus.ServiceBusFailureReason;
import com.azure.messaging.servicebus.ServiceBusProcessorClient;
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
import com.azure.messaging.servicebus.ServiceBusReceivedMessageContext;
import com.azure.messaging.servicebus.administration.ServiceBusAdministrationClient;
import com.azure.messaging.servicebus.administration.ServiceBusAdministrationClientBuilder;
import com.azure.messaging.servicebus.administration.models.CreateSubscriptionOptions;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
import jakarta.xml.bind.JAXBException;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import javax.xml.stream.XMLStreamException;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.hc.client5.http.classic.methods.HttpGet;
import org.entur.siri21.util.SiriXml;
import org.opentripplanner.framework.application.ApplicationShutdownSupport;
import org.opentripplanner.framework.io.OtpHttpClient;
import org.opentripplanner.framework.io.OtpHttpClientException;
import org.opentripplanner.framework.io.OtpHttpClientFactory;
import org.opentripplanner.routing.services.TransitAlertService;
import org.opentripplanner.transit.service.TimetableRepository;
import org.opentripplanner.updater.alert.TransitAlertProvider;
import org.opentripplanner.updater.spi.GraphUpdater;
import org.opentripplanner.updater.spi.HttpHeaders;
import org.opentripplanner.updater.spi.WriteToGraphCallback;
import org.opentripplanner.updater.trip.siri.SiriRealTimeTripUpdateAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import uk.org.siri.siri21.ServiceDelivery;
import uk.org.siri.siri21.Siri;

/* loaded from: input_file:org/opentripplanner/ext/siri/updater/azure/SiriAzureUpdater.class */
public class SiriAzureUpdater implements GraphUpdater {
    private final String updaterType;
    private final AuthenticationType authenticationType;
    private final String fullyQualifiedNamespace;
    private final String configRef;
    private final String serviceBusUrl;
    private final String topicName;
    private final Duration autoDeleteOnIdle;
    private final int prefetchCount;
    private ServiceBusProcessorClient eventProcessor;
    private ServiceBusAdministrationClient serviceBusAdmin;
    private String subscriptionName;
    protected final SiriAzureMessageHandler messageHandler;

    @Nullable
    private final URI dataInitializationUrl;
    private final int timeout;
    private static final Set<ServiceBusFailureReason> RETRYABLE_REASONS = Set.of(ServiceBusFailureReason.GENERAL_ERROR, ServiceBusFailureReason.QUOTA_EXCEEDED, ServiceBusFailureReason.SERVICE_BUSY, ServiceBusFailureReason.SERVICE_COMMUNICATION_ERROR, ServiceBusFailureReason.SERVICE_TIMEOUT, ServiceBusFailureReason.UNAUTHORIZED, ServiceBusFailureReason.MESSAGE_LOCK_LOST, ServiceBusFailureReason.SESSION_LOCK_LOST, ServiceBusFailureReason.SESSION_CANNOT_BE_LOCKED);
    private static final Set<ServiceBusFailureReason> NON_RETRYABLE_REASONS = Set.of(ServiceBusFailureReason.MESSAGING_ENTITY_NOT_FOUND, ServiceBusFailureReason.MESSAGING_ENTITY_DISABLED, ServiceBusFailureReason.MESSAGE_SIZE_EXCEEDED, ServiceBusFailureReason.MESSAGE_NOT_FOUND, ServiceBusFailureReason.MESSAGING_ENTITY_ALREADY_EXISTS);
    private static final AtomicLong MESSAGE_COUNTER = new AtomicLong(0);
    private final Logger LOG = LoggerFactory.getLogger(getClass());
    private boolean isPrimed = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    @FunctionalInterface
    /* loaded from: input_file:org/opentripplanner/ext/siri/updater/azure/SiriAzureUpdater$CheckedRunnable.class */
    public interface CheckedRunnable {
        void run() throws Exception;
    }

    /* loaded from: input_file:org/opentripplanner/ext/siri/updater/azure/SiriAzureUpdater$SxWrapper.class */
    public static class SxWrapper extends SiriAzureUpdater implements TransitAlertProvider {
        SxWrapper(SiriAzureUpdaterParameters siriAzureUpdaterParameters, SiriAzureSXUpdater siriAzureSXUpdater) {
            super(siriAzureUpdaterParameters, siriAzureSXUpdater);
        }

        @Override // org.opentripplanner.updater.alert.TransitAlertProvider
        public TransitAlertService getTransitAlertService() {
            return ((SiriAzureSXUpdater) this.messageHandler).getTransitAlertService();
        }
    }

    SiriAzureUpdater(SiriAzureUpdaterParameters siriAzureUpdaterParameters, SiriAzureMessageHandler siriAzureMessageHandler) {
        this.messageHandler = (SiriAzureMessageHandler) Objects.requireNonNull(siriAzureMessageHandler);
        try {
            this.dataInitializationUrl = siriAzureUpdaterParameters.buildDataInitializationUrl().orElse(null);
            this.configRef = (String) Objects.requireNonNull(siriAzureUpdaterParameters.configRef(), "configRef must not be null");
            this.authenticationType = (AuthenticationType) Objects.requireNonNull(siriAzureUpdaterParameters.getAuthenticationType(), "authenticationType must not be null");
            this.topicName = (String) Objects.requireNonNull(siriAzureUpdaterParameters.getTopicName(), "topicName must not be null");
            this.updaterType = (String) Objects.requireNonNull(siriAzureUpdaterParameters.getType(), "type must not be null");
            this.timeout = siriAzureUpdaterParameters.getTimeout();
            this.autoDeleteOnIdle = siriAzureUpdaterParameters.getAutoDeleteOnIdle();
            this.prefetchCount = siriAzureUpdaterParameters.getPrefetchCount();
            if (this.authenticationType == AuthenticationType.FederatedIdentity) {
                this.fullyQualifiedNamespace = (String) Objects.requireNonNull(siriAzureUpdaterParameters.getFullyQualifiedNamespace(), "fullyQualifiedNamespace must not be null when using FederatedIdentity authentication");
                this.serviceBusUrl = null;
            } else {
                if (this.authenticationType != AuthenticationType.SharedAccessKey) {
                    throw new IllegalArgumentException("Unsupported authentication type: " + String.valueOf(this.authenticationType));
                }
                this.serviceBusUrl = (String) Objects.requireNonNull(siriAzureUpdaterParameters.getServiceBusUrl(), "serviceBusUrl must not be null when using SharedAccessKey authentication");
                this.fullyQualifiedNamespace = null;
            }
        } catch (URISyntaxException e) {
            throw new IllegalArgumentException("Invalid history url", e);
        }
    }

    public static SiriAzureUpdater createETUpdater(SiriAzureETUpdaterParameters siriAzureETUpdaterParameters, SiriRealTimeTripUpdateAdapter siriRealTimeTripUpdateAdapter) {
        return new SiriAzureUpdater(siriAzureETUpdaterParameters, new SiriAzureETUpdater(siriAzureETUpdaterParameters, siriRealTimeTripUpdateAdapter));
    }

    public static SiriAzureUpdater createSXUpdater(SiriAzureSXUpdaterParameters siriAzureSXUpdaterParameters, TimetableRepository timetableRepository) {
        return new SxWrapper(siriAzureSXUpdaterParameters, new SiriAzureSXUpdater(siriAzureSXUpdaterParameters, timetableRepository));
    }

    @Override // org.opentripplanner.updater.spi.GraphUpdater
    public void setup(WriteToGraphCallback writeToGraphCallback) {
        this.messageHandler.setup(writeToGraphCallback);
    }

    @Override // org.opentripplanner.updater.spi.GraphUpdater
    public void run() {
        this.subscriptionName = System.getenv(CoreConstants.HOSTNAME_KEY);
        if (this.subscriptionName == null || this.subscriptionName.isBlank()) {
            this.subscriptionName = "otp-" + String.valueOf(UUID.randomUUID());
        }
        try {
            executeWithRetry(this::setupSubscription, "Setting up Service Bus subscription to topic");
            executeWithRetry(() -> {
                Optional<ServiceDelivery> fetchInitialSiriData = fetchInitialSiriData();
                if (fetchInitialSiriData.isEmpty()) {
                    this.LOG.info("Got empty response from history endpoint");
                } else {
                    processInitialSiriData(fetchInitialSiriData.get());
                }
            }, "Initializing historical Siri data");
            executeWithRetry(this::startEventProcessor, "Starting Service Bus event processor");
            setPrimed();
            ApplicationShutdownSupport.addShutdownHook("azure-siri-updater-shutdown", () -> {
                this.LOG.info("Calling shutdownHook on {} updater", this.updaterType);
                if (this.eventProcessor != null) {
                    this.eventProcessor.close();
                }
                if (this.serviceBusAdmin != null) {
                    this.serviceBusAdmin.deleteSubscription(this.topicName, this.subscriptionName);
                    this.LOG.info("Subscription '{}' deleted on topic '{}'.", this.subscriptionName, this.topicName);
                }
            });
        } catch (ServiceBusException e) {
            this.LOG.error("Service Bus encountered an error during setup: {}", e.getMessage(), e);
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
            this.LOG.warn("Updater was interrupted during setup.");
        } catch (URISyntaxException e3) {
            this.LOG.error("Invalid URI provided for Service Bus setup: {}", e3.getMessage(), e3);
        } catch (Exception e4) {
            this.LOG.error("An unexpected error occurred during setup: {}", e4.getMessage(), e4);
        }
    }

    void sleep(int i) throws InterruptedException {
        Thread.sleep(i);
    }

    void executeWithRetry(CheckedRunnable checkedRunnable, String str) throws Exception {
        int i = 1000;
        int i2 = 1;
        while (true) {
            try {
                checkedRunnable.run();
                this.LOG.info("{} succeeded.", str);
                return;
            } catch (InterruptedException e) {
                this.LOG.warn("{} was interrupted during execution.", str);
                Thread.currentThread().interrupt();
                throw e;
            } catch (Exception e2) {
                this.LOG.warn("{} failed. Error: {} (Attempt {})", str, e2.getMessage(), Integer.valueOf(i2));
                if (!shouldRetry(e2)) {
                    this.LOG.error("{} encountered a non-retryable error: {}.", str, e2.getMessage());
                    throw e2;
                }
                this.LOG.warn("{} will retry in {} ms.", str, Integer.valueOf(i));
                i2++;
                try {
                    sleep(i);
                    i = Math.min(i * 2, 60000);
                } catch (InterruptedException e3) {
                    this.LOG.warn("{} was interrupted during sleep.", str);
                    Thread.currentThread().interrupt();
                    throw e3;
                }
            }
        }
    }

    boolean shouldRetry(Exception exc) {
        if (!(exc instanceof ServiceBusException)) {
            if (ExceptionUtils.hasCause(exc, OtpHttpClientException.class)) {
                return true;
            }
            this.LOG.warn("Non-ServiceBus exception encountered: {}. Not retrying.", exc.getClass().getName());
            return false;
        }
        ServiceBusFailureReason reason = ((ServiceBusException) exc).getReason();
        if (RETRYABLE_REASONS.contains(reason)) {
            this.LOG.warn("Transient error encountered: {}. Retrying...", reason);
            return true;
        }
        if (NON_RETRYABLE_REASONS.contains(reason)) {
            this.LOG.error("Non-recoverable error encountered: {}. Not retrying.", reason);
            return false;
        }
        this.LOG.warn("Unhandled ServiceBusFailureReason: {}. Retrying by default.", reason);
        return true;
    }

    private void setupSubscription() throws ServiceBusException, URISyntaxException {
        if (this.authenticationType == AuthenticationType.FederatedIdentity) {
            this.serviceBusAdmin = new ServiceBusAdministrationClientBuilder().credential(this.fullyQualifiedNamespace, new DefaultAzureCredentialBuilder().executorService(MoreExecutors.newDirectExecutorService()).build()).buildClient();
        } else if (this.authenticationType == AuthenticationType.SharedAccessKey) {
            this.serviceBusAdmin = new ServiceBusAdministrationClientBuilder().connectionString(this.serviceBusUrl).buildClient();
        }
        CreateSubscriptionOptions autoDeleteOnIdle = new CreateSubscriptionOptions().setAutoDeleteOnIdle(this.autoDeleteOnIdle);
        if (this.serviceBusAdmin.getSubscriptionExists(this.topicName, this.subscriptionName)) {
            this.LOG.info("Subscription '{}' already exists. Deleting existing subscription.", this.subscriptionName);
            this.serviceBusAdmin.deleteSubscription(this.topicName, this.subscriptionName);
            this.LOG.info("Service Bus deleted subscription {}.", this.subscriptionName);
        }
        this.serviceBusAdmin.createSubscription(this.topicName, this.subscriptionName, autoDeleteOnIdle);
        this.LOG.info("{} updater created subscription {}", this.updaterType, this.subscriptionName);
    }

    private void startEventProcessor() throws ServiceBusException {
        ServiceBusClientBuilder serviceBusClientBuilder = new ServiceBusClientBuilder();
        if (this.authenticationType == AuthenticationType.FederatedIdentity) {
            Preconditions.checkNotNull(this.fullyQualifiedNamespace, "fullyQualifiedNamespace must be set for FederatedIdentity authentication");
            serviceBusClientBuilder.fullyQualifiedNamespace(this.fullyQualifiedNamespace).credential((TokenCredential) new DefaultAzureCredentialBuilder().build());
        } else {
            if (this.authenticationType != AuthenticationType.SharedAccessKey) {
                throw new IllegalArgumentException("Unsupported authentication type: " + String.valueOf(this.authenticationType));
            }
            Preconditions.checkNotNull(this.serviceBusUrl, "serviceBusUrl must be set for SharedAccessKey authentication");
            serviceBusClientBuilder.connectionString(this.serviceBusUrl);
        }
        this.eventProcessor = serviceBusClientBuilder.processor().topicName(this.topicName).subscriptionName(this.subscriptionName).receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE).disableAutoComplete().prefetchCount(this.prefetchCount).processError(this::errorConsumer).processMessage(this::handleMessage).buildProcessorClient();
        this.eventProcessor.start();
        this.LOG.info("Service Bus processor started for topic '{}' and subscription '{}', prefetching {} messages.", this.topicName, this.subscriptionName, Integer.valueOf(this.prefetchCount));
    }

    private void handleMessage(ServiceBusReceivedMessageContext serviceBusReceivedMessageContext) {
        ServiceBusReceivedMessage message = serviceBusReceivedMessageContext.getMessage();
        MESSAGE_COUNTER.incrementAndGet();
        if (MESSAGE_COUNTER.get() % 100 == 0) {
            this.LOG.debug("Total SIRI-{} messages received={}", this.updaterType, Long.valueOf(MESSAGE_COUNTER.get()));
        }
        try {
            Siri parseXml = SiriXml.parseXml(message.getBody().toString());
            ServiceDelivery serviceDelivery = parseXml.getServiceDelivery();
            if (serviceDelivery != null) {
                this.messageHandler.handleMessage(serviceDelivery, message.getMessageId());
            } else if (parseXml.getHeartbeatNotification() != null) {
                this.LOG.debug("Updater {} received SIRI heartbeat message", this.updaterType);
            } else {
                this.LOG.debug("Updater {} received SIRI message without ServiceDelivery", this.updaterType);
            }
        } catch (JAXBException | XMLStreamException e) {
            this.LOG.error(e.getLocalizedMessage(), (Throwable) e);
        }
    }

    @Override // org.opentripplanner.updater.spi.GraphUpdater
    public boolean isPrimed() {
        return this.isPrimed;
    }

    private void setPrimed() {
        this.isPrimed = true;
    }

    @Override // org.opentripplanner.updater.spi.GraphUpdater
    public String getConfigRef() {
        return this.configRef;
    }

    private Optional<ServiceDelivery> fetchInitialSiriData() {
        if (this.dataInitializationUrl == null) {
            return Optional.empty();
        }
        Map<String, String> asMap = HttpHeaders.of().acceptApplicationXML().build().asMap();
        this.LOG.info("Fetching initial Siri data from {}, timeout is {} ms.", this.dataInitializationUrl, Integer.valueOf(this.timeout));
        OtpHttpClientFactory otpHttpClientFactory = new OtpHttpClientFactory();
        try {
            OtpHttpClient create = otpHttpClientFactory.create(this.LOG);
            long currentTimeMillis = System.currentTimeMillis();
            Optional executeAndMapOptional = create.executeAndMapOptional(new HttpGet(this.dataInitializationUrl), Duration.ofMillis(this.timeout), asMap, SiriXml::parseXml);
            this.LOG.info("Fetched initial data in {} ms", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
            if (executeAndMapOptional.isEmpty()) {
                this.LOG.info("Got status 204 'No Content'.");
            }
            Optional<ServiceDelivery> map = executeAndMapOptional.map((v0) -> {
                return v0.getServiceDelivery();
            });
            otpHttpClientFactory.close();
            return map;
        } catch (Throwable th) {
            try {
                otpHttpClientFactory.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    public void processInitialSiriData(ServiceDelivery serviceDelivery) {
        try {
            long currentTimeMillis = System.currentTimeMillis();
            Future<?> handleMessage = this.messageHandler.handleMessage(serviceDelivery, "history-message");
            if (handleMessage != null) {
                handleMessage.get();
            }
            this.LOG.info("{} updater initialized in {} ms.", this.updaterType, Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
        } catch (InterruptedException | ExecutionException e) {
            throw new SiriAzureInitializationException("Error applying history", e);
        }
    }

    private void errorConsumer(ServiceBusErrorContext serviceBusErrorContext) {
        this.LOG.error("Error when receiving messages from namespace={}, Entity={}", serviceBusErrorContext.getFullyQualifiedNamespace(), serviceBusErrorContext.getEntityPath());
        Throwable exception = serviceBusErrorContext.getException();
        if (!(exception instanceof ServiceBusException)) {
            this.LOG.error("Non-ServiceBusException occurred!", serviceBusErrorContext.getException());
            return;
        }
        ServiceBusException serviceBusException = (ServiceBusException) exception;
        ServiceBusFailureReason reason = serviceBusException.getReason();
        if (reason == ServiceBusFailureReason.MESSAGING_ENTITY_DISABLED || reason == ServiceBusFailureReason.MESSAGING_ENTITY_NOT_FOUND) {
            this.LOG.error("An unrecoverable error occurred. Stopping processing with reason {} {}", reason, serviceBusException.getMessage());
            return;
        }
        if (reason == ServiceBusFailureReason.MESSAGE_LOCK_LOST) {
            this.LOG.error("Message lock lost for message", (Throwable) serviceBusException);
            return;
        }
        if (reason != ServiceBusFailureReason.SERVICE_BUSY && reason != ServiceBusFailureReason.UNAUTHORIZED) {
            this.LOG.error(serviceBusException.getLocalizedMessage(), (Throwable) serviceBusException);
            return;
        }
        this.LOG.error("Service Bus is busy or unauthorized, wait and try again");
        try {
            TimeUnit.SECONDS.sleep(5L);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            this.LOG.info("OTP is shutting down, stopping processing of ServiceBus error messages");
        }
    }
}
