#!/usr/bin/python3 import sys import subprocess import json import logging import argparse import email import mailparser _LOG_LEVEL_STRINGS = ['CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG'] def _log_level_string_to_int(log_level_string): if not log_level_string in _LOG_LEVEL_STRINGS: message = 'invalid choice: {0} (choose from {1})'.format(log_level_string, _LOG_LEVEL_STRINGS) raise argparse.ArgumentTypeError(message) log_level_int = getattr(logging, log_level_string, logging.INFO) # check the logging log_level_choices have not changed from our expected values assert isinstance(log_level_int, int) return log_level_int WL_PATH_DEF = "/etc/rspamd/local.d/whitelist.txt" RC_PATH_DEF = "/usr/bin/rspamc" parser = argparse.ArgumentParser(description="""Learn messages via rspamc and manage a sender whitelist. Depends on https://github.com/SpamScope/mail-parser.\n\nUse with local.d/multimap.conf:\nSENDER_FROM_WHITELIST {{ type = "from"; map = "file://{}"; # default; set this via -w/--whitelist-path prefilter = true; action = "accept"; filter = "email"; }}""".format(WL_PATH_DEF), formatter_class=argparse.RawDescriptionHelpFormatter) parser.add_argument("message_class", help="What to classify the message as: 'ham' or 'spam'") parser.add_argument("--input", "-i", nargs='?', type=argparse.FileType('r'), default=sys.stdin, help="Mail message input; read from stdin by default") parser.add_argument("-w", "--whitelist-path", help="Path to whitelist file *NEEDS TO BE WRITABLE BY THE CURRENT USER*; Default: {}".format(WL_PATH_DEF), default=WL_PATH_DEF) parser.add_argument("-r", "--rspamc-path", help="Path to rspamc binary; Default: {}".format(RC_PATH_DEF), default=RC_PATH_DEF) parser.add_argument("--log-file", help="Path to log file; Default: stdout", default=None) parser.add_argument('--log-level', default='INFO', dest='log_level', type=_log_level_string_to_int, nargs='?', help='Set the logging output level. {0}; Default: INFO'.format(_LOG_LEVEL_STRINGS)) parser.add_argument('--mailparse-log-level', default='WARNING', dest='mp_log_level', type=_log_level_string_to_int, nargs='?', help='Set the mailparse logging output level. {0}; Default: INFO'.format(_LOG_LEVEL_STRINGS)) log = logging.getLogger() if __name__ == "__main__": # parse argv args = parser.parse_args(args=None if sys.argv[1:] else ['--help']) # set up logging formatter = logging.Formatter('%(asctime)s:%(levelname)s: %(message)s') if args.log_file: fileHandler = logging.FileHandler("{}".format(args.log_file)) fileHandler.setFormatter(formatter) log.addHandler(fileHandler) else: consoleHandler = logging.StreamHandler() consoleHandler.setFormatter(formatter) log.addHandler(consoleHandler) log.setLevel(args.log_level) logging.getLogger("mailparser").setLevel(args.mp_log_level) log.debug("Called rspamd_learn.py") # main try: cls = args.message_class if cls not in ("spam", "ham"): raise ValueError("First argument must be 'ham' or 'spam'") what = args.input.read() if not what: raise ValueError("Either pass the to-be-processed message as stdin or via -i/--input") # use mailparser to get sender addresses mail = mailparser.parse_from_string(what) from_lines = mail.from_[:] for k in ("return_path", "envelope_from", "sender", "x_mail_from"): val = getattr(mail, k) if not val: continue if not isinstance(val, list): val = [val] from_lines += email.utils.getaddresses(val) # parse current whitelist and update it according to the current message with open(args.whitelist_path, "r+", encoding="utf-8") as f: # this might be a tad naive whitelist_orig = f.read().split() whitelist = whitelist_orig[:] for omit, addr in list(set(from_lines)): if cls == "spam" and addr in whitelist: action = "remove" elif cls == "ham" and addr not in whitelist: action = "append" else: log.debug("whitelist: {} already marked as {}".format(addr, cls)) continue log.info("{}: {}".format(cls, addr)) getattr(whitelist, action)(addr) if whitelist_orig != whitelist: f.truncate(0) f.seek(0) f.writelines("\n".join(whitelist)+"\n") # rspamc learn message ret = subprocess.check_output([args.rspamc_path, "learn_{}".format(cls)], input=what, universal_newlines=True) log.debug("Rspamc result: {}".format(ret)) except Exception as e: log.exception(e) sys.exit(1)