package com.sanjiang.vantrue.internal.mqtt.handler.publish.outgoing;

import android.util.Log;
import com.sanjiang.vantrue.internal.annotations.CallByThread;
import com.sanjiang.vantrue.internal.mqtt.MqttClientConfig;
import com.sanjiang.vantrue.internal.mqtt.MqttClientConnectionConfig;
import com.sanjiang.vantrue.internal.mqtt.advanced.interceptor.MqttClientInterceptors;
import com.sanjiang.vantrue.internal.mqtt.exceptions.MqttClientStateExceptions;
import com.sanjiang.vantrue.internal.mqtt.handler.MqttSessionAwareHandler;
import com.sanjiang.vantrue.internal.mqtt.handler.disconnect.MqttDisconnectUtil;
import com.sanjiang.vantrue.internal.mqtt.handler.publish.outgoing.f;
import com.sanjiang.vantrue.internal.mqtt.ioc.ClientScope;
import com.sanjiang.vantrue.internal.mqtt.message.publish.MqttPublish;
import com.sanjiang.vantrue.internal.mqtt.message.publish.MqttPublishResult;
import com.sanjiang.vantrue.internal.mqtt.message.publish.MqttStatefulPublish;
import com.sanjiang.vantrue.internal.mqtt.message.publish.puback.MqttPubAck;
import com.sanjiang.vantrue.internal.mqtt.message.publish.pubcomp.MqttPubComp;
import com.sanjiang.vantrue.internal.mqtt.message.publish.pubrec.MqttPubRec;
import com.sanjiang.vantrue.internal.mqtt.message.publish.pubrel.MqttPubRel;
import com.sanjiang.vantrue.internal.mqtt.message.publish.pubrel.MqttPubRelBuilder;
import com.sanjiang.vantrue.internal.netty.ContextFuture;
import com.sanjiang.vantrue.internal.netty.DefaultContextPromise;
import com.sanjiang.vantrue.internal.util.Ranges;
import com.sanjiang.vantrue.internal.util.collections.IntIndex;
import com.sanjiang.vantrue.internal.util.collections.NodeList;
import com.sanjiang.vantrue.mqtt.MqttClientState;
import com.sanjiang.vantrue.mqtt.datatypes.MqttQos;
import com.sanjiang.vantrue.mqtt.exceptions.ConnectionClosedException;
import com.sanjiang.vantrue.mqtt.mqtt5.advanced.interceptor.qos1.Mqtt5OutgoingQos1Interceptor;
import com.sanjiang.vantrue.mqtt.mqtt5.advanced.interceptor.qos2.Mqtt5OutgoingQos2Interceptor;
import com.sanjiang.vantrue.mqtt.mqtt5.exceptions.Mqtt5PubAckException;
import com.sanjiang.vantrue.mqtt.mqtt5.exceptions.Mqtt5PubRecException;
import com.sanjiang.vantrue.mqtt.mqtt5.message.disconnect.Mqtt5DisconnectReasonCode;
import com.sanjiang.vantrue.mqtt.mqtt5.message.publish.puback.Mqtt5PubAckReasonCode;
import com.sanjiang.vantrue.mqtt.mqtt5.message.publish.pubrec.Mqtt5PubRecReasonCode;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoop;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.reactivex.rxjava3.core.o;
import io.reactivex.rxjava3.core.t;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.ToIntFunction;
import javax.inject.Inject;
import nc.l;
import nc.m;
import org.jctools.queues.SpscUnboundedArrayQueue;
import pc.u;
import pc.w;

@ClientScope
/* loaded from: classes4.dex */
public class MqttOutgoingQosHandler extends MqttSessionAwareHandler implements t<h>, Runnable, ContextFuture.Listener<h> {
    static final /* synthetic */ boolean $assertionsDisabled = false;
    private static final IntIndex.Spec<e> INDEX_SPEC = new IntIndex.Spec<>(new ToIntFunction() { // from class: com.sanjiang.vantrue.internal.mqtt.handler.publish.outgoing.d
        @Override // java.util.function.ToIntFunction
        public final int applyAsInt(Object obj) {
            int i10;
            i10 = ((e) obj).f18478b;
            return i10;
        }
    });
    private static final int MAX_CONCURRENT_PUBLISH_FLOWABLES = 64;

    @l
    public static final String NAME = "qos.outgoing";
    private static final boolean QOS_2_COMPLETE_RESULT = false;
    private static final String TAG = "MqttOutgoingQosHandler";

    @l
    private final MqttClientConfig clientConfig;

    @m
    private h currentPending;

    @m
    private e resendPending;
    private int sendMaximum;
    private int shrinkRequests;

