package io.rsocket.core;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.rsocket.Closeable;
import io.rsocket.DuplexConnection;
import io.rsocket.RSocketErrorException;
import io.rsocket.frame.FrameHeaderCodec;
import io.rsocket.plugins.DuplexConnectionInterceptor;
import io.rsocket.plugins.InitializingInterceptorRegistry;
import java.net.SocketAddress;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import kotlin.jvm.internal.LongCompanionObject;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;

/* loaded from: input_file:io/rsocket/core/ClientServerInputMultiplexer.class */
class ClientServerInputMultiplexer implements CoreSubscriber<ByteBuf>, Closeable {
    private final InternalDuplexConnection serverReceiver;
    private final InternalDuplexConnection clientReceiver;
    private final DuplexConnection serverConnection;
    private final DuplexConnection clientConnection;
    private final DuplexConnection source;
    private final boolean isClient;
    private Subscription s;
    private Throwable t;
    private volatile int state;
    private static final AtomicIntegerFieldUpdater<ClientServerInputMultiplexer> STATE = AtomicIntegerFieldUpdater.newUpdater(ClientServerInputMultiplexer.class, "state");

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/rsocket/core/ClientServerInputMultiplexer$InternalDuplexConnection.class */
    public static class InternalDuplexConnection extends Flux<ByteBuf> implements Subscription, DuplexConnection {
        private final ClientServerInputMultiplexer clientServerInputMultiplexer;
        private final DuplexConnection source;
        private volatile int state;
        static final AtomicIntegerFieldUpdater<InternalDuplexConnection> STATE = AtomicIntegerFieldUpdater.newUpdater(InternalDuplexConnection.class, "state");
        CoreSubscriber<? super ByteBuf> actual;

        public InternalDuplexConnection(ClientServerInputMultiplexer clientServerInputMultiplexer, DuplexConnection duplexConnection) {
            this.clientServerInputMultiplexer = clientServerInputMultiplexer;
            this.source = duplexConnection;
        }

        @Override // reactor.core.publisher.Flux, reactor.core.CorePublisher
        public void subscribe(CoreSubscriber<? super ByteBuf> coreSubscriber) {
            if (this.state != 0 || !STATE.compareAndSet(this, 0, 1)) {
                Operators.error(coreSubscriber, new IllegalStateException("InternalDuplexConnection allows only single subscription"));
            } else {
                this.actual = coreSubscriber;
                coreSubscriber.onSubscribe(this);
            }
        }

        @Override // org.reactivestreams.Subscription
        public void request(long j) {
            if (this.state == 1 && STATE.compareAndSet(this, 1, 2)) {
                ClientServerInputMultiplexer clientServerInputMultiplexer = this.clientServerInputMultiplexer;
                if (clientServerInputMultiplexer.notifyRequested()) {
                    return;
                }
                Throwable th = clientServerInputMultiplexer.t;
                if (th != null) {
                    this.actual.onError(th);
                } else {
                    this.actual.onComplete();
                }
            }
        }

        @Override // org.reactivestreams.Subscription
        public void cancel() {
        }

        void onNext(ByteBuf byteBuf) {
            this.actual.onNext(byteBuf);
        }

        void onComplete() {
            this.actual.onComplete();
        }

        void onError(Throwable th) {
            this.actual.onError(th);
        }

        @Override // io.rsocket.DuplexConnection
        public void sendFrame(int i, ByteBuf byteBuf) {
            this.source.sendFrame(i, byteBuf);
        }

        @Override // io.rsocket.DuplexConnection
        public void sendErrorAndClose(RSocketErrorException rSocketErrorException) {
            this.source.sendErrorAndClose(rSocketErrorException);
        }

        @Override // io.rsocket.DuplexConnection
        public Flux<ByteBuf> receive() {
            return this;
        }

        @Override // io.rsocket.DuplexConnection
        public ByteBufAllocator alloc() {
            return this.source.alloc();
        }

        @Override // io.rsocket.DuplexConnection
        public SocketAddress remoteAddress() {
            return this.source.remoteAddress();
        }

        @Override // reactor.core.Disposable
        public void dispose() {
            this.source.dispose();
        }

        @Override // reactor.core.Disposable
        public boolean isDisposed() {
            return this.source.isDisposed();
        }

        public boolean isSubscribed() {
            return this.state != 0;
        }

        @Override // io.rsocket.Closeable
        public Mono<Void> onClose() {
            return this.source.onClose();
        }

