001package jmri.jmrix.dccpp.dccppovertcp;
002
003import java.io.BufferedReader;
004import java.io.IOException;
005import java.io.InputStreamReader;
006import java.io.OutputStream;
007import java.net.Socket;
008import java.util.LinkedList;
009import jmri.InstanceManager;
010import jmri.jmrix.dccpp.DCCppListener;
011import jmri.jmrix.dccpp.DCCppMessage;
012import jmri.jmrix.dccpp.DCCppReply;
013import jmri.jmrix.dccpp.DCCppSystemConnectionMemo;
014import org.slf4j.Logger;
015import org.slf4j.LoggerFactory;
016
017import javax.annotation.concurrent.GuardedBy;
018
019/**
020 * Implementation of the DCCppOverTcp LbServer Server Protocol.
021 *
022 * @author Alex Shepherd Copyright (C) 2006
023 * @author Mark Underwood Copyright (C) 2015
024 */
025public final class ClientRxHandler extends Thread implements DCCppListener {
026
027    Socket clientSocket;
028    BufferedReader inStream;
029    OutputStream outStream;
030    @GuardedBy ("replyQueue")
031    final LinkedList<DCCppReply> replyQueue = new LinkedList<>(); // Init before Rx and Tx
032
033    Thread txThread;
034    String inString;
035    String remoteAddress;
036    DCCppMessage lastSentMessage;
037    private static final String oldSendPrefix = "SEND"; // lack of space is correct for legacy code
038    private static final String oldReceivePrefix = "RECEIVE "; // presence of space is correct for legacy code
039    private static final String sendPrefix = "<";
040    private static final String oldServerVersionString = "VERSION JMRI Server "; // CAREFUL: Changing this could break backward compatibility
041    private static final String newServerVersionString = "VERSION DCC++ Server ";
042    boolean useOldPrefix = false;
043
044    public ClientRxHandler(String newRemoteAddress, Socket newSocket) {
045        clientSocket = newSocket;
046        setDaemon(true);
047        setPriority(Thread.MAX_PRIORITY);
048        remoteAddress = newRemoteAddress;
049        setName("ClientRxHandler:" + remoteAddress);
050        lastSentMessage = null;
051        start();
052    }
053
054    @Override
055    public void run() {
056
057        DCCppSystemConnectionMemo memo = InstanceManager.getDefault(DCCppSystemConnectionMemo.class);
058
059        try {
060            txThread = new Thread(new ClientTxHandler(this));
061            txThread.setDaemon(true);
062            txThread.setPriority(Thread.MAX_PRIORITY);
063            txThread.setName("ClientTxHandler:" + remoteAddress);
064
065            inStream = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
066            outStream = clientSocket.getOutputStream();
067
068            memo.getDCCppTrafficController().addDCCppListener(~0, this);
069
070            txThread.start();
071
072            while (!isInterrupted()) {
073                inString = inStream.readLine();
074                if (inString == null) {
075                    log.debug("ClientRxHandler: Remote Connection Closed");
076                    interrupt();
077                } else {
078                    log.debug("ClientRxHandler: Received: {}", inString);
079
080                    // Check for the old server version string.  If present,
081                    // append the old-style prefixes to transmissions.
082                    // Not sure this ever happens. Only the client sends
083                    // the version string.
084                    if (inString.startsWith(oldServerVersionString)) {
085                        useOldPrefix = true;
086                    }
087                    // Legacy support: If the old prefix is there, delete it.
088                    // Also, set the flag so we will start sending old-style
089                    // prefixes.
090                    if (inString.startsWith(oldSendPrefix)) {
091                        useOldPrefix = true;
092                        final int trim = oldSendPrefix.length();
093                        inString = inString.substring(trim);
094                        log.debug("Adapted String: {}", inString);
095                    }
096                    // Check for the opening bracket
097                    if (!inString.startsWith(sendPrefix)) {
098                        log.debug("Invalid packet format: {}", inString);
099                        continue;
100                    }
101
102                    DCCppMessage msg = new DCCppMessage(inString.substring(inString.indexOf('<') + 1,
103                            inString.lastIndexOf('>')));
104
105                    memo.getDCCppTrafficController().sendDCCppMessage(msg, null);
106                    // Keep the message we just sent so we can ACK it when we hear
107                    // the echo from the LocoBuffer
108                    lastSentMessage = msg;
109                }
110            }
111        } catch (IOException ex) {
112            log.debug("ClientRxHandler: IO Exception: ", ex);
113        }
114
115        memo.getDCCppTrafficController().removeDCCppListener(~0, this);
116        txThread.interrupt();
117
118        txThread = null;
119        inStream = null;
120        outStream = null;
121        synchronized (replyQueue) {
122            replyQueue.clear();
123        }
124
125        try {
126            clientSocket.close();
127        } catch (IOException ex1) {
128            log.trace("Exception while closing client socket",ex1);
129        }
130
131        InstanceManager.getDefault(Server.class).removeClient(this);
132        log.info("ClientRxHandler: Exiting");
133    }
134
135    public void close() {
136        try {
137            clientSocket.close();
138        } catch (IOException ex1) {
139            log.error("close, which closing clientSocket", ex1);
140        }
141    }
142
143    class ClientTxHandler implements Runnable {
144
145        DCCppReply msg;
146        StringBuilder outBuf;
147        Thread parentThread;
148
149        ClientTxHandler(Thread creator) {
150            parentThread = creator;
151        }
152
153        @Override
154        public void run() {
155
156            try {
157                outBuf = new StringBuilder(newServerVersionString);
158                outBuf.append(jmri.Version.name()).append("\r\n");
159                outStream.write(outBuf.toString().getBytes());
160
161                while (!isInterrupted()) {
162                    msg = null;
163
164                    synchronized (replyQueue) {
165                        if (replyQueue.isEmpty()) {
166                            replyQueue.wait();
167                        }
168
169                        if (!replyQueue.isEmpty()) {
170                            msg = replyQueue.removeFirst();
171                            log.debug("Prepping to send message: {}", msg);
172                        }
173                    }
174
175                    if (msg != null) {
176                        outBuf.setLength(0);
177                        if (useOldPrefix) {
178                            outBuf.append(oldReceivePrefix);
179                        }
180                        outBuf.append("<");
181                        outBuf.append(msg.toString());
182                        outBuf.append(">");
183                        log.debug("ClientTxHandler: Send: {}", outBuf);
184                        outBuf.append("\r\n");
185                        outStream.write(outBuf.toString().getBytes());
186                        outStream.flush();
187                    }
188                }
189            } catch (IOException ex) {
190                log.error("ClientTxHandler: IO Exception");
191            } catch (InterruptedException ex) {
192                Thread.currentThread().interrupt(); // retain if needed later
193                log.debug("ClientTxHandler: Interrupted Exception");
194            }
195            // Interrupt the Parent to let it know we are exiting for some reason
196            parentThread.interrupt();
197
198            parentThread = null;
199            msg = null;
200            outBuf = null;
201            log.info("ClientTxHandler: Exiting");
202        }
203    }
204
205    @Override
206    public void message(DCCppMessage msg) {
207        // no need to handle outgoing messages
208    }
209
210    @Override
211    public void message(DCCppReply msg) {
212        synchronized (replyQueue) {
213            replyQueue.add(msg);
214            replyQueue.notifyAll();
215        }
216        log.debug("Message added to queue: {}", msg);
217    }
218
219    @Override
220    public void notifyTimeout(DCCppMessage m) {
221        // ToDo : handle timeouts
222    }
223
224    private static final Logger log = LoggerFactory.getLogger(ClientRxHandler.class);
225
226}