001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.component.file;
018
019import java.io.File;
020import java.io.FileNotFoundException;
021
022import org.apache.camel.Component;
023import org.apache.camel.Exchange;
024import org.apache.camel.Processor;
025import org.apache.camel.processor.idempotent.MemoryIdempotentRepository;
026import org.apache.camel.spi.Metadata;
027import org.apache.camel.spi.UriEndpoint;
028import org.apache.camel.spi.UriParam;
029import org.apache.camel.spi.UriPath;
030import org.apache.camel.util.FileUtil;
031import org.apache.camel.util.ObjectHelper;
032
033/**
034 * File endpoint.
035 */
036@UriEndpoint(scheme = "file", syntax = "file:directoryName", consumerClass = FileConsumer.class, label = "core,file")
037public class FileEndpoint extends GenericFileEndpoint<File> {
038
039    private final FileOperations operations = new FileOperations(this);
040
041    @UriPath(name = "directoryName") @Metadata(required = "true")
042    private File file;
043    @UriParam(defaultValue = "true")
044    private boolean copyAndDeleteOnRenameFail = true;
045    @UriParam
046    private boolean renameUsingCopy;
047    @UriParam(label = "producer", defaultValue = "true")
048    private boolean forceWrites = true;
049
050    public FileEndpoint() {
051        // use marker file as default exclusive read locks
052        this.readLock = "markerFile";
053    }
054
055    public FileEndpoint(String endpointUri, Component component) {
056        super(endpointUri, component);
057        // use marker file as default exclusive read locks
058        this.readLock = "markerFile";
059    }
060
061    public FileConsumer createConsumer(Processor processor) throws Exception {
062        ObjectHelper.notNull(operations, "operations");
063        ObjectHelper.notNull(file, "file");
064
065        // auto create starting directory if needed
066        if (!file.exists() && !file.isDirectory()) {
067            if (isAutoCreate()) {
068                log.debug("Creating non existing starting directory: {}", file);
069                boolean absolute = FileUtil.isAbsolute(file);
070                boolean created = operations.buildDirectory(file.getPath(), absolute);
071                if (!created) {
072                    log.warn("Cannot auto create starting directory: {}", file);
073                }
074            } else if (isStartingDirectoryMustExist()) {
075                throw new FileNotFoundException("Starting directory does not exist: " + file);
076            }
077        }
078
079        FileConsumer result = newFileConsumer(processor, operations);
080
081        if (isDelete() && getMove() != null) {
082            throw new IllegalArgumentException("You cannot set both delete=true and move options");
083        }
084
085        // if noop=true then idempotent should also be configured
086        if (isNoop() && !isIdempotentSet()) {
087            log.info("Endpoint is configured with noop=true so forcing endpoint to be idempotent as well");
088            setIdempotent(true);
089        }
090
091        // if idempotent and no repository set then create a default one
092        if (isIdempotentSet() && isIdempotent() && idempotentRepository == null) {
093            log.info("Using default memory based idempotent repository with cache max size: " + DEFAULT_IDEMPOTENT_CACHE_SIZE);
094            idempotentRepository = MemoryIdempotentRepository.memoryIdempotentRepository(DEFAULT_IDEMPOTENT_CACHE_SIZE);
095        }
096
097        // set max messages per poll
098        result.setMaxMessagesPerPoll(getMaxMessagesPerPoll());
099        result.setEagerLimitMaxMessagesPerPoll(isEagerMaxMessagesPerPoll());
100
101        configureConsumer(result);
102        return result;
103    }
104
105    public GenericFileProducer<File> createProducer() throws Exception {
106        ObjectHelper.notNull(operations, "operations");
107
108        // you cannot use temp file and file exists append
109        if (getFileExist() == GenericFileExist.Append && ((getTempPrefix() != null) || (getTempFileName() != null))) {
110            throw new IllegalArgumentException("You cannot set both fileExist=Append and tempPrefix/tempFileName options");
111        }
112
113        // ensure fileExist and moveExisting is configured correctly if in use
114        if (getFileExist() == GenericFileExist.Move && getMoveExisting() == null) {
115            throw new IllegalArgumentException("You must configure moveExisting option when fileExist=Move");
116        } else if (getMoveExisting() != null && getFileExist() != GenericFileExist.Move) {
117            throw new IllegalArgumentException("You must configure fileExist=Move when moveExisting has been set");
118        }
119
120        return new GenericFileProducer<File>(this, operations);
121    }
122
123    public Exchange createExchange(GenericFile<File> file) {
124        Exchange exchange = createExchange();
125        if (file != null) {
126            file.bindToExchange(exchange);
127        }
128        return exchange;
129    }
130
131    /**
132     * Strategy to create a new {@link FileConsumer}
133     *
134     * @param processor  the given processor
135     * @param operations file operations
136     * @return the created consumer
137     */
138    protected FileConsumer newFileConsumer(Processor processor, GenericFileOperations<File> operations) {
139        return new FileConsumer(this, processor, operations);
140    }
141
142    public File getFile() {
143        return file;
144    }
145
146    /**
147     * The starting directory
148     */
149    public void setFile(File file) {
150        this.file = file;
151        // update configuration as well
152        getConfiguration().setDirectory(FileUtil.isAbsolute(file) ? file.getAbsolutePath() : file.getPath());
153    }
154
155    @Override
156    public String getScheme() {
157        return "file";
158    }
159
160    @Override
161    protected String createEndpointUri() {
162        return getFile().toURI().toString();
163    }
164
165    @Override
166    public char getFileSeparator() {       
167        return File.separatorChar;
168    }
169
170    @Override
171    public boolean isAbsolute(String name) {
172        // relative or absolute path?
173        return FileUtil.isAbsolute(new File(name));
174    }
175
176    public boolean isCopyAndDeleteOnRenameFail() {
177        return copyAndDeleteOnRenameFail;
178    }
179
180    /**
181     * Whether to fallback and do a copy and delete file, in case the file could not be renamed directly. This option is not available for the FTP component.
182     */
183    public void setCopyAndDeleteOnRenameFail(boolean copyAndDeleteOnRenameFail) {
184        this.copyAndDeleteOnRenameFail = copyAndDeleteOnRenameFail;
185    }
186
187    public boolean isRenameUsingCopy() {
188        return renameUsingCopy;
189    }
190
191    /**
192     * Perform rename operations using a copy and delete strategy.
193     * This is primarily used in environments where the regular rename operation is unreliable (e.g. across different file systems or networks).
194     * This option takes precedence over the copyAndDeleteOnRenameFail parameter that will automatically fall back to the copy and delete strategy,
195     * but only after additional delays.
196     */
197    public void setRenameUsingCopy(boolean renameUsingCopy) {
198        this.renameUsingCopy = renameUsingCopy;
199    }
200
201    public boolean isForceWrites() {
202        return forceWrites;
203    }
204
205    /**
206     * Whether to force syncing writes to the file system.
207     * You can turn this off if you do not want this level of guarantee, for example if writing to logs / audit logs etc; this would yield better performance.
208     */
209    public void setForceWrites(boolean forceWrites) {
210        this.forceWrites = forceWrites;
211    }
212}