package com.azure.messaging.servicebus;

import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.implementation.ServiceBusConstants;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.HashMap;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import reactor.core.Disposable;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/azure/messaging/servicebus/LockRenewalOperation.class */
public class LockRenewalOperation implements AutoCloseable, Disposable {
    private final ClientLogger logger;
    private final AtomicBoolean isDisposed;
    private final AtomicReference<OffsetDateTime> lockedUntil;
    private final AtomicReference<Throwable> throwable;
    private final AtomicReference<LockRenewalStatus> status;
    private final MonoProcessor<Void> cancellationProcessor;
    private final Mono<Void> completionMono;
    private final String lockToken;
    private final boolean isSession;
    private final Function<String, Mono<OffsetDateTime>> renewalOperation;
    private final Disposable subscription;

    /* JADX INFO: Access modifiers changed from: package-private */
    public LockRenewalOperation(String str, Duration duration, boolean z, Function<String, Mono<OffsetDateTime>> function) {
        this(str, duration, z, function, OffsetDateTime.now());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public LockRenewalOperation(String str, Duration duration, boolean z, Function<String, Mono<OffsetDateTime>> function, OffsetDateTime offsetDateTime) {
        this.isDisposed = new AtomicBoolean();
        this.lockedUntil = new AtomicReference<>();
        this.throwable = new AtomicReference<>();
        this.status = new AtomicReference<>(LockRenewalStatus.RUNNING);
        this.cancellationProcessor = MonoProcessor.create();
        this.lockToken = (String) Objects.requireNonNull(str, "'lockToken' cannot be null.");
        this.renewalOperation = (Function) Objects.requireNonNull(function, "'renewalOperation' cannot be null.");
        this.isSession = z;
        Objects.requireNonNull(offsetDateTime, "'lockedUntil cannot be null.'");
        Objects.requireNonNull(duration, "'maxLockRenewalDuration' cannot be null.");
        HashMap hashMap = new HashMap(2);
        hashMap.put("lockToken", str);
        hashMap.put("isSession", Boolean.valueOf(z));
        this.logger = new ClientLogger((Class<?>) LockRenewalOperation.class, hashMap);
        if (duration.isNegative()) {
            throw this.logger.logExceptionAsError(new IllegalArgumentException("'maxLockRenewalDuration' cannot be negative."));
        }
        this.lockedUntil.set(offsetDateTime);
        Flux<OffsetDateTime> cache = getRenewLockOperation(offsetDateTime, duration).takeUntilOther(this.cancellationProcessor).cache(Duration.ofMinutes(2L));
        this.completionMono = cache.then();
        this.subscription = cache.subscribe(offsetDateTime2 -> {
            this.lockedUntil.set(offsetDateTime2);
        }, th -> {
            this.logger.error("Error occurred while renewing lock token.", th);
            this.status.set(LockRenewalStatus.FAILED);
            this.throwable.set(th);
            this.cancellationProcessor.onComplete();
        }, () -> {
            if (this.status.compareAndSet(LockRenewalStatus.RUNNING, LockRenewalStatus.COMPLETE)) {
                this.logger.verbose("Renewing lock task completed.");
            }
            this.cancellationProcessor.onComplete();
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<Void> getCompletionOperation() {
        return this.completionMono;
    }

    OffsetDateTime getLockedUntil() {
        return this.lockedUntil.get();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String getLockToken() {
        if (this.isSession) {
            return null;
        }
        return this.lockToken;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String getSessionId() {
        if (this.isSession) {
            return this.lockToken;
        }
        return null;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public LockRenewalStatus getStatus() {
        return this.status.get();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Throwable getThrowable() {
        return this.throwable.get();
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        if (this.isDisposed.getAndSet(true)) {
            return;
        }
        if (this.status.compareAndSet(LockRenewalStatus.RUNNING, LockRenewalStatus.CANCELLED)) {
            this.logger.verbose("Cancelled operation.");
        }
        this.cancellationProcessor.onComplete();
        this.subscription.dispose();
    }

    @Override // reactor.core.Disposable
    public void dispose() {
        close();
    }

    @Override // reactor.core.Disposable
    public boolean isDisposed() {
        return this.isDisposed.get();
    }

    private Flux<OffsetDateTime> getRenewLockOperation(OffsetDateTime offsetDateTime, Duration duration) {
        if (duration.isZero()) {
            this.status.set(LockRenewalStatus.COMPLETE);
            return Flux.empty();
        }
        EmitterProcessor create = EmitterProcessor.create();
        FluxSink<T> sink = create.sink();
        sink.next(calculateRenewalDelay(offsetDateTime));
        return Flux.switchOnNext(create.map(duration2 -> {
            return Mono.delay(duration2).thenReturn(Flux.create(fluxSink -> {
                fluxSink.next(duration2);
            }));
        })).takeUntilOther(Flux.first(this.cancellationProcessor, Mono.delay(duration))).flatMap(flux -> {
            this.logger.info("Starting lock renewal.");
            return this.renewalOperation.apply(this.lockToken);
        }).map(offsetDateTime2 -> {
            this.logger.atInfo().addKeyValue("nextExpiration", offsetDateTime2).addKeyValue("next", Duration.between(OffsetDateTime.now(), offsetDateTime2)).log("Starting lock renewal.");
            sink.next(calculateRenewalDelay(offsetDateTime2));
            return offsetDateTime2;
        });
    }

    private Duration calculateRenewalDelay(OffsetDateTime offsetDateTime) {
        Duration between = Duration.between(OffsetDateTime.now(), offsetDateTime);
        if (between.toMillis() < 400) {
            this.logger.atInfo().addKeyValue("lockedUntil", offsetDateTime).log("Duration was less than 400ms.");
            return Duration.ZERO;
        }
        long min = Math.min(between.toMillis() / 2, ServiceBusConstants.MAX_RENEWAL_BUFFER_DURATION.toMillis());
        Duration ofMillis = Duration.ofMillis(between.toMillis() - min);
        if (ofMillis.isNegative()) {
            this.logger.atInfo().addKeyValue("renewAfter", between.toMillis()).addKeyValue("buffer", min).log("Adjusted duration is negative.");
        }
        return ofMillis;
    }
}
