package com.sanjiang.vantrue.internal.mqtt.handler.subscribe;

import android.util.Log;
import b2.a;
import b2.b;
import b2.c;
import b2.i;
import com.sanjiang.vantrue.internal.annotations.CallByThread;
import com.sanjiang.vantrue.internal.mqtt.MqttClientConfig;
import com.sanjiang.vantrue.internal.mqtt.MqttClientConnectionConfig;
import com.sanjiang.vantrue.internal.mqtt.datatypes.MqttUserPropertiesImpl;
import com.sanjiang.vantrue.internal.mqtt.handler.MqttSessionAwareHandler;
import com.sanjiang.vantrue.internal.mqtt.handler.disconnect.MqttDisconnectUtil;
import com.sanjiang.vantrue.internal.mqtt.handler.publish.incoming.MqttGlobalIncomingPublishFlow;
import com.sanjiang.vantrue.internal.mqtt.handler.publish.incoming.MqttIncomingPublishFlows;
import com.sanjiang.vantrue.internal.mqtt.handler.publish.incoming.MqttSubscribedPublishFlow;
import com.sanjiang.vantrue.internal.mqtt.handler.subscribe.MqttSubscriptionHandler;
import com.sanjiang.vantrue.internal.mqtt.ioc.ClientScope;
import com.sanjiang.vantrue.internal.mqtt.message.MqttCommonReasonCode;
import com.sanjiang.vantrue.internal.mqtt.message.subscribe.MqttStatefulSubscribe;
import com.sanjiang.vantrue.internal.mqtt.message.subscribe.MqttSubscribe;
import com.sanjiang.vantrue.internal.mqtt.message.subscribe.suback.MqttSubAck;
import com.sanjiang.vantrue.internal.mqtt.message.unsubscribe.MqttStatefulUnsubscribe;
import com.sanjiang.vantrue.internal.mqtt.message.unsubscribe.MqttUnsubscribe;
import com.sanjiang.vantrue.internal.mqtt.message.unsubscribe.unsuback.MqttUnsubAck;
import com.sanjiang.vantrue.internal.mqtt.message.unsubscribe.unsuback.mqtt3.Mqtt3UnsubAckView;
import com.sanjiang.vantrue.internal.util.Ranges;
import com.sanjiang.vantrue.internal.util.collections.ImmutableList;
import com.sanjiang.vantrue.internal.util.collections.IntIndex;
import com.sanjiang.vantrue.internal.util.collections.NodeList;
import com.sanjiang.vantrue.mqtt.MqttClientState;
import com.sanjiang.vantrue.mqtt.mqtt5.exceptions.Mqtt5SubAckException;
import com.sanjiang.vantrue.mqtt.mqtt5.exceptions.Mqtt5UnsubAckException;
import com.sanjiang.vantrue.mqtt.mqtt5.message.disconnect.Mqtt5DisconnectReasonCode;
import com.sanjiang.vantrue.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAckReasonCode;
import com.sanjiang.vantrue.mqtt.mqtt5.message.unsubscribe.unsuback.Mqtt5UnsubAckReasonCode;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoop;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.ToIntFunction;
import javax.inject.Inject;
import nc.l;
import nc.m;

@ClientScope
/* loaded from: classes4.dex */
public class MqttSubscriptionHandler extends MqttSessionAwareHandler implements Runnable {
    private static final IntIndex.Spec<b> INDEX_SPEC = new IntIndex.Spec<>(new ToIntFunction() { // from class: b2.h
        @Override // java.util.function.ToIntFunction
        public final int applyAsInt(Object obj) {
            int i10;
            i10 = ((b) obj).f3037a;
            return i10;
        }
    }, 4);
    public static final int MAX_SUB_PENDING = 10;

    @l
    public static final String NAME = "subscription";
    private static final String TAG = "MqttSubscriptionHandler";

    @l
    private final MqttClientConfig clientConfig;

    @m
    private b currentPending;

    @l
    private final MqttIncomingPublishFlows incomingPublishFlows;

    @m
    private b sendPending;
    private boolean subscriptionIdentifiersAvailable;

    @l
    private final NodeList<b> pending = new NodeList<>();
    private int nextSubscriptionIdentifier = 1;

    @l
    private final IntIndex<b> pendingIndex = new IntIndex<>(INDEX_SPEC);

    @l
    private final Ranges packetIdentifiers = new Ranges(65526, 65535);

