001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.tcp;
018    
019    import java.io.DataInputStream;
020    import java.io.DataOutputStream;
021    import java.io.IOException;
022    import java.io.InterruptedIOException;
023    import java.net.InetAddress;
024    import java.net.InetSocketAddress;
025    import java.net.Socket;
026    import java.net.SocketException;
027    import java.net.SocketTimeoutException;
028    import java.net.URI;
029    import java.net.UnknownHostException;
030    import java.util.HashMap;
031    import java.util.Map;
032    import java.util.concurrent.CountDownLatch;
033    import java.util.concurrent.SynchronousQueue;
034    import java.util.concurrent.ThreadFactory;
035    import java.util.concurrent.ThreadPoolExecutor;
036    import java.util.concurrent.TimeUnit;
037    import java.util.concurrent.atomic.AtomicReference;
038    
039    import javax.net.SocketFactory;
040    
041    import org.apache.activemq.Service;
042    import org.apache.activemq.transport.Transport;
043    import org.apache.activemq.transport.TransportLoggerFactory;
044    import org.apache.activemq.transport.TransportThreadSupport;
045    import org.apache.activemq.util.IntrospectionSupport;
046    import org.apache.activemq.util.ServiceStopper;
047    import org.apache.activemq.wireformat.WireFormat;
048    import org.apache.commons.logging.Log;
049    import org.apache.commons.logging.LogFactory;
050    
051    /**
052     * An implementation of the {@link Transport} interface using raw tcp/ip
053     * 
054     * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications)
055     * @version $Revision$
056     */
057    public class TcpTransport extends TransportThreadSupport implements Transport, Service, Runnable {
058        private static final Log LOG = LogFactory.getLog(TcpTransport.class);
059        private static final ThreadPoolExecutor SOCKET_CLOSE;
060        protected final URI remoteLocation;
061        protected final URI localLocation;
062        protected final WireFormat wireFormat;
063    
064        protected int connectionTimeout = 30000;
065        protected int soTimeout;
066        protected int socketBufferSize = 64 * 1024;
067        protected int ioBufferSize = 8 * 1024;
068        protected boolean closeAsync=true;
069        protected Socket socket;
070        protected DataOutputStream dataOut;
071        protected DataInputStream dataIn;
072        protected TcpBufferedOutputStream buffOut = null;
073        /**
074         * trace=true -> the Transport stack where this TcpTransport
075         * object will be, will have a TransportLogger layer
076         * trace=false -> the Transport stack where this TcpTransport
077         * object will be, will NOT have a TransportLogger layer, and therefore
078         * will never be able to print logging messages.
079         * This parameter is most probably set in Connection or TransportConnector URIs.
080         */
081        protected boolean trace = false;
082        /**
083         * Name of the LogWriter implementation to use.
084         * Names are mapped to classes in the resources/META-INF/services/org/apache/activemq/transport/logwriters directory.
085         * This parameter is most probably set in Connection or TransportConnector URIs.
086         */
087        protected String logWriterName = TransportLoggerFactory.defaultLogWriterName;
088        /**
089         * Specifies if the TransportLogger will be manageable by JMX or not.
090         * Also, as long as there is at least 1 TransportLogger which is manageable,
091         * a TransportLoggerControl MBean will me created.
092         */
093        protected boolean dynamicManagement = false;
094        /**
095         * startLogging=true -> the TransportLogger object of the Transport stack
096         * will initially write messages to the log.
097         * startLogging=false -> the TransportLogger object of the Transport stack
098         * will initially NOT write messages to the log.
099         * This parameter only has an effect if trace == true.
100         * This parameter is most probably set in Connection or TransportConnector URIs.
101         */
102        protected boolean startLogging = true;
103        /**
104         * Specifies the port that will be used by the JMX server to manage
105         * the TransportLoggers.
106         * This should only be set in an URI by a client (producer or consumer) since
107         * a broker will already create a JMX server.
108         * It is useful for people who test a broker and clients in the same machine
109         * and want to control both via JMX; a different port will be needed.
110         */
111        protected int jmxPort = 1099;
112        protected boolean useLocalHost = true;
113        protected int minmumWireFormatVersion;
114        protected SocketFactory socketFactory;
115        protected final AtomicReference<CountDownLatch> stoppedLatch = new AtomicReference<CountDownLatch>();
116    
117        private Map<String, Object> socketOptions;
118        private Boolean keepAlive;
119        private Boolean tcpNoDelay;
120        private Thread runnerThread;
121    
122        /**
123         * Connect to a remote Node - e.g. a Broker
124         * 
125         * @param wireFormat
126         * @param socketFactory
127         * @param remoteLocation
128         * @param localLocation - e.g. local InetAddress and local port
129         * @throws IOException
130         * @throws UnknownHostException
131         */
132        public TcpTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation,
133                            URI localLocation) throws UnknownHostException, IOException {
134            this.wireFormat = wireFormat;
135            this.socketFactory = socketFactory;
136            try {
137                this.socket = socketFactory.createSocket();
138            } catch (SocketException e) {
139                this.socket = null;
140            }
141            this.remoteLocation = remoteLocation;
142            this.localLocation = localLocation;
143            setDaemon(false);
144        }
145    
146        /**
147         * Initialize from a server Socket
148         * 
149         * @param wireFormat
150         * @param socket
151         * @throws IOException
152         */
153        public TcpTransport(WireFormat wireFormat, Socket socket) throws IOException {
154            this.wireFormat = wireFormat;
155            this.socket = socket;
156            this.remoteLocation = null;
157            this.localLocation = null;
158            setDaemon(true);
159        }
160    
161        /**
162         * A one way asynchronous send
163         */
164        public void oneway(Object command) throws IOException {
165            checkStarted();
166            wireFormat.marshal(command, dataOut);
167            dataOut.flush();
168        }
169    
170        /**
171         * @return pretty print of 'this'
172         */
173        public String toString() {
174            return "tcp://" + socket.getInetAddress() + ":" + socket.getPort();
175        }
176    
177        /**
178         * reads packets from a Socket
179         */
180        public void run() {
181            LOG.trace("TCP consumer thread for " + this + " starting");
182            this.runnerThread=Thread.currentThread();
183            try {
184                while (!isStopped()) {
185                    doRun();
186                }
187            } catch (IOException e) {
188                stoppedLatch.get().countDown();
189                onException(e);
190            } catch (Throwable e){
191                stoppedLatch.get().countDown();
192                IOException ioe=new IOException("Unexpected error occured");
193                ioe.initCause(e);
194                onException(ioe);
195            }finally {
196                stoppedLatch.get().countDown();
197            }
198        }
199    
200        protected void doRun() throws IOException {
201            try {
202                Object command = readCommand();
203                doConsume(command);
204            } catch (SocketTimeoutException e) {
205            } catch (InterruptedIOException e) {
206            }
207        }
208    
209        protected Object readCommand() throws IOException {
210            return wireFormat.unmarshal(dataIn);
211        }
212    
213        // Properties
214        // -------------------------------------------------------------------------
215    
216        public boolean isTrace() {
217            return trace;
218        }
219    
220        public void setTrace(boolean trace) {
221            this.trace = trace;
222        }
223        
224        public String getLogWriterName() {
225            return logWriterName;
226        }
227    
228        public void setLogWriterName(String logFormat) {
229            this.logWriterName = logFormat;
230        }
231    
232        public boolean isDynamicManagement() {
233            return dynamicManagement;
234        }
235    
236        public void setDynamicManagement(boolean useJmx) {
237            this.dynamicManagement = useJmx;
238        }
239    
240        public boolean isStartLogging() {
241            return startLogging;
242        }
243    
244        public void setStartLogging(boolean startLogging) {
245            this.startLogging = startLogging;
246        }
247    
248        public int getJmxPort() {
249            return jmxPort;
250        }
251    
252        public void setJmxPort(int jmxPort) {
253            this.jmxPort = jmxPort;
254        }
255        
256        public int getMinmumWireFormatVersion() {
257            return minmumWireFormatVersion;
258        }
259    
260        public void setMinmumWireFormatVersion(int minmumWireFormatVersion) {
261            this.minmumWireFormatVersion = minmumWireFormatVersion;
262        }
263    
264        public boolean isUseLocalHost() {
265            return useLocalHost;
266        }
267    
268        /**
269         * Sets whether 'localhost' or the actual local host name should be used to
270         * make local connections. On some operating systems such as Macs its not
271         * possible to connect as the local host name so localhost is better.
272         */
273        public void setUseLocalHost(boolean useLocalHost) {
274            this.useLocalHost = useLocalHost;
275        }
276    
277        public int getSocketBufferSize() {
278            return socketBufferSize;
279        }
280    
281        /**
282         * Sets the buffer size to use on the socket
283         */
284        public void setSocketBufferSize(int socketBufferSize) {
285            this.socketBufferSize = socketBufferSize;
286        }
287    
288        public int getSoTimeout() {
289            return soTimeout;
290        }
291    
292        /**
293         * Sets the socket timeout
294         */
295        public void setSoTimeout(int soTimeout) {
296            this.soTimeout = soTimeout;
297        }
298    
299        public int getConnectionTimeout() {
300            return connectionTimeout;
301        }
302    
303        /**
304         * Sets the timeout used to connect to the socket
305         */
306        public void setConnectionTimeout(int connectionTimeout) {
307            this.connectionTimeout = connectionTimeout;
308        }
309    
310        public Boolean getKeepAlive() {
311            return keepAlive;
312        }
313    
314        /**
315         * Enable/disable TCP KEEP_ALIVE mode
316         */
317        public void setKeepAlive(Boolean keepAlive) {
318            this.keepAlive = keepAlive;
319        }
320    
321        public Boolean getTcpNoDelay() {
322            return tcpNoDelay;
323        }
324    
325        /**
326         * Enable/disable the TCP_NODELAY option on the socket
327         */
328        public void setTcpNoDelay(Boolean tcpNoDelay) {
329            this.tcpNoDelay = tcpNoDelay;
330        }
331    
332        /**
333         * @return the ioBufferSize
334         */
335        public int getIoBufferSize() {
336            return this.ioBufferSize;
337        }
338    
339        /**
340         * @param ioBufferSize the ioBufferSize to set
341         */
342        public void setIoBufferSize(int ioBufferSize) {
343            this.ioBufferSize = ioBufferSize;
344        }
345        
346        /**
347         * @return the closeAsync
348         */
349        public boolean isCloseAsync() {
350            return closeAsync;
351        }
352    
353        /**
354         * @param closeAsync the closeAsync to set
355         */
356        public void setCloseAsync(boolean closeAsync) {
357            this.closeAsync = closeAsync;
358        }
359    
360        // Implementation methods
361        // -------------------------------------------------------------------------
362        protected String resolveHostName(String host) throws UnknownHostException {
363            String localName = InetAddress.getLocalHost().getHostName();
364            if (localName != null && isUseLocalHost()) {
365                if (localName.equals(host)) {
366                    return "localhost";
367                }
368            }
369            return host;
370        }
371    
372        /**
373         * Configures the socket for use
374         * 
375         * @param sock
376         * @throws SocketException
377         */
378        protected void initialiseSocket(Socket sock) throws SocketException {
379            if (socketOptions != null) {
380                IntrospectionSupport.setProperties(socket, socketOptions);
381            }
382    
383            try {
384                sock.setReceiveBufferSize(socketBufferSize);
385                sock.setSendBufferSize(socketBufferSize);
386            } catch (SocketException se) {
387                LOG.warn("Cannot set socket buffer size = " + socketBufferSize);
388                LOG.debug("Cannot set socket buffer size. Reason: " + se, se);
389            }
390            sock.setSoTimeout(soTimeout);
391    
392            if (keepAlive != null) {
393                sock.setKeepAlive(keepAlive.booleanValue());
394            }
395            if (tcpNoDelay != null) {
396                sock.setTcpNoDelay(tcpNoDelay.booleanValue());
397            }
398        }
399    
400        protected void doStart() throws Exception {
401            connect();
402            stoppedLatch.set(new CountDownLatch(1));
403            super.doStart();
404        }
405    
406        protected void connect() throws Exception {
407    
408            if (socket == null && socketFactory == null) {
409                throw new IllegalStateException("Cannot connect if the socket or socketFactory have not been set");
410            }
411    
412            InetSocketAddress localAddress = null;
413            InetSocketAddress remoteAddress = null;
414    
415            if (localLocation != null) {
416                localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()),
417                                                     localLocation.getPort());
418            }
419    
420            if (remoteLocation != null) {
421                String host = resolveHostName(remoteLocation.getHost());
422                remoteAddress = new InetSocketAddress(host, remoteLocation.getPort());
423            }
424    
425            if (socket != null) {
426    
427                if (localAddress != null) {
428                    socket.bind(localAddress);
429                }
430    
431                // If it's a server accepted socket.. we don't need to connect it
432                // to a remote address.
433                if (remoteAddress != null) {
434                    if (connectionTimeout >= 0) {
435                        socket.connect(remoteAddress, connectionTimeout);
436                    } else {
437                        socket.connect(remoteAddress);
438                    }
439                }
440    
441            } else {
442                // For SSL sockets.. you can't create an unconnected socket :(
443                // This means the timout option are not supported either.
444                if (localAddress != null) {
445                    socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort(),
446                                                        localAddress.getAddress(), localAddress.getPort());
447                } else {
448                    socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort());
449                }
450            }
451    
452            initialiseSocket(socket);
453            initializeStreams();
454        }
455    
456        protected void doStop(ServiceStopper stopper) throws Exception {
457            if (LOG.isDebugEnabled()) {
458                LOG.debug("Stopping transport " + this);
459            }
460    
461            // Closing the streams flush the sockets before closing.. if the socket
462            // is hung.. then this hangs the close.
463            // closeStreams();
464            if (socket != null) {
465                if (closeAsync) {
466                    //closing the socket can hang also 
467                    final CountDownLatch latch = new CountDownLatch(1);
468                    
469                    SOCKET_CLOSE.execute(new Runnable() {
470        
471                        public void run() {
472                            try {
473                                socket.close();
474                            } catch (IOException e) {
475                                LOG.debug("Caught exception closing socket",e);
476                            }finally {
477                                latch.countDown();
478                            }
479                        }
480                        
481                    });
482                    latch.await(1,TimeUnit.SECONDS);
483                }else {
484                    try {
485                        socket.close();
486                    } catch (IOException e) {
487                        LOG.debug("Caught exception closing socket",e);
488                    }
489                }
490               
491            }
492        }
493    
494        /**
495         * Override so that stop() blocks until the run thread is no longer running.
496         */
497        @Override
498        public void stop() throws Exception {
499            super.stop();
500            CountDownLatch countDownLatch = stoppedLatch.get();
501            if (countDownLatch != null && Thread.currentThread() != this.runnerThread) {
502                countDownLatch.await(1,TimeUnit.SECONDS);
503            }
504        }
505    
506        protected void initializeStreams() throws Exception {
507            TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), ioBufferSize);
508            this.dataIn = new DataInputStream(buffIn);
509            buffOut = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize);
510            this.dataOut = new DataOutputStream(buffOut);
511        }
512    
513        protected void closeStreams() throws IOException {
514            if (dataOut != null) {
515                dataOut.close();
516            }
517            if (dataIn != null) {
518                dataIn.close();
519            }
520        }
521    
522        public void setSocketOptions(Map<String, Object> socketOptions) {
523            this.socketOptions = new HashMap<String, Object>(socketOptions);
524        }
525    
526        public String getRemoteAddress() {
527            if (socket != null) {
528                return "" + socket.getRemoteSocketAddress();
529            }
530            return null;
531        }
532        
533        @Override
534        public <T> T narrow(Class<T> target) {
535            if (target == Socket.class) {
536                return target.cast(socket);
537            } else if ( target == TcpBufferedOutputStream.class) {
538                return target.cast(buffOut);
539            }
540            return super.narrow(target);
541        }
542        
543    
544        static {
545            SOCKET_CLOSE =   new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
546                public Thread newThread(Runnable runnable) {
547                    Thread thread = new Thread(runnable, "TcpSocketClose: "+runnable);
548                    thread.setPriority(Thread.MAX_PRIORITY);
549                    thread.setDaemon(true);
550                    return thread;
551                }
552            });
553        }
554    }