001    /*
002     * Copyright 2010 the original author or authors.
003     * 
004     *  Licensed under the Apache License, Version 2.0 (the "License");
005     *  you may not use this file except in compliance with the License.
006     *  You may obtain a copy of the License at
007     *
008     *      http://www.apache.org/licenses/LICENSE-2.0
009     *
010     *  Unless required by applicable law or agreed to in writing, software
011     *  distributed under the License is distributed on an "AS IS" BASIS,
012     *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013     *  See the License for the specific language governing permissions and
014     *  limitations under the License.
015     */
016    package com.hs.mail.mailet;
017    
018    import java.io.IOException;
019    import java.net.ConnectException;
020    import java.net.SocketException;
021    import java.net.UnknownHostException;
022    import java.util.ArrayList;
023    import java.util.Arrays;
024    import java.util.Collection;
025    import java.util.Date;
026    import java.util.Hashtable;
027    import java.util.Iterator;
028    import java.util.List;
029    import java.util.Locale;
030    import java.util.Properties;
031    import java.util.Set;
032    
033    import javax.mail.Address;
034    import javax.mail.MessagingException;
035    import javax.mail.SendFailedException;
036    import javax.mail.Session;
037    import javax.mail.Transport;
038    import javax.mail.internet.InternetAddress;
039    import javax.mail.internet.MimeMessage;
040    
041    import org.apache.commons.lang.ArrayUtils;
042    import org.apache.log4j.Logger;
043    
044    import com.hs.mail.container.config.ComponentManager;
045    import com.hs.mail.container.config.Config;
046    import com.hs.mail.dns.DnsServer;
047    import com.hs.mail.smtp.message.HostAddress;
048    import com.hs.mail.smtp.message.Recipient;
049    import com.hs.mail.smtp.message.SmtpMessage;
050    import com.sun.mail.smtp.SMTPAddressFailedException;
051    import com.sun.mail.smtp.SMTPSendFailedException;
052    
053    /**
054     * Receives a Mail from SmtpMessageConsumer and takes care of delivery of the
055     * message to remote hosts. If for some reason mail can't be delivered store it
056     * with undelivered recipients and current retry count in the "spool"
057     * repository. After the next "retryDelayTime" the SpoolRepository will try to
058     * send it again. After "maxRetries" the mail will be considered undeliverable
059     * and will be returned to sender.
060     * 
061     * @author Won Chul Doh
062     * @since Jun 7, 2010
063     * 
064     */
065    public class RemoteDelivery extends AbstractMailet {
066            
067            static Logger logger = Logger.getLogger(RemoteDelivery.class);
068            
069            private Session session = null;
070            private boolean debug = false;
071        private int maxRetries = 3; // default number of retries
072        private long smtpTimeout = 5000;  //default number of ms to timeout on smtp delivery
073        private boolean sendPartial = true; // If false then ANY address errors will cause the transmission to fail
074        private long connectionTimeout = 60000;  // The amount of time JavaMail will wait before giving up on a socket connect()
075    
076            public void setDebug(boolean debug) {
077                    this.debug = debug;
078            }
079    
080            /**
081             * Initialize the mailet
082             */
083            public void init(MailetContext context) {
084                    super.init(context);
085                    Properties props = new Properties();
086                    Properties sysprops = System.getProperties();
087                    for (Object key : sysprops.keySet()) {
088                            if (((String) key).startsWith("mail.")) {
089                                    props.put(key, sysprops.get(key));
090                            }
091                    }
092                    if (this.debug)
093                            props.setProperty("mail.debug", "true");
094                    props.setProperty("mail.smtp.timeout", smtpTimeout + "");
095            props.setProperty("mail.smtp.connectiontimeout", connectionTimeout + "");
096            props.setProperty("mail.smtp.sendpartial", String.valueOf(sendPartial));
097            props.setProperty("mail.smtp.localhost", Config.getHelloName());
098            
099            Logger.getLogger("console").info("Config: " + Config.getHelloName());
100            this.maxRetries = (int) Config.getNumberProperty("max_retry_count", 3); 
101            
102                    this.session = Session.getInstance(props, null);
103                    this.session.setDebug(this.debug);
104            }
105    
106            public boolean accept(Set<Recipient> recipients, SmtpMessage message) {
107                    return message.getNode() == SmtpMessage.REMOTE
108                                    || message.getNode() == SmtpMessage.ALL;
109            }
110    
111            /**
112             * For this message, we take the list of recipients, organize these into
113             * distinct servers, and deliver the message for each of these servers.
114             */
115            public void service(Set<Recipient> recipients, SmtpMessage message)
116                            throws MessagingException {
117                    // Organize the recipients into distinct servers (name made case insensitive)
118                    Hashtable<String, Collection<Recipient>> targets = new Hashtable<String, Collection<Recipient>>();
119                    for (Recipient recipient : recipients) {
120                            String targetServer = recipient.getHost().toLowerCase(Locale.US);
121                            if (!Config.isLocal(targetServer)) {
122                                    Collection<Recipient> temp = targets.get(targetServer);
123                                    if (temp == null) {
124                                            temp = new ArrayList<Recipient>();
125                                            targets.put(targetServer, temp);
126                                    }
127                                    temp.add(recipient);
128                            }
129                    }
130                    
131                    if (targets.isEmpty()) {
132                            return;
133                    }
134                    
135                    // We have the recipients organized into distinct servers
136                    List<Recipient> undelivered = new ArrayList<Recipient>();
137                    MimeMessage mimemsg = message.getMimeMessage();
138                    boolean deleteMessage = true;
139                    for (String targetServer : targets.keySet()) {
140                            Collection<Recipient> temp = targets.get(targetServer);
141                            if (!deliver(targetServer, temp, message, mimemsg)) {
142                                    // Retry this message later
143                                    deleteMessage = false;
144                                    if (!temp.isEmpty()) {
145                                            undelivered.addAll(temp);
146                                    }
147                            }
148                    }
149                    if (!deleteMessage) {
150                            try {
151                                    message.setRetryCount(message.getRetryCount() + 1);
152                                    message.setLastUpdate(new Date());
153                                    recipients.clear();
154                                    recipients.addAll(undelivered);
155                                    message.store();
156                            } catch (IOException e) {
157                            }
158                    }
159            }
160    
161            /**
162             * We arranged that the recipients are all going to the same mail server. We
163             * will now rely on the DNS server to do DNS MX record lookup and try to
164             * deliver to the multiple mail servers. If it fails, we should decide that
165             * the failure is permanent or temporary.
166             * 
167             * @param host
168             *            the same host of recipients
169             * @param recipients
170             *            recipients who are all going to the same mail server
171             * @param message
172             *            Mail object to be delivered
173             * @param mimemsg
174             *            MIME message representation of the message
175             * @return true if the delivery was successful or permanent failure so the
176             *         message should be deleted, otherwise false and the message will
177             *         be tried to send again later
178             */
179            private boolean deliver(String host, Collection<Recipient> recipients,
180                            SmtpMessage message, MimeMessage mimemsg) {
181                    // Prepare javamail recipients
182                    InternetAddress[] addresses = new InternetAddress[recipients.size()];
183                    Iterator<Recipient> it = recipients.iterator();
184                    for (int i = 0; it.hasNext(); i++) {
185                            Recipient rcpt = it.next();
186                            addresses[i] = rcpt.toInternetAddress();
187                    }
188                    
189                    try {
190                            // Lookup the possible targets
191                            Iterator<HostAddress> targetServers = getSmtpHostAddress(host);
192                            if (!targetServers.hasNext()) {
193                                    logger.info("No mail server found for: " + host);
194                                    StringBuilder exceptionBuffer = new StringBuilder(128).append(
195                                                    "There are no DNS entries for the hostname ").append(
196                                                    host).append(
197                                                    ".  I cannot determine where to send this message.");
198                                    return failMessage(message, addresses, new MessagingException(
199                                                    exceptionBuffer.toString()), false);
200                            }
201                            
202                            Properties props = session.getProperties();
203                            if (message.isNotificationMessage()) {
204                                    props.put("mail.smtp.from", "<>");
205                            } else {
206                                    props.put("mail.smtp.from", message.getFrom().getMailbox());
207                            }
208                    
209                            MessagingException lastError = null;
210                            StringBuilder logBuffer = null;
211                            HostAddress outgoingMailServer = null;
212                            while (targetServers.hasNext()) {
213                                    try {
214                                            outgoingMailServer = targetServers.next();
215                                            logBuffer = new StringBuilder(256)
216                                                            .append("Attempting to deliver message to host ")
217                                                            .append(outgoingMailServer.getHostname())
218                                                            .append(" at ")
219                                                            .append(outgoingMailServer.getHost())
220                                                            .append(" for addresses ")
221                                                            .append(Arrays.asList(addresses));
222                                            logger.info(logBuffer.toString());
223                                            Transport transport = null;
224                                            try {
225                                                    transport = session.getTransport(outgoingMailServer);
226                                                    try {
227                                                            transport.connect();
228                                                    } catch (MessagingException e) {
229                                                            // Any error on connect should cause the mailet to
230                                                            // attempt to connect to the next SMTP server
231                                                            // associated with this MX record.
232                                                            logger.error(e.getMessage());
233                                                            continue;
234                                                    }
235                                                    transport.sendMessage(mimemsg, addresses);
236                                            } finally {
237                                                    if (transport != null) {
238                                                            try {
239                                                                    transport.close();
240                                                            } catch (MessagingException e) {
241                                                            }
242                                                            transport = null;
243                                                    }
244                                            }
245                                            logBuffer = new StringBuilder(256)
246                                                            .append("Successfully sent message to host ")
247                                                            .append(outgoingMailServer.getHostname())
248                                                            .append(" at ")
249                                                            .append(outgoingMailServer.getHost())
250                                                            .append(" for addresses ")
251                                                            .append(Arrays.asList(addresses));
252                                            logger.info(logBuffer.toString());
253                                            recipients.clear();
254                                            return true;
255                                    } catch (SendFailedException sfe) {
256                                            if (sfe.getValidSentAddresses() != null) {
257                                                    Address[] validSent = sfe.getValidSentAddresses();
258                                                    if (validSent.length > 0) {
259                                                            logBuffer = new StringBuilder(256)
260                                                                            .append("Successfully sent message to host ")
261                                                                            .append(outgoingMailServer.getHostname())
262                                                                            .append(" at ")
263                                                                            .append(outgoingMailServer.getHost())
264                                                                            .append(" for addresses ")
265                                                                            .append(Arrays.asList(validSent));
266                                                            logger.info(logBuffer.toString());
267                                                            // Remove the addresses to which this message was
268                                                            // sent successfully
269                                                            List<InternetAddress> temp = new ArrayList<InternetAddress>();
270                                                            for (int i = 0; i < addresses.length; i++) {
271                                                                    if (!ArrayUtils.contains(validSent, addresses[i])) {
272                                                                            if (addresses[i] != null) {
273                                                                                    temp.add(addresses[i]);
274                                                                            }
275                                                                    }
276                                                            }
277                                                            addresses = temp.toArray(new InternetAddress[temp.size()]);
278                                                            removeAll(recipients, validSent);
279                                                    }
280                                            }
281                                            
282                                            if (sfe instanceof SMTPSendFailedException) {
283                                                    SMTPSendFailedException ssfe = (SMTPSendFailedException) sfe;
284                                                    // If permanent error 5xx, terminate this delivery
285                                                    // attempt by re-throwing the exception
286                                                    if (ssfe.getReturnCode() >= 500 && ssfe.getReturnCode() <= 599)
287                                                            throw sfe;
288                                            }
289                                            
290                                            if (!ArrayUtils.isEmpty(sfe.getValidUnsentAddresses())) {
291                                                    // Valid addresses remained, so continue with any other server.
292                                                    if (logger.isDebugEnabled())
293                                                            logger
294                                                                            .debug("Send failed, "
295                                                                                            + sfe.getValidUnsentAddresses().length
296                                                                                            + " valid recipients("
297                                                                                            + Arrays.asList(sfe
298                                                                                                            .getValidUnsentAddresses())
299                                                                                            + ") remain, continuing with any other servers");
300                                        lastError = sfe;
301                                                    continue;
302                                            } else {
303                                                    // There are no valid addresses left to send, so re-throw
304                                                    throw sfe;
305                                            }
306                                    } catch (MessagingException me) {
307                                            Exception ne;
308                                            if ((ne = me.getNextException()) != null
309                                                            && ne instanceof IOException) {
310                                                    // It can be some socket or weird I/O related problem.
311                                                    lastError = me;
312                                                    continue;
313                                            }
314                                            throw me;
315                                    }
316                            } // end while
317                            if (lastError != null) {
318                                    throw lastError;
319                            }
320                    } catch (SendFailedException sfe) {
321                boolean deleteMessage = false;
322                
323                            if (sfe instanceof SMTPSendFailedException) {
324                                    SMTPSendFailedException ssfe = (SMTPSendFailedException) sfe;
325                                    deleteMessage = (ssfe.getReturnCode() >= 500 && ssfe.getReturnCode() <= 599);
326                            } else {
327                                    // Sometimes we'll get a normal SendFailedException with nested
328                                    // SMTPAddressFailedException, so use the latter RetCode
329                    MessagingException me = sfe;
330                    Exception ne;
331                    while ((ne = me.getNextException()) != null && ne instanceof MessagingException) {
332                        me = (MessagingException)ne;
333                        if (me instanceof SMTPAddressFailedException) {
334                            SMTPAddressFailedException ssfe = (SMTPAddressFailedException)me;
335                            deleteMessage = (ssfe.getReturnCode() >= 500 && ssfe.getReturnCode() <= 599);
336                        }
337                    }
338                            }
339                            if (!ArrayUtils.isEmpty(sfe.getInvalidAddresses())) {
340                                    // Invalid addresses should be considered permanent
341                                    Address[] invalid = sfe.getInvalidAddresses();
342                                    removeAll(recipients, invalid);
343                                    deleteMessage = failMessage(message, invalid, sfe, true);
344                            }
345                            if (!ArrayUtils.isEmpty(sfe.getValidUnsentAddresses())) {
346                                    // Vaild-unsent addresses should be considered temporary
347                                    deleteMessage = failMessage(message, sfe.getValidUnsentAddresses(), sfe, false);
348                            }
349                            return deleteMessage;
350                    } catch (MessagingException mex) {
351                            // Check whether this is a permanent error (like account doesn't
352                            // exist or mailbox is full or domain is setup wrong)
353                            // We fail permanently if this was 5xx error.
354                            return failMessage(message, addresses, mex, ('5' == mex
355                                            .getMessage().charAt(0)));
356                    }
357                    // If we get here, we've exhausted the loop of servers without sending
358                    // the message or throwing an exception.
359                    // One case where this might happen is if there is no server we can
360                    // connect. So this should be considered temporary
361                    return failMessage(message, addresses, new MessagingException(
362                                    "No mail server(s) available at this time."), false);
363            }
364            
365            /**
366             * Build error messages and make sure the error is permanent.
367             * 
368             * @param message
369             *            com.hs.mail.smtp.message.SmtpMessage
370             * @param addresses
371             *            addresses not delivered
372             * @param ex
373             *            javax.mail.MessagingException
374             * @param permanent
375             *            true if the exception is permanent error, false otherwise
376             * @return Whether the message failed fully and can be deleted
377             */
378            private boolean failMessage(SmtpMessage message, Address[] addresses,
379                            MessagingException ex, boolean permanent) {
380            StringBuffer logBuffer =
381                new StringBuffer(64)
382                            .append((permanent) ? "Permanent" : "Temporary")
383                    .append(" exception delivering mail (")
384                    .append(message.getName())
385                    .append("): ")
386                    .append(ex.getMessage().trim());
387            logger.error(logBuffer.toString());
388                    if (!permanent) {
389                            int retries = message.getRetryCount();
390                            if (retries < maxRetries) {
391                                    return false;
392                            }
393                    }
394                    if (message.isNotificationMessage()) {
395                            if (logger.isDebugEnabled())
396                                    logger.debug(
397                                            "Null reverse-path: no bounce will be generated for "
398                                                            + message.getName());
399                            return true;
400                    }
401                    String errorMessage = buildErrorMessage(addresses, ex);
402                    message.appendErrorMessage(errorMessage);
403                    return true;
404            }
405    
406            /**
407             * Build error message from the exception.
408             */
409            private String buildErrorMessage(Address[] addresses, MessagingException ex) {
410                    StringBuilder errorBuffer = new StringBuilder();
411                    if (!ArrayUtils.isEmpty(addresses)) {
412                            for (int i = 0; i < addresses.length; i++) {
413                                    errorBuffer.append(addresses[i].toString()).append("\r\n");
414                            }
415                    }
416                    Exception ne = ex.getNextException();
417                    if (ne == null) {
418                            errorBuffer.append(ex.getMessage().trim());
419                    } else if (ne instanceof SendFailedException) {
420                            errorBuffer.append("Remote server told me: "
421                                            + ne.getMessage().trim());
422                    } else if (ne instanceof UnknownHostException) {
423                            errorBuffer.append("Unknown host: " + ne.getMessage().trim());
424                    } else if (ne instanceof ConnectException) {
425                            // Already formatted as "Connection timed out: connect"
426                            errorBuffer.append(ne.getMessage().trim());
427                    } else if (ne instanceof SocketException) {
428                            errorBuffer.append("Socket exception: " + ne.getMessage().trim());
429                    } else {
430                            errorBuffer.append(ne.getMessage().trim());
431                    }
432                    errorBuffer.append("\r\n");
433                    return errorBuffer.toString();
434            }
435    
436            public static Iterator<HostAddress> getSmtpHostAddress(String domainName) {
437                    DnsServer dns = (DnsServer) ComponentManager.getBean("dns.server");
438                    return dns.getSmtpHostAddress(domainName);
439            }
440            
441            public static void removeAll(Collection<Recipient> recipients,
442                            Address[] addresses) {
443                    for (Iterator<Recipient> it = recipients.iterator(); it.hasNext();) {
444                            Recipient rcpt = it.next();
445                            for (Address address : addresses) {
446                                    if (rcpt.getMailbox().equals(
447                                                    ((InternetAddress) address).getAddress())) {
448                                            it.remove();
449                                            break;
450                                    }
451                            }
452                    }
453            }
454    
455    }