package com.systematic.sitaware.tactical.comms.videoserver.internal.channeljoin;

import com.systematic.sitaware.tactical.comms.videoserver.internal.feedcontext.StreamStatus;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.DatagramChannel;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.WritableByteChannel;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/systematic/sitaware/tactical/comms/videoserver/internal/channeljoin/ChannelJoiner.class */
public class ChannelJoiner {
    private static final Logger logger = LoggerFactory.getLogger(ChannelJoiner.class);
    private final ExecutorService threadPool;
    private final Queue<Registration> registerQueue = new ConcurrentLinkedQueue();
    private boolean shutdown = false;
    private final Selector selector = Selector.open();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/systematic/sitaware/tactical/comms/videoserver/internal/channeljoin/ChannelJoiner$Registration.class */
    public class Registration {
        private final SelectableChannel channel;
        private final int interestOps;
        private final JoinedChannels attachment;

        Registration(SelectableChannel selectableChannel, int i, JoinedChannels joinedChannels) {
            this.channel = selectableChannel;
            this.interestOps = i;
            this.attachment = joinedChannels;
        }
    }

    public ChannelJoiner(ExecutorService executorService) throws IOException {
        this.threadPool = executorService;
    }

    public void start() {
        this.threadPool.execute(this::doSelect);
    }

    public void shutdown() {
        this.shutdown = true;
    }

    public JoinedChannels join(SelectableChannel selectableChannel, SelectableChannel selectableChannel2, Consumer<StreamStatus> consumer) {
        JoinedChannels joinedChannels = new JoinedChannels(selectableChannel, selectableChannel2, consumer);
        try {
            selectableChannel.configureBlocking(false);
            selectableChannel2.configureBlocking(false);
            registerJoinedChannelRead(joinedChannels);
            return joinedChannels;
        } catch (Exception e) {
            logger.debug("Failed to configure feed for Non-blocking I/O: ", e);
            joinedChannels.performStatusCallback(StreamStatus.ERROR);
            return joinedChannels;
        }
    }

    public JoinedChannels join(SelectableChannel selectableChannel, WritableByteChannel writableByteChannel, Consumer<StreamStatus> consumer) {
        JoinedChannels joinedChannels = new JoinedChannels(selectableChannel, writableByteChannel, consumer);
        try {
            selectableChannel.configureBlocking(false);
            registerJoinedChannelRead(joinedChannels);
            return joinedChannels;
        } catch (Exception e) {
            logger.debug("Failed to configure feed for Non-blocking I/O", e);
            joinedChannels.performStatusCallback(StreamStatus.ERROR);
            return joinedChannels;
        }
    }

    public void unjoin(JoinedChannels joinedChannels) {
        Optional.ofNullable(joinedChannels.getSrcServer()).ifPresent(serverSocketChannel -> {
            this.registerQueue.add(new Registration(serverSocketChannel, 0, joinedChannels));
        });
        Optional.ofNullable(joinedChannels.getDstServer()).ifPresent(serverSocketChannel2 -> {
            this.registerQueue.add(new Registration(serverSocketChannel2, 0, joinedChannels));
        });
        Optional.ofNullable(joinedChannels.getSrcClientSelectable()).ifPresent(selectableChannel -> {
            this.registerQueue.add(new Registration(selectableChannel, 0, joinedChannels));
        });
        Optional.ofNullable(joinedChannels.getDstClientSelectable()).ifPresent(selectableChannel2 -> {
            this.registerQueue.add(new Registration(selectableChannel2, 0, joinedChannels));
        });
        this.selector.wakeup();
    }

    private void registerJoinedChannelRead(JoinedChannels joinedChannels) {
        SelectableChannel srcClientSelectable;
        int i = 16;
        if (!joinedChannels.dstClientConnected()) {
            srcClientSelectable = joinedChannels.getDstServer();
        } else if (joinedChannels.srcClientConnected()) {
            i = 1;
            srcClientSelectable = joinedChannels.getSrcClientSelectable();
        } else {
            srcClientSelectable = joinedChannels.getSrcServer();
        }
        registerChannel(srcClientSelectable, i, joinedChannels);
    }

    private void doSelect() {
        while (!this.shutdown) {
            try {
                this.selector.select();
            } catch (Exception e) {
                logger.debug("Caught exception while selecting", e);
            }
            processRegistrationQueue();
            Iterator<SelectionKey> it = this.selector.selectedKeys().iterator();
            while (it.hasNext()) {
                SelectionKey next = it.next();
                it.remove();
                if (next.attachment() instanceof JoinedChannels) {
                    JoinedChannels joinedChannels = (JoinedChannels) next.attachment();
                    if (next.isValid() && !joinedChannels.isCurrentlyHandled()) {
                        joinedChannels.setCurrentlyHandled(true);
                        int readyOps = next.readyOps();
                        this.threadPool.execute(this::doSelect);
                        handleEvent(next, readyOps, joinedChannels);
                        joinedChannels.setCurrentlyHandled(false);
                        return;
                    }
                } else {
                    logger.error("Faulty selector logic. JoinedChannels not attached to SelectorKey");
                    deregisterChannel(next.channel());
                }
            }
        }
    }

    private void processRegistrationQueue() {
        HashSet hashSet = null;
        while (!this.registerQueue.isEmpty()) {
            Registration remove = this.registerQueue.remove();
            try {
                if (remove.interestOps == 0) {
                    SelectionKey keyFor = remove.channel.keyFor(this.selector);
                    if (keyFor != null) {
                        if (hashSet == null) {
                            hashSet = new HashSet();
                        }
                        hashSet.add(keyFor);
                    }
                } else {
                    processNewRegistration(hashSet, remove);
                }
            } catch (Exception e) {
                logger.debug("Caught exception while registering for event with selector", e);
                onError(remove.attachment);
            }
        }
        processCancelledRegistrations(hashSet);
    }

    private void processNewRegistration(Set<SelectionKey> set, Registration registration) throws ClosedChannelException {
        SelectionKey register = registration.channel.register(this.selector, registration.interestOps, registration.attachment);
        if (set != null) {
            set.remove(register);
        }
    }

    private void processCancelledRegistrations(Set<SelectionKey> set) {
        if (set != null) {
            Iterator<SelectionKey> it = set.iterator();
            while (it.hasNext()) {
                try {
                    it.next().cancel();
                } catch (Exception e) {
                    logger.debug("Caught exception while registering for event with selector", e);
                }
            }
        }
    }

    private void handleEvent(SelectionKey selectionKey, int i, JoinedChannels joinedChannels) {
        if ((i & 16) != 0) {
            handleAcceptEvent(selectionKey, joinedChannels);
        } else if ((i & 1) != 0) {
            handleReadEvent(selectionKey, joinedChannels);
        } else if ((i & 4) != 0) {
            handleWriteEvent(selectionKey, joinedChannels);
        }
    }

    private void handleAcceptEvent(SelectionKey selectionKey, JoinedChannels joinedChannels) {
        if (selectionKey.channel() instanceof ServerSocketChannel) {
            ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
            deregisterChannel(serverSocketChannel);
            try {
                SocketChannel accept = serverSocketChannel.accept();
                accept.configureBlocking(false);
                if (serverSocketChannel == joinedChannels.getSrcServer()) {
                    joinedChannels.setSrcClient(accept);
                } else {
                    if (serverSocketChannel != joinedChannels.getDstServer()) {
                        logger.error("Faulty selector logic. Attachment not matching channel.");
                        return;
                    }
                    joinedChannels.setDstClient(accept);
                }
                registerJoinedChannelRead(joinedChannels);
                try {
                    serverSocketChannel.close();
                } catch (Exception e) {
                    logger.debug("Caught exception while closing server socket", e);
                }
            } catch (Exception e2) {
                logger.debug("Caught exception while accepting client", e2);
            }
        }
    }

    private void handleReadEvent(SelectionKey selectionKey, JoinedChannels joinedChannels) {
        if (!(selectionKey.channel() instanceof ReadableByteChannel)) {
            logger.error("Faulty selector logic. Read event on channel which is not ReadableByteChannel");
            deregisterChannel(selectionKey.channel());
            return;
        }
        readFromSourceChannelToBuffer(joinedChannels);
        joinedChannels.flipBufferToReadMode();
        writeFromBufferToDestinationChannel(joinedChannels);
        if (joinedChannels.getBuffer().remaining() == 0) {
            joinedChannels.clearBuffer();
        } else if (joinedChannels.getDstClientSelectable() == null) {
            joinedChannels.flipBufferToWriteMode();
        } else {
            deregisterChannel(selectionKey.channel());
            registerChannel(joinedChannels.getDstClientSelectable(), 4, joinedChannels);
        }
    }

    private void handleWriteEvent(SelectionKey selectionKey, JoinedChannels joinedChannels) {
        if (!(selectionKey.channel() instanceof WritableByteChannel)) {
            logger.error("Faulty selector logic. Write event on channel which is not WritableByteChannel");
            deregisterChannel(selectionKey.channel());
            return;
        }
        writeFromBufferToDestinationChannel(joinedChannels);
        if (joinedChannels.getBuffer().remaining() == 0) {
            deregisterChannel(selectionKey.channel());
            joinedChannels.clearBuffer();
            registerJoinedChannelRead(joinedChannels);
        }
    }

    private void writeFromBufferToDestinationChannel(JoinedChannels joinedChannels) {
        try {
            joinedChannels.getDstClient().write(joinedChannels.getBuffer());
        } catch (Exception e) {
            logger.debug("Caught exception while writing", e);
            onError(joinedChannels);
        }
    }

    private void readFromSourceChannelToBuffer(JoinedChannels joinedChannels) {
        int i = 1;
        while (i > 0) {
            try {
                if (!joinedChannels.getBuffer().hasRemaining()) {
                    break;
                } else if (joinedChannels.getSrcClient() instanceof DatagramChannel) {
                    i = ((DatagramChannel) joinedChannels.getSrcClient()).receive(joinedChannels.getBuffer()) == null ? 0 : 1;
                } else {
                    i = joinedChannels.getSrcClient().read(joinedChannels.getBuffer());
                }
            } catch (Exception e) {
                logger.debug("Caught exception while reading", e);
                onError(joinedChannels);
            }
        }
        if (i >= 0) {
            joinedChannels.performStatusCallback(StreamStatus.DATA_READ);
        } else {
            logger.debug("End-of-stream reached during read operation");
            onError(joinedChannels);
        }
    }

    private void onError(JoinedChannels joinedChannels) {
        unjoin(joinedChannels);
        joinedChannels.performStatusCallback(StreamStatus.ERROR);
    }

    private void deregisterChannel(SelectableChannel selectableChannel) {
        this.registerQueue.add(new Registration(selectableChannel, 0, null));
        this.selector.wakeup();
    }

    private void registerChannel(SelectableChannel selectableChannel, int i, JoinedChannels joinedChannels) {
        this.registerQueue.add(new Registration(selectableChannel, i, joinedChannels));
        this.selector.wakeup();
    }
}
