package io.rsocket.internal;

import io.netty.buffer.ByteBuf;
import io.rsocket.internal.jctools.queues.MpscUnboundedArrayQueue;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.stream.Stream;
import kotlin.jvm.internal.LongCompanionObject;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.Exceptions;
import reactor.core.Fuseable;
import reactor.core.Scannable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Operators;
import reactor.util.annotation.Nullable;
import reactor.util.concurrent.Queues;
import reactor.util.context.Context;

/* loaded from: input_file:io/rsocket/internal/UnboundedProcessor.class */
public final class UnboundedProcessor extends Flux<ByteBuf> implements Scannable, Disposable, CoreSubscriber<ByteBuf>, Fuseable.QueueSubscription<ByteBuf>, Fuseable {
    final Queue<ByteBuf> queue;
    final Queue<ByteBuf> priorityQueue;
    final Runnable onFinalizedHook;
    boolean cancelled;
    boolean done;
    Throwable error;
    CoreSubscriber<? super ByteBuf> actual;
    static final long FLAG_FINALIZED = Long.MIN_VALUE;
    static final long FLAG_DISPOSED = 4611686018427387904L;
    static final long FLAG_TERMINATED = 2305843009213693952L;
    static final long FLAG_CANCELLED = 1152921504606846976L;
    static final long FLAG_HAS_VALUE = 576460752303423488L;
    static final long FLAG_HAS_REQUEST = 288230376151711744L;
    static final long FLAG_SUBSCRIBER_READY = 144115188075855872L;
    static final long FLAG_SUBSCRIBED_ONCE = 72057594037927936L;
    static final long MAX_WIP_VALUE = 72057594037927935L;
    volatile long state;
    volatile int discardGuard;
    volatile long requested;
    boolean outputFused;
    static final AtomicLongFieldUpdater<UnboundedProcessor> STATE = AtomicLongFieldUpdater.newUpdater(UnboundedProcessor.class, "state");
    static final AtomicIntegerFieldUpdater<UnboundedProcessor> DISCARD_GUARD = AtomicIntegerFieldUpdater.newUpdater(UnboundedProcessor.class, "discardGuard");
    static final AtomicLongFieldUpdater<UnboundedProcessor> REQUESTED = AtomicLongFieldUpdater.newUpdater(UnboundedProcessor.class, "requested");

    public UnboundedProcessor() {
        this(() -> {
        });
    }

    public UnboundedProcessor(Runnable runnable) {
        this.onFinalizedHook = runnable;
        this.queue = new MpscUnboundedArrayQueue(Queues.SMALL_BUFFER_SIZE);
        this.priorityQueue = new MpscUnboundedArrayQueue(Queues.SMALL_BUFFER_SIZE);
    }

    @Override // reactor.core.Scannable
    public Stream<Scannable> inners() {
        return hasDownstreams() ? Stream.of(Scannable.from(this.actual)) : Stream.empty();
    }

    @Override // reactor.core.Scannable
    public Object scanUnsafe(Scannable.Attr attr) {
        if (Scannable.Attr.ACTUAL == attr) {
            if (isSubscriberReady(this.state)) {
                return this.actual;
            }
            return null;
        }
        if (Scannable.Attr.BUFFERED == attr) {
            return Integer.valueOf(this.queue.size() + this.priorityQueue.size());
        }
        if (Scannable.Attr.PREFETCH == attr) {
            return Integer.MAX_VALUE;
        }
        if (Scannable.Attr.CANCELLED != attr) {
            return null;
        }
        long j = this.state;
        return Boolean.valueOf(isCancelled(j) || isDisposed(j));
    }

    public void onNextPrioritized(ByteBuf byteBuf) {
        if (this.done || this.cancelled) {
            release(byteBuf);
            return;
        }
        if (!this.priorityQueue.offer(byteBuf)) {
            onError(Operators.onOperatorError(null, Exceptions.failWithOverflow(), byteBuf, currentContext()));
            release(byteBuf);
            return;
        }
        long markValueAdded = markValueAdded(this);
        if (isFinalized(markValueAdded)) {
            clearSafely();
            return;
        }
        if (isSubscriberReady(markValueAdded)) {
            if (this.outputFused) {
                this.actual.onNext(null);
            } else if (!isWorkInProgress(markValueAdded) && hasRequest(markValueAdded)) {
                drainRegular(markValueAdded);
            }
        }
    }

