package com.azure.messaging.servicebus;

import com.azure.core.amqp.exception.AmqpErrorCondition;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.implementation.ClientConstants;
import com.azure.core.amqp.implementation.StringUtil;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.implementation.MessagingEntityType;
import com.azure.messaging.servicebus.implementation.ServiceBusManagementNode;
import com.azure.messaging.servicebus.implementation.ServiceBusReceiveLink;
import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusTracer;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.TimeoutException;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.retry.Retry;

/* loaded from: input_file:com/azure/messaging/servicebus/ServiceBusSessionAcquirer.class */
final class ServiceBusSessionAcquirer {
    private static final String TRACKING_ID_KEY = "trackingId";
    private final ClientLogger logger;
    private final String identifier;
    private final String entityPath;
    private final MessagingEntityType entityType;
    private final Duration tryTimeout;
    private final boolean timeoutRetryDisabled;
    private final ServiceBusReceiveMode receiveMode;
    private final ConnectionCacheWrapper connectionCacheWrapper;
    private final Mono<ServiceBusManagementNode> sessionManagement;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/azure/messaging/servicebus/ServiceBusSessionAcquirer$Session.class */
    public static final class Session {
        private final ServiceBusReceiveLink link;
        private final ServiceBusReceiveLink.SessionProperties properties;
        private final Mono<ServiceBusManagementNode> sessionManagement;

        Session(ServiceBusReceiveLink serviceBusReceiveLink, ServiceBusReceiveLink.SessionProperties sessionProperties, Mono<ServiceBusManagementNode> mono) {
            this.link = (ServiceBusReceiveLink) Objects.requireNonNull(serviceBusReceiveLink, "sessionLink cannot be null.");
            this.properties = (ServiceBusReceiveLink.SessionProperties) Objects.requireNonNull(sessionProperties, "sessionProperties cannot be null.");
            this.sessionManagement = (Mono) Objects.requireNonNull(mono, "sessionManagement cannot be null.");
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public String getId() {
            return this.properties.getId();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public ServiceBusReceiveLink getLink() {
            return this.link;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Disposable beginLockRenew(ServiceBusTracer serviceBusTracer, Duration duration) {
            String id = this.properties.getId();
            return new LockRenewalOperation(id, duration, true, str -> {
                return this.sessionManagement.flatMap(serviceBusManagementNode -> {
                    return serviceBusTracer.traceMono("ServiceBus.renewSessionLock", serviceBusManagementNode.renewSessionLock(id, this.link.getLinkName()));
                });
            }, this.properties.getLockedUntil());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ServiceBusSessionAcquirer(ClientLogger clientLogger, String str, String str2, MessagingEntityType messagingEntityType, ServiceBusReceiveMode serviceBusReceiveMode, Duration duration, boolean z, ConnectionCacheWrapper connectionCacheWrapper) {
        if (!$assertionsDisabled && !connectionCacheWrapper.isV2()) {
            throw new AssertionError();
        }
        this.logger = clientLogger;
        this.identifier = str;
        this.entityPath = str2;
        this.entityType = messagingEntityType;
        this.tryTimeout = duration;
        this.timeoutRetryDisabled = z;
        this.receiveMode = serviceBusReceiveMode;
        this.connectionCacheWrapper = connectionCacheWrapper;
        this.sessionManagement = connectionCacheWrapper.getConnection().flatMap(serviceBusAmqpConnection -> {
            return serviceBusAmqpConnection.getManagementNode(str2, messagingEntityType);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isConnectionClosed() {
        return this.connectionCacheWrapper.isChannelClosed();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<Session> acquire() {
        return acquireIntern(null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<Session> acquire(String str) {
        Objects.requireNonNull(str, "sessionId cannot be null.");
        return acquireIntern(str);
    }

    private Mono<Session> acquireIntern(String str) {
        return this.timeoutRetryDisabled ? acquireSession(str).onErrorResume(th -> {
            return isBrokerTimeoutError(th) ? publishError(str, new TimeoutException("com.microsoft:timeout").initCause(th), false) : publishError(str, th, true);
        }) : acquireSession(str).timeout(this.tryTimeout).retryWhen(Retry.from(flux -> {
            return flux.flatMap(retrySignal -> {
                Throwable failure = retrySignal.failure();
                if (!isTimeoutError(failure)) {
                    return publishError(str, failure, true);
                }
                this.logger.atVerbose().addKeyValue(ClientConstants.ENTITY_PATH_KEY, this.entityPath).addKeyValue("attempt", retrySignal.totalRetriesInARow()).log("Timeout while acquiring session '{}'.", sessionName(str), failure);
                return Mono.delay(Duration.ZERO);
            });
        }));
    }

    private Mono<Session> acquireSession(String str) {
        return Mono.defer(() -> {
            return this.connectionCacheWrapper.getConnection().flatMap(serviceBusAmqpConnection -> {
                return serviceBusAmqpConnection.createReceiveLink(linkName(str), this.entityPath, this.receiveMode, null, this.entityType, this.identifier, str);
            }).flatMap(serviceBusReceiveLink -> {
                return serviceBusReceiveLink.getSessionProperties().flatMap(sessionProperties -> {
                    return Mono.just(new Session(serviceBusReceiveLink, sessionProperties, this.sessionManagement));
                });
            });
        });
    }

    private <T> Mono<T> publishError(String str, Throwable th, boolean z) {
        long nanoTime = System.nanoTime();
        if (z) {
            this.logger.atInfo().addKeyValue(ClientConstants.ENTITY_PATH_KEY, this.entityPath).addKeyValue(TRACKING_ID_KEY, nanoTime).log("Unable to acquire session '{}'.", sessionName(str), th);
        }
        return Mono.error(th).publishOn(Schedulers.boundedElastic()).doOnError(th2 -> {
            this.logger.atVerbose().addKeyValue(TRACKING_ID_KEY, nanoTime).log("Emitting session acquire error" + (z ? "." : ": " + th.getMessage()));
        });
    }

    private static boolean isBrokerTimeoutError(Throwable th) {
        return (th instanceof AmqpException) && ((AmqpException) th).getErrorCondition() == AmqpErrorCondition.TIMEOUT_ERROR;
    }

    private static boolean isTimeoutError(Throwable th) {
        return (th instanceof TimeoutException) || isBrokerTimeoutError(th);
    }

    private static String linkName(String str) {
        return str != null ? str : StringUtil.getRandomString("session-");
    }

    private static String sessionName(String str) {
        return str == null ? "unnamed" : str;
    }

    static {
        $assertionsDisabled = !ServiceBusSessionAcquirer.class.desiredAssertionStatus();
    }
}
