package com.azure.messaging.servicebus;

import com.azure.messaging.servicebus.implementation.instrumentation.ReceiverKind;
import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusReceiverInstrumentation;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.Function;
import org.apache.qpid.proton.message.Message;
import org.reactivestreams.Subscription;
import reactor.core.CorePublisher;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxOperator;
import reactor.util.context.Context;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/azure/messaging/servicebus/TracingFluxOperator.class */
public final class TracingFluxOperator<T> extends BaseSubscriber<T> {
    private final CoreSubscriber<? super T> downstream;
    private final BiConsumer<T, Function<T, Throwable>> instrumentation;

    public static <T> Flux<T> create(Flux<T> flux, final ServiceBusReceiverInstrumentation serviceBusReceiverInstrumentation) {
        return (serviceBusReceiverInstrumentation.isEnabled() || !serviceBusReceiverInstrumentation.isAsyncReceiverInstrumentation()) ? new FluxOperator<T, T>(flux) { // from class: com.azure.messaging.servicebus.TracingFluxOperator.1
            @Override // reactor.core.publisher.Flux, reactor.core.CorePublisher
            public void subscribe(CoreSubscriber<? super T> coreSubscriber) {
                Objects.requireNonNull(coreSubscriber, "'actual' cannot be null.");
                CorePublisher corePublisher = this.source;
                ServiceBusReceiverInstrumentation serviceBusReceiverInstrumentation2 = serviceBusReceiverInstrumentation;
                corePublisher.subscribe((CoreSubscriber) new TracingFluxOperator(coreSubscriber, (obj, function) -> {
                    if (obj instanceof Message) {
                        serviceBusReceiverInstrumentation2.instrumentProcess((Message) obj, ReceiverKind.ASYNC_RECEIVER, (Function<Message, Throwable>) function);
                    } else if (obj instanceof ServiceBusReceivedMessage) {
                        serviceBusReceiverInstrumentation2.instrumentProcess((ServiceBusReceivedMessage) obj, ReceiverKind.ASYNC_RECEIVER, (Function<ServiceBusReceivedMessage, Throwable>) function);
                    }
                }));
            }
        } : flux;
    }

    private TracingFluxOperator(CoreSubscriber<? super T> coreSubscriber, BiConsumer<T, Function<T, Throwable>> biConsumer) {
        this.downstream = coreSubscriber;
        this.instrumentation = biConsumer;
    }

    @Override // reactor.core.CoreSubscriber
    public Context currentContext() {
        return this.downstream.currentContext();
    }

    @Override // reactor.core.publisher.BaseSubscriber
    protected void hookOnSubscribe(Subscription subscription) {
        this.downstream.onSubscribe(this);
    }

    @Override // reactor.core.publisher.BaseSubscriber
    protected void hookOnNext(T t) {
        this.instrumentation.accept(t, obj -> {
            this.downstream.onNext(obj);
            return null;
        });
    }

    @Override // reactor.core.publisher.BaseSubscriber
    protected void hookOnError(Throwable th) {
        this.downstream.onError(th);
    }

    @Override // reactor.core.publisher.BaseSubscriber
    protected void hookOnComplete() {
        this.downstream.onComplete();
    }
}
