001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.component.file.remote;
018
019 import org.apache.camel.Exchange;
020 import org.apache.camel.ServicePoolAware;
021 import org.apache.camel.component.file.GenericFileOperationFailedException;
022 import org.apache.camel.component.file.GenericFileProducer;
023 import org.apache.camel.util.ExchangeHelper;
024
025 /**
026 * Remote file producer. Handles connecting and disconnecting if we are not.
027 * Generic type F is the remote system implementation of a file.
028 */
029 public class RemoteFileProducer<T> extends GenericFileProducer<T> implements ServicePoolAware {
030
031 private boolean loggedIn;
032
033 protected RemoteFileProducer(RemoteFileEndpoint<T> endpoint, RemoteFileOperations<T> operations) {
034 super(endpoint, operations);
035 }
036
037 @Override
038 protected String getFileSeparator() {
039 return "/";
040 }
041
042 @Override
043 protected String normalizePath(String name) {
044 return name;
045 }
046
047 @SuppressWarnings("unchecked")
048 public void process(Exchange exchange) throws Exception {
049 Exchange remoteExchange = getEndpoint().createExchange(exchange);
050 processExchange(remoteExchange);
051 ExchangeHelper.copyResults(exchange, remoteExchange);
052 }
053
054 protected RemoteFileOperations getOperations() {
055 return (RemoteFileOperations) operations;
056 }
057
058 @Override
059 @SuppressWarnings("unchecked")
060 public RemoteFileEndpoint<T> getEndpoint() {
061 return (RemoteFileEndpoint<T>) super.getEndpoint();
062 }
063
064 /**
065 * The file could not be written. We need to disconnect from the remote server.
066 */
067 protected void handleFailedWrite(Exchange exchange, Exception exception) throws Exception {
068 loggedIn = false;
069 if (isStopping() || isStopped()) {
070 // if we are stopping then ignore any exception during a poll
071 log.debug("Exception occurred during stopping: " + exception.getMessage());
072 } else {
073 log.warn("Writing file failed with: " + exception.getMessage());
074 try {
075 disconnect();
076 } catch (Exception e) {
077 // ignore exception
078 log.debug("Ignored exception during disconnect: " + e.getMessage());
079 }
080 // rethrow the original exception*/
081 throw exception;
082 }
083 }
084
085 public void disconnect() throws GenericFileOperationFailedException {
086 loggedIn = false;
087 if (getOperations().isConnected()) {
088 if (log.isDebugEnabled()) {
089 log.debug("Disconnecting from: " + getEndpoint());
090 }
091 getOperations().disconnect();
092 }
093 }
094
095 @Override
096 protected void preWriteCheck() throws Exception {
097 // before writing send a noop to see if the connection is alive and works
098 boolean noop = false;
099 try {
100 connectIfNecessary();
101 if (loggedIn) {
102 noop = getOperations().sendNoop();
103 }
104 } catch (Exception e) {
105 // ignore as we will try to recover connection
106 noop = false;
107 }
108 if (log.isDebugEnabled()) {
109 log.debug("preWriteCheck send noop success: " + noop);
110 }
111
112 // if not alive then force a disconnect so we reconnect again
113 if (!noop) {
114 try {
115 if (log.isDebugEnabled()) {
116 log.debug("preWriteCheck forcing a disconnect as noop failed");
117 }
118 disconnect();
119 } catch (Exception e) {
120 // ignore for now as we will reconnect below
121 }
122 }
123
124 connectIfNecessary();
125 if (!loggedIn) {
126 // must be logged in to be able to upload the file
127 String message = "Cannot connect/login to: " + getEndpoint().remoteServerInformation();
128 throw new GenericFileOperationFailedException(message);
129 }
130 }
131
132 @Override
133 protected void postWriteCheck() {
134 try {
135 if (getEndpoint().isDisconnect()) {
136 if (log.isTraceEnabled()) {
137 log.trace("postWriteCheck disconnect from: " + getEndpoint());
138 }
139 disconnect();
140 }
141 } catch (GenericFileOperationFailedException e) {
142 // ignore just log a warning
143 log.warn("Exception occurred during disconnecting from: " + getEndpoint() + " " + e.getMessage());
144 }
145 }
146
147 @Override
148 protected void doStart() throws Exception {
149 log.debug("Starting");
150 // do not connect when component starts, just wait until we process as we will
151 // connect at that time if needed
152 super.doStart();
153 }
154
155 @Override
156 protected void doStop() throws Exception {
157 try {
158 disconnect();
159 } catch (Exception e) {
160 log.debug("Exception occurred during disconnecting from: " + getEndpoint() + " " + e.getMessage());
161 }
162 super.doStop();
163 }
164
165 protected void connectIfNecessary() throws GenericFileOperationFailedException {
166 if (!loggedIn) {
167 if (log.isDebugEnabled()) {
168 log.debug("Not already connected/logged in. Connecting to: " + getEndpoint());
169 }
170 RemoteFileConfiguration config = (RemoteFileConfiguration) getEndpoint().getConfiguration();
171 loggedIn = getOperations().connect(config);
172 if (!loggedIn) {
173 return;
174 }
175 log.info("Connected and logged in to: " + getEndpoint());
176 }
177 }
178
179 public boolean isSingleton() {
180 // this producer is stateful because the remote file operations is not thread safe
181 return false;
182 }
183
184 }