    @Inject
    public MqttSubscriptionHandler(@l MqttClientConfig mqttClientConfig, @l MqttIncomingPublishFlows mqttIncomingPublishFlows) {
        this.clientConfig = mqttClientConfig;
        this.incomingPublishFlows = mqttIncomingPublishFlows;
    }

    private void completePending(@l b bVar) {
        this.pending.remove(bVar);
        this.packetIdentifiers.returnId(bVar.f3037a);
        run();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public /* synthetic */ void lambda$onSessionStartOrResume$1(Integer num, List list) {
        this.pending.addFirst(new c(new MqttSubscribe(ImmutableList.copyOf((Collection) list), MqttUserPropertiesImpl.NO_USER_PROPERTIES), num.intValue(), null));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public /* synthetic */ void lambda$subscribe$2(MqttSubscriptionFlow mqttSubscriptionFlow, MqttSubscribe mqttSubscribe) {
        if (mqttSubscriptionFlow.init()) {
            int i10 = this.nextSubscriptionIdentifier;
            this.nextSubscriptionIdentifier = i10 + 1;
            this.incomingPublishFlows.subscribe(mqttSubscribe, i10, mqttSubscriptionFlow instanceof MqttSubscribedPublishFlow ? (MqttSubscribedPublishFlow) mqttSubscriptionFlow : null);
            queue(new c(mqttSubscribe, i10, mqttSubscriptionFlow));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public /* synthetic */ void lambda$subscribeGlobal$4(MqttGlobalIncomingPublishFlow mqttGlobalIncomingPublishFlow) {
        if (mqttGlobalIncomingPublishFlow.init()) {
            this.incomingPublishFlows.subscribeGlobal(mqttGlobalIncomingPublishFlow);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public /* synthetic */ void lambda$unsubscribe$3(a aVar, MqttUnsubscribe mqttUnsubscribe) {
        if (aVar.init()) {
            queue(new i(mqttUnsubscribe, aVar));
        }
    }

    private void queue(@l b bVar) {
        this.pending.add(bVar);
        if (this.sendPending == null) {
            this.sendPending = bVar;
            run();
        }
    }

    private void readSubAck(@l ChannelHandlerContext channelHandlerContext, @l MqttSubAck mqttSubAck) {
        b remove = this.pendingIndex.remove(mqttSubAck.getPacketIdentifier());
        if (remove == null) {
            MqttDisconnectUtil.disconnect(channelHandlerContext.channel(), Mqtt5DisconnectReasonCode.PROTOCOL_ERROR, "Unknown packet identifier for SUBACK");
            return;
        }
        if (!(remove instanceof c)) {
            MqttDisconnectUtil.disconnect(channelHandlerContext.channel(), Mqtt5DisconnectReasonCode.PROTOCOL_ERROR, "SUBACK received for an UNSUBSCRIBE");
            return;
        }
        c cVar = (c) remove;
        MqttSubscriptionFlow<MqttSubAck> a10 = cVar.a();
        ImmutableList<Mqtt5SubAckReasonCode> reasonCodes = mqttSubAck.getReasonCodes();
        boolean z10 = cVar.f3038b.getSubscriptions().size() != reasonCodes.size();
        boolean allErrors = MqttCommonReasonCode.allErrors(mqttSubAck.getReasonCodes());
        this.incomingPublishFlows.subAck(cVar.f3038b, cVar.f3039c, reasonCodes);
        if (a10 != null) {
            if (z10 || allErrors) {
                String str = z10 ? "Count of Reason Codes in SUBACK does not match count of subscriptions in SUBSCRIBE" : "SUBACK contains only Error Codes";
                if (a10.isCancelled()) {
                    Log.w(TAG, str.concat(" but the SubAck flow has been cancelled"));
                } else {
                    a10.onError(new Mqtt5SubAckException(mqttSubAck, str));
                }
            } else if (a10.isCancelled()) {
                Log.w(TAG, "Subscribe was successful but the SubAck flow has been cancelled");
            } else {
                a10.onSuccess(mqttSubAck);
            }
        }
        completePending(cVar);
    }

    private void readUnsubAck(@l ChannelHandlerContext channelHandlerContext, @l MqttUnsubAck mqttUnsubAck) {
        b remove = this.pendingIndex.remove(mqttUnsubAck.getPacketIdentifier());
        if (remove == null) {
            MqttDisconnectUtil.disconnect(channelHandlerContext.channel(), Mqtt5DisconnectReasonCode.PROTOCOL_ERROR, "Unknown packet identifier for UNSUBACK");
            return;
        }
        if (!(remove instanceof i)) {
            MqttDisconnectUtil.disconnect(channelHandlerContext.channel(), Mqtt5DisconnectReasonCode.PROTOCOL_ERROR, "UNSUBACK received for a SUBSCRIBE");
            return;
        }
        i iVar = (i) remove;
        a<MqttUnsubAck> a10 = iVar.a();
        ImmutableList<Mqtt5UnsubAckReasonCode> reasonCodes = mqttUnsubAck.getReasonCodes();
        boolean z10 = iVar.f3050b.getTopicFilters().size() != reasonCodes.size();
        boolean allErrors = MqttCommonReasonCode.allErrors(mqttUnsubAck.getReasonCodes());
        if (reasonCodes == Mqtt3UnsubAckView.REASON_CODES_ALL_SUCCESS || !(z10 || allErrors)) {
            this.incomingPublishFlows.unsubscribe(iVar.f3050b, reasonCodes);
            if (a10.isCancelled()) {
                Log.w(TAG, "Unsubscribe was successful but the UnsubAck flow has been cancelled");
            } else {
                a10.onSuccess(mqttUnsubAck);
            }
        } else {
            String str = z10 ? "Count of Reason Codes in UNSUBACK does not match count of Topic Filters in UNSUBSCRIBE" : "UNSUBACK contains only Error Codes";
            if (a10.isCancelled()) {
                Log.w(TAG, str.concat(" but the UnsubAck flow has been cancelled"));
            } else {
                a10.onError(new Mqtt5UnsubAckException(mqttUnsubAck, str));
            }
        }
        completePending(iVar);
    }

    private void writeSubscribe(@l ChannelHandlerContext channelHandlerContext, @l c cVar) {
        MqttStatefulSubscribe createStateful = cVar.f3038b.createStateful(cVar.f3037a, this.subscriptionIdentifiersAvailable ? cVar.f3039c : -1);
        this.currentPending = cVar;
        channelHandlerContext.write(createStateful, channelHandlerContext.voidPromise());
        this.currentPending = null;
    }

    private void writeUnsubscribe(@l ChannelHandlerContext channelHandlerContext, @l i iVar) {
        MqttStatefulUnsubscribe createStateful = iVar.f3050b.createStateful(iVar.f3037a);
        this.currentPending = iVar;
        channelHandlerContext.write(createStateful, channelHandlerContext.voidPromise());
        this.currentPending = null;
    }

    @Override // io.netty.channel.ChannelInboundHandlerAdapter, io.netty.channel.ChannelInboundHandler
    public void channelRead(@l ChannelHandlerContext channelHandlerContext, @l Object obj) {
        if (obj instanceof MqttSubAck) {
            readSubAck(channelHandlerContext, (MqttSubAck) obj);
        } else if (obj instanceof MqttUnsubAck) {
            readUnsubAck(channelHandlerContext, (MqttUnsubAck) obj);
        } else {
            channelHandlerContext.fireChannelRead(obj);
        }
    }

    @Override // io.netty.channel.ChannelInboundHandlerAdapter, io.netty.channel.ChannelHandlerAdapter, io.netty.channel.ChannelHandler, io.netty.channel.ChannelInboundHandler
    public void exceptionCaught(@l ChannelHandlerContext channelHandlerContext, @l Throwable th) {
        b bVar;
        if ((th instanceof IOException) || (bVar = this.currentPending) == null) {
            channelHandlerContext.fireExceptionCaught(th);
            return;
        }
        this.pending.remove(bVar);
        this.packetIdentifiers.returnId(this.currentPending.f3037a);
        this.pendingIndex.remove(this.currentPending.f3037a);
        MqttSubscriptionFlow<?> a10 = this.currentPending.a();
        if (a10 != null) {
            a10.onError(th);
        }
        b bVar2 = this.currentPending;
        if (bVar2 instanceof c) {
            c cVar = (c) bVar2;
            this.incomingPublishFlows.subAck(cVar.f3038b, cVar.f3039c, ImmutableList.of(Mqtt5SubAckReasonCode.UNSPECIFIED_ERROR));
        }
        this.currentPending = null;
    }

    @Override // com.sanjiang.vantrue.internal.mqtt.handler.MqttSessionAwareHandler
    public void onSessionEnd(@l Throwable th) {
        super.onSessionEnd(th);
        this.pendingIndex.clear();
        this.sendPending = null;
        for (b first = this.pending.getFirst(); first != null; first = first.getNext()) {
            int i10 = first.f3037a;
            if (i10 == 0) {
                break;
            }
            this.packetIdentifiers.returnId(i10);
            first.f3037a = 0;
        }
        if (!this.clientConfig.isResubscribeIfSessionExpired() || this.clientConfig.getState() == MqttClientState.DISCONNECTED) {
            this.incomingPublishFlows.clear(th);
            for (b first2 = this.pending.getFirst(); first2 != null; first2 = first2.getNext()) {
                MqttSubscriptionFlow<?> a10 = first2.a();
                if (a10 != null) {
                    a10.onError(th);
                }
            }
            this.pending.clear();
            this.nextSubscriptionIdentifier = 1;
        }
    }

    @Override // com.sanjiang.vantrue.internal.mqtt.handler.MqttSessionAwareHandler
    public void onSessionStartOrResume(@l MqttClientConnectionConfig mqttClientConnectionConfig, @l EventLoop eventLoop) {
        this.subscriptionIdentifiersAvailable = mqttClientConnectionConfig.areSubscriptionIdentifiersAvailable();
        if (!this.hasSession) {
            this.incomingPublishFlows.getSubscriptions().forEach(new BiConsumer() { // from class: b2.g
                @Override // java.util.function.BiConsumer
                public final void accept(Object obj, Object obj2) {
                    MqttSubscriptionHandler.this.lambda$onSessionStartOrResume$1((Integer) obj, (List) obj2);
                }
            });
        }
        this.pendingIndex.clear();
        b first = this.pending.getFirst();
        this.sendPending = first;
        if (first != null) {
            eventLoop.execute(this);
        }
        super.onSessionStartOrResume(mqttClientConnectionConfig, eventLoop);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r1v0, types: [b2.b] */
    /* JADX WARN: Type inference failed for: r1v3, types: [b2.b] */
    @Override // java.lang.Runnable
    @CallByThread("Netty EventLoop")
    public void run() {
        ChannelHandlerContext channelHandlerContext = this.ctx;
        if (channelHandlerContext == null) {
            return;
        }
        int i10 = 0;
        i iVar = this.sendPending;
        while (iVar != null && this.pendingIndex.size() < 10) {
            if (iVar.f3037a == 0) {
                int id = this.packetIdentifiers.getId();
                if (id == -1) {
                    Log.e(TAG, "No Packet Identifier available for (UN)SUBSCRIBE. This must not happen and is a bug.");
                    return;
                }
                iVar.f3037a = id;
            }
            this.pendingIndex.put(iVar);
            if (this.sendPending instanceof c) {
                writeSubscribe(channelHandlerContext, iVar);
            } else {
                writeUnsubscribe(channelHandlerContext, iVar);
            }
            i10++;
            b next = iVar.getNext();
            this.sendPending = next;
            iVar = next;
        }
        if (i10 > 0) {
            channelHandlerContext.flush();
        }
    }

    public void subscribe(@l final MqttSubscribe mqttSubscribe, @l final MqttSubscriptionFlow<MqttSubAck> mqttSubscriptionFlow) {
        mqttSubscriptionFlow.getEventLoop().execute(new Runnable() { // from class: b2.f
            @Override // java.lang.Runnable
            public final void run() {
                MqttSubscriptionHandler.this.lambda$subscribe$2(mqttSubscriptionFlow, mqttSubscribe);
            }
        });
    }

    public void subscribeGlobal(@l final MqttGlobalIncomingPublishFlow mqttGlobalIncomingPublishFlow) {
        mqttGlobalIncomingPublishFlow.getEventLoop().execute(new Runnable() { // from class: b2.e
            @Override // java.lang.Runnable
            public final void run() {
                MqttSubscriptionHandler.this.lambda$subscribeGlobal$4(mqttGlobalIncomingPublishFlow);
            }
        });
    }

    public void unsubscribe(@l final MqttUnsubscribe mqttUnsubscribe, @l final a<MqttUnsubAck> aVar) {
        aVar.getEventLoop().execute(new Runnable() { // from class: b2.d
            @Override // java.lang.Runnable
            public final void run() {
                MqttSubscriptionHandler.this.lambda$unsubscribe$3(aVar, mqttUnsubscribe);
            }
        });
    }
}
