001    /*
002     * Copyright 2010 the original author or authors.
003     * 
004     *  Licensed under the Apache License, Version 2.0 (the "License");
005     *  you may not use this file except in compliance with the License.
006     *  You may obtain a copy of the License at
007     *
008     *      http://www.apache.org/licenses/LICENSE-2.0
009     *
010     *  Unless required by applicable law or agreed to in writing, software
011     *  distributed under the License is distributed on an "AS IS" BASIS,
012     *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013     *  See the License for the specific language governing permissions and
014     *  limitations under the License.
015     */
016    package com.hs.mail.smtp.spool;
017    
018    import java.io.File;
019    import java.io.IOException;
020    import java.util.Arrays;
021    import java.util.Comparator;
022    import java.util.List;
023    
024    import org.apache.commons.io.FileUtils;
025    import org.apache.commons.lang.ArrayUtils;
026    import org.apache.log4j.Logger;
027    
028    /**
029     * Class to watch files under some conditions
030     * 
031     * @author Won Chul Doh
032     * @since Jun 3, 2010
033     * 
034     */
035    public class FileWatcher implements Watcher {
036            
037            static Logger logger = Logger.getLogger(FileWatcher.class);
038            
039        private List<Consumer> consumers = null;
040        private MainJob mainJob = null;
041        private File targetDir = null;
042        private long watchInterval = 30000;
043        private long processInterval = 10;
044        private boolean includeDirectory = false;
045        private File failureDirFile = null;
046        private Comparator<File> fileComparator = null;
047    
048            public long getWatchInterval() {
049                    return watchInterval;
050            }
051    
052            public void setWatchInterval(long millis) {
053                    this.watchInterval = millis;
054            }
055    
056            public List<Consumer> getConsumers() {
057                    return consumers;
058            }
059    
060            public void setConsumers(List<Consumer> consumers) {
061                    this.consumers = consumers;
062            }
063    
064            public void setTarget(String target) {
065                    this.targetDir = new File(target);
066            }
067            
068            public void setTargetDir(File targetDir) {
069                    this.targetDir = targetDir;
070            }
071            
072        public void setFailureDir(String failureDir) {
073            this.failureDirFile = new File(failureDir);
074        }
075        
076            public void setFileComparator(Comparator<File> fileComparator) {
077                    this.fileComparator = fileComparator;
078            }
079    
080            public void start() {
081                    if (targetDir != null) {
082                            try {
083                                    if (null == failureDirFile) {
084                                            failureDirFile = new File(targetDir.getParent(), "failure");
085                                    }
086                                    FileUtils.forceMkdir(failureDirFile);
087                            } catch (Exception e) {
088                                    logger.error("Cannot create failure directory "
089                                                    + failureDirFile);
090                                    return;
091                            }
092                            mainJob = new MainJob(this);
093                            new Thread(mainJob).start();
094                    }
095            }
096    
097        class MainJob implements Runnable {
098    
099            private boolean stopRequested = false;
100            private Watcher watcher;
101    
102            public MainJob (Watcher watcher) {
103                    this.watcher = watcher;
104            }
105            
106            public void stop() {
107                    this.stopRequested = true;
108            }
109            
110            public void run() {
111                            while (!stopRequested) {
112                                    processMessage();
113                            }
114                            synchronized (this) {
115                                    try {
116                                            wait(watchInterval);
117                                    } catch (InterruptedException e) {
118                                    }
119                            }
120                    }
121            
122            private void processMessage() {
123                long currentWatchInterval = watchInterval;
124                File [] files = null;
125    
126                    if (targetDir.isDirectory()) {
127                            files = targetDir.listFiles();
128                    }
129                
130                            if (!ArrayUtils.isEmpty(files)) {
131                                    if (fileComparator != null) {
132                                            Arrays.sort(files, fileComparator);
133                                    }
134                                    for (File file : files) {
135                            if (!includeDirectory && file.isDirectory())
136                                continue;
137            
138                                            if (logger.isInfoEnabled())
139                                                    logger.info("Watcher will process the working file: "
140                                                                    + file);
141    
142                                            processWorkingFile(file);
143                            
144                                            // If the consumer script changes the watchInterval, the
145                                            // processing must be suspended.
146                                            if (watchInterval != currentWatchInterval) {
147                                                    break;
148                                            }
149                            
150                                            // To relax CPU, wait 0.1 process interval after processing
151                                            // a file.
152                                            if (processInterval > 0) {
153                                                    synchronized (this) {
154                                                            try {
155                                                                    wait(processInterval);
156                                                            } catch (Exception e) {
157                                                            }
158                                                    }
159                            }
160                                    }
161                            }
162            }
163            
164            private void processWorkingFile(File workingFile) {
165                    synchronized (workingFile) {
166                            int sc = consumeFile(workingFile);
167    
168                                    if (sc == Consumer.CONSUME_SUCCEEDED) {
169                                            try {
170                                                    FileUtils.forceDelete(workingFile);
171                                            } catch (IOException e) {
172                                                    logger.warn("Cannot delete " + workingFile);
173                                            }
174                                    } else if (sc == Consumer.CONSUME_ERROR_FAIL) {
175                                            try {
176                                                    FileUtils.forceDelete(workingFile);
177                                            } catch (IOException e) {
178                                                    logger.warn("Cannot delete " + workingFile);
179                                            }
180                                    } else if (sc == Consumer.CONSUME_ERROR_MOVE) {
181                                            try {
182                                                    if (workingFile.isFile()) {
183                                                            FileUtils.moveFile(workingFile, failureDirFile);
184                                                    } else {
185                                                            FileUtils.moveDirectory(workingFile, failureDirFile);
186                                                    }
187                                            } catch (IOException e) {
188                                                    logger.warn("Cannot move " + workingFile + " to " + failureDirFile);
189                                            }
190                                    }
191                            }
192            }
193            
194                    private int consumeFile(File file) {
195                            int sc = Consumer.CONSUME_SUCCEEDED;
196                            for (Consumer consumer : consumers) {
197                                    try {
198                                            sc = consumer.consume(watcher, file);
199                                    } catch (Exception e) {
200                                            sc = Consumer.CONSUME_ERROR_KEEP;
201                                            logger.warn("Error during consuming: " + e.getMessage());
202                                    }
203                            }
204                            return sc;
205                    }
206    
207        }
208            
209    }