    @m
    private w subscription;

    @m
    private MqttTopicAliasMapping topicAliasMapping;

    @l
    private final SpscUnboundedArrayQueue<h> queue = new SpscUnboundedArrayQueue<>(32);

    @l
    private final AtomicInteger queuedCounter = new AtomicInteger();

    @l
    private final NodeList<e> pending = new NodeList<>();

    @l
    private final Ranges packetIdentifiers = new Ranges(1, 0);

    @l
    private final IntIndex<e> pendingIndex = new IntIndex<>(INDEX_SPEC);

    @l
    private final MqttPublishFlowables publishFlowables = new MqttPublishFlowables();

    @Inject
    public MqttOutgoingQosHandler(@l MqttClientConfig mqttClientConfig) {
        this.clientConfig = mqttClientConfig;
    }

    public static /* synthetic */ u a(o oVar) {
        return oVar;
    }

    @l
    private MqttPubRel buildPubRel(@l MqttPublish mqttPublish, @l MqttPubRec mqttPubRec) {
        Mqtt5OutgoingQos2Interceptor outgoingQos2Interceptor;
        MqttPubRelBuilder mqttPubRelBuilder = new MqttPubRelBuilder(mqttPubRec);
        MqttClientInterceptors interceptors = this.clientConfig.getAdvancedConfig().getInterceptors();
        if (interceptors != null && (outgoingQos2Interceptor = interceptors.getOutgoingQos2Interceptor()) != null) {
            outgoingQos2Interceptor.onPubRec(this.clientConfig, mqttPublish, mqttPubRec, mqttPubRelBuilder);
        }
        return mqttPubRelBuilder.build();
    }

    private void clearQueued(@l Throwable th) {
        int i10;
        do {
            i10 = 0;
            while (true) {
                h hVar = (h) this.queue.poll();
                if (hVar == null) {
                    break;
                }
                hVar.a().e(new MqttPublishResult(hVar.b(), th));
                i10++;
            }
        } while (this.queuedCounter.addAndGet(-i10) != 0);
    }

    private void completePending(@l ChannelHandlerContext channelHandlerContext, @l e eVar) {
        this.pending.remove(eVar);
        int i10 = eVar.f18478b;
        this.packetIdentifiers.returnId(i10);
        int i11 = this.sendMaximum;
        if (i10 > i11) {
            this.packetIdentifiers.resize(i11);
        }
        if (this.resendPending != null) {
            channelHandlerContext.channel().eventLoop().execute(this);
        }
    }

    private static void error(@l ChannelHandlerContext channelHandlerContext, @l String str) {
        MqttDisconnectUtil.disconnect(channelHandlerContext.channel(), Mqtt5DisconnectReasonCode.PROTOCOL_ERROR, str);
    }

    private boolean isRepublishIfSessionExpired() {
        return this.clientConfig.isRepublishIfSessionExpired() && this.clientConfig.getState() != MqttClientState.DISCONNECTED;
    }

    private void onPubAck(@l MqttPublish mqttPublish, @l MqttPubAck mqttPubAck) {
        Mqtt5OutgoingQos1Interceptor outgoingQos1Interceptor;
        MqttClientInterceptors interceptors = this.clientConfig.getAdvancedConfig().getInterceptors();
        if (interceptors == null || (outgoingQos1Interceptor = interceptors.getOutgoingQos1Interceptor()) == null) {
            return;
        }
        outgoingQos1Interceptor.onPubAck(this.clientConfig, mqttPublish, mqttPubAck);
    }

    private void onPubComp(@l MqttPubRel mqttPubRel, @l MqttPubComp mqttPubComp) {
        Mqtt5OutgoingQos2Interceptor outgoingQos2Interceptor;
        MqttClientInterceptors interceptors = this.clientConfig.getAdvancedConfig().getInterceptors();
        if (interceptors == null || (outgoingQos2Interceptor = interceptors.getOutgoingQos2Interceptor()) == null) {
            return;
        }
        outgoingQos2Interceptor.onPubComp(this.clientConfig, mqttPubRel, mqttPubComp);
    }

