package io.rsocket.loadbalance;

import io.netty.util.ReferenceCountUtil;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.frame.FrameType;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;
import kotlin.jvm.internal.LongCompanionObject;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.core.publisher.Sinks;
import reactor.util.context.Context;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/rsocket/loadbalance/PooledRSocket.class */
public final class PooledRSocket extends ResolvingOperator<RSocket> implements CoreSubscriber<RSocket>, RSocket {
    final RSocketPool parent;
    final Mono<RSocket> rSocketSource;
    final LoadbalanceTarget loadbalanceTarget;
    final Sinks.Empty<Void> onCloseSink = Sinks.unsafe().empty();
    volatile Subscription s;
    static final AtomicReferenceFieldUpdater<PooledRSocket, Subscription> S = AtomicReferenceFieldUpdater.newUpdater(PooledRSocket.class, Subscription.class, "s");

    /* loaded from: input_file:io/rsocket/loadbalance/PooledRSocket$FluxInner.class */
    static final class FluxInner<INPUT> extends FluxDeferredResolution<INPUT, RSocket> {
        FluxInner(PooledRSocket pooledRSocket, INPUT input, FrameType frameType) {
            super(pooledRSocket, input, frameType);
        }

        @Override // java.util.function.BiConsumer
        public void accept(RSocket rSocket, Throwable th) {
            Flux<Payload> requestChannel;
            if (isTerminated()) {
                return;
            }
            if (th != null) {
                if (this.requestType == FrameType.REQUEST_STREAM) {
                    ReferenceCountUtil.safeRelease(this.fluxOrPayload);
                }
                onError(th);
            } else {
                if (rSocket == null) {
                    this.parent.observe(this);
                    return;
                }
                switch (this.requestType) {
                    case REQUEST_STREAM:
                        requestChannel = rSocket.requestStream((Payload) this.fluxOrPayload);
                        break;
                    case REQUEST_CHANNEL:
                        requestChannel = rSocket.requestChannel((Flux) this.fluxOrPayload);
                        break;
                    default:
                        Operators.error(this.actual, new IllegalStateException("Should never happen"));
                        return;
                }
                requestChannel.subscribe((CoreSubscriber<? super Payload>) this);
            }
        }
    }

    /* loaded from: input_file:io/rsocket/loadbalance/PooledRSocket$MonoInner.class */
    static final class MonoInner<RESULT> extends MonoDeferredResolution<RESULT, RSocket> {
        MonoInner(PooledRSocket pooledRSocket, Payload payload, FrameType frameType) {
            super(pooledRSocket, payload, frameType);
        }

