package io.moquette.broker;

import io.moquette.broker.Session;
import io.moquette.broker.subscriptions.ISubscriptionsDirectory;
import io.moquette.broker.subscriptions.Subscription;
import io.moquette.broker.subscriptions.Topic;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: classes.dex */
public class SessionRegistry {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) SessionRegistry.class);
    private final IQueueRepository queueRepository;
    private final ISubscriptionsDirectory subscriptionsDirectory;
    private final ConcurrentMap<String, Session> pool = new ConcurrentHashMap();
    private final ConcurrentMap<String, Queue<EnqueuedMessage>> queues = new ConcurrentHashMap();

    /* loaded from: classes.dex */
    public static abstract class EnqueuedMessage {
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: classes.dex */
    public enum PostConnectAction {
        NONE,
        SEND_STORED_MESSAGES;

        /* renamed from: values, reason: to resolve conflict with enum method */
        public static PostConnectAction[] valuesCustom() {
            PostConnectAction[] valuesCustom = values();
            int length = valuesCustom.length;
            PostConnectAction[] postConnectActionArr = new PostConnectAction[length];
            System.arraycopy(valuesCustom, 0, postConnectActionArr, 0, length);
            return postConnectActionArr;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: classes.dex */
    public static final class PubRelMarker extends EnqueuedMessage {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: classes.dex */
    public static class PublishedMessage extends EnqueuedMessage {
        final ByteBuf payload;
        final MqttQoS publishingQos;
        final Topic topic;

        /* JADX INFO: Access modifiers changed from: package-private */
        public PublishedMessage(Topic topic, MqttQoS mqttQoS, ByteBuf byteBuf) {
            this.topic = topic;
            this.publishingQos = mqttQoS;
            this.payload = byteBuf;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SessionRegistry(ISubscriptionsDirectory iSubscriptionsDirectory, IQueueRepository iQueueRepository) {
        this.subscriptionsDirectory = iSubscriptionsDirectory;
        this.queueRepository = iQueueRepository;
    }

    private PostConnectAction bindToExistingSession(MQTTConnection mQTTConnection, MqttConnectMessage mqttConnectMessage, String str, Session session) {
        PostConnectAction postConnectAction = PostConnectAction.NONE;
        boolean isCleanSession = mqttConnectMessage.variableHeader().isCleanSession();
        Session session2 = this.pool.get(str);
        if (isCleanSession && session2.disconnected()) {
            dropQueuesForClient(str);
            unsubscribe(session2);
            if (!session2.assignState(Session.SessionStatus.DISCONNECTED, Session.SessionStatus.CONNECTING)) {
                throw new SessionCorruptedException("old session was already changed state");
            }
            copySessionConfig(mqttConnectMessage, session2);
            session2.bind(mQTTConnection);
            if (!session2.assignState(Session.SessionStatus.CONNECTING, Session.SessionStatus.CONNECTED)) {
                throw new SessionCorruptedException("old session moved in connected state by other thread");
            }
            if (!this.pool.replace(str, session2, session2)) {
                throw new SessionCorruptedException("old session was already removed");
            }
            LOG.trace("case 2, oldSession with same CId {} disconnected", str);
            return postConnectAction;
        }
        if (isCleanSession || !session2.disconnected()) {
            if (!session2.connected()) {
                return postConnectAction;
            }
            LOG.trace("case 4, oldSession with same CId {} still connected, force to close", str);
            session2.closeImmediately();
            if (this.pool.replace(str, session2, session)) {
                return postConnectAction;
            }
            throw new SessionCorruptedException("old session was already removed");
        }
        reactivateSubscriptions(session2);
        if (!session2.assignState(Session.SessionStatus.DISCONNECTED, Session.SessionStatus.CONNECTING)) {
            throw new SessionCorruptedException("old session moved in connected state by other thread");
        }
        session2.bind(mQTTConnection);
        if (!session2.assignState(Session.SessionStatus.CONNECTING, Session.SessionStatus.CONNECTED)) {
            throw new SessionCorruptedException("old session moved in other state state by other thread");
        }
        if (!this.pool.replace(str, session2, session2)) {
            throw new SessionCorruptedException("old session was already removed");
        }
        PostConnectAction postConnectAction2 = PostConnectAction.SEND_STORED_MESSAGES;
        LOG.trace("case 3, oldSession with same CId {} disconnected", str);
        return postConnectAction2;
    }

    private void copySessionConfig(MqttConnectMessage mqttConnectMessage, Session session) {
        session.update(mqttConnectMessage.variableHeader().isCleanSession(), mqttConnectMessage.variableHeader().isWillFlag() ? createWill(mqttConnectMessage) : null);
    }

    private Session createNewSession(MQTTConnection mQTTConnection, MqttConnectMessage mqttConnectMessage, String str) {
        final boolean isCleanSession = mqttConnectMessage.variableHeader().isCleanSession();
        Queue<EnqueuedMessage> computeIfAbsent = this.queues.computeIfAbsent(str, new Function() { // from class: io.moquette.broker.-$$Lambda$SessionRegistry$5uy_Co-7_n1RHdLmaAKT0yUoo50
            @Override // java.util.function.Function
            public final Object apply(Object obj) {
                return SessionRegistry.this.lambda$0$SessionRegistry(isCleanSession, (String) obj);
            }
        });
        Session session = mqttConnectMessage.variableHeader().isWillFlag() ? new Session(str, isCleanSession, createWill(mqttConnectMessage), computeIfAbsent) : new Session(isCleanSession, str, computeIfAbsent);
        session.markConnected();
        session.bind(mQTTConnection);
        return session;
    }

    private Session.Will createWill(MqttConnectMessage mqttConnectMessage) {
        ByteBuf copiedBuffer = Unpooled.copiedBuffer(mqttConnectMessage.payload().willMessageInBytes());
        return new Session.Will(mqttConnectMessage.payload().willTopic(), copiedBuffer, MqttQoS.valueOf(mqttConnectMessage.variableHeader().willQos()), mqttConnectMessage.variableHeader().isWillRetain());
    }

    private void dropQueuesForClient(String str) {
        this.queues.remove(str);
    }

    private void reactivateSubscriptions(Session session) {
        for (Subscription subscription : session.getSubscriptions()) {
        }
    }

    private void unsubscribe(Session session) {
        Iterator<Subscription> it = session.getSubscriptions().iterator();
        while (it.hasNext()) {
            this.subscriptionsDirectory.removeSubscription(it.next().getTopicFilter(), session.getClientID());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Removed duplicated region for block: B:14:0x004a  */
    /* JADX WARN: Removed duplicated region for block: B:17:? A[RETURN, SYNTHETIC] */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void bindToSession(io.moquette.broker.MQTTConnection r6, io.netty.handler.codec.mqtt.MqttConnectMessage r7, java.lang.String r8) {
        /*
            r5 = this;
            io.moquette.broker.SessionRegistry$PostConnectAction r0 = io.moquette.broker.SessionRegistry.PostConnectAction.NONE
            java.util.concurrent.ConcurrentMap<java.lang.String, io.moquette.broker.Session> r1 = r5.pool
            boolean r1 = r1.containsKey(r8)
            r2 = 0
            r3 = 1
            if (r1 != 0) goto L2d
            io.moquette.broker.Session r1 = r5.createNewSession(r6, r7, r8)
            java.util.concurrent.ConcurrentMap<java.lang.String, io.moquette.broker.Session> r4 = r5.pool
            java.lang.Object r4 = r4.putIfAbsent(r8, r1)
            io.moquette.broker.Session r4 = (io.moquette.broker.Session) r4
            if (r4 != 0) goto L1c
            r4 = 1
            goto L1d
        L1c:
            r4 = 0
        L1d:
            if (r4 == 0) goto L28
            org.slf4j.Logger r1 = io.moquette.broker.SessionRegistry.LOG
            java.lang.String r4 = "case 1, not existing session with CId {}"
            r1.trace(r4, r8)
            r1 = 0
            goto L36
        L28:
            io.moquette.broker.SessionRegistry$PostConnectAction r0 = r5.bindToExistingSession(r6, r7, r8, r1)
            goto L35
        L2d:
            io.moquette.broker.Session r0 = r5.createNewSession(r6, r7, r8)
            io.moquette.broker.SessionRegistry$PostConnectAction r0 = r5.bindToExistingSession(r6, r7, r8, r0)
        L35:
            r1 = 1
        L36:
            io.netty.handler.codec.mqtt.MqttConnectVariableHeader r7 = r7.variableHeader()
            boolean r7 = r7.isCleanSession()
            if (r7 != 0) goto L43
            if (r1 == 0) goto L43
            r2 = 1
        L43:
            r6.sendConnAck(r2)
            io.moquette.broker.SessionRegistry$PostConnectAction r6 = io.moquette.broker.SessionRegistry.PostConnectAction.SEND_STORED_MESSAGES
            if (r0 != r6) goto L55
            java.util.concurrent.ConcurrentMap<java.lang.String, io.moquette.broker.Session> r6 = r5.pool
            java.lang.Object r6 = r6.get(r8)
            io.moquette.broker.Session r6 = (io.moquette.broker.Session) r6
            r6.sendQueuedMessagesWhileOffline()
        L55:
            return
        */
        throw new UnsupportedOperationException("Method not decompiled: io.moquette.broker.SessionRegistry.bindToSession(io.moquette.broker.MQTTConnection, io.netty.handler.codec.mqtt.MqttConnectMessage, java.lang.String):void");
    }

    public void disconnect(String str) {
        Session retrieve = retrieve(str);
        if (retrieve == null) {
            LOG.debug("Some other thread already removed the session CId={}", str);
        } else {
            retrieve.disconnect();
        }
    }

    public /* synthetic */ Queue lambda$0$SessionRegistry(boolean z, String str) {
        return this.queueRepository.createQueue(str, z);
    }

    public void remove(String str) {
        this.pool.remove(str);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Session retrieve(String str) {
        return this.pool.get(str);
    }
}
