001 /* 002 * Copyright 2010 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 package com.hs.mail.smtp.spool; 017 018 import java.io.File; 019 import java.util.List; 020 021 import javax.mail.MessagingException; 022 023 import org.apache.commons.lang.StringUtils; 024 import org.apache.log4j.Logger; 025 import org.springframework.beans.factory.InitializingBean; 026 027 import com.hs.mail.mailet.Mailet; 028 import com.hs.mail.mailet.MailetContext; 029 import com.hs.mail.smtp.message.DeliveryStatusNotifier; 030 import com.hs.mail.smtp.message.SmtpMessage; 031 032 /** 033 * 034 * @author Won Chul Doh 035 * @since Jun 5, 2010 036 * 037 */ 038 public class SmtpMessageConsumer implements Consumer, InitializingBean { 039 040 static Logger logger = Logger.getLogger(SmtpMessageConsumer.class); 041 042 private static final long DEFAULT_DELAY_TIME = 120000; // 2*60*1000 millis (2 minutes) 043 044 private MailetContext context; 045 private List<Mailet> mailets; 046 private long retryDelayTime = DEFAULT_DELAY_TIME; 047 048 public void setRetryDelayTime(long delayTime) { 049 this.retryDelayTime = delayTime; 050 } 051 052 public void setMailetContext(MailetContext context) { 053 this.context = context; 054 } 055 056 public void setMailets(List<Mailet> mailets) { 057 this.mailets = mailets; 058 } 059 060 public void afterPropertiesSet() throws Exception { 061 for (Mailet aMailet : mailets) { 062 aMailet.init(context); 063 } 064 } 065 066 public int consume(Watcher watcher, Object stuffs) { 067 File trigger = (File) stuffs; 068 SmtpMessage message = SmtpMessage.readMessage(trigger.getName()); 069 070 // Check if the message is ready for processing based on the delay time. 071 if (!accept(message)) { 072 // We are not ready to process this. 073 return Consumer.CONSUME_ERROR_KEEP; 074 } 075 076 // Save the original retry count for this message. 077 int retries = message.getRetryCount(); 078 079 processMessage(message); 080 081 // This means that the message was processed successfully or permanent 082 // exception was caught while processing the message. 083 boolean error = false; 084 if (!StringUtils.isEmpty(message.getErrorMessage())) { 085 // There exist errors, bounce this mail to original sender. 086 dsnNotify(message); 087 error = true; 088 } 089 090 // See if the retry count was changed by the mailets. 091 if (message.getRetryCount() > retries) { 092 // This means temporary exception was caught while processing the 093 // message. Store this message back in spool and it will get picked 094 // up and processed later. 095 // We only tell the watcher to do not delete the message. 096 // The original message was "stored" by the mailet. 097 StringBuilder logBuffer = new StringBuilder(128) 098 .append("Storing message ") 099 .append(message.getName()) 100 .append(" into spool after ") 101 .append(retries) 102 .append(" retries"); 103 logger.info(logBuffer.toString()); 104 return Consumer.CONSUME_ERROR_KEEP; 105 } 106 107 // OK, we made it through... remove message from the spool. 108 message.dispose(); 109 110 return (error) ? Consumer.CONSUME_ERROR_FAIL 111 : Consumer.CONSUME_SUCCEEDED; 112 } 113 114 private boolean accept(SmtpMessage message) { 115 int retries = message.getRetryCount(); 116 if (retries > 0) { 117 // Quadruples the delay with every attempt 118 long timeToProcess = message.getLastUpdate().getTime() 119 + (long) Math.pow(4, retries) * retryDelayTime; 120 if (System.currentTimeMillis() < timeToProcess) { 121 return false; 122 } 123 } 124 return true; 125 } 126 127 private void processMessage(SmtpMessage msg) { 128 for (Mailet aMailet : mailets) { 129 try { 130 if (aMailet.accept(msg.getRecipients(), msg)) { 131 if (logger.isDebugEnabled()) { 132 logger.debug("Processing " + msg.getName() 133 + " through " + aMailet.getClass().getName()); 134 } 135 aMailet.service(msg.getRecipients(), msg); 136 } 137 } catch (Exception e) { 138 logger.error(e.getMessage(), e); 139 } 140 } 141 } 142 143 private void dsnNotify(SmtpMessage message) { 144 if (!message.isNotificationMessage()) { 145 try { 146 // Bounce message to the reverse-path 147 DeliveryStatusNotifier.dsnNotify(null, message.getFrom(), 148 message.getMimeMessage(), message.getErrorMessage()); 149 } catch (MessagingException e) { 150 logger.error(e.getMessage(), e); 151 } 152 } 153 } 154 155 }