    @Override // org.reactivestreams.Subscriber
    public void onNext(ByteBuf byteBuf) {
        if (this.done || this.cancelled) {
            release(byteBuf);
            return;
        }
        if (!this.queue.offer(byteBuf)) {
            onError(Operators.onOperatorError(null, Exceptions.failWithOverflow(), byteBuf, currentContext()));
            release(byteBuf);
            return;
        }
        long markValueAdded = markValueAdded(this);
        if (isFinalized(markValueAdded)) {
            clearSafely();
            return;
        }
        if (isSubscriberReady(markValueAdded)) {
            if (this.outputFused) {
                this.actual.onNext(null);
            } else if (!isWorkInProgress(markValueAdded) && hasRequest(markValueAdded)) {
                drainRegular(markValueAdded);
            }
        }
    }

    @Override // org.reactivestreams.Subscriber
    public void onError(Throwable th) {
        if (this.done || this.cancelled) {
            Operators.onErrorDropped(th, currentContext());
            return;
        }
        this.error = th;
        this.done = true;
        long markTerminatedOrFinalized = markTerminatedOrFinalized(this);
        if (isFinalized(markTerminatedOrFinalized) || isDisposed(markTerminatedOrFinalized) || isCancelled(markTerminatedOrFinalized) || isTerminated(markTerminatedOrFinalized)) {
            Operators.onErrorDropped(th, currentContext());
            return;
        }
        if (isSubscriberReady(markTerminatedOrFinalized)) {
            if (this.outputFused) {
                this.actual.onError(th);
                return;
            }
            if (isWorkInProgress(markTerminatedOrFinalized)) {
                return;
            }
            if (!hasValue(markTerminatedOrFinalized)) {
                this.actual.onError(th);
            } else if (hasRequest(markTerminatedOrFinalized)) {
                drainRegular(markTerminatedOrFinalized);
            }
        }
    }

    @Override // org.reactivestreams.Subscriber
    public void onComplete() {
        if (this.done || this.cancelled) {
            return;
        }
        this.done = true;
        long markTerminatedOrFinalized = markTerminatedOrFinalized(this);
        if (isFinalized(markTerminatedOrFinalized) || isDisposed(markTerminatedOrFinalized) || isCancelled(markTerminatedOrFinalized) || isTerminated(markTerminatedOrFinalized) || !isSubscriberReady(markTerminatedOrFinalized)) {
            return;
        }
        if (this.outputFused) {
            this.actual.onComplete();
            return;
        }
        if (isWorkInProgress(markTerminatedOrFinalized)) {
            return;
        }
        if (!hasValue(markTerminatedOrFinalized)) {
            this.actual.onComplete();
        } else if (hasRequest(markTerminatedOrFinalized)) {
            drainRegular(markTerminatedOrFinalized);
        }
    }

    void drainRegular(long j) {
        CoreSubscriber<? super ByteBuf> coreSubscriber = this.actual;
        Queue<ByteBuf> queue = this.queue;
        Queue<ByteBuf> queue2 = this.priorityQueue;
        long j2 = j + 1;
        do {
            long j3 = this.requested;
            long j4 = 0;
            boolean z = false;
            while (j3 != j4) {
                boolean z2 = this.done;
                ByteBuf poll = queue2.poll();
                z = poll == null;
                if (z) {
                    poll = queue.poll();
                    z = poll == null;
                }
                if (checkTerminated(z2, z, coreSubscriber)) {
                    if (z) {
                        return;
                    }
                    release(poll);
                    return;
                } else {
                    if (z) {
                        break;
                    }
                    coreSubscriber.onNext(poll);
                    j4++;
                }
            }
            if (j3 == j4) {
                boolean z3 = this.done;
                z = queue.isEmpty() && queue2.isEmpty();
                if (checkTerminated(z3, z, coreSubscriber)) {
                    return;
                }
            }
            if (j4 != 0 && j3 != LongCompanionObject.MAX_VALUE) {
                j3 = REQUESTED.addAndGet(this, -j4);
            }
            j2 = markWorkDone(this, j2, j3 > 0, !z);
            if (isCancelled(j2)) {
                clearAndFinalize(this);
                return;
            } else if (isDisposed(j2)) {
                clearAndFinalize(this);
                coreSubscriber.onError(new CancellationException("Disposed"));
                return;
            }
        } while (isWorkInProgress(j2));
    }

