Last active
March 22, 2019 21:11
-
-
Save pannal/ff8066e272e2ecd42621894f6c843dce to your computer and use it in GitHub Desktop.
Rspamd learn with automatic sender-whitelist
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/python3 | |
| import sys | |
| import subprocess | |
| import json | |
| import logging | |
| import argparse | |
| _LOG_LEVEL_STRINGS = ['CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG'] | |
| def _log_level_string_to_int(log_level_string): | |
| if not log_level_string in _LOG_LEVEL_STRINGS: | |
| message = 'invalid choice: {0} (choose from {1})'.format(log_level_string, _LOG_LEVEL_STRINGS) | |
| raise argparse.ArgumentTypeError(message) | |
| log_level_int = getattr(logging, log_level_string, logging.INFO) | |
| # check the logging log_level_choices have not changed from our expected values | |
| assert isinstance(log_level_int, int) | |
| return log_level_int | |
| WL_PATH_DEF = "/etc/rspamd/whitelist.txt" | |
| MP_PATH_DEF = "/usr/local/bin/mailparser" | |
| RC_PATH_DEF = "/usr/bin/rspamc" | |
| parser = argparse.ArgumentParser(description="Learn messages via rspamc and manage a sender whitelist. Depends on https://github.com/SpamScope/mail-parser") | |
| parser.add_argument("message_class", help="What to classify the message as: 'ham' or 'spam'") | |
| parser.add_argument("--input", "-i", nargs='?', type=argparse.FileType('r'), default=sys.stdin, help="Mail message input; read from stdin by default") | |
| parser.add_argument("-w", "--whitelist-path", help="Path to whitelist file; Default: {}".format(WL_PATH_DEF), default=WL_PATH_DEF) | |
| parser.add_argument("-m", "--mailparser-path", help="Path to mailparser binary; Default: {}".format(MP_PATH_DEF), default=MP_PATH_DEF) | |
| parser.add_argument("-r", "--rspamc-path", help="Path to rspamc binary; Default: {}".format(RC_PATH_DEF), default=RC_PATH_DEF) | |
| parser.add_argument("--log-file", help="Path to log file; Default: stdout", default=None) | |
| parser.add_argument('--log-level', | |
| default='INFO', | |
| dest='log_level', | |
| type=_log_level_string_to_int, | |
| nargs='?', | |
| help='Set the logging output level. {0}; Default: INFO'.format(_LOG_LEVEL_STRINGS)) | |
| log = logging.getLogger() | |
| if __name__ == "__main__": | |
| # parse argv | |
| args = parser.parse_args() | |
| # set up logging | |
| formatter = logging.Formatter('%(asctime)s:%(levelname)s: %(message)s') | |
| if args.log_file: | |
| fileHandler = logging.FileHandler("{}".format(args.log_file)) | |
| fileHandler.setFormatter(formatter) | |
| log.addHandler(fileHandler) | |
| else: | |
| consoleHandler = logging.StreamHandler() | |
| consoleHandler.setFormatter(formatter) | |
| log.addHandler(consoleHandler) | |
| log.setLevel(args.log_level) | |
| log.debug("Called rspamd_learn.py") | |
| # main | |
| try: | |
| cls = args.message_class | |
| what = args.input.read() | |
| if not what: | |
| raise ValueError("Either pass the to-be-processed message as stdin or via -i/--input") | |
| # use mailparser to get From | |
| from_lines = json.loads(subprocess.check_output([args.mailparser_path, "-k", "-m"], input=what, universal_newlines=True)) | |
| if cls not in ("spam", "ham"): | |
| raise ValueError("First argument must be 'ham' or 'spam'") | |
| # parse current whitelist and update it according to the current message | |
| with open(args.whitelist_path, "r+", encoding="utf-8") as f: | |
| # this might be a tad naive | |
| whitelist_orig = f.read().split() | |
| whitelist = whitelist_orig[:] | |
| for ommit, addr in from_lines: | |
| if cls == "spam" and addr in whitelist: | |
| action = "remove" | |
| elif cls == "ham" and addr not in whitelist: | |
| action = "append" | |
| else: | |
| log.debug("whitelist: message already marked as {}".format(cls)) | |
| continue | |
| log.info("{}: {}".format(cls, addr)) | |
| getattr(whitelist, action)(addr) | |
| if whitelist_orig != whitelist: | |
| f.truncate(0) | |
| f.seek(0) | |
| f.writelines("\n".join(whitelist)+"\n") | |
| # rspamc learn message | |
| ret = subprocess.check_output([args.rspamc_path, "learn_{}".format(cls)], input=what, universal_newlines=True) | |
| log.debug("Rspamc result: {}".format(ret)) | |
| except Exception as e: | |
| log.exception(e) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment