001 /* 002 * Copyright 2010 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 package com.hs.mail.smtp.spool; 017 018 import java.io.File; 019 import java.io.IOException; 020 import java.util.Arrays; 021 import java.util.Comparator; 022 import java.util.List; 023 024 import org.apache.commons.io.FileUtils; 025 import org.apache.commons.lang.ArrayUtils; 026 import org.apache.log4j.Logger; 027 028 /** 029 * Class to watch files under some conditions 030 * 031 * @author Won Chul Doh 032 * @since Jun 3, 2010 033 * 034 */ 035 public class FileWatcher implements Watcher { 036 037 static Logger logger = Logger.getLogger(FileWatcher.class); 038 039 private List<Consumer> consumers = null; 040 private MainJob mainJob = null; 041 private File targetDir = null; 042 private long watchInterval = 30000; 043 private long processInterval = 10; 044 private boolean includeDirectory = false; 045 private File failureDirFile = null; 046 private Comparator<File> fileComparator = null; 047 048 public long getWatchInterval() { 049 return watchInterval; 050 } 051 052 public void setWatchInterval(long millis) { 053 this.watchInterval = millis; 054 } 055 056 public List<Consumer> getConsumers() { 057 return consumers; 058 } 059 060 public void setConsumers(List<Consumer> consumers) { 061 this.consumers = consumers; 062 } 063 064 public void setTarget(String target) { 065 this.targetDir = new File(target); 066 } 067 068 public void setTargetDir(File targetDir) { 069 this.targetDir = targetDir; 070 } 071 072 public void setFailureDir(String failureDir) { 073 this.failureDirFile = new File(failureDir); 074 } 075 076 public void setFileComparator(Comparator<File> fileComparator) { 077 this.fileComparator = fileComparator; 078 } 079 080 public void start() { 081 if (targetDir != null) { 082 try { 083 if (null == failureDirFile) { 084 failureDirFile = new File(targetDir.getParent(), "failure"); 085 } 086 FileUtils.forceMkdir(failureDirFile); 087 } catch (Exception e) { 088 logger.error("Cannot create failure directory " 089 + failureDirFile); 090 return; 091 } 092 mainJob = new MainJob(this); 093 new Thread(mainJob).start(); 094 } 095 } 096 097 class MainJob implements Runnable { 098 099 private boolean stopRequested = false; 100 private Watcher watcher; 101 102 public MainJob (Watcher watcher) { 103 this.watcher = watcher; 104 } 105 106 public void stop() { 107 this.stopRequested = true; 108 } 109 110 public void run() { 111 while (!stopRequested) { 112 processMessage(); 113 } 114 synchronized (this) { 115 try { 116 wait(watchInterval); 117 } catch (InterruptedException e) { 118 } 119 } 120 } 121 122 private void processMessage() { 123 long currentWatchInterval = watchInterval; 124 File [] files = null; 125 126 if (targetDir.isDirectory()) { 127 files = targetDir.listFiles(); 128 } 129 130 if (!ArrayUtils.isEmpty(files)) { 131 if (fileComparator != null) { 132 Arrays.sort(files, fileComparator); 133 } 134 for (File file : files) { 135 if (!includeDirectory && file.isDirectory()) 136 continue; 137 138 if (logger.isInfoEnabled()) 139 logger.info("Watcher will process the working file: " 140 + file); 141 142 processWorkingFile(file); 143 144 // If the consumer script changes the watchInterval, the 145 // processing must be suspended. 146 if (watchInterval != currentWatchInterval) { 147 break; 148 } 149 150 // To relax CPU, wait 0.1 process interval after processing 151 // a file. 152 if (processInterval > 0) { 153 synchronized (this) { 154 try { 155 wait(processInterval); 156 } catch (Exception e) { 157 } 158 } 159 } 160 } 161 } 162 } 163 164 private void processWorkingFile(File workingFile) { 165 synchronized (workingFile) { 166 int sc = consumeFile(workingFile); 167 168 if (sc == Consumer.CONSUME_SUCCEEDED) { 169 try { 170 FileUtils.forceDelete(workingFile); 171 } catch (IOException e) { 172 logger.warn("Cannot delete " + workingFile); 173 } 174 } else if (sc == Consumer.CONSUME_ERROR_FAIL) { 175 try { 176 FileUtils.forceDelete(workingFile); 177 } catch (IOException e) { 178 logger.warn("Cannot delete " + workingFile); 179 } 180 } else if (sc == Consumer.CONSUME_ERROR_MOVE) { 181 try { 182 if (workingFile.isFile()) { 183 FileUtils.moveFile(workingFile, failureDirFile); 184 } else { 185 FileUtils.moveDirectory(workingFile, failureDirFile); 186 } 187 } catch (IOException e) { 188 logger.warn("Cannot move " + workingFile + " to " + failureDirFile); 189 } 190 } 191 } 192 } 193 194 private int consumeFile(File file) { 195 int sc = Consumer.CONSUME_SUCCEEDED; 196 for (Consumer consumer : consumers) { 197 try { 198 sc = consumer.consume(watcher, file); 199 } catch (Exception e) { 200 sc = Consumer.CONSUME_ERROR_KEEP; 201 logger.warn("Error during consuming: " + e.getMessage()); 202 } 203 } 204 return sc; 205 } 206 207 } 208 209 }