package com.systematic.sitaware.tactical.comms.service.fcs.proxy.internal.connection;

import com.systematic.sitaware.framework.utility.concurrent.ExecutorServiceFactory;
import com.systematic.sitaware.tactical.comms.service.fcs.proxy.internal.ErrorReporter;
import com.systematic.sitaware.tactical.comms.service.fcs.proxy.internal.model.messages.FcsMessageWriter;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/systematic/sitaware/tactical/comms/service/fcs/proxy/internal/connection/AbstractFcsConnection.class */
public abstract class AbstractFcsConnection implements FcsConnection {
    private static final Logger logger = LoggerFactory.getLogger(AbstractFcsConnection.class);
    private static final int STX = 2;
    private static final int ETX = 3;
    private static final int EOF = -1;
    private ErrorReporter errorReporter;
    private OutputStream outputStream;
    private InputStream inputStream;
    private Consumer<byte[]> messageReceiver;
    private final int maxLength;
    private volatile Future<?> receiveFuture;
    private final Semaphore sendSemaphore = new Semaphore(1);
    private OutputStream dummyOutputStream = new OutputStream() { // from class: com.systematic.sitaware.tactical.comms.service.fcs.proxy.internal.connection.AbstractFcsConnection.1
        @Override // java.io.OutputStream
        public void write(int i) throws IOException {
        }
    };
    private ExecutorService service = ExecutorServiceFactory.getDedicatedSingleThreadExecutor("fcs-msg-receiver");

    /* JADX INFO: Access modifiers changed from: package-private */
    public AbstractFcsConnection(int i) {
        this.maxLength = i;
        logger.debug("initialize");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void defineOutputStream(OutputStream outputStream) {
        this.outputStream = outputStream;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void undefineOutputStream() {
        this.outputStream = null;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void defineInputStream(InputStream inputStream) {
        this.inputStream = inputStream;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void undefineInputStream() {
        if (this.inputStream != null) {
            try {
                this.inputStream.close();
            } catch (IOException e) {
            }
        }
        this.inputStream = null;
    }

    public void setMessageReceiver(Consumer<byte[]> consumer) {
        this.messageReceiver = consumer;
    }

    @Override // com.systematic.sitaware.tactical.comms.service.fcs.proxy.internal.connection.FcsConnection
    public void connect() throws IOException {
        if (this.messageReceiver == null) {
            throw new IllegalStateException("messageReceiver must be set");
        }
        stopMessageReceiving();
        startMessageReceiving();
    }

    @Override // com.systematic.sitaware.tactical.comms.service.fcs.proxy.internal.connection.FcsConnection
    public void disconnect() {
        stopMessageReceiving();
    }

    private void stopMessageReceiving() {
        if (this.receiveFuture != null) {
            this.receiveFuture.cancel(true);
            this.receiveFuture = null;
        }
    }

    @Override // com.systematic.sitaware.tactical.comms.service.fcs.proxy.internal.connection.FcsConnection
    public void setErrorReporter(ErrorReporter errorReporter) {
        logger.debug("setErrorReporter");
        this.errorReporter = errorReporter;
    }

    @Override // com.systematic.sitaware.tactical.comms.service.fcs.proxy.internal.connection.FcsConnection
    public void send(FcsMessageWriter fcsMessageWriter) {
        logger.debug("send");
        sendAux(fcsMessageWriter, true);
    }

    public void startMessageReceiving() {
        this.receiveFuture = this.service.submit(() -> {
            while (true) {
                try {
                    this.messageReceiver.accept(receiveMessage());
                } catch (IOException e) {
                    this.errorReporter.disconnected("Failed to read");
                    return;
                }
            }
        });
    }

    @Override // com.systematic.sitaware.tactical.comms.service.fcs.proxy.internal.connection.FcsConnection
    public byte[] receiveMessage() throws IOException {
        try {
            receiveUntilTagOrEof(STX, this.dummyOutputStream);
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            receiveUntilTagOrEof(ETX, byteArrayOutputStream);
            return byteArrayOutputStream.toByteArray();
        } catch (Exception e) {
            reportError("IO error when receiving message.", e.getMessage());
            throw e;
        }
    }

    private void reportError(String str, String str2) {
        if (this.errorReporter != null) {
            this.errorReporter.reportError(true, str, str2);
        }
    }

    private void receiveUntilTagOrEof(int i, OutputStream outputStream) throws IOException {
        int i2;
        InputStream inputStream = this.inputStream;
        if (inputStream == null) {
            throw new IOException("input stream null.");
        }
        int i3 = 0;
        int read = inputStream.read();
        while (true) {
            i2 = read;
            if (i2 == i || i2 == EOF) {
                break;
            }
            int i4 = i3;
            i3++;
            if (i4 > this.maxLength) {
                throw new IOException("can not find tag " + i + " in " + this.maxLength + " bytes.");
            }
            outputStream.write(i2);
            read = inputStream.read();
        }
        if (i2 == EOF) {
            throw new IOException("can not find tag " + i + " before eof.");
        }
    }

    @Override // com.systematic.sitaware.tactical.comms.service.fcs.proxy.internal.connection.FcsConnection
    public boolean sendNoWait(FcsMessageWriter fcsMessageWriter) {
        logger.debug("sendNoWait");
        return sendAux(fcsMessageWriter, false);
    }

    private boolean sendAux(FcsMessageWriter fcsMessageWriter, boolean z) {
        try {
            if (z) {
                this.sendSemaphore.acquire();
            } else if (!this.sendSemaphore.tryAcquire(100L, TimeUnit.MILLISECONDS)) {
                return false;
            }
            try {
                try {
                    this.outputStream.write(STX);
                    fcsMessageWriter.write(this.outputStream);
                    this.outputStream.write(ETX);
                    this.outputStream.flush();
                    this.sendSemaphore.release();
                    return true;
                } catch (Throwable th) {
                    this.sendSemaphore.release();
                    throw th;
                }
            } catch (Exception e) {
                reportError("IO error when sending message.", e.getMessage());
                this.sendSemaphore.release();
                return false;
            }
        } catch (InterruptedException e2) {
            logger.info("Interrupted send: ", e2);
            return false;
        }
    }
}
