package io.rsocket.core;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.rsocket.DuplexConnection;
import io.rsocket.RSocketErrorException;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import kotlin.jvm.internal.LongCompanionObject;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.publisher.Operators;
import reactor.util.context.Context;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/rsocket/core/SetupHandlingDuplexConnection.class */
public class SetupHandlingDuplexConnection extends Flux<ByteBuf> implements DuplexConnection, CoreSubscriber<ByteBuf>, Subscription {
    final DuplexConnection source;
    final MonoSink<Tuple2<ByteBuf, DuplexConnection>> sink;
    Subscription s;
    boolean firstFrameReceived = false;
    CoreSubscriber<? super ByteBuf> actual;
    boolean done;
    Throwable t;

    /* JADX INFO: Access modifiers changed from: package-private */
    public SetupHandlingDuplexConnection(DuplexConnection duplexConnection, MonoSink<Tuple2<ByteBuf, DuplexConnection>> monoSink) {
        this.source = duplexConnection;
        this.sink = monoSink;
        duplexConnection.receive().subscribe((CoreSubscriber<? super ByteBuf>) this);
    }

    @Override // reactor.core.Disposable
    public void dispose() {
        this.source.dispose();
    }

    @Override // reactor.core.Disposable
    public boolean isDisposed() {
        return this.source.isDisposed();
    }

    @Override // io.rsocket.Closeable
    public Mono<Void> onClose() {
        return this.source.onClose();
    }

    @Override // io.rsocket.DuplexConnection
    public void sendFrame(int i, ByteBuf byteBuf) {
        this.source.sendFrame(i, byteBuf);
    }

    @Override // io.rsocket.DuplexConnection
    public Flux<ByteBuf> receive() {
        return this;
    }

    @Override // io.rsocket.DuplexConnection
    public SocketAddress remoteAddress() {
        return this.source.remoteAddress();
    }

    @Override // reactor.core.publisher.Flux, reactor.core.CorePublisher
    public void subscribe(CoreSubscriber<? super ByteBuf> coreSubscriber) {
        if (!this.done) {
            this.actual = coreSubscriber;
            coreSubscriber.onSubscribe(this);
            return;
        }
        Throwable th = this.t;
        if (th == null) {
            Operators.complete(coreSubscriber);
        } else {
            Operators.error(coreSubscriber, th);
        }
    }

    @Override // org.reactivestreams.Subscription
    public void request(long j) {
        if (j != LongCompanionObject.MAX_VALUE) {
            this.actual.onError(new IllegalArgumentException("Only unbounded request is allowed"));
        } else {
            this.s.request(LongCompanionObject.MAX_VALUE);
        }
    }

    @Override // org.reactivestreams.Subscription
    public void cancel() {
        this.source.dispose();
        this.s.cancel();
    }

    @Override // reactor.core.CoreSubscriber, org.reactivestreams.Subscriber
    public void onSubscribe(Subscription subscription) {
        if (Operators.validate(this.s, subscription)) {
            this.s = subscription;
            subscription.request(1L);
        }
    }

    @Override // org.reactivestreams.Subscriber
    public void onNext(ByteBuf byteBuf) {
        if (this.firstFrameReceived) {
            this.actual.onNext(byteBuf);
        } else {
            this.firstFrameReceived = true;
            this.sink.success(Tuples.of(byteBuf, this));
        }
    }

    @Override // org.reactivestreams.Subscriber
    public void onError(Throwable th) {
        if (this.done) {
            Operators.onErrorDropped(th, Context.empty());
            return;
        }
        this.done = true;
        this.t = th;
        if (!this.firstFrameReceived) {
            this.sink.error(th);
            return;
        }
        CoreSubscriber<? super ByteBuf> coreSubscriber = this.actual;
        if (coreSubscriber != null) {
            coreSubscriber.onError(th);
        }
    }

    @Override // org.reactivestreams.Subscriber
    public void onComplete() {
        if (this.done) {
            return;
        }
        this.done = true;
        if (!this.firstFrameReceived) {
            this.sink.error(new ClosedChannelException());
            return;
        }
        CoreSubscriber<? super ByteBuf> coreSubscriber = this.actual;
        if (coreSubscriber != null) {
            coreSubscriber.onComplete();
        }
    }

    @Override // io.rsocket.DuplexConnection
    public void sendErrorAndClose(RSocketErrorException rSocketErrorException) {
        this.source.sendErrorAndClose(rSocketErrorException);
    }

    @Override // io.rsocket.DuplexConnection
    public ByteBufAllocator alloc() {
        return this.source.alloc();
    }
}
