package com.azure.core.amqp.implementation;

import com.azure.core.amqp.ClaimsBasedSecurityNode;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.exception.AmqpResponseCode;
import com.azure.core.exception.AzureException;
import com.azure.core.util.logging.ClientLogger;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

/* loaded from: input_file:com/azure/core/amqp/implementation/ActiveClientTokenManager.class */
public class ActiveClientTokenManager implements TokenManager {
    private static final ClientLogger LOGGER = new ClientLogger((Class<?>) ActiveClientTokenManager.class);
    private final Mono<ClaimsBasedSecurityNode> cbsNode;
    private final String tokenAudience;
    private final String scopes;
    private volatile Disposable subscription;
    private final AtomicBoolean hasScheduled = new AtomicBoolean();
    private final AtomicBoolean hasDisposed = new AtomicBoolean();
    private final Sinks.Many<AmqpResponseCode> authorizationResults = Sinks.many().replay().latest();
    private final Sinks.Many<Duration> durationSource = Sinks.many().multicast().onBackpressureBuffer();
    private final AtomicReference<Duration> lastRefreshInterval = new AtomicReference<>(Duration.ofMinutes(1));

    public ActiveClientTokenManager(Mono<ClaimsBasedSecurityNode> mono, String str, String str2) {
        this.cbsNode = mono;
        this.tokenAudience = str;
        this.scopes = str2;
    }

    @Override // com.azure.core.amqp.implementation.TokenManager
    public Flux<AmqpResponseCode> getAuthorizationResults() {
        return this.authorizationResults.asFlux();
    }

    @Override // com.azure.core.amqp.implementation.TokenManager
    public Mono<Long> authorize() {
        return this.hasDisposed.get() ? Mono.error(new AzureException("Cannot authorize with CBS node when this token manager has been disposed of.")) : this.cbsNode.flatMap(claimsBasedSecurityNode -> {
            return claimsBasedSecurityNode.authorize(this.tokenAudience, this.scopes);
        }).map(offsetDateTime -> {
            long floor = ((long) Math.floor(Duration.between(OffsetDateTime.now(ZoneOffset.UTC), offsetDateTime).getSeconds() * 0.9d)) * 1000;
            if (!this.hasScheduled.getAndSet(true)) {
                LOGGER.atInfo().addKeyValue("scopes", this.scopes).log("Scheduling refresh token task.");
                Duration ofMillis = Duration.ofMillis(floor);
                this.lastRefreshInterval.set(ofMillis);
                this.authorizationResults.emitNext(AmqpResponseCode.ACCEPTED, (signalType, emitResult) -> {
                    AmqpLoggingUtils.addSignalTypeAndResult(LOGGER.atVerbose(), signalType, emitResult).log("Could not emit ACCEPTED.");
                    return false;
                });
                this.subscription = scheduleRefreshTokenTask(ofMillis);
            }
            return Long.valueOf(floor);
        });
    }

    @Override // com.azure.core.amqp.implementation.TokenManager, java.lang.AutoCloseable
    public void close() {
        if (this.hasDisposed.getAndSet(true)) {
            return;
        }
        this.authorizationResults.emitComplete((signalType, emitResult) -> {
            AmqpLoggingUtils.addSignalTypeAndResult(LOGGER.atVerbose(), signalType, emitResult).log("Could not close authorizationResults.");
            return false;
        });
        this.durationSource.emitComplete((signalType2, emitResult2) -> {
            AmqpLoggingUtils.addSignalTypeAndResult(LOGGER.atVerbose(), signalType2, emitResult2).log("Could not close durationSource.");
            return false;
        });
        if (this.subscription != null) {
            this.subscription.dispose();
        }
    }

    private Disposable scheduleRefreshTokenTask(Duration duration) {
        this.durationSource.emitNext(duration, (signalType, emitResult) -> {
            AmqpLoggingUtils.addSignalTypeAndResult(LOGGER.atVerbose(), signalType, emitResult).log("Could not emit initial refresh interval.");
            return false;
        });
        return Flux.switchOnNext(this.durationSource.asFlux().map(Flux::interval)).takeUntil(l -> {
            return this.hasDisposed.get();
        }).flatMap(l2 -> {
            LOGGER.atInfo().addKeyValue("scopes", this.scopes).log("Refreshing token.");
            return authorize();
        }).onErrorContinue(th -> {
            return (th instanceof AmqpException) && ((AmqpException) th).isTransient();
        }, (th2, obj) -> {
            Duration duration2 = this.lastRefreshInterval.get();
            LOGGER.atWarning().addKeyValue("scopes", this.scopes).addKeyValue(ClientConstants.INTERVAL_KEY, obj).log("Error is transient. Rescheduling authorization task.", th2);
            this.durationSource.emitNext(duration2, (signalType2, emitResult2) -> {
                AmqpLoggingUtils.addSignalTypeAndResult(LOGGER.atVerbose(), signalType2, emitResult2).addKeyValue("lastRefresh", duration2).log("Could not emit lastRefresh.");
                return false;
            });
        }).subscribe(l3 -> {
            LOGGER.atVerbose().addKeyValue("scopes", this.scopes).addKeyValue(ClientConstants.INTERVAL_KEY, l3).log("Authorization successful. Refreshing token.");
            this.authorizationResults.emitNext(AmqpResponseCode.ACCEPTED, (signalType2, emitResult2) -> {
                AmqpLoggingUtils.addSignalTypeAndResult(LOGGER.atVerbose(), signalType2, emitResult2).log("Could not emit ACCEPTED after refresh.");
                return false;
            });
            Duration ofMillis = Duration.ofMillis(l3.longValue());
            this.lastRefreshInterval.set(ofMillis);
            this.durationSource.emitNext(ofMillis, (signalType3, emitResult3) -> {
                AmqpLoggingUtils.addSignalTypeAndResult(LOGGER.atVerbose(), signalType3, emitResult3).addKeyValue("nextRefresh", ofMillis).log("Could not emit nextRefresh.");
                return false;
            });
        }, th3 -> {
            LOGGER.atError().addKeyValue("scopes", this.scopes).addKeyValue("audience", this.tokenAudience).log("Error occurred while refreshing token that is not retriable. Not scheduling refresh task. Use ActiveClientTokenManager.authorize() to schedule task again.", th3);
            if (this.hasDisposed.getAndSet(true)) {
                return;
            }
            this.hasScheduled.set(false);
            this.durationSource.emitComplete((signalType2, emitResult2) -> {
                AmqpLoggingUtils.addSignalTypeAndResult(LOGGER.atVerbose(), signalType2, emitResult2).log("Could not close durationSource.");
                return false;
            });
            this.authorizationResults.emitError(th3, (signalType3, emitResult3) -> {
                AmqpLoggingUtils.addSignalTypeAndResult(LOGGER.atVerbose(), signalType3, emitResult3).log("Could not emit authorization error.", th3);
                return false;
            });
        });
    }
}
