001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transport;
018
019 import java.io.IOException;
020 import java.net.URI;
021
022 import org.apache.activemq.util.ServiceSupport;
023 import org.apache.commons.logging.Log;
024 import org.apache.commons.logging.LogFactory;
025
026 /**
027 * A useful base class for transport implementations.
028 *
029 * @version $Revision: 1.1 $
030 */
031 public abstract class TransportSupport extends ServiceSupport implements Transport {
032 private static final Log LOG = LogFactory.getLog(TransportSupport.class);
033
034 TransportListener transportListener;
035
036 /**
037 * Returns the current transport listener
038 */
039 public TransportListener getTransportListener() {
040 return transportListener;
041 }
042
043 /**
044 * Registers an inbound command listener
045 *
046 * @param commandListener
047 */
048 public void setTransportListener(TransportListener commandListener) {
049 this.transportListener = commandListener;
050 }
051
052 /**
053 * narrow acceptance
054 *
055 * @param target
056 * @return 'this' if assignable
057 */
058 public <T> T narrow(Class<T> target) {
059 boolean assignableFrom = target.isAssignableFrom(getClass());
060 if (assignableFrom) {
061 return target.cast(this);
062 }
063 return null;
064 }
065
066 public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
067 throw new AssertionError("Unsupported Method");
068 }
069
070 public Object request(Object command) throws IOException {
071 throw new AssertionError("Unsupported Method");
072 }
073
074 public Object request(Object command, int timeout) throws IOException {
075 throw new AssertionError("Unsupported Method");
076 }
077
078 /**
079 * Process the inbound command
080 */
081 public void doConsume(Object command) {
082 if (command != null) {
083 if (transportListener != null) {
084 transportListener.onCommand(command);
085 } else {
086 LOG.error("No transportListener available to process inbound command: " + command);
087 }
088 }
089 }
090
091 /**
092 * Passes any IO exceptions into the transport listener
093 */
094 public void onException(IOException e) {
095 if (transportListener != null) {
096 transportListener.onException(e);
097 }
098 }
099
100 protected void checkStarted() throws IOException {
101 if (!isStarted()) {
102 throw new IOException("The transport is not running.");
103 }
104 }
105
106 public boolean isFaultTolerant() {
107 return false;
108 }
109
110
111 public void reconnect(URI uri) throws IOException {
112 throw new IOException("Not supported");
113 }
114
115 public boolean isDisposed() {
116 return isStopped();
117 }
118
119 public boolean isConnected() {
120 return isStarted();
121 }
122
123 }