    boolean checkTerminated(boolean z, boolean z2, CoreSubscriber<? super ByteBuf> coreSubscriber) {
        long j = this.state;
        if (isCancelled(j)) {
            clearAndFinalize(this);
            return true;
        }
        if (isDisposed(j)) {
            clearAndFinalize(this);
            coreSubscriber.onError(new CancellationException("Disposed"));
            return true;
        }
        if (!z || !z2) {
            return false;
        }
        clearAndFinalize(this);
        Throwable th = this.error;
        if (th != null) {
            coreSubscriber.onError(th);
            return true;
        }
        coreSubscriber.onComplete();
        return true;
    }

    @Override // reactor.core.CoreSubscriber, org.reactivestreams.Subscriber
    public void onSubscribe(Subscription subscription) {
        long j = this.state;
        if (isFinalized(j) || isTerminated(j) || isCancelled(j) || isDisposed(j)) {
            subscription.cancel();
        } else {
            subscription.request(LongCompanionObject.MAX_VALUE);
        }
    }

    @Override // reactor.core.publisher.Flux
    public int getPrefetch() {
        return Integer.MAX_VALUE;
    }

    @Override // reactor.core.CoreSubscriber
    public Context currentContext() {
        return isSubscriberReady(this.state) ? this.actual.currentContext() : Context.empty();
    }

    @Override // reactor.core.publisher.Flux, reactor.core.CorePublisher
    public void subscribe(CoreSubscriber<? super ByteBuf> coreSubscriber) {
        Objects.requireNonNull(coreSubscriber, "subscribe");
        long markSubscribedOnce = markSubscribedOnce(this);
        if (isSubscribedOnce(markSubscribedOnce)) {
            Operators.error(coreSubscriber, new IllegalStateException("UnboundedProcessor allows only a single Subscriber"));
            return;
        }
        if (isDisposed(markSubscribedOnce)) {
            Operators.error(coreSubscriber, new CancellationException("Disposed"));
            return;
        }
        coreSubscriber.onSubscribe(this);
        this.actual = coreSubscriber;
        long markSubscriberReady = markSubscriberReady(this);
        if (this.outputFused) {
            if (isCancelled(markSubscriberReady)) {
                return;
            }
            if (isDisposed(markSubscriberReady)) {
                coreSubscriber.onError(new CancellationException("Disposed"));
                return;
            }
            if (hasValue(markSubscriberReady)) {
                coreSubscriber.onNext(null);
            }
            if (isTerminated(markSubscriberReady)) {
                Throwable th = this.error;
                if (th != null) {
                    coreSubscriber.onError(th);
                    return;
                } else {
                    coreSubscriber.onComplete();
                    return;
                }
            }
            return;
        }
        if (isCancelled(markSubscriberReady)) {
            clearAndFinalize(this);
            return;
        }
        if (isDisposed(markSubscriberReady)) {
            clearAndFinalize(this);
            coreSubscriber.onError(new CancellationException("Disposed"));
            return;
        }
        if (hasValue(markSubscriberReady)) {
            if (hasRequest(markSubscriberReady)) {
                drainRegular(markSubscriberReady);
            }
        } else if (isTerminated(markSubscriberReady)) {
            clearAndFinalize(this);
            Throwable th2 = this.error;
            if (th2 != null) {
                coreSubscriber.onError(th2);
            } else {
                coreSubscriber.onComplete();
            }
        }
    }

    @Override // org.reactivestreams.Subscription
    public void request(long j) {
        if (Operators.validate(j)) {
            if (this.outputFused) {
                if (isSubscriberReady(this.state)) {
                    this.actual.onNext(null);
                    return;
                }
                return;
            }
            Operators.addCap(REQUESTED, this, j);
            long markRequestAdded = markRequestAdded(this);
            if (isWorkInProgress(markRequestAdded) || isFinalized(markRequestAdded) || isCancelled(markRequestAdded) || isDisposed(markRequestAdded) || !isSubscriberReady(markRequestAdded) || !hasValue(markRequestAdded)) {
                return;
            }
            drainRegular(markRequestAdded);
        }
    }

    @Override // org.reactivestreams.Subscription
    public void cancel() {
        this.cancelled = true;
        long markCancelled = markCancelled(this);
        if (isWorkInProgress(markCancelled) || isFinalized(markCancelled) || isCancelled(markCancelled) || isDisposed(markCancelled)) {
            return;
        }
        if (isSubscribedOnce(markCancelled) && this.outputFused) {
            return;
        }
        clearAndFinalize(this);
    }

    @Override // reactor.core.Disposable
    public void dispose() {
        this.cancelled = true;
        long markDisposed = markDisposed(this);
        if (isWorkInProgress(markDisposed) || isFinalized(markDisposed) || isCancelled(markDisposed) || isDisposed(markDisposed)) {
            return;
        }
        if (!isSubscribedOnce(markDisposed)) {
            clearAndFinalize(this);
            return;
        }
        if (isSubscriberReady(markDisposed)) {
            if (!this.outputFused) {
                clearAndFinalize(this);
                this.actual.onError(new CancellationException("Disposed"));
            } else {
                if (isTerminated(markDisposed)) {
                    return;
                }
                this.actual.onError(new CancellationException("Disposed"));
            }
        }
    }

    @Override // java.util.Queue
    @Nullable
    public ByteBuf poll() {
        ByteBuf poll = this.priorityQueue.poll();
        return poll != null ? poll : this.queue.poll();
    }

    @Override // java.util.Collection
    public int size() {
        return this.priorityQueue.size() + this.queue.size();
    }

    @Override // java.util.Collection
    public boolean isEmpty() {
        return this.priorityQueue.isEmpty() && this.queue.isEmpty();
    }

    @Override // java.util.Collection
    public void clear() {
        clearAndFinalize(this);
    }

    void clearSafely() {
        if (DISCARD_GUARD.getAndIncrement(this) != 0) {
            return;
        }
        int i = 1;
        do {
            clearUnsafely();
            i = DISCARD_GUARD.addAndGet(this, -i);
        } while (i != 0);
    }

    void clearUnsafely() {
        Queue<ByteBuf> queue = this.queue;
        Queue<ByteBuf> queue2 = this.priorityQueue;
        while (true) {
            ByteBuf poll = queue.poll();
            if (poll == null) {
                break;
            } else {
                release(poll);
            }
        }
        while (true) {
            ByteBuf poll2 = queue2.poll();
            if (poll2 == null) {
                return;
            } else {
                release(poll2);
            }
        }
    }

    @Override // reactor.core.Fuseable.QueueSubscription
    public int requestFusion(int i) {
        if ((i & 2) == 0) {
            return 0;
        }
        this.outputFused = true;
        return 2;
    }

    @Override // reactor.core.Disposable
    public boolean isDisposed() {
        return isFinalized(this.state);
    }

    boolean hasDownstreams() {
        long j = this.state;
        return !isTerminated(j) && isSubscriberReady(j);
    }

    static void release(ByteBuf byteBuf) {
        if (byteBuf.refCnt() > 0) {
            try {
                byteBuf.release();
            } catch (Throwable th) {
            }
        }
    }

    static long markSubscribedOnce(UnboundedProcessor unboundedProcessor) {
        long j;
        do {
            j = unboundedProcessor.state;
            if (isSubscribedOnce(j)) {
                return j;
            }
        } while (!STATE.compareAndSet(unboundedProcessor, j, j | FLAG_SUBSCRIBED_ONCE));
        return j;
    }

    static long markSubscriberReady(UnboundedProcessor unboundedProcessor) {
        long j;
        long j2;
        do {
            j = unboundedProcessor.state;
            if (isFinalized(j) || isCancelled(j) || isDisposed(j)) {
                return j;
            }
            j2 = j;
            if (!unboundedProcessor.outputFused && ((!hasValue(j) && isTerminated(j)) || (hasRequest(j) && hasValue(j)))) {
                j2 = addWork(j);
            }
        } while (!STATE.compareAndSet(unboundedProcessor, j, j2 | FLAG_SUBSCRIBER_READY));
        return j;
    }

    static long markRequestAdded(UnboundedProcessor unboundedProcessor) {
        long j;
        long j2;
        do {
            j = unboundedProcessor.state;
            if (isFinalized(j) || isCancelled(j) || isDisposed(j)) {
                return j;
            }
            j2 = j;
            if (isSubscriberReady(j) && hasValue(j)) {
                j2 = addWork(j);
            }
        } while (!STATE.compareAndSet(unboundedProcessor, j, j2 | FLAG_HAS_REQUEST));
        return j;
    }

    static long markValueAdded(UnboundedProcessor unboundedProcessor) {
        long j;
        long j2;
        do {
            j = unboundedProcessor.state;
            if (isFinalized(j)) {
                return j;
            }
            j2 = j;
            if (isWorkInProgress(j)) {
                j2 = addWork(j);
            } else if (isSubscriberReady(j)) {
                if (unboundedProcessor.outputFused) {
                    return j;
                }
                if (hasRequest(j)) {
                    j2 = addWork(j);
                }
            }
        } while (!STATE.compareAndSet(unboundedProcessor, j, j2 | FLAG_HAS_VALUE));
        return j;
    }

    static long markTerminatedOrFinalized(UnboundedProcessor unboundedProcessor) {
        long j;
        long j2;
        do {
            j = unboundedProcessor.state;
            if (isFinalized(j) || isTerminated(j) || isCancelled(j) || isDisposed(j)) {
                return j;
            }
            j2 = j;
            if (isSubscriberReady(j) && !unboundedProcessor.outputFused) {
                if (!hasValue(j)) {
                    j2 = Long.MIN_VALUE;
                } else if (hasRequest(j)) {
                    j2 = addWork(j);
                }
            }
        } while (!STATE.compareAndSet(unboundedProcessor, j, j2 | 2305843009213693952L));
        if (isFinalized(j2)) {
            unboundedProcessor.onFinalizedHook.run();
        }
        return j;
    }

    static long markCancelled(UnboundedProcessor unboundedProcessor) {
        long j;
        do {
            j = unboundedProcessor.state;
            if (isFinalized(j) || isCancelled(j)) {
                return j;
            }
        } while (!STATE.compareAndSet(unboundedProcessor, j, addWork(j) | 1152921504606846976L));
        return j;
    }

    static long markDisposed(UnboundedProcessor unboundedProcessor) {
        long j;
        do {
            j = unboundedProcessor.state;
            if (isFinalized(j) || isCancelled(j) || isDisposed(j)) {
                return j;
            }
        } while (!STATE.compareAndSet(unboundedProcessor, j, addWork(j) | 4611686018427387904L));
        return j;
    }

    static long addWork(long j) {
        return (j & MAX_WIP_VALUE) == MAX_WIP_VALUE ? j : j + 1;
    }

    static long markWorkDone(UnboundedProcessor unboundedProcessor, long j, boolean z, boolean z2) {
        long j2;
        long j3;
        long j4 = j & MAX_WIP_VALUE;
        do {
            j2 = unboundedProcessor.state;
            if ((j2 & MAX_WIP_VALUE) != j4) {
                return j2;
            }
            if (isFinalized(j2) || isCancelled(j2) || isDisposed(j2)) {
                return j2;
            }
            j3 = j2 - j4;
        } while (!STATE.compareAndSet(unboundedProcessor, j2, (j3 ^ (z ? 0L : FLAG_HAS_REQUEST)) ^ (z2 ? 0L : FLAG_HAS_VALUE)));
        return j3;
    }

    static void clearAndFinalize(UnboundedProcessor unboundedProcessor) {
        long j;
        do {
            j = unboundedProcessor.state;
            if (isFinalized(j)) {
                unboundedProcessor.clearSafely();
                return;
            } else if (isSubscriberReady(j) && unboundedProcessor.outputFused) {
                unboundedProcessor.clearUnsafely();
            } else {
                unboundedProcessor.clearSafely();
            }
        } while (!STATE.compareAndSet(unboundedProcessor, j, (j & (-72057594037927936L) & (-576460752303423489L)) | Long.MIN_VALUE));
        unboundedProcessor.onFinalizedHook.run();
    }

    static boolean hasValue(long j) {
        return (j & FLAG_HAS_VALUE) == FLAG_HAS_VALUE;
    }

    static boolean hasRequest(long j) {
        return (j & FLAG_HAS_REQUEST) == FLAG_HAS_REQUEST;
    }

    static boolean isCancelled(long j) {
        return (j & 1152921504606846976L) == 1152921504606846976L;
    }

    static boolean isDisposed(long j) {
        return (j & 4611686018427387904L) == 4611686018427387904L;
    }

    static boolean isWorkInProgress(long j) {
        return (j & MAX_WIP_VALUE) != 0;
    }

    static boolean isTerminated(long j) {
        return (j & 2305843009213693952L) == 2305843009213693952L;
    }

    static boolean isFinalized(long j) {
        return (j & Long.MIN_VALUE) == Long.MIN_VALUE;
    }

    static boolean isSubscriberReady(long j) {
        return (j & FLAG_SUBSCRIBER_READY) == FLAG_SUBSCRIBER_READY;
    }

    static boolean isSubscribedOnce(long j) {
        return (j & FLAG_SUBSCRIBED_ONCE) == FLAG_SUBSCRIBED_ONCE;
    }
}
