001 /*
002 * Copyright 2010 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016 package com.hs.mail.smtp.spool;
017
018 import java.io.File;
019 import java.io.IOException;
020 import java.util.Arrays;
021 import java.util.Comparator;
022 import java.util.List;
023
024 import org.apache.commons.io.FileUtils;
025 import org.apache.commons.lang.ArrayUtils;
026 import org.apache.log4j.Logger;
027
028 /**
029 * Class to watch files under some conditions
030 *
031 * @author Won Chul Doh
032 * @since Jun 3, 2010
033 *
034 */
035 public class FileWatcher implements Watcher {
036
037 static Logger logger = Logger.getLogger(FileWatcher.class);
038
039 private List<Consumer> consumers = null;
040 private MainJob mainJob = null;
041 private File targetDir = null;
042 private long watchInterval = 30000;
043 private long processInterval = 10;
044 private boolean includeDirectory = false;
045 private File failureDirFile = null;
046 private Comparator<File> fileComparator = null;
047
048 public long getWatchInterval() {
049 return watchInterval;
050 }
051
052 public void setWatchInterval(long millis) {
053 this.watchInterval = millis;
054 }
055
056 public List<Consumer> getConsumers() {
057 return consumers;
058 }
059
060 public void setConsumers(List<Consumer> consumers) {
061 this.consumers = consumers;
062 }
063
064 public void setTarget(String target) {
065 this.targetDir = new File(target);
066 }
067
068 public void setTargetDir(File targetDir) {
069 this.targetDir = targetDir;
070 }
071
072 public void setFailureDir(String failureDir) {
073 this.failureDirFile = new File(failureDir);
074 }
075
076 public void setFileComparator(Comparator<File> fileComparator) {
077 this.fileComparator = fileComparator;
078 }
079
080 public void start() {
081 if (targetDir != null) {
082 try {
083 if (null == failureDirFile) {
084 failureDirFile = new File(targetDir.getParent(), "failure");
085 }
086 FileUtils.forceMkdir(failureDirFile);
087 } catch (Exception e) {
088 logger.error("Cannot create failure directory "
089 + failureDirFile);
090 return;
091 }
092 mainJob = new MainJob(this);
093 new Thread(mainJob).start();
094 }
095 }
096
097 class MainJob implements Runnable {
098
099 private boolean stopRequested = false;
100 private Watcher watcher;
101
102 public MainJob (Watcher watcher) {
103 this.watcher = watcher;
104 }
105
106 public void stop() {
107 this.stopRequested = true;
108 }
109
110 public void run() {
111 while (!stopRequested) {
112 processMessage();
113 }
114 synchronized (this) {
115 try {
116 wait(watchInterval);
117 } catch (InterruptedException e) {
118 }
119 }
120 }
121
122 private void processMessage() {
123 long currentWatchInterval = watchInterval;
124 File [] files = null;
125
126 if (targetDir.isDirectory()) {
127 files = targetDir.listFiles();
128 }
129
130 if (!ArrayUtils.isEmpty(files)) {
131 if (fileComparator != null) {
132 Arrays.sort(files, fileComparator);
133 }
134 for (File file : files) {
135 if (!includeDirectory && file.isDirectory())
136 continue;
137
138 if (logger.isInfoEnabled())
139 logger.info("Watcher will process the working file: "
140 + file);
141
142 processWorkingFile(file);
143
144 // If the consumer script changes the watchInterval, the
145 // processing must be suspended.
146 if (watchInterval != currentWatchInterval) {
147 break;
148 }
149
150 // To relax CPU, wait 0.1 process interval after processing
151 // a file.
152 if (processInterval > 0) {
153 synchronized (this) {
154 try {
155 wait(processInterval);
156 } catch (Exception e) {
157 }
158 }
159 }
160 }
161 }
162 }
163
164 private void processWorkingFile(File workingFile) {
165 synchronized (workingFile) {
166 int sc = consumeFile(workingFile);
167
168 if (sc == Consumer.CONSUME_SUCCEEDED) {
169 try {
170 FileUtils.forceDelete(workingFile);
171 } catch (IOException e) {
172 logger.warn("Cannot delete " + workingFile);
173 }
174 } else if (sc == Consumer.CONSUME_ERROR_FAIL) {
175 try {
176 FileUtils.forceDelete(workingFile);
177 } catch (IOException e) {
178 logger.warn("Cannot delete " + workingFile);
179 }
180 } else if (sc == Consumer.CONSUME_ERROR_MOVE) {
181 try {
182 if (workingFile.isFile()) {
183 FileUtils.moveFile(workingFile, failureDirFile);
184 } else {
185 FileUtils.moveDirectory(workingFile, failureDirFile);
186 }
187 } catch (IOException e) {
188 logger.warn("Cannot move " + workingFile + " to " + failureDirFile);
189 }
190 }
191 }
192 }
193
194 private int consumeFile(File file) {
195 int sc = Consumer.CONSUME_SUCCEEDED;
196 for (Consumer consumer : consumers) {
197 try {
198 sc = consumer.consume(watcher, file);
199 } catch (Exception e) {
200 sc = Consumer.CONSUME_ERROR_KEEP;
201 logger.warn("Error during consuming: " + e.getMessage());
202 }
203 }
204 return sc;
205 }
206
207 }
208
209 }