001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transport.tcp;
018
019 import java.io.DataInputStream;
020 import java.io.DataOutputStream;
021 import java.io.IOException;
022 import java.io.InterruptedIOException;
023 import java.net.InetAddress;
024 import java.net.InetSocketAddress;
025 import java.net.Socket;
026 import java.net.SocketException;
027 import java.net.SocketTimeoutException;
028 import java.net.URI;
029 import java.net.UnknownHostException;
030 import java.util.HashMap;
031 import java.util.Map;
032 import java.util.concurrent.CountDownLatch;
033 import java.util.concurrent.SynchronousQueue;
034 import java.util.concurrent.ThreadFactory;
035 import java.util.concurrent.ThreadPoolExecutor;
036 import java.util.concurrent.TimeUnit;
037 import java.util.concurrent.atomic.AtomicReference;
038
039 import javax.net.SocketFactory;
040
041 import org.apache.activemq.Service;
042 import org.apache.activemq.transport.Transport;
043 import org.apache.activemq.transport.TransportLoggerFactory;
044 import org.apache.activemq.transport.TransportThreadSupport;
045 import org.apache.activemq.util.IntrospectionSupport;
046 import org.apache.activemq.util.ServiceStopper;
047 import org.apache.activemq.wireformat.WireFormat;
048 import org.apache.commons.logging.Log;
049 import org.apache.commons.logging.LogFactory;
050
051 /**
052 * An implementation of the {@link Transport} interface using raw tcp/ip
053 *
054 * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications)
055 * @version $Revision$
056 */
057 public class TcpTransport extends TransportThreadSupport implements Transport, Service, Runnable {
058 private static final Log LOG = LogFactory.getLog(TcpTransport.class);
059 private static final ThreadPoolExecutor SOCKET_CLOSE;
060 protected final URI remoteLocation;
061 protected final URI localLocation;
062 protected final WireFormat wireFormat;
063
064 protected int connectionTimeout = 30000;
065 protected int soTimeout;
066 protected int socketBufferSize = 64 * 1024;
067 protected int ioBufferSize = 8 * 1024;
068 protected boolean closeAsync=true;
069 protected Socket socket;
070 protected DataOutputStream dataOut;
071 protected DataInputStream dataIn;
072 protected TcpBufferedOutputStream buffOut = null;
073 /**
074 * trace=true -> the Transport stack where this TcpTransport
075 * object will be, will have a TransportLogger layer
076 * trace=false -> the Transport stack where this TcpTransport
077 * object will be, will NOT have a TransportLogger layer, and therefore
078 * will never be able to print logging messages.
079 * This parameter is most probably set in Connection or TransportConnector URIs.
080 */
081 protected boolean trace = false;
082 /**
083 * Name of the LogWriter implementation to use.
084 * Names are mapped to classes in the resources/META-INF/services/org/apache/activemq/transport/logwriters directory.
085 * This parameter is most probably set in Connection or TransportConnector URIs.
086 */
087 protected String logWriterName = TransportLoggerFactory.defaultLogWriterName;
088 /**
089 * Specifies if the TransportLogger will be manageable by JMX or not.
090 * Also, as long as there is at least 1 TransportLogger which is manageable,
091 * a TransportLoggerControl MBean will me created.
092 */
093 protected boolean dynamicManagement = false;
094 /**
095 * startLogging=true -> the TransportLogger object of the Transport stack
096 * will initially write messages to the log.
097 * startLogging=false -> the TransportLogger object of the Transport stack
098 * will initially NOT write messages to the log.
099 * This parameter only has an effect if trace == true.
100 * This parameter is most probably set in Connection or TransportConnector URIs.
101 */
102 protected boolean startLogging = true;
103 /**
104 * Specifies the port that will be used by the JMX server to manage
105 * the TransportLoggers.
106 * This should only be set in an URI by a client (producer or consumer) since
107 * a broker will already create a JMX server.
108 * It is useful for people who test a broker and clients in the same machine
109 * and want to control both via JMX; a different port will be needed.
110 */
111 protected int jmxPort = 1099;
112 protected boolean useLocalHost = true;
113 protected int minmumWireFormatVersion;
114 protected SocketFactory socketFactory;
115 protected final AtomicReference<CountDownLatch> stoppedLatch = new AtomicReference<CountDownLatch>();
116
117 private Map<String, Object> socketOptions;
118 private Boolean keepAlive;
119 private Boolean tcpNoDelay;
120 private Thread runnerThread;
121
122 /**
123 * Connect to a remote Node - e.g. a Broker
124 *
125 * @param wireFormat
126 * @param socketFactory
127 * @param remoteLocation
128 * @param localLocation - e.g. local InetAddress and local port
129 * @throws IOException
130 * @throws UnknownHostException
131 */
132 public TcpTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation,
133 URI localLocation) throws UnknownHostException, IOException {
134 this.wireFormat = wireFormat;
135 this.socketFactory = socketFactory;
136 try {
137 this.socket = socketFactory.createSocket();
138 } catch (SocketException e) {
139 this.socket = null;
140 }
141 this.remoteLocation = remoteLocation;
142 this.localLocation = localLocation;
143 setDaemon(false);
144 }
145
146 /**
147 * Initialize from a server Socket
148 *
149 * @param wireFormat
150 * @param socket
151 * @throws IOException
152 */
153 public TcpTransport(WireFormat wireFormat, Socket socket) throws IOException {
154 this.wireFormat = wireFormat;
155 this.socket = socket;
156 this.remoteLocation = null;
157 this.localLocation = null;
158 setDaemon(true);
159 }
160
161 /**
162 * A one way asynchronous send
163 */
164 public void oneway(Object command) throws IOException {
165 checkStarted();
166 wireFormat.marshal(command, dataOut);
167 dataOut.flush();
168 }
169
170 /**
171 * @return pretty print of 'this'
172 */
173 public String toString() {
174 return "tcp://" + socket.getInetAddress() + ":" + socket.getPort();
175 }
176
177 /**
178 * reads packets from a Socket
179 */
180 public void run() {
181 LOG.trace("TCP consumer thread for " + this + " starting");
182 this.runnerThread=Thread.currentThread();
183 try {
184 while (!isStopped()) {
185 doRun();
186 }
187 } catch (IOException e) {
188 stoppedLatch.get().countDown();
189 onException(e);
190 } catch (Throwable e){
191 stoppedLatch.get().countDown();
192 IOException ioe=new IOException("Unexpected error occured");
193 ioe.initCause(e);
194 onException(ioe);
195 }finally {
196 stoppedLatch.get().countDown();
197 }
198 }
199
200 protected void doRun() throws IOException {
201 try {
202 Object command = readCommand();
203 doConsume(command);
204 } catch (SocketTimeoutException e) {
205 } catch (InterruptedIOException e) {
206 }
207 }
208
209 protected Object readCommand() throws IOException {
210 return wireFormat.unmarshal(dataIn);
211 }
212
213 // Properties
214 // -------------------------------------------------------------------------
215
216 public boolean isTrace() {
217 return trace;
218 }
219
220 public void setTrace(boolean trace) {
221 this.trace = trace;
222 }
223
224 public String getLogWriterName() {
225 return logWriterName;
226 }
227
228 public void setLogWriterName(String logFormat) {
229 this.logWriterName = logFormat;
230 }
231
232 public boolean isDynamicManagement() {
233 return dynamicManagement;
234 }
235
236 public void setDynamicManagement(boolean useJmx) {
237 this.dynamicManagement = useJmx;
238 }
239
240 public boolean isStartLogging() {
241 return startLogging;
242 }
243
244 public void setStartLogging(boolean startLogging) {
245 this.startLogging = startLogging;
246 }
247
248 public int getJmxPort() {
249 return jmxPort;
250 }
251
252 public void setJmxPort(int jmxPort) {
253 this.jmxPort = jmxPort;
254 }
255
256 public int getMinmumWireFormatVersion() {
257 return minmumWireFormatVersion;
258 }
259
260 public void setMinmumWireFormatVersion(int minmumWireFormatVersion) {
261 this.minmumWireFormatVersion = minmumWireFormatVersion;
262 }
263
264 public boolean isUseLocalHost() {
265 return useLocalHost;
266 }
267
268 /**
269 * Sets whether 'localhost' or the actual local host name should be used to
270 * make local connections. On some operating systems such as Macs its not
271 * possible to connect as the local host name so localhost is better.
272 */
273 public void setUseLocalHost(boolean useLocalHost) {
274 this.useLocalHost = useLocalHost;
275 }
276
277 public int getSocketBufferSize() {
278 return socketBufferSize;
279 }
280
281 /**
282 * Sets the buffer size to use on the socket
283 */
284 public void setSocketBufferSize(int socketBufferSize) {
285 this.socketBufferSize = socketBufferSize;
286 }
287
288 public int getSoTimeout() {
289 return soTimeout;
290 }
291
292 /**
293 * Sets the socket timeout
294 */
295 public void setSoTimeout(int soTimeout) {
296 this.soTimeout = soTimeout;
297 }
298
299 public int getConnectionTimeout() {
300 return connectionTimeout;
301 }
302
303 /**
304 * Sets the timeout used to connect to the socket
305 */
306 public void setConnectionTimeout(int connectionTimeout) {
307 this.connectionTimeout = connectionTimeout;
308 }
309
310 public Boolean getKeepAlive() {
311 return keepAlive;
312 }
313
314 /**
315 * Enable/disable TCP KEEP_ALIVE mode
316 */
317 public void setKeepAlive(Boolean keepAlive) {
318 this.keepAlive = keepAlive;
319 }
320
321 public Boolean getTcpNoDelay() {
322 return tcpNoDelay;
323 }
324
325 /**
326 * Enable/disable the TCP_NODELAY option on the socket
327 */
328 public void setTcpNoDelay(Boolean tcpNoDelay) {
329 this.tcpNoDelay = tcpNoDelay;
330 }
331
332 /**
333 * @return the ioBufferSize
334 */
335 public int getIoBufferSize() {
336 return this.ioBufferSize;
337 }
338
339 /**
340 * @param ioBufferSize the ioBufferSize to set
341 */
342 public void setIoBufferSize(int ioBufferSize) {
343 this.ioBufferSize = ioBufferSize;
344 }
345
346 /**
347 * @return the closeAsync
348 */
349 public boolean isCloseAsync() {
350 return closeAsync;
351 }
352
353 /**
354 * @param closeAsync the closeAsync to set
355 */
356 public void setCloseAsync(boolean closeAsync) {
357 this.closeAsync = closeAsync;
358 }
359
360 // Implementation methods
361 // -------------------------------------------------------------------------
362 protected String resolveHostName(String host) throws UnknownHostException {
363 String localName = InetAddress.getLocalHost().getHostName();
364 if (localName != null && isUseLocalHost()) {
365 if (localName.equals(host)) {
366 return "localhost";
367 }
368 }
369 return host;
370 }
371
372 /**
373 * Configures the socket for use
374 *
375 * @param sock
376 * @throws SocketException
377 */
378 protected void initialiseSocket(Socket sock) throws SocketException {
379 if (socketOptions != null) {
380 IntrospectionSupport.setProperties(socket, socketOptions);
381 }
382
383 try {
384 sock.setReceiveBufferSize(socketBufferSize);
385 sock.setSendBufferSize(socketBufferSize);
386 } catch (SocketException se) {
387 LOG.warn("Cannot set socket buffer size = " + socketBufferSize);
388 LOG.debug("Cannot set socket buffer size. Reason: " + se, se);
389 }
390 sock.setSoTimeout(soTimeout);
391
392 if (keepAlive != null) {
393 sock.setKeepAlive(keepAlive.booleanValue());
394 }
395 if (tcpNoDelay != null) {
396 sock.setTcpNoDelay(tcpNoDelay.booleanValue());
397 }
398 }
399
400 protected void doStart() throws Exception {
401 connect();
402 stoppedLatch.set(new CountDownLatch(1));
403 super.doStart();
404 }
405
406 protected void connect() throws Exception {
407
408 if (socket == null && socketFactory == null) {
409 throw new IllegalStateException("Cannot connect if the socket or socketFactory have not been set");
410 }
411
412 InetSocketAddress localAddress = null;
413 InetSocketAddress remoteAddress = null;
414
415 if (localLocation != null) {
416 localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()),
417 localLocation.getPort());
418 }
419
420 if (remoteLocation != null) {
421 String host = resolveHostName(remoteLocation.getHost());
422 remoteAddress = new InetSocketAddress(host, remoteLocation.getPort());
423 }
424
425 if (socket != null) {
426
427 if (localAddress != null) {
428 socket.bind(localAddress);
429 }
430
431 // If it's a server accepted socket.. we don't need to connect it
432 // to a remote address.
433 if (remoteAddress != null) {
434 if (connectionTimeout >= 0) {
435 socket.connect(remoteAddress, connectionTimeout);
436 } else {
437 socket.connect(remoteAddress);
438 }
439 }
440
441 } else {
442 // For SSL sockets.. you can't create an unconnected socket :(
443 // This means the timout option are not supported either.
444 if (localAddress != null) {
445 socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort(),
446 localAddress.getAddress(), localAddress.getPort());
447 } else {
448 socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort());
449 }
450 }
451
452 initialiseSocket(socket);
453 initializeStreams();
454 }
455
456 protected void doStop(ServiceStopper stopper) throws Exception {
457 if (LOG.isDebugEnabled()) {
458 LOG.debug("Stopping transport " + this);
459 }
460
461 // Closing the streams flush the sockets before closing.. if the socket
462 // is hung.. then this hangs the close.
463 // closeStreams();
464 if (socket != null) {
465 if (closeAsync) {
466 //closing the socket can hang also
467 final CountDownLatch latch = new CountDownLatch(1);
468
469 SOCKET_CLOSE.execute(new Runnable() {
470
471 public void run() {
472 try {
473 socket.close();
474 } catch (IOException e) {
475 LOG.debug("Caught exception closing socket",e);
476 }finally {
477 latch.countDown();
478 }
479 }
480
481 });
482 latch.await(1,TimeUnit.SECONDS);
483 }else {
484 try {
485 socket.close();
486 } catch (IOException e) {
487 LOG.debug("Caught exception closing socket",e);
488 }
489 }
490
491 }
492 }
493
494 /**
495 * Override so that stop() blocks until the run thread is no longer running.
496 */
497 @Override
498 public void stop() throws Exception {
499 super.stop();
500 CountDownLatch countDownLatch = stoppedLatch.get();
501 if (countDownLatch != null && Thread.currentThread() != this.runnerThread) {
502 countDownLatch.await(1,TimeUnit.SECONDS);
503 }
504 }
505
506 protected void initializeStreams() throws Exception {
507 TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), ioBufferSize);
508 this.dataIn = new DataInputStream(buffIn);
509 buffOut = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize);
510 this.dataOut = new DataOutputStream(buffOut);
511 }
512
513 protected void closeStreams() throws IOException {
514 if (dataOut != null) {
515 dataOut.close();
516 }
517 if (dataIn != null) {
518 dataIn.close();
519 }
520 }
521
522 public void setSocketOptions(Map<String, Object> socketOptions) {
523 this.socketOptions = new HashMap<String, Object>(socketOptions);
524 }
525
526 public String getRemoteAddress() {
527 if (socket != null) {
528 return "" + socket.getRemoteSocketAddress();
529 }
530 return null;
531 }
532
533 @Override
534 public <T> T narrow(Class<T> target) {
535 if (target == Socket.class) {
536 return target.cast(socket);
537 } else if ( target == TcpBufferedOutputStream.class) {
538 return target.cast(buffOut);
539 }
540 return super.narrow(target);
541 }
542
543
544 static {
545 SOCKET_CLOSE = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
546 public Thread newThread(Runnable runnable) {
547 Thread thread = new Thread(runnable, "TcpSocketClose: "+runnable);
548 thread.setPriority(Thread.MAX_PRIORITY);
549 thread.setDaemon(true);
550 return thread;
551 }
552 });
553 }
554 }