        @Override // java.util.function.BiConsumer
        public void accept(RSocket rSocket, Throwable th) {
            Mono<Void> metadataPush;
            if (isTerminated()) {
                return;
            }
            if (th != null) {
                ReferenceCountUtil.safeRelease(this.payload);
                onError(th);
                return;
            }
            if (rSocket == null) {
                this.parent.observe(this);
                return;
            }
            switch (this.requestType) {
                case REQUEST_FNF:
                    metadataPush = rSocket.fireAndForget(this.payload);
                    break;
                case REQUEST_RESPONSE:
                    metadataPush = rSocket.requestResponse(this.payload);
                    break;
                case METADATA_PUSH:
                    metadataPush = rSocket.metadataPush(this.payload);
                    break;
                default:
                    Operators.error(this.actual, new IllegalStateException("Should never happen"));
                    return;
            }
            metadataPush.subscribe((CoreSubscriber<? super Void>) this);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PooledRSocket(RSocketPool rSocketPool, Mono<RSocket> mono, LoadbalanceTarget loadbalanceTarget) {
        this.parent = rSocketPool;
        this.rSocketSource = mono;
        this.loadbalanceTarget = loadbalanceTarget;
    }

    @Override // reactor.core.CoreSubscriber, org.reactivestreams.Subscriber
    public void onSubscribe(Subscription subscription) {
        if (Operators.setOnce(S, this, subscription)) {
            subscription.request(LongCompanionObject.MAX_VALUE);
        }
    }

    @Override // org.reactivestreams.Subscriber
    public void onComplete() {
        Subscription subscription = this.s;
        RSocket rSocket = (RSocket) this.value;
        if (subscription == Operators.cancelledSubscription() || !S.compareAndSet(this, subscription, null)) {
            doFinally();
        } else if (rSocket == null) {
            terminate(new IllegalStateException("Source completed empty"));
        } else {
            complete(rSocket);
        }
    }

    @Override // org.reactivestreams.Subscriber
    public void onError(Throwable th) {
        if (this.s == Operators.cancelledSubscription() || S.getAndSet(this, Operators.cancelledSubscription()) == Operators.cancelledSubscription()) {
            doFinally();
            Operators.onErrorDropped(th, Context.empty());
        } else {
            doFinally();
            doCleanup(th);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.reactivestreams.Subscriber
    public void onNext(RSocket rSocket) {
        if (this.s == Operators.cancelledSubscription()) {
            doOnValueExpired(rSocket);
        } else {
            this.value = rSocket;
            doFinally();
        }
    }

    @Override // io.rsocket.loadbalance.ResolvingOperator
    protected void doSubscribe() {
        this.rSocketSource.subscribe((CoreSubscriber<? super RSocket>) this);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.rsocket.loadbalance.ResolvingOperator
    public void doOnValueResolved(RSocket rSocket) {
        rSocket.onClose().subscribe(null, this::doCleanup, () -> {
            doCleanup(ON_DISPOSE);
        });
    }

    void doCleanup(Throwable th) {
        PooledRSocket[] pooledRSocketArr;
        PooledRSocket[] pooledRSocketArr2;
        if (isDisposed()) {
            return;
        }
        terminate(th);
        RSocketPool rSocketPool = this.parent;
        do {
            pooledRSocketArr = rSocketPool.activeSockets;
            int length = pooledRSocketArr.length;
            int i = -1;
            int i2 = 0;
            while (true) {
                if (i2 >= length) {
                    break;
                }
                if (pooledRSocketArr[i2] == this) {
                    i = i2;
                    break;
                }
                i2++;
            }
            if (i == -1) {
                break;
            }
            if (length == 1) {
                pooledRSocketArr2 = RSocketPool.EMPTY;
            } else {
                int i3 = length - 1;
                pooledRSocketArr2 = new PooledRSocket[i3];
                if (i != 0) {
                    System.arraycopy(pooledRSocketArr, 0, pooledRSocketArr2, 0, i);
                }
                if (i != i3) {
                    System.arraycopy(pooledRSocketArr, i + 1, pooledRSocketArr2, i, i3 - i);
                }
            }
        } while (!RSocketPool.ACTIVE_SOCKETS.compareAndSet(rSocketPool, pooledRSocketArr, pooledRSocketArr2));
        if (th == ON_DISPOSE) {
            this.onCloseSink.tryEmitEmpty();
        } else {
            this.onCloseSink.tryEmitError(th);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.rsocket.loadbalance.ResolvingOperator
    public void doOnValueExpired(RSocket rSocket) {
        rSocket.dispose();
    }

    @Override // io.rsocket.loadbalance.ResolvingOperator
    protected void doOnDispose() {
        Operators.terminate(S, this);
        RSocket rSocket = (RSocket) this.value;
        if (rSocket == null) {
            this.onCloseSink.tryEmitEmpty();
            return;
        }
        Mono<Void> onClose = rSocket.onClose();
        Sinks.Empty<Void> empty = this.onCloseSink;
        empty.getClass();
        Consumer<? super Throwable> consumer = empty::tryEmitError;
        Sinks.Empty<Void> empty2 = this.onCloseSink;
        empty2.getClass();
        onClose.subscribe(null, consumer, empty2::tryEmitEmpty);
    }

    @Override // io.rsocket.RSocket
    public Mono<Void> fireAndForget(Payload payload) {
        return new MonoInner(this, payload, FrameType.REQUEST_FNF);
    }

    @Override // io.rsocket.RSocket
    public Mono<Payload> requestResponse(Payload payload) {
        return new MonoInner(this, payload, FrameType.REQUEST_RESPONSE);
    }

    @Override // io.rsocket.RSocket
    public Flux<Payload> requestStream(Payload payload) {
        return new FluxInner(this, payload, FrameType.REQUEST_STREAM);
    }

    @Override // io.rsocket.RSocket
    public Flux<Payload> requestChannel(Publisher<Payload> publisher) {
        return new FluxInner(this, publisher, FrameType.REQUEST_CHANNEL);
    }

    @Override // io.rsocket.RSocket
    public Mono<Void> metadataPush(Payload payload) {
        return new MonoInner(this, payload, FrameType.METADATA_PUSH);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public LoadbalanceTarget target() {
        return this.loadbalanceTarget;
    }

    @Override // io.rsocket.RSocket, io.rsocket.Closeable
    public Mono<Void> onClose() {
        return this.onCloseSink.asMono();
    }

    @Override // io.rsocket.RSocket, io.rsocket.Availability
    public double availability() {
        RSocket valueIfResolved = valueIfResolved();
        if (valueIfResolved != null) {
            return valueIfResolved.availability();
        }
        return 0.0d;
    }
}