    private void onPubRecError(@l MqttPublish mqttPublish, @l MqttPubRec mqttPubRec) {
        Mqtt5OutgoingQos2Interceptor outgoingQos2Interceptor;
        MqttClientInterceptors interceptors = this.clientConfig.getAdvancedConfig().getInterceptors();
        if (interceptors == null || (outgoingQos2Interceptor = interceptors.getOutgoingQos2Interceptor()) == null) {
            return;
        }
        outgoingQos2Interceptor.onPubRecError(this.clientConfig, mqttPublish, mqttPubRec);
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void readPubAck(@l ChannelHandlerContext channelHandlerContext, @l MqttPubAck mqttPubAck) {
        e remove = this.pendingIndex.remove(mqttPubAck.getPacketIdentifier());
        if (remove == null) {
            error(channelHandlerContext, "PUBACK contained unknown packet identifier");
            return;
        }
        if (!(remove instanceof h)) {
            this.pendingIndex.put(remove);
            error(channelHandlerContext, "PUBACK must not be received for a PUBREL");
            return;
        }
        h hVar = (h) remove;
        MqttPublish b10 = hVar.b();
        if (b10.getQos() != MqttQos.AT_LEAST_ONCE) {
            this.pendingIndex.put(remove);
            error(channelHandlerContext, "PUBACK must not be received for a QoS 2 PUBLISH");
        } else {
            completePending(channelHandlerContext, hVar);
            onPubAck(b10, mqttPubAck);
            hVar.a().e(new MqttPublishResult.MqttQos1Result(b10, ((Mqtt5PubAckReasonCode) mqttPubAck.getReasonCode()).isError() ? new Mqtt5PubAckException(mqttPubAck, "PUBACK contained an Error Code") : null, mqttPubAck));
        }
    }

    private void readPubComp(@l ChannelHandlerContext channelHandlerContext, @l MqttPubComp mqttPubComp) {
        e remove = this.pendingIndex.remove(mqttPubComp.getPacketIdentifier());
        if (remove == null) {
            error(channelHandlerContext, "PUBCOMP contained unknown packet identifier");
            return;
        }
        if (!(remove instanceof f)) {
            this.pendingIndex.put(remove);
            if (((h) remove).b().getQos() == MqttQos.AT_LEAST_ONCE) {
                error(channelHandlerContext, "PUBCOMP must not be received for a QoS 1 PUBLISH");
                return;
            } else {
                error(channelHandlerContext, "PUBCOMP must not be received when the PUBREL has not been sent yet");
                return;
            }
        }
        f fVar = (f) remove;
        MqttPubRel b10 = fVar.b();
        a a10 = fVar.a();
        completePending(channelHandlerContext, fVar);
        onPubComp(b10, mqttPubComp);
        if (((f.b) fVar).getAsBoolean()) {
            a10.d(1L);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void readPubRec(@l ChannelHandlerContext channelHandlerContext, @l MqttPubRec mqttPubRec) {
        int packetIdentifier = mqttPubRec.getPacketIdentifier();
        e eVar = this.pendingIndex.get(packetIdentifier);
        if (eVar == null) {
            error(channelHandlerContext, "PUBREC contained unknown packet identifier");
            return;
        }
        if (!(eVar instanceof h)) {
            error(channelHandlerContext, "PUBREC must not be received when the PUBREL has already been sent");
            return;
        }
        h hVar = (h) eVar;
        MqttPublish b10 = hVar.b();
        if (b10.getQos() != MqttQos.EXACTLY_ONCE) {
            error(channelHandlerContext, "PUBREC must not be received for a QoS 1 PUBLISH");
            return;
        }
        a a10 = hVar.a();
        if (((Mqtt5PubRecReasonCode) mqttPubRec.getReasonCode()).isError()) {
            this.pendingIndex.remove(packetIdentifier);
            completePending(channelHandlerContext, hVar);
            onPubRecError(b10, mqttPubRec);
            a10.e(new MqttPublishResult.MqttQos2Result(b10, new Mqtt5PubRecException(mqttPubRec, "PUBREC contained an Error Code"), mqttPubRec));
            return;
        }
        MqttPubRel buildPubRel = buildPubRel(b10, mqttPubRec);
        f.b bVar = new f.b(buildPubRel, a10);
        replacePending(hVar, bVar);
        a10.e(new MqttPublishResult.MqttQos2IntermediateResult(b10, mqttPubRec, bVar));
        writePubRel(channelHandlerContext, buildPubRel);
        channelHandlerContext.flush();
    }

    private void replacePending(@l h hVar, @l f fVar) {
        fVar.f18478b = hVar.f18478b;
        this.pendingIndex.put(fVar);
        this.pending.replace(hVar, fVar);
    }

    private void resend(@l ChannelHandlerContext channelHandlerContext, @l e eVar) {
        this.pendingIndex.put(eVar);
        if (!(eVar instanceof h)) {
            writePubRel(channelHandlerContext, ((f) eVar).b());
        } else {
            h hVar = (h) eVar;
            writeQos1Or2Publish(channelHandlerContext, hVar.b().createStateful(hVar.f18478b, true, this.topicAliasMapping), hVar);
        }
    }

    private void writePubRel(@l ChannelHandlerContext channelHandlerContext, @l MqttPubRel mqttPubRel) {
        channelHandlerContext.write(mqttPubRel, channelHandlerContext.voidPromise());
    }

    private void writePublish(@l ChannelHandlerContext channelHandlerContext, @l h hVar) {
        if (hVar.b().getQos() == MqttQos.AT_MOST_ONCE) {
            writeQos0Publish(channelHandlerContext, hVar);
        } else {
            writeQos1Or2Publish(channelHandlerContext, hVar);
        }
    }

    private void writeQos0Publish(@l ChannelHandlerContext channelHandlerContext, @l h hVar) {
        channelHandlerContext.write(hVar.b().createStateful(-1, false, this.topicAliasMapping), new DefaultContextPromise(channelHandlerContext.channel(), hVar)).addListener((GenericFutureListener<? extends Future<? super Void>>) this);
    }

    private void writeQos1Or2Publish(@l ChannelHandlerContext channelHandlerContext, @l h hVar) {
        int id = this.packetIdentifiers.getId();
        if (id < 0) {
            Log.e(TAG, "No Packet Identifier available for QoS 1 or 2 PUBLISH. This must not happen and is a bug.");
            return;
        }
        hVar.f18478b = id;
        this.pendingIndex.put(hVar);
        this.pending.add(hVar);
        writeQos1Or2Publish(channelHandlerContext, hVar.b().createStateful(id, false, this.topicAliasMapping), hVar);
    }

    private void writeQos1Or2Publish(@l ChannelHandlerContext channelHandlerContext, @l MqttStatefulPublish mqttStatefulPublish, @l h hVar) {
        this.currentPending = hVar;
        channelHandlerContext.write(mqttStatefulPublish, channelHandlerContext.voidPromise());
        this.currentPending = null;
    }

    @Override // io.netty.channel.ChannelInboundHandlerAdapter, io.netty.channel.ChannelInboundHandler
    public void channelRead(@l ChannelHandlerContext channelHandlerContext, @l Object obj) {
        if (obj instanceof MqttPubAck) {
            readPubAck(channelHandlerContext, (MqttPubAck) obj);
            return;
        }
        if (obj instanceof MqttPubRec) {
            readPubRec(channelHandlerContext, (MqttPubRec) obj);
        } else if (obj instanceof MqttPubComp) {
            readPubComp(channelHandlerContext, (MqttPubComp) obj);
        } else {
            channelHandlerContext.fireChannelRead(obj);
        }
    }

    @Override // io.netty.channel.ChannelInboundHandlerAdapter, io.netty.channel.ChannelInboundHandler
    public void channelWritabilityChanged(@l ChannelHandlerContext channelHandlerContext) {
        Channel channel = channelHandlerContext.channel();
        if (channel.isWritable()) {
            channel.eventLoop().execute(this);
        }
        channelHandlerContext.fireChannelWritabilityChanged();
    }

    @Override // io.netty.channel.ChannelInboundHandlerAdapter, io.netty.channel.ChannelHandlerAdapter, io.netty.channel.ChannelHandler, io.netty.channel.ChannelInboundHandler
    public void exceptionCaught(@l ChannelHandlerContext channelHandlerContext, @l Throwable th) {
        h hVar;
        if ((th instanceof IOException) || (hVar = this.currentPending) == null) {
            channelHandlerContext.fireExceptionCaught(th);
            return;
        }
        this.pendingIndex.remove(hVar.f18478b);
        this.currentPending.a().e(new MqttPublishResult(this.currentPending.b(), th));
        completePending(channelHandlerContext, this.currentPending);
        this.currentPending = null;
    }

    @l
    public MqttPublishFlowables getPublishFlowables() {
        return this.publishFlowables;
    }

    @Override // pc.v
    public void onComplete() {
        Log.e(TAG, "MqttPublishFlowables is global and must never complete. This must not happen and is a bug.");
    }

    @Override // pc.v
    public void onError(@l Throwable th) {
        Log.e(TAG, "MqttPublishFlowables is global and must never error. This must not happen and is a bug.", th);
    }

    @Override // pc.v
    public void onNext(@l h hVar) {
        this.queue.offer(hVar);
        if (this.queuedCounter.getAndIncrement() == 0) {
            hVar.a().getEventLoop().execute(this);
        }
    }

    @Override // com.sanjiang.vantrue.internal.mqtt.handler.MqttSessionAwareHandler
    public void onSessionEnd(@l Throwable th) {
        super.onSessionEnd(th);
        this.pendingIndex.clear();
        this.resendPending = null;
        if (isRepublishIfSessionExpired()) {
            return;
        }
        for (e first = this.pending.getFirst(); first != null; first = first.getNext()) {
            this.packetIdentifiers.returnId(first.f18478b);
            if (first instanceof h) {
                first.a().e(new MqttPublishResult(((h) first).b(), th));
            } else {
                f.b bVar = (f.b) first;
                if (bVar.getAsBoolean()) {
                    bVar.a().d(1L);
                }
            }
        }
        this.pending.clear();
        clearQueued(th);
    }

    @Override // com.sanjiang.vantrue.internal.mqtt.handler.MqttSessionAwareHandler
    public void onSessionStartOrResume(@l MqttClientConnectionConfig mqttClientConnectionConfig, @l EventLoop eventLoop) {
        int i10 = this.sendMaximum;
        int min = Math.min(mqttClientConnectionConfig.getSendMaximum(), 65525);
        this.sendMaximum = min;
        this.packetIdentifiers.resize(min);
        if (i10 == 0) {
            this.publishFlowables.flatMap(new r5.o() { // from class: com.sanjiang.vantrue.internal.mqtt.handler.publish.outgoing.c
                @Override // r5.o
                public final Object apply(Object obj) {
                    return MqttOutgoingQosHandler.a((o) obj);
                }
            }, true, 64, Math.min(min, o.bufferSize())).subscribe((t<? super R>) this);
            this.subscription.request(min);
        } else {
            int i11 = (min - i10) - this.shrinkRequests;
            if (i11 > 0) {
                this.shrinkRequests = 0;
                this.subscription.request(i11);
            } else {
                this.shrinkRequests = -i11;
            }
        }
        this.topicAliasMapping = mqttClientConnectionConfig.getSendTopicAliasMapping();
        this.pendingIndex.clear();
        e first = this.pending.getFirst();
        this.resendPending = first;
        if (first != null || this.queuedCounter.get() > 0) {
            eventLoop.execute(this);
        }
        super.onSessionStartOrResume(mqttClientConnectionConfig, eventLoop);
    }

    @Override // io.reactivex.rxjava3.core.t, pc.v
    public void onSubscribe(@l w wVar) {
        this.subscription = wVar;
    }

    @Override // io.netty.util.concurrent.GenericFutureListener
    public void operationComplete(@l ContextFuture<? extends h> contextFuture) {
        h context = contextFuture.getContext();
        MqttPublish b10 = context.b();
        a a10 = context.a();
        Throwable cause = contextFuture.cause();
        if (!(cause instanceof IOException)) {
            a10.e(new MqttPublishResult(b10, cause));
        } else {
            a10.e(new MqttPublishResult(b10, new ConnectionClosedException(cause)));
            contextFuture.channel().pipeline().fireExceptionCaught(cause);
        }
    }

    @CallByThread("Netty EventLoop")
    public void request(long j10) {
        int i10 = this.shrinkRequests;
        if (i10 == 0) {
            this.subscription.request(j10);
            return;
        }
        long j11 = i10;
        if (j10 <= j11) {
            this.shrinkRequests = (int) (j11 - j10);
        } else {
            this.shrinkRequests = 0;
            this.subscription.request(j10 - j11);
        }
    }

    @Override // java.lang.Runnable
    @CallByThread("Netty EventLoop")
    public void run() {
        if (!this.hasSession) {
            if (isRepublishIfSessionExpired()) {
                return;
            }
            clearQueued(MqttClientStateExceptions.notConnected());
            return;
        }
        ChannelHandlerContext channelHandlerContext = this.ctx;
        if (channelHandlerContext == null) {
            return;
        }
        Channel channel = channelHandlerContext.channel();
        int size = this.sendMaximum - this.pendingIndex.size();
        e eVar = this.resendPending;
        int i10 = 0;
        int i11 = 0;
        while (eVar != null && i11 < size && channel.isWritable()) {
            resend(channelHandlerContext, eVar);
            i11++;
            eVar = eVar.getNext();
            this.resendPending = eVar;
        }
        while (i11 < size && channel.isWritable()) {
            h hVar = (h) this.queue.poll();
            if (hVar == null) {
                break;
            }
            writePublish(channelHandlerContext, hVar);
            i11++;
            i10++;
        }
        if (i11 > 0) {
            boolean isWritable = channel.isWritable();
            channelHandlerContext.flush();
            if (i10 <= 0 || this.queuedCounter.addAndGet(-i10) <= 0 || !isWritable) {
                return;
            }
            channel.eventLoop().execute(this);
        }
    }
}