        @Override // io.rsocket.DuplexConnection, io.rsocket.Availability
        public double availability() {
            return this.source.availability();
        }
    }

    public ClientServerInputMultiplexer(DuplexConnection duplexConnection, InitializingInterceptorRegistry initializingInterceptorRegistry, boolean z) {
        this.source = duplexConnection;
        this.isClient = z;
        this.serverReceiver = new InternalDuplexConnection(this, duplexConnection);
        this.clientReceiver = new InternalDuplexConnection(this, duplexConnection);
        this.serverConnection = initializingInterceptorRegistry.initConnection(DuplexConnectionInterceptor.Type.SERVER, this.serverReceiver);
        this.clientConnection = initializingInterceptorRegistry.initConnection(DuplexConnectionInterceptor.Type.CLIENT, this.clientReceiver);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public DuplexConnection asServerConnection() {
        return this.serverConnection;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public DuplexConnection asClientConnection() {
        return this.clientConnection;
    }

    @Override // reactor.core.Disposable
    public void dispose() {
        this.source.dispose();
    }

    @Override // reactor.core.Disposable
    public boolean isDisposed() {
        return this.source.isDisposed();
    }

    @Override // io.rsocket.Closeable
    public Mono<Void> onClose() {
        return this.source.onClose();
    }

    @Override // reactor.core.CoreSubscriber, org.reactivestreams.Subscriber
    public void onSubscribe(Subscription subscription) {
        if (Operators.validate(this.s, subscription)) {
            this.s = subscription;
            subscription.request(LongCompanionObject.MAX_VALUE);
        }
    }

    @Override // org.reactivestreams.Subscriber
    public void onNext(ByteBuf byteBuf) {
        DuplexConnectionInterceptor.Type type;
        int streamId = FrameHeaderCodec.streamId(byteBuf);
        if (streamId == 0) {
            switch (FrameHeaderCodec.frameType(byteBuf)) {
                case LEASE:
                case KEEPALIVE:
                case ERROR:
                    type = this.isClient ? DuplexConnectionInterceptor.Type.CLIENT : DuplexConnectionInterceptor.Type.SERVER;
                    break;
                default:
                    type = this.isClient ? DuplexConnectionInterceptor.Type.SERVER : DuplexConnectionInterceptor.Type.CLIENT;
                    break;
            }
        } else {
            type = (streamId & 1) == 0 ? DuplexConnectionInterceptor.Type.SERVER : DuplexConnectionInterceptor.Type.CLIENT;
        }
        switch (type) {
            case CLIENT:
                this.clientReceiver.onNext(byteBuf);
                return;
            case SERVER:
                this.serverReceiver.onNext(byteBuf);
                return;
            default:
                return;
        }
    }

    @Override // org.reactivestreams.Subscriber
    public void onComplete() {
        int andSet = STATE.getAndSet(this, Integer.MIN_VALUE);
        if (andSet == Integer.MIN_VALUE || andSet == 0) {
            return;
        }
        if (this.clientReceiver.isSubscribed()) {
            this.clientReceiver.onComplete();
        }
        if (this.serverReceiver.isSubscribed()) {
            this.serverReceiver.onComplete();
        }
    }

    @Override // org.reactivestreams.Subscriber
    public void onError(Throwable th) {
        this.t = th;
        int andSet = STATE.getAndSet(this, Integer.MIN_VALUE);
        if (andSet == Integer.MIN_VALUE || andSet == 0) {
            return;
        }
        if (this.clientReceiver.isSubscribed()) {
            this.clientReceiver.onError(th);
        }
        if (this.serverReceiver.isSubscribed()) {
            this.serverReceiver.onError(th);
        }
    }

    boolean notifyRequested() {
        int incrementAndGetCheckingState = incrementAndGetCheckingState();
        if (incrementAndGetCheckingState == Integer.MIN_VALUE) {
            return false;
        }
        if (incrementAndGetCheckingState != 2) {
            return true;
        }
        this.source.receive().subscribe((CoreSubscriber<? super ByteBuf>) this);
        return true;
    }

    int incrementAndGetCheckingState() {
        int i;
        int i2;
        do {
            i = this.state;
            if (i == Integer.MIN_VALUE) {
                return i;
            }
            i2 = i + 1;
        } while (!STATE.compareAndSet(this, i, i2));
        return i2;
    }
}
