Skip to content

Instantly share code, notes, and snippets.

@pannal
Last active March 22, 2019 21:11
Show Gist options
  • Select an option

  • Save pannal/ff8066e272e2ecd42621894f6c843dce to your computer and use it in GitHub Desktop.

Select an option

Save pannal/ff8066e272e2ecd42621894f6c843dce to your computer and use it in GitHub Desktop.
Rspamd learn with automatic sender-whitelist
#!/usr/bin/python3
import sys
import subprocess
import json
import logging
import argparse
_LOG_LEVEL_STRINGS = ['CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG']
def _log_level_string_to_int(log_level_string):
if not log_level_string in _LOG_LEVEL_STRINGS:
message = 'invalid choice: {0} (choose from {1})'.format(log_level_string, _LOG_LEVEL_STRINGS)
raise argparse.ArgumentTypeError(message)
log_level_int = getattr(logging, log_level_string, logging.INFO)
# check the logging log_level_choices have not changed from our expected values
assert isinstance(log_level_int, int)
return log_level_int
WL_PATH_DEF = "/etc/rspamd/whitelist.txt"
MP_PATH_DEF = "/usr/local/bin/mailparser"
RC_PATH_DEF = "/usr/bin/rspamc"
parser = argparse.ArgumentParser(description="Learn messages via rspamc and manage a sender whitelist. Depends on https://github.com/SpamScope/mail-parser")
parser.add_argument("message_class", help="What to classify the message as: 'ham' or 'spam'")
parser.add_argument("--input", "-i", nargs='?', type=argparse.FileType('r'), default=sys.stdin, help="Mail message input; read from stdin by default")
parser.add_argument("-w", "--whitelist-path", help="Path to whitelist file; Default: {}".format(WL_PATH_DEF), default=WL_PATH_DEF)
parser.add_argument("-m", "--mailparser-path", help="Path to mailparser binary; Default: {}".format(MP_PATH_DEF), default=MP_PATH_DEF)
parser.add_argument("-r", "--rspamc-path", help="Path to rspamc binary; Default: {}".format(RC_PATH_DEF), default=RC_PATH_DEF)
parser.add_argument("--log-file", help="Path to log file; Default: stdout", default=None)
parser.add_argument('--log-level',
default='INFO',
dest='log_level',
type=_log_level_string_to_int,
nargs='?',
help='Set the logging output level. {0}; Default: INFO'.format(_LOG_LEVEL_STRINGS))
log = logging.getLogger()
if __name__ == "__main__":
# set up logging
args = parser.parse_args()
formatter = logging.Formatter('%(asctime)s:%(levelname)s: %(message)s')
if args.log_file:
fileHandler = logging.FileHandler("{}".format(args.log_file))
fileHandler.setFormatter(formatter)
log.addHandler(fileHandler)
else:
consoleHandler = logging.StreamHandler()
consoleHandler.setFormatter(formatter)
log.addHandler(consoleHandler)
log.setLevel(args.log_level)
log.debug("Called rspamd_learn.py")
# main
try:
cls = args.message_class
what = args.input.read()
if not what:
raise ValueError("Either pass the to-be-processed message as stdin or via -i/--input")
from_lines = json.loads(subprocess.check_output([args.mailparser_path, "-k", "-m"], input=what, universal_newlines=True))
if cls not in ("spam", "ham"):
raise ValueError("First argument must be 'ham' or 'spam'")
with open(args.whitelist_path, "r+", encoding="utf-8") as f:
whitelist_orig = f.read().split()
whitelist = whitelist_orig[:]
for ommit, addr in from_lines:
if cls == "spam" and addr in whitelist:
action = "remove"
elif cls == "ham" and addr not in whitelist:
action = "append"
else:
continue
log.info("{}: {}".format(cls, addr))
getattr(whitelist, action)(addr)
if whitelist_orig != whitelist:
f.truncate(0)
f.seek(0)
f.writelines("\n".join(whitelist)+"\n")
# rspamc learn message
ret = subprocess.check_output([args.rspamc_path, "learn_{}".format(cls)], input=what, universal_newlines=True)
log.debug("Rspamc result: {}".format(ret))
except Exception as e:
log.exception(e)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment