001 /*
002 * Copyright 2010 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016 package com.hs.mail.smtp.spool;
017
018 import java.io.File;
019 import java.util.List;
020
021 import javax.mail.MessagingException;
022
023 import org.apache.commons.lang.StringUtils;
024 import org.apache.log4j.Logger;
025 import org.springframework.beans.factory.InitializingBean;
026
027 import com.hs.mail.mailet.Mailet;
028 import com.hs.mail.mailet.MailetContext;
029 import com.hs.mail.smtp.message.DeliveryStatusNotifier;
030 import com.hs.mail.smtp.message.SmtpMessage;
031
032 /**
033 *
034 * @author Won Chul Doh
035 * @since Jun 5, 2010
036 *
037 */
038 public class SmtpMessageConsumer implements Consumer, InitializingBean {
039
040 static Logger logger = Logger.getLogger(SmtpMessageConsumer.class);
041
042 private static final long DEFAULT_DELAY_TIME = 120000; // 2*60*1000 millis (2 minutes)
043
044 private MailetContext context;
045 private List<Mailet> mailets;
046 private long retryDelayTime = DEFAULT_DELAY_TIME;
047
048 public void setRetryDelayTime(long delayTime) {
049 this.retryDelayTime = delayTime;
050 }
051
052 public void setMailetContext(MailetContext context) {
053 this.context = context;
054 }
055
056 public void setMailets(List<Mailet> mailets) {
057 this.mailets = mailets;
058 }
059
060 public void afterPropertiesSet() throws Exception {
061 for (Mailet aMailet : mailets) {
062 aMailet.init(context);
063 }
064 }
065
066 public int consume(Watcher watcher, Object stuffs) {
067 File trigger = (File) stuffs;
068 SmtpMessage message = SmtpMessage.readMessage(trigger.getName());
069
070 // Check if the message is ready for processing based on the delay time.
071 if (!accept(message)) {
072 // We are not ready to process this.
073 return Consumer.CONSUME_ERROR_KEEP;
074 }
075
076 // Save the original retry count for this message.
077 int retries = message.getRetryCount();
078
079 processMessage(message);
080
081 // This means that the message was processed successfully or permanent
082 // exception was caught while processing the message.
083 boolean error = false;
084 if (!StringUtils.isEmpty(message.getErrorMessage())) {
085 // There exist errors, bounce this mail to original sender.
086 dsnNotify(message);
087 error = true;
088 }
089
090 // See if the retry count was changed by the mailets.
091 if (message.getRetryCount() > retries) {
092 // This means temporary exception was caught while processing the
093 // message. Store this message back in spool and it will get picked
094 // up and processed later.
095 // We only tell the watcher to do not delete the message.
096 // The original message was "stored" by the mailet.
097 StringBuilder logBuffer = new StringBuilder(128)
098 .append("Storing message ")
099 .append(message.getName())
100 .append(" into spool after ")
101 .append(retries)
102 .append(" retries");
103 logger.info(logBuffer.toString());
104 return Consumer.CONSUME_ERROR_KEEP;
105 }
106
107 // OK, we made it through... remove message from the spool.
108 message.dispose();
109
110 return (error) ? Consumer.CONSUME_ERROR_FAIL
111 : Consumer.CONSUME_SUCCEEDED;
112 }
113
114 private boolean accept(SmtpMessage message) {
115 int retries = message.getRetryCount();
116 if (retries > 0) {
117 // Quadruples the delay with every attempt
118 long timeToProcess = message.getLastUpdate().getTime()
119 + (long) Math.pow(4, retries) * retryDelayTime;
120 if (System.currentTimeMillis() < timeToProcess) {
121 return false;
122 }
123 }
124 return true;
125 }
126
127 private void processMessage(SmtpMessage msg) {
128 for (Mailet aMailet : mailets) {
129 try {
130 if (aMailet.accept(msg.getRecipients(), msg)) {
131 if (logger.isDebugEnabled()) {
132 logger.debug("Processing " + msg.getName()
133 + " through " + aMailet.getClass().getName());
134 }
135 aMailet.service(msg.getRecipients(), msg);
136 }
137 } catch (Exception e) {
138 logger.error(e.getMessage(), e);
139 }
140 }
141 }
142
143 private void dsnNotify(SmtpMessage message) {
144 if (!message.isNotificationMessage()) {
145 try {
146 // Bounce message to the reverse-path
147 DeliveryStatusNotifier.dsnNotify(null, message.getFrom(),
148 message.getMimeMessage(), message.getErrorMessage());
149 } catch (MessagingException e) {
150 logger.error(e.getMessage(), e);
151 }
152 }
153 }
154
155 }