001    /*
002     * Copyright 2010 the original author or authors.
003     * 
004     *  Licensed under the Apache License, Version 2.0 (the "License");
005     *  you may not use this file except in compliance with the License.
006     *  You may obtain a copy of the License at
007     *
008     *      http://www.apache.org/licenses/LICENSE-2.0
009     *
010     *  Unless required by applicable law or agreed to in writing, software
011     *  distributed under the License is distributed on an "AS IS" BASIS,
012     *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013     *  See the License for the specific language governing permissions and
014     *  limitations under the License.
015     */
016    package com.hs.mail.smtp.spool;
017    
018    import java.io.File;
019    import java.util.List;
020    
021    import javax.mail.MessagingException;
022    
023    import org.apache.commons.lang.StringUtils;
024    import org.apache.log4j.Logger;
025    import org.springframework.beans.factory.InitializingBean;
026    
027    import com.hs.mail.mailet.Mailet;
028    import com.hs.mail.mailet.MailetContext;
029    import com.hs.mail.smtp.message.DeliveryStatusNotifier;
030    import com.hs.mail.smtp.message.SmtpMessage;
031    
032    /**
033     * 
034     * @author Won Chul Doh
035     * @since Jun 5, 2010
036     * 
037     */
038    public class SmtpMessageConsumer implements Consumer, InitializingBean {
039            
040            static Logger logger = Logger.getLogger(SmtpMessageConsumer.class);
041            
042        private static final long DEFAULT_DELAY_TIME = 120000; // 2*60*1000 millis (2 minutes)
043            
044        private MailetContext context;
045            private List<Mailet> mailets;
046            private long retryDelayTime = DEFAULT_DELAY_TIME;
047            
048            public void setRetryDelayTime(long delayTime) {
049                    this.retryDelayTime = delayTime;
050            }
051            
052            public void setMailetContext(MailetContext context) {
053                    this.context = context;
054            }
055    
056            public void setMailets(List<Mailet> mailets) {
057                    this.mailets = mailets;
058            }
059    
060            public void afterPropertiesSet() throws Exception {
061                    for (Mailet aMailet : mailets) {
062                            aMailet.init(context);
063                    }
064            }
065            
066            public int consume(Watcher watcher, Object stuffs) {
067                    File trigger = (File) stuffs;
068                    SmtpMessage message = SmtpMessage.readMessage(trigger.getName());
069                    
070                    // Check if the message is ready for processing based on the delay time.
071                    if (!accept(message)) {
072                            // We are not ready to process this.
073                            return Consumer.CONSUME_ERROR_KEEP;
074                    }
075    
076                    // Save the original retry count for this message. 
077                    int retries = message.getRetryCount();
078                    
079                    processMessage(message);
080                    
081                    // This means that the message was processed successfully or permanent
082                    // exception was caught while processing the message.
083                    boolean error = false;
084                    if (!StringUtils.isEmpty(message.getErrorMessage())) {
085                            // There exist errors, bounce this mail to original sender.
086                            dsnNotify(message);
087                            error = true;
088                    }
089    
090                    // See if the retry count was changed by the mailets.
091                    if (message.getRetryCount() > retries) {
092                            // This means temporary exception was caught while processing the
093                            // message. Store this message back in spool and it will get picked
094                            // up and processed later.
095                            // We only tell the watcher to do not delete the message. 
096                            // The original message was "stored" by the mailet.
097                            StringBuilder logBuffer = new StringBuilder(128)
098                                            .append("Storing message ")
099                                            .append(message.getName())
100                                            .append(" into spool after ")
101                                            .append(retries)
102                                            .append(" retries");
103                            logger.info(logBuffer.toString());
104                            return Consumer.CONSUME_ERROR_KEEP;
105                    }
106    
107                    // OK, we made it through... remove message from the spool.
108                    message.dispose();
109    
110                    return (error) ? Consumer.CONSUME_ERROR_FAIL
111                                    : Consumer.CONSUME_SUCCEEDED;
112            }
113            
114            private boolean accept(SmtpMessage message) {
115                    int retries = message.getRetryCount();
116                    if (retries > 0) {
117                            // Quadruples the delay with every attempt
118                            long timeToProcess = message.getLastUpdate().getTime()
119                                            + (long) Math.pow(4, retries) * retryDelayTime;
120                            if (System.currentTimeMillis() < timeToProcess) {
121                                    return false;
122                            }
123                    }
124                    return true;
125            }
126    
127            private void processMessage(SmtpMessage msg) {
128                    for (Mailet aMailet : mailets) {
129                            try {
130                                    if (aMailet.accept(msg.getRecipients(), msg)) {
131                                            if (logger.isDebugEnabled()) {
132                                                    logger.debug("Processing " + msg.getName()
133                                                                    + " through " + aMailet.getClass().getName());
134                                            }
135                                            aMailet.service(msg.getRecipients(), msg);
136                                    }
137                            } catch (Exception e) {
138                                    logger.error(e.getMessage(), e);
139                            }
140                    }
141            }
142    
143            private void dsnNotify(SmtpMessage message) {
144                    if (!message.isNotificationMessage()) {
145                            try {
146                                    // Bounce message to the reverse-path
147                                    DeliveryStatusNotifier.dsnNotify(null, message.getFrom(),
148                                                    message.getMimeMessage(), message.getErrorMessage());
149                            } catch (MessagingException e) {
150                                    logger.error(e.getMessage(), e);
151                            }
152                    }
153            }
154    
155    }