package com.azure.messaging.servicebus;

import com.azure.core.amqp.AmqpEndpointState;
import com.azure.core.amqp.implementation.AmqpReceiveLink;
import com.azure.core.amqp.implementation.ClientConstants;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.logging.LoggingEventBuilder;
import com.azure.messaging.servicebus.ServiceBusSessionAcquirer;
import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusTracer;
import java.time.Duration;
import java.util.function.Supplier;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.message.Message;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/azure/messaging/servicebus/ServiceBusSessionReactorReceiver.class */
public final class ServiceBusSessionReactorReceiver implements AmqpReceiveLink {
    private final ClientLogger logger;
    private final String sessionId;
    private final AmqpReceiveLink sessionLink;
    private final boolean hasIdleTimeout;
    private final Sinks.Many<Boolean> nextItemIdleTimeoutSink = Sinks.many().multicast().onBackpressureBuffer();
    private final Sinks.Empty<Void> terminateEndpointStatesSink = Sinks.empty();
    private final Disposable.Composite disposables = Disposables.composite();

    /* JADX INFO: Access modifiers changed from: package-private */
    public ServiceBusSessionReactorReceiver(ClientLogger clientLogger, ServiceBusTracer serviceBusTracer, ServiceBusSessionAcquirer.Session session, Duration duration, Duration duration2) {
        this.logger = clientLogger;
        this.sessionId = session.getId();
        this.sessionLink = session.getLink();
        this.hasIdleTimeout = duration != null;
        if (this.hasIdleTimeout) {
            this.disposables.add(Flux.switchOnNext(this.nextItemIdleTimeoutSink.asFlux().map(bool -> {
                return Mono.delay(duration);
            })).subscribe(l -> {
                withLinkInfo(clientLogger.atInfo()).addKeyValue("timeout", duration).log("Did not a receive message within timeout.");
                this.terminateEndpointStatesSink.emitEmpty(Sinks.EmitFailureHandler.FAIL_FAST);
            }));
        }
        this.disposables.add(session.beginLockRenew(serviceBusTracer, duration2));
    }

    public String getSessionId() {
        return this.sessionId;
    }

    @Override // com.azure.core.amqp.AmqpLink
    public String getHostname() {
        return this.sessionLink.getHostname();
    }

    @Override // com.azure.core.amqp.implementation.AmqpReceiveLink
    public String getConnectionId() {
        return this.sessionLink.getConnectionId();
    }

    @Override // com.azure.core.amqp.AmqpLink
    public String getLinkName() {
        return this.sessionLink.getLinkName();
    }

    @Override // com.azure.core.amqp.AmqpLink
    public String getEntityPath() {
        return this.sessionLink.getEntityPath();
    }

    @Override // com.azure.core.amqp.AmqpLink
    public Flux<AmqpEndpointState> getEndpointStates() {
        return (this.hasIdleTimeout ? this.sessionLink.getEndpointStates().takeUntilOther(this.terminateEndpointStatesSink.asMono()) : this.sessionLink.getEndpointStates()).onErrorResume(th -> {
            withLinkInfo(this.logger.atWarning()).log("Error occurred. Ending session {}.", this.sessionId, th);
            return Mono.empty();
        });
    }

    @Override // com.azure.core.amqp.implementation.AmqpReceiveLink
    public Flux<Message> receive() {
        return this.hasIdleTimeout ? Mono.defer(() -> {
            this.nextItemIdleTimeoutSink.emitNext(true, Sinks.EmitFailureHandler.FAIL_FAST);
            return Mono.empty();
        }).thenMany(this.sessionLink.receive()).doOnNext(message -> {
            this.nextItemIdleTimeoutSink.emitNext(true, Sinks.EmitFailureHandler.FAIL_FAST);
        }) : this.sessionLink.receive();
    }

    @Override // com.azure.core.amqp.implementation.AmqpReceiveLink
    public Mono<Void> updateDisposition(String str, DeliveryState deliveryState) {
        return this.sessionLink.updateDisposition(str, deliveryState);
    }

    @Override // com.azure.core.amqp.implementation.AmqpReceiveLink
    public void addCredit(Supplier<Long> supplier) {
        this.sessionLink.addCredit(supplier);
    }

    @Override // reactor.core.Disposable
    public void dispose() {
        this.disposables.dispose();
        this.sessionLink.dispose();
    }

    @Override // com.azure.core.amqp.AmqpLink, com.azure.core.util.AsyncCloseable
    public Mono<Void> closeAsync() {
        this.disposables.dispose();
        return this.sessionLink.closeAsync();
    }

    @Override // com.azure.core.amqp.implementation.AmqpReceiveLink
    public Mono<Void> addCredits(int i) {
        return FluxUtil.monoError(this.logger, new UnsupportedOperationException("addCredits(int) should not be called in V2 route."));
    }

    @Override // com.azure.core.amqp.implementation.AmqpReceiveLink
    public int getCredits() {
        throw this.logger.logExceptionAsError(new UnsupportedOperationException("getCredits() should not be called in V2 route."));
    }

    @Override // com.azure.core.amqp.implementation.AmqpReceiveLink
    public void setEmptyCreditListener(Supplier<Integer> supplier) {
        throw this.logger.logExceptionAsError(new UnsupportedOperationException("setEmptyCreditListener should not be called in V2 route."));
    }

    private LoggingEventBuilder withLinkInfo(LoggingEventBuilder loggingEventBuilder) {
        return loggingEventBuilder.addKeyValue("sessionId", this.sessionId).addKeyValue(ClientConstants.LINK_NAME_KEY, getLinkName());
    }
}
