-
-
Save mattisz/d112ebfe1869c56ce111ecbd2cbbd04d to your computer and use it in GitHub Desktop.
Revisions
-
mattisz revised this gist
May 5, 2023 . 1 changed file with 185 additions and 108 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -24,13 +24,12 @@ import re import requests import logging import json from base64 import b64encode from datetime import datetime import xml.etree.ElementTree as etree from urllib.parse import urlparse REQUEST_TIMEOUT = 5.0 class IPMIUpdater: @@ -85,53 +84,95 @@ def get_xhr_headers(self, url_name): headers["X-Requested-With"] = "XMLHttpRequest" return headers def login(self, username, password, model): """ Log into IPMI interface :param username: username to use for logging in :param password: password to use for logging in :return: bool """ if model != "X12": if self.use_b64encoded_login: login_data = { 'name': b64encode(username.encode("UTF-8")), 'pwd': b64encode(password.encode("UTF-8")), 'check': '00' } else: login_data = { 'name': username, 'pwd': password } try: result = self.session.post(self.login_url, login_data, timeout=REQUEST_TIMEOUT, verify=False) except ConnectionError: return False if not result.ok: return result.status_code if '/cgi/url_redirect.cgi?url_name=mainmenu' not in result.text: return False # Set mandatory cookies: url_parts = urlparse(self.ipmi_url) # Cookie: langSetFlag=0; language=English; SID=<dynamic session ID here!>; mainpage=configuration; subpage=config_ssl mandatory_cookies = { 'langSetFlag': '0', 'language': 'English' } for cookie_name, cookie_value in mandatory_cookies.items(): self.session.cookies.set(cookie_name, cookie_value, domain=url_parts.hostname) return True else: login_data = { 'UserName': username, 'Password': password } request_headers = {'Content-Type': 'application/json'} try: result = self.session.post(self.login_url, data=json.dumps(login_data), headers=request_headers, timeout=REQUEST_TIMEOUT, verify=False) except ConnectionError: return False if not result.ok: return result.status_code return result def get_ipmi_cert_info(self, model, token): """ Verify existing certificate information :return: dict """ if model == "X12": request_headers = { 'Content-Type': 'application/json', 'X-Auth-Token': token } try: r = self.session.get(self.cert_info_url, headers=request_headers, verify=False) except ConnectionError: return False if not r.ok: return False data = r.json() valid_from = datetime.strptime(data['VaildFrom'].rstrip(re.split('\d{4}', data['VaildFrom'])[1]), r"%b %d %H:%M:%S %Y") valid_until = datetime.strptime(data['GoodTHRU'].rstrip(re.split('\d{4}', data['GoodTHRU'])[1]), r"%b %d %H:%M:%S %Y") return { 'has_cert': True, 'valid_from': valid_from, 'valid_until': valid_until } headers = self.get_xhr_headers("config_ssl") cert_info_data = self._get_op_data('SSL_STATUS.XML', '(0,0)') @@ -189,7 +230,7 @@ def get_ipmi_cert_valid(self): status = status[0] return bool(int(status.get('VALIDATE'))) def upload_cert(self, key_file, cert_file, model, token): """ Send X.509 certificate and private key to server :param session: Current session object @@ -206,50 +247,83 @@ def upload_cert(self, key_file, cert_file): # extract certificates only (IMPI doesn't like DH PARAMS) cert_data = b'\n'.join(re.findall(b'-----BEGIN CERTIFICATE-----.*?-----END CERTIFICATE-----', cert_data, re.DOTALL)) + b'\n' if model == 'X12': substr = b'-----END CERTIFICATE-----\n' cert_data = cert_data.split(substr)[0] + substr files_to_upload = self._get_upload_data(cert_data, key_data) request_headers = {'X-Auth-Token': token} try: result = self.session.post(self.upload_cert_url, files=files_to_upload, headers=request_headers, timeout=REQUEST_TIMEOUT, verify=False) except ConnectionError: return False if not 'SSL certificate and private key were successfully uploaded' in result.text: return False return True else: files_to_upload = self._get_upload_data(cert_data, key_data) headers = self.get_csrf_headers("config_ssl") csrf_token = self.get_csrf_token("config_ssl") csrf_data = {} if csrf_token is not None: csrf_data["CSRF_TOKEN"] = csrf_token try: result = self.session.post(self.upload_cert_url, csrf_data, files=files_to_upload, headers=headers, timeout=REQUEST_TIMEOUT, verify=False) except ConnectionError: return False if not result.ok: return False if 'Content-Type' not in result.headers.keys() or result.headers['Content-Type'] != 'text/html': # On failure, Content-Type will be 'text/plain' and 'Transfer-Encoding' is 'chunked' return False if 'CONFPAGE_RESET' not in result.text: return False return True def _check_reboot_result(self, result): return True def reboot_ipmi(self, model, token): if model != 'X12': # do we need a different Referer here? headers = self.get_xhr_headers("config_ssl") reboot_data = self._get_op_data('main_bmcreset', None) try: result = self.session.post(self.reboot_url, reboot_data, headers=headers, timeout=REQUEST_TIMEOUT, verify=False) except ConnectionError: return False if not result.ok: return False if not self._check_reboot_result(result): return False return True else: request_headers = {'X-Auth-Token': token} try: result = self.session.post(self.reboot_url, headers=request_headers, timeout=REQUEST_TIMEOUT, verify=False) except ConnectionError: return False if not result.ok: return False return True class IPMIX9Updater(IPMIUpdater): @@ -385,6 +459,21 @@ def _get_upload_data(self, cert_data, key_data): ('cert_file', ('fullchain.pem', cert_data, 'application/octet-stream')), ('key_file', ('privkey.pem', key_data, 'application/octet-stream')) ] class IPMIX12Updater(IPMIUpdater): def __init__(self, session, ipmi_url): super().__init__(session, ipmi_url) self.login_url = f'{ipmi_url}/redfish/v1/SessionService/Sessions' self.cert_info_url = f'{ipmi_url}/redfish/v1/UpdateService/Oem/Supermicro/SSLCert' self.upload_cert_url = f'{ipmi_url}/redfish/v1/UpdateService/Oem/Supermicro/SSLCert/Actions/SmcSSLCert.Upload' self.reboot_url = f'{ipmi_url}/redfish/v1/Managers/1/Actions/Manager.Reset' self.use_b64encoded_login = False def _get_upload_data(self, cert_data, key_data): return { 'cert_file' : cert_data, 'key_file' : key_data } def parse_valid_until(pem): from datetime import datetime @@ -396,59 +485,34 @@ def parse_valid_until(pem): def create_updater(args): session = requests.session() if not args.quiet: print("Board model is " + args.model) if args.model == "X10": return IPMIX10Updater(session, args.ipmi_url) elif args.model == "X11": return IPMIX11Updater(session, args.ipmi_url) elif args.model == "X9": return IPMIX9Updater(session, args.ipmi_url) elif args.model == "X12": return IPMIX12Updater(session, args.ipmi_url) else: raise Exception(f"Unknown model: {args.model}") def main(): parser = argparse.ArgumentParser(description='Update Supermicro IPMI SSL certificate') parser.add_argument('--ipmi-url', required=True, help='Supermicro IPMI 2.0 URL') parser.add_argument('--model', required=True, help='Board model: X9, X10, X11, X12, X13') parser.add_argument('--key-file', required=True, help='X.509 Private key filename') parser.add_argument('--cert-file', required=True, help='X.509 Certificate filename') parser.add_argument('--username', required=True, help='IPMI username with admin access') parser.add_argument('--password', required=True, help='IPMI user password') parser.add_argument('--no-reboot', action='store_true', help='The default is to reboot the IPMI after upload for the change to take effect.') parser.add_argument('--force-update', action='store_true', @@ -480,6 +544,9 @@ def main(): requests_log.setLevel(logging.DEBUG) requests_log.propagate = True if args.model == "X13": args.model = "X12" # Start the operation requests.packages.urllib3.disable_warnings(requests.packages.urllib3.exceptions.InsecureRequestWarning) @@ -489,11 +556,20 @@ def main(): debug_log.setLevel(logging.DEBUG) updater.setLogger(debug_log) login_response = updater.login(args.username, args.password, args.model) if not login_response: print("Login failed. Cannot continue!") exit(2) elif args.model == 'X12': try: token = login_response.headers['X-Auth-Token'] except: print(f'ERROR: Login failed with error {login_response}') exit(2) else: token = None cert_info = updater.get_ipmi_cert_info(args.model, token) if not cert_info: print("Failed to extract certificate information from IPMI!") exit(2) @@ -510,19 +586,20 @@ def main(): print("New cert validity period matches existing cert, will update regardless") # Go upload! if not updater.upload_cert(args.key_file, args.cert_file, args.model, token): print("Failed to upload X.509 files to IPMI!") exit(2) if args.model != 'X12': cert_valid = updater.get_ipmi_cert_valid() if not cert_valid: print("Uploads failed validation") exit(2) if not args.quiet: print("Uploaded files ok.") cert_info = updater.get_ipmi_cert_info(args.model, token) if not cert_info: print("Failed to extract certificate information from IPMI!") exit(2) @@ -532,7 +609,7 @@ def main(): if not args.no_reboot: if not args.quiet: print("Rebooting IPMI to apply changes.") if not updater.reboot_ipmi(args.model, token): print("Rebooting failed! Go reboot it manually?") if not args.quiet: -
dekimsey revised this gist
Aug 31, 2021 . 1 changed file with 79 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -251,6 +251,83 @@ def reboot_ipmi(self): return False return True class IPMIX9Updater(IPMIUpdater): class TLSv1HttpAdapter(requests.adapters.HTTPAdapter): """"Transport adapter" that allows us to use SSLv3.""" def init_poolmanager(self, connections, maxsize, block=False): import ssl from urllib3.poolmanager import PoolManager ctx = ssl.SSLContext(protocol=ssl.PROTOCOL_TLSv1) ctx.load_default_certs() ctx.set_ciphers('DEFAULT@SECLEVEL=1') self.poolmanager = PoolManager( num_pools=connections, maxsize=maxsize, block=block, ssl_context=ctx) def __init__(self, session, ipmi_url): super().__init__(session, ipmi_url) self.reboot_url = f'{ipmi_url}/cgi/BMCReset.cgi' self.use_b64encoded_login = False self.session.mount('https://', IPMIX9Updater.TLSv1HttpAdapter()) def _get_op_data(self, op, r): timestamp = datetime.utcnow().strftime('%a %d %b %Y %H:%M:%S GMT') data = { 'time_stamp': timestamp # 'Thu Jul 12 2018 19:52:48 GMT+0300 (FLE Daylight Time)' } if r is not None: data[op] = r return data def _get_upload_data(self, cert_data, key_data): return [ ('sslcrt_file', ('cert.pem', cert_data, 'application/octet-stream')), ('privkey_file', ('privkey.pem', key_data, 'application/octet-stream')) ] def _check_reboot_result(self, result): self.logger.debug(result.text) root = etree.fromstring(result.text) # <?xml> <IPMI> <SSL_INFO> status = root.findall('.//BMC_RESET/STATE') if not status: return False if status[0].get('CODE') == 'OK': return True return False #if '<STATE CODE="OK"/>' not in result.text: # return False def get_ipmi_cert_valid(self): """ Verify existing certificate information :return: bool """ headers = self.get_xhr_headers("config_ssl") cert_info_data = self._get_op_data('SSL_VALIDATE.XML', '(0,0)') try: result = self.session.post(self.cert_info_url, cert_info_data, headers=headers, timeout=REQUEST_TIMEOUT, verify=False) except ConnectionError: return False if not result.ok: return False self.logger.debug(result.text) root = etree.fromstring(result.text) # <?xml> <IPMI> <SSL_INFO> <VALIDATE> status = root.findall('.//SSL_INFO/VALIDATE') if not status: return False # Since xpath will return a list, just pick the first one from it. status = status[0] return bool(int(status.get('CERT'))) and bool(int(status.get('KEY'))) class IPMIX10Updater(IPMIUpdater): def __init__(self, session, ipmi_url): super().__init__(session, ipmi_url) @@ -331,6 +408,8 @@ def create_updater(args): return IPMIX10Updater(session, args.ipmi_url) elif model == "X11": return IPMIX11Updater(session, args.ipmi_url) elif model == "X9": return IPMIX9Updater(session, args.ipmi_url) else: raise Exception(f"Unknown model: {model}") -
dekimsey revised this gist
Aug 31, 2021 . 1 changed file with 19 additions and 2 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -153,8 +153,8 @@ def get_ipmi_cert_info(self): status = status[0] has_cert = bool(int(status.get('CERT_EXIST'))) if has_cert: valid_from = datetime.strptime(status.get('VALID_FROM'), r"%b %d %H:%M:%S %Y") valid_until = datetime.strptime(status.get('VALID_UNTIL'), r"%b %d %H:%M:%S %Y") return { 'has_cert': has_cert, @@ -309,6 +309,12 @@ def _get_upload_data(self, cert_data, key_data): ('key_file', ('privkey.pem', key_data, 'application/octet-stream')) ] def parse_valid_until(pem): from datetime import datetime from OpenSSL import crypto as c with open(pem, 'rb') as fh: cert = c.load_certificate(c.FILETYPE_PEM, fh.read()) return datetime.strptime(cert.get_notAfter().decode('utf8'), "%Y%m%d%H%M%SZ") def create_updater(args): session = requests.session() @@ -366,6 +372,8 @@ def main(): help='IPMI user password [%(default)s]', default=os.environ.get('IPMI_UPDATER_PASSWORD', None)) parser.add_argument('--no-reboot', action='store_true', help='The default is to reboot the IPMI after upload for the change to take effect.') parser.add_argument('--force-update', action='store_true', help='Ignore the cert end date check, always replace the cert.') parser.add_argument('--quiet', action='store_true', help='Do not output anything if successful') parser.add_argument('--debug', action='store_true', @@ -410,8 +418,17 @@ def main(): if not cert_info: print("Failed to extract certificate information from IPMI!") exit(2) current_valid_until = cert_info.get('valid_until', None) if not args.quiet and cert_info['has_cert']: print("There exists a certificate, which is valid until: %s" % cert_info['valid_until']) new_valid_until = parse_valid_until(args.cert_file) if current_valid_until == new_valid_until: if not args.force_update: print("New cert validity period matches existing cert, nothing to do") exit(0) else: print("New cert validity period matches existing cert, will update regardless") # Go upload! if not updater.upload_cert(args.key_file, args.cert_file): -
dekimsey revised this gist
Aug 31, 2021 . 1 changed file with 2 additions and 2 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -362,8 +362,8 @@ def main(): help='X.509 Certificate filename') parser.add_argument('--username', required=True, help='IPMI username with admin access') parser.add_argument('--password', required=False, help='IPMI user password [%(default)s]', default=os.environ.get('IPMI_UPDATER_PASSWORD', None)) parser.add_argument('--no-reboot', action='store_true', help='The default is to reboot the IPMI after upload for the change to take effect.') parser.add_argument('--quiet', action='store_true', -
dekimsey revised this gist
Aug 31, 2021 . 1 changed file with 5 additions and 4 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -26,8 +26,9 @@ import logging from base64 import b64encode from datetime import datetime import xml.etree.ElementTree as etree from urllib.parse import urlparse from requests.auth import HTTPBasicAuth REQUEST_TIMEOUT = 5.0 @@ -145,7 +146,7 @@ def get_ipmi_cert_info(self): self.logger.debug(result.text) root = etree.fromstring(result.text) # <?xml> <IPMI> <SSL_INFO> <STATUS> status = root.findall('.//SSL_INFO/STATUS') if not status: return False # Since xpath will return a list, just pick the first one from it. @@ -181,7 +182,7 @@ def get_ipmi_cert_valid(self): self.logger.debug(result.text) root = etree.fromstring(result.text) # <?xml> <IPMI> <SSL_INFO> status = root.findall('.//SSL_INFO') if not status: return False # Since xpath will return a list, just pick the first one from it. @@ -276,7 +277,7 @@ def _check_reboot_result(self, result): self.logger.debug(result.text) root = etree.fromstring(result.text) # <?xml> <IPMI> <SSL_INFO> status = root.findall('.//BMC_RESET/STATE') if not status: return False if status[0].get('CODE') == 'OK': -
mcdamo revised this gist
Oct 20, 2020 . 1 changed file with 9 additions and 4 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -312,10 +312,11 @@ def _get_upload_data(self, cert_data, key_data): def create_updater(args): session = requests.session() if args.model is None: model = determine_model(session, args.ipmi_url, args.debug) else: model = args.model if not args.quiet: print("Board model is " + model) @@ -326,14 +327,18 @@ def create_updater(args): else: raise Exception(f"Unknown model: {model}") def determine_model(session, ipmi_url, debug): redfish_url = f'{ipmi_url}/redfish/v1/' try: r = session.get(redfish_url, timeout=REQUEST_TIMEOUT, verify=False) except (ConnectionError, requests.exceptions.SSLError) as err: print("Failed to determine model: connection error") if debug: print(err) exit(2) if not r.ok: print(f"Failed to determine model (try --model): {r.status_code} {r.reason}") exit(2) data = r.json() -
mcdamo revised this gist
Oct 20, 2020 . 1 changed file with 40 additions and 13 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -45,7 +45,14 @@ def __init__(self, session, ipmi_url): self.use_b64encoded_login = True self._csrf_token = None error_log = logging.getLogger("IPMIUpdater") error_log.setLevel(logging.ERROR) self.setLogger(error_log) def setLogger(self, logger): self.logger = logger def get_csrf_token(self, url_name): if self._csrf_token is not None: return self._csrf_token @@ -69,7 +76,7 @@ def get_csrf_headers(self, url_name): if csrf_token is not None: headers["CSRF_TOKEN"] = csrf_token self.logger.debug("HEADERS:%s" % headers) return headers def get_xhr_headers(self, url_name): @@ -135,15 +142,15 @@ def get_ipmi_cert_info(self): if not result.ok: return False self.logger.debug(result.text) root = etree.fromstring(result.text) # <?xml> <IPMI> <SSL_INFO> <STATUS> status = root.xpath('//IPMI/SSL_INFO/STATUS') if not status: return False # Since xpath will return a list, just pick the first one from it. status = status[0] has_cert = bool(int(status.get('CERT_EXIST'))) if has_cert: valid_from = status.get('VALID_FROM') valid_until = status.get('VALID_UNTIL') @@ -171,15 +178,15 @@ def get_ipmi_cert_valid(self): if not result.ok: return False self.logger.debug(result.text) root = etree.fromstring(result.text) # <?xml> <IPMI> <SSL_INFO> status = root.xpath('//IPMI/SSL_INFO') if not status: return False # Since xpath will return a list, just pick the first one from it. status = status[0] return bool(int(status.get('VALIDATE'))) def upload_cert(self, key_file, cert_file): """ @@ -261,13 +268,22 @@ def _get_op_data(self, op, r): def _get_upload_data(self, cert_data, key_data): return [ ('cert_file', ('cert.pem', cert_data, 'application/octet-stream')), ('key_file', ('privkey.pem', key_data, 'application/octet-stream')) ] def _check_reboot_result(self, result): self.logger.debug(result.text) root = etree.fromstring(result.text) # <?xml> <IPMI> <SSL_INFO> status = root.xpath('//IPMI/BMC_RESET/STATE') if not status: return False if status[0].get('CODE') == 'OK': return True return False #if '<STATE CODE="OK"/>' not in result.text: # return False class IPMIX11Updater(IPMIUpdater): @@ -296,8 +312,10 @@ def _get_upload_data(self, cert_data, key_data): def create_updater(args): session = requests.session() if not args.model: model = determine_model(session, args.ipmi_url) else: model = args.model if not args.quiet: print("Board model is " + model) @@ -330,6 +348,8 @@ def main(): parser = argparse.ArgumentParser(description='Update Supermicro IPMI SSL certificate') parser.add_argument('--ipmi-url', required=True, help='Supermicro IPMI 2.0 URL') parser.add_argument('--model', required=False, help='Board model, eg. X10 or X11') parser.add_argument('--key-file', required=True, help='X.509 Private key filename') parser.add_argument('--cert-file', required=True, @@ -342,6 +362,8 @@ def main(): help='The default is to reboot the IPMI after upload for the change to take effect.') parser.add_argument('--quiet', action='store_true', help='Do not output anything if successful') parser.add_argument('--debug', action='store_true', help='Output additional debugging') args = parser.parse_args() # Confirm args @@ -354,11 +376,11 @@ def main(): if args.ipmi_url[-1] == '/': args.ipmi_url = args.ipmi_url[0:-1] if args.debug: import http.client as http_client http_client.HTTPConnection.debuglevel = 1 # Enable request logging logging.basicConfig() logging.getLogger().setLevel(logging.DEBUG) requests_log = logging.getLogger("requests.packages.urllib3") @@ -369,6 +391,10 @@ def main(): requests.packages.urllib3.disable_warnings(requests.packages.urllib3.exceptions.InsecureRequestWarning) updater = create_updater(args) if args.debug: debug_log = logging.getLogger("IPMIUpdater") debug_log.setLevel(logging.DEBUG) updater.setLogger(debug_log) if not updater.login(args.username, args.password): print("Login failed. Cannot continue!") @@ -413,3 +439,4 @@ def main(): if __name__ == "__main__": main() -
mcdamo revised this gist
Oct 20, 2020 . 1 changed file with 300 additions and 184 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -21,186 +21,310 @@ import os import argparse import re import requests import logging from base64 import b64encode from datetime import datetime from lxml import etree from urllib.parse import urlparse from requests.auth import HTTPBasicAuth REQUEST_TIMEOUT = 5.0 class IPMIUpdater: def __init__(self, session, ipmi_url): self.session = session self.ipmi_url = ipmi_url self.login_url = f'{ipmi_url}/cgi/login.cgi' self.cert_info_url = f'{ipmi_url}/cgi/ipmi.cgi' self.upload_cert_url = f'{ipmi_url}/cgi/upload_ssl.cgi' self.url_redirect_template = f'{ipmi_url}/cgi/url_redirect.cgi?url_name=%s' self.use_b64encoded_login = True self._csrf_token = None def get_csrf_token(self, url_name): if self._csrf_token is not None: return self._csrf_token page_url = self.url_redirect_template % url_name result = self.session.get(page_url) result.raise_for_status() match = re.search(r'SmcCsrfInsert\s*\("CSRF_TOKEN",\s*"([^"]*)"\);', result.text) if match: return match.group(1) def get_csrf_headers(self, url_name): page_url = self.url_redirect_template % url_name headers = { "Origin": self.ipmi_url, "Referer": page_url, } csrf_token = self.get_csrf_token(url_name) if csrf_token is not None: headers["CSRF_TOKEN"] = csrf_token print("HEADERS:", headers) return headers def get_xhr_headers(self, url_name): headers = self.get_csrf_headers(url_name) headers["X-Requested-With"] = "XMLHttpRequest" return headers def login(self, username, password): """ Log into IPMI interface :param username: username to use for logging in :param password: password to use for logging in :return: bool """ if self.use_b64encoded_login: login_data = { 'name': b64encode(username.encode("UTF-8")), 'pwd': b64encode(password.encode("UTF-8")), 'check': '00' } else: login_data = { 'name': username, 'pwd': password } try: result = self.session.post(self.login_url, login_data, timeout=REQUEST_TIMEOUT, verify=False) except ConnectionError: return False if not result.ok: return False if '/cgi/url_redirect.cgi?url_name=mainmenu' not in result.text: return False # Set mandatory cookies: url_parts = urlparse(self.ipmi_url) # Cookie: langSetFlag=0; language=English; SID=<dynamic session ID here!>; mainpage=configuration; subpage=config_ssl mandatory_cookies = { 'langSetFlag': '0', 'language': 'English' } for cookie_name, cookie_value in mandatory_cookies.items(): self.session.cookies.set(cookie_name, cookie_value, domain=url_parts.hostname) return True def get_ipmi_cert_info(self): """ Verify existing certificate information :return: dict """ headers = self.get_xhr_headers("config_ssl") cert_info_data = self._get_op_data('SSL_STATUS.XML', '(0,0)') try: result = self.session.post(self.cert_info_url, cert_info_data, headers=headers, timeout=REQUEST_TIMEOUT, verify=False) except ConnectionError: return False if not result.ok: return False root = etree.fromstring(result.text) # <?xml> <IPMI> <SSL_INFO> <STATUS> status = root.xpath('//IPMI/SSL_INFO/STATUS') if not status: return False # Since xpath will return a list, just pick the first one from it. status = status[0] has_cert = int(status.get('CERT_EXIST')) has_cert = bool(has_cert) if has_cert: valid_from = status.get('VALID_FROM') valid_until = status.get('VALID_UNTIL') return { 'has_cert': has_cert, 'valid_from': valid_from, 'valid_until': valid_until } def get_ipmi_cert_valid(self): """ Verify existing certificate information :return: bool """ headers = self.get_xhr_headers("config_ssl") cert_info_data = self._get_op_data('SSL_VALIDATE.XML', '(0,0)') try: result = self.session.post(self.cert_info_url, cert_info_data, headers=headers, timeout=REQUEST_TIMEOUT, verify=False) except ConnectionError: return False if not result.ok: return False root = etree.fromstring(result.text) # <?xml> <IPMI> <SSL_INFO> status = root.xpath('//IPMI/SSL_INFO') if not status: return False # Since xpath will return a list, just pick the first one from it. status = status[0] valid_cert = int(status.get('VALIDATE')) return bool(valid_cert) def upload_cert(self, key_file, cert_file): """ Send X.509 certificate and private key to server :param session: Current session object :type session requests.session :param url: base-URL to IPMI :param key_file: filename to X.509 certificate private key :param cert_file: filename to X.509 certificate PEM :return: """ with open(key_file, 'rb') as filehandle: key_data = filehandle.read() with open(cert_file, 'rb') as filehandle: cert_data = filehandle.read() # extract certificates only (IMPI doesn't like DH PARAMS) cert_data = b'\n'.join(re.findall(b'-----BEGIN CERTIFICATE-----.*?-----END CERTIFICATE-----', cert_data, re.DOTALL)) + b'\n' files_to_upload = self._get_upload_data(cert_data, key_data) headers = self.get_csrf_headers("config_ssl") csrf_token = self.get_csrf_token("config_ssl") csrf_data = {} if csrf_token is not None: csrf_data["CSRF_TOKEN"] = csrf_token try: result = self.session.post(self.upload_cert_url, csrf_data, files=files_to_upload, headers=headers, timeout=REQUEST_TIMEOUT, verify=False) except ConnectionError: return False if not result.ok: return False if 'Content-Type' not in result.headers.keys() or result.headers['Content-Type'] != 'text/html': # On failure, Content-Type will be 'text/plain' and 'Transfer-Encoding' is 'chunked' return False if 'CONFPAGE_RESET' not in result.text: return False return True def _check_reboot_result(self, result): return True def reboot_ipmi(self): # do we need a different Referer here? headers = self.get_xhr_headers("config_ssl") reboot_data = self._get_op_data('main_bmcreset', None) try: result = self.session.post(self.reboot_url, reboot_data, headers=headers, timeout=REQUEST_TIMEOUT, verify=False) except ConnectionError: return False if not result.ok: return False if not self._check_reboot_result(result): return False return True class IPMIX10Updater(IPMIUpdater): def __init__(self, session, ipmi_url): super().__init__(session, ipmi_url) self.reboot_url = f'{ipmi_url}/cgi/BMCReset.cgi' self.use_b64encoded_login = False def _get_op_data(self, op, r): timestamp = datetime.utcnow().strftime('%a %d %b %Y %H:%M:%S GMT') data = { 'time_stamp': timestamp # 'Thu Jul 12 2018 19:52:48 GMT+0300 (FLE Daylight Time)' } if r is not None: data[op] = r return data def _get_upload_data(self, cert_data, key_data): return [ ('/tmp/cert.pem', ('cert.pem', cert_data, 'application/octet-stream')), ('/tmp/key.pem', ('key.pem', key_data, 'application/octet-stream')) ] def _check_reboot_result(self, result): if '<STATE CODE="OK"/>' not in result.text: return False class IPMIX11Updater(IPMIUpdater): def __init__(self, session, ipmi_url): super().__init__(session, ipmi_url) self.reboot_url = f'{ipmi_url}/cgi/op.cgi' self.use_b64encoded_login = True def _get_op_data(self, op, r): data = { 'op': op } if r is not None: data['r'] = r data['_'] = '' return data def _get_upload_data(self, cert_data, key_data): return [ ('cert_file', ('fullchain.pem', cert_data, 'application/octet-stream')), ('key_file', ('privkey.pem', key_data, 'application/octet-stream')) ] def create_updater(args): session = requests.session() # First determine if we are X10 or X11 model = determine_model(session, args.ipmi_url) if not args.quiet: print("Board model is " + model) if model == "X10": return IPMIX10Updater(session, args.ipmi_url) elif model == "X11": return IPMIX11Updater(session, args.ipmi_url) else: raise Exception(f"Unknown model: {model}") def determine_model(session, ipmi_url): redfish_url = f'{ipmi_url}/redfish/v1/' try: r = session.get(redfish_url, timeout=REQUEST_TIMEOUT, verify=False) except ConnectionError: exit(2) if not r.ok: exit(2) data = r.json() # The UpdateService methods are only available on newer X11 based boards if "UpdateService" in data: return "X11" else: return "X10" def main(): parser = argparse.ArgumentParser(description='Update Supermicro IPMI SSL certificate') @@ -231,6 +355,9 @@ def main(): args.ipmi_url = args.ipmi_url[0:-1] if not args.quiet: import http.client as http_client http_client.HTTPConnection.debuglevel = 1 # Enable reuest logging logging.basicConfig() logging.getLogger().setLevel(logging.DEBUG) @@ -240,45 +367,34 @@ def main(): # Start the operation requests.packages.urllib3.disable_warnings(requests.packages.urllib3.exceptions.InsecureRequestWarning) updater = create_updater(args) if not updater.login(args.username, args.password): print("Login failed. Cannot continue!") exit(2) cert_info = updater.get_ipmi_cert_info() if not cert_info: print("Failed to extract certificate information from IPMI!") exit(2) if not args.quiet and cert_info['has_cert']: print("There exists a certificate, which is valid until: %s" % cert_info['valid_until']) # Go upload! if not updater.upload_cert(args.key_file, args.cert_file): print("Failed to upload X.509 files to IPMI!") exit(2) cert_valid = updater.get_ipmi_cert_valid() if not cert_valid: print("Uploads failed validation") exit(2) if not args.quiet: print("Uploaded files ok.") cert_info = updater.get_ipmi_cert_info() if not cert_info: print("Failed to extract certificate information from IPMI!") exit(2) @@ -288,7 +404,7 @@ def main(): if not args.no_reboot: if not args.quiet: print("Rebooting IPMI to apply changes.") if not updater.reboot_ipmi(): print("Rebooting failed! Go reboot it manually?") if not args.quiet: -
oxc revised this gist
Aug 31, 2020 . 1 changed file with 83 additions and 101 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -33,26 +33,24 @@ REQUEST_TIMEOUT = 5.0 class IPMIUpdater: def __init__(self, session, ipmi_url): self.session = session self.ipmi_url = ipmi_url self.login_url = f'{ipmi_url}/cgi/login.cgi' self.cert_info_url = f'{ipmi_url}/cgi/ipmi.cgi' self.upload_cert_url = f'{ipmi_url}/cgi/upload_ssl.cgi' self.url_redirect_template = f'{ipmi_url}/cgi/url_redirect.cgi?url_name=%s' self.use_b64encoded_login = True self._csrf_token = None def get_csrf_token(self, url_name): if self._csrf_token is not None: return self._csrf_token page_url = self.url_redirect_template % url_name result = self.session.get(page_url) result.raise_for_status() @@ -61,7 +59,7 @@ def get_csrf_token(self, url_name): return match.group(1) def get_csrf_headers(self, url_name): page_url = self.url_redirect_template % url_name headers = { "Origin": self.ipmi_url, @@ -86,20 +84,7 @@ def login(self, username, password): :param password: password to use for logging in :return: bool """ if self.use_b64encoded_login: login_data = { 'name': b64encode(username.encode("UTF-8")), 'pwd': b64encode(password.encode("UTF-8")), @@ -112,7 +97,7 @@ def login(self, username, password): } try: result = self.session.post(self.login_url, login_data, timeout=REQUEST_TIMEOUT, verify=False) except ConnectionError: return False if not result.ok: @@ -141,26 +126,10 @@ def get_ipmi_cert_info(self): headers = self.get_xhr_headers("config_ssl") cert_info_data = self._get_op_data('SSL_STATUS.XML', '(0,0)') try: result = self.session.post(self.cert_info_url, cert_info_data, headers=headers, timeout=REQUEST_TIMEOUT, verify=False) except ConnectionError: return False if not result.ok: @@ -193,26 +162,10 @@ def get_ipmi_cert_valid(self): headers = self.get_xhr_headers("config_ssl") cert_info_data = self._get_op_data('SSL_VALIDATE.XML', '(0,0)') try: result = self.session.post(self.cert_info_url, cert_info_data, headers=headers, timeout=REQUEST_TIMEOUT, verify=False) except ConnectionError: return False if not result.ok: @@ -245,24 +198,16 @@ def upload_cert(self, key_file, cert_file): # extract certificates only (IMPI doesn't like DH PARAMS) cert_data = b'\n'.join(re.findall(b'-----BEGIN CERTIFICATE-----.*?-----END CERTIFICATE-----', cert_data, re.DOTALL)) + b'\n' files_to_upload = self._get_upload_data(cert_data, key_data) headers = self.get_csrf_headers("config_ssl") csrf_token = self.get_csrf_token("config_ssl") csrf_data = {} if csrf_token is not None: csrf_data["CSRF_TOKEN"] = csrf_token try: result = self.session.post(self.upload_cert_url, csrf_data, files=files_to_upload, headers=headers, timeout=REQUEST_TIMEOUT, verify=False) except ConnectionError: return False if not result.ok: @@ -278,39 +223,76 @@ def upload_cert(self, key_file, cert_file): return True def _check_reboot_result(self, result): return True def reboot_ipmi(self): # do we need a different Referer here? headers = self.get_xhr_headers("config_ssl") reboot_data = self._get_op_data('main_bmcreset', None) try: result = self.session.post(self.reboot_url, reboot_data, headers=headers, timeout=REQUEST_TIMEOUT, verify=False) except ConnectionError: return False if not result.ok: return False if not self._check_reboot_result(result): return False return True class IPMIX10Updater(IPMIUpdater): def __init__(self, session, ipmi_url): super().__init__(session, ipmi_url) self.reboot_url = f'{ipmi_url}/cgi/BMCReset.cgi' self.use_b64encoded_login = False def _get_op_data(self, op, r): timestamp = datetime.utcnow().strftime('%a %d %b %Y %H:%M:%S GMT') data = { 'time_stamp': timestamp # 'Thu Jul 12 2018 19:52:48 GMT+0300 (FLE Daylight Time)' } if r is not None: data[op] = r return data def _get_upload_data(self, cert_data, key_data): return [ ('/tmp/cert.pem', ('cert.pem', cert_data, 'application/octet-stream')), ('/tmp/key.pem', ('key.pem', key_data, 'application/octet-stream')) ] def _check_reboot_result(self, result): if '<STATE CODE="OK"/>' not in result.text: return False class IPMIX11Updater(IPMIUpdater): def __init__(self, session, ipmi_url): super().__init__(session, ipmi_url) self.reboot_url = f'{ipmi_url}/cgi/op.cgi' self.use_b64encoded_login = True def _get_op_data(self, op, r): data = { 'op': op } if r is not None: data['r'] = r data['_'] = '' return data def _get_upload_data(self, cert_data, key_data): return [ ('cert_file', ('fullchain.pem', cert_data, 'application/octet-stream')), ('key_file', ('privkey.pem', key_data, 'application/octet-stream')) ] def create_updater(args): session = requests.session() @@ -319,8 +301,12 @@ def create_updater(args): if not args.quiet: print("Board model is " + model) if model == "X10": return IPMIX10Updater(session, args.ipmi_url) elif model == "X11": return IPMIX11Updater(session, args.ipmi_url) else: raise Exception(f"Unknown model: {model}") def determine_model(session, ipmi_url): redfish_url = f'{ipmi_url}/redfish/v1/' @@ -391,8 +377,6 @@ def main(): cert_info = updater.get_ipmi_cert_info() if not cert_info: print("Failed to extract certificate information from IPMI!") exit(2) if not args.quiet and cert_info['has_cert']: print("There exists a certificate, which is valid until: %s" % cert_info['valid_until']) @@ -413,8 +397,6 @@ def main(): cert_info = updater.get_ipmi_cert_info() if not cert_info: print("Failed to extract certificate information from IPMI!") exit(2) if not args.quiet and cert_info['has_cert']: print("After upload, there exists a certificate, which is valid until: %s" % cert_info['valid_until']) -
oxc revised this gist
Aug 31, 2020 . 1 changed file with 287 additions and 218 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -24,224 +24,306 @@ import re import requests import logging from base64 import b64encode from datetime import datetime from lxml import etree from urllib.parse import urlparse from requests.auth import HTTPBasicAuth REQUEST_TIMEOUT = 5.0 class IPMIUpdater: def __init__(self, session, ipmi_url, model): self.session = session self.ipmi_url = ipmi_url self.model = model self.LOGIN_URL = f'{ipmi_url}/cgi/login.cgi' self.IPMI_CERT_INFO_URL = f'{ipmi_url}/cgi/ipmi.cgi' self.UPLOAD_CERT_URL = f'{ipmi_url}/cgi/upload_ssl.cgi' self.REBOOT_IPMI_X10_URL = f'{ipmi_url}/cgi/BMCReset.cgi' self.REBOOT_IPMI_X11_URL = f'{ipmi_url}/cgi/op.cgi' self.URL_REDIRECT_URL = f'{ipmi_url}/cgi/url_redirect.cgi?url_name=%s' self.REDFISH_ROOT = f'{ipmi_url}/redfish/v1/' self._csrf_token = None def get_csrf_token(self, url_name): if self._csrf_token is not None: return self._csrf_token page_url = self.URL_REDIRECT_URL % url_name result = self.session.get(page_url) result.raise_for_status() match = re.search(r'SmcCsrfInsert\s*\("CSRF_TOKEN",\s*"([^"]*)"\);', result.text) if match: return match.group(1) def get_csrf_headers(self, url_name): page_url = self.URL_REDIRECT_URL % url_name headers = { "Origin": self.ipmi_url, "Referer": page_url, } csrf_token = self.get_csrf_token(url_name) if csrf_token is not None: headers["CSRF_TOKEN"] = csrf_token print("HEADERS:", headers) return headers def get_xhr_headers(self, url_name): headers = self.get_csrf_headers(url_name) headers["X-Requested-With"] = "XMLHttpRequest" return headers def login(self, username, password): """ Log into IPMI interface :param username: username to use for logging in :param password: password to use for logging in :return: bool """ if False: try: session_url = self.REDFISH_ROOT + "/SessionService/Sessions" session_data = { 'UserName': username, 'Password': password } result = self.session.post(session_url, json=session_data, timeout=REQUEST_TIMEOUT, verify=False) except ConnectionError: return False if not result.ok: return False if True: login_data = { 'name': b64encode(username.encode("UTF-8")), 'pwd': b64encode(password.encode("UTF-8")), 'check': '00' } else: login_data = { 'name': username, 'pwd': password } try: result = self.session.post(self.LOGIN_URL, login_data, timeout=REQUEST_TIMEOUT, verify=False) except ConnectionError: return False if not result.ok: return False if '/cgi/url_redirect.cgi?url_name=mainmenu' not in result.text: return False # Set mandatory cookies: url_parts = urlparse(self.ipmi_url) # Cookie: langSetFlag=0; language=English; SID=<dynamic session ID here!>; mainpage=configuration; subpage=config_ssl mandatory_cookies = { 'langSetFlag': '0', 'language': 'English' } for cookie_name, cookie_value in mandatory_cookies.items(): self.session.cookies.set(cookie_name, cookie_value, domain=url_parts.hostname) return True def get_ipmi_cert_info(self): """ Verify existing certificate information :return: dict """ headers = self.get_xhr_headers("config_ssl") if self.model == "X10": timestamp = datetime.utcnow().strftime('%a %d %b %Y %H:%M:%S GMT') cert_info_data = { 'SSL_STATUS.XML': '(0,0)', 'time_stamp': timestamp # 'Thu Jul 12 2018 19:52:48 GMT+0300 (FLE Daylight Time)' } elif self.model == "X11": cert_info_data = { 'op': 'SSL_STATUS.XML', 'r': '(0,0)', '_': '' } #for cookie in session.cookies: # print(cookie) try: ipmi_info_url = self.IPMI_CERT_INFO_URL result = self.session.post(ipmi_info_url, cert_info_data, headers=headers, timeout=REQUEST_TIMEOUT, verify=False) print("Result.text:", result.text) except ConnectionError: return False if not result.ok: return False root = etree.fromstring(result.text) # <?xml> <IPMI> <SSL_INFO> <STATUS> status = root.xpath('//IPMI/SSL_INFO/STATUS') if not status: return False # Since xpath will return a list, just pick the first one from it. status = status[0] has_cert = int(status.get('CERT_EXIST')) has_cert = bool(has_cert) if has_cert: valid_from = status.get('VALID_FROM') valid_until = status.get('VALID_UNTIL') return { 'has_cert': has_cert, 'valid_from': valid_from, 'valid_until': valid_until } def get_ipmi_cert_valid(self): """ Verify existing certificate information :return: bool """ headers = self.get_xhr_headers("config_ssl") if self.model == "X10": timestamp = datetime.utcnow().strftime('%a %d %b %Y %H:%M:%S GMT') cert_info_data = { 'SSL_VALIDATE.XML': '(0,0)', 'time_stamp': timestamp # 'Thu Jul 12 2018 19:52:48 GMT+0300 (FLE Daylight Time)' } elif self.model == "X11": cert_info_data = { 'op': 'SSL_VALIDATE.XML', 'r': '(0,0)', '_': '' } #for cookie in session.cookies: # print(cookie) try: ipmi_info_url = self.IPMI_CERT_INFO_URL result = self.session.post(ipmi_info_url, cert_info_data, headers=headers, timeout=REQUEST_TIMEOUT, verify=False) except ConnectionError: return False if not result.ok: return False root = etree.fromstring(result.text) # <?xml> <IPMI> <SSL_INFO> status = root.xpath('//IPMI/SSL_INFO') if not status: return False # Since xpath will return a list, just pick the first one from it. status = status[0] valid_cert = int(status.get('VALIDATE')) return bool(valid_cert) def upload_cert(self, key_file, cert_file): """ Send X.509 certificate and private key to server :param session: Current session object :type session requests.session :param url: base-URL to IPMI :param key_file: filename to X.509 certificate private key :param cert_file: filename to X.509 certificate PEM :return: """ with open(key_file, 'rb') as filehandle: key_data = filehandle.read() with open(cert_file, 'rb') as filehandle: cert_data = filehandle.read() # extract certificates only (IMPI doesn't like DH PARAMS) cert_data = b'\n'.join(re.findall(b'-----BEGIN CERTIFICATE-----.*?-----END CERTIFICATE-----', cert_data, re.DOTALL)) + b'\n' if self.model == "X10": files_to_upload = [ ('/tmp/cert.pem', ('cert.pem', cert_data, 'application/octet-stream')), ('/tmp/key.pem', ('key.pem', key_data, 'application/octet-stream')) ] elif self.model == "X11": files_to_upload = [ ('cert_file', ('fullchain.pem', cert_data, 'application/octet-stream')), ('key_file', ('privkey.pem', key_data, 'application/octet-stream')) ] headers = self.get_csrf_headers("config_ssl") csrf_token = self.get_csrf_token("config_ssl") csrf_data = { "CSRF_TOKEN": csrf_token } try: upload_cert_url = self.UPLOAD_CERT_URL result = self.session.post(upload_cert_url, csrf_data, files=files_to_upload, headers=headers, timeout=REQUEST_TIMEOUT, verify=False) except ConnectionError: return False if not result.ok: return False if 'Content-Type' not in result.headers.keys() or result.headers['Content-Type'] != 'text/html': # On failure, Content-Type will be 'text/plain' and 'Transfer-Encoding' is 'chunked' return False if 'CONFPAGE_RESET' not in result.text: return False return True def reboot_ipmi(self): # do we need a different Referer here? headers = self.get_xhr_headers("config_ssl") if self.model == "X10": timestamp = datetime.utcnow().strftime('%a %d %b %Y %H:%M:%S GMT') reboot_data = { 'time_stamp': timestamp # 'Thu Jul 12 2018 19:52:48 GMT+0300 (FLE Daylight Time)' } reboot_url = self.REBOOT_IPMI_X10_URL elif self.model == "X11": reboot_data = { 'op': 'main_bmcreset', '_': '' } reboot_url = self.REBOOT_IPMI_X11_URL try: result = self.session.post(reboot_url, reboot_data, headers=headers, timeout=REQUEST_TIMEOUT, verify=False) except ConnectionError: return False if not result.ok: return False if self.model == "X10": if '<STATE CODE="OK"/>' not in result.text: return False return True def create_updater(args): session = requests.session() # First determine if we are X10 or X11 model = determine_model(session, args.ipmi_url) if not args.quiet: print("Board model is " + model) return IPMIUpdater(session, args.ipmi_url, model) def determine_model(session, ipmi_url): redfish_url = f'{ipmi_url}/redfish/v1/' try: r = session.get(redfish_url, timeout=REQUEST_TIMEOUT, verify=False) @@ -287,6 +369,9 @@ def main(): args.ipmi_url = args.ipmi_url[0:-1] if not args.quiet: import http.client as http_client http_client.HTTPConnection.debuglevel = 1 # Enable reuest logging logging.basicConfig() logging.getLogger().setLevel(logging.DEBUG) @@ -296,55 +381,39 @@ def main(): # Start the operation requests.packages.urllib3.disable_warnings(requests.packages.urllib3.exceptions.InsecureRequestWarning) updater = create_updater(args) if not updater.login(args.username, args.password): print("Login failed. Cannot continue!") exit(2) cert_info = updater.get_ipmi_cert_info() if not cert_info: print("Failed to extract certificate information from IPMI!") if updater.model == "X11": print("Try checking either your IPMI network config or your IPMI license!") exit(2) if not args.quiet and cert_info['has_cert']: print("There exists a certificate, which is valid until: %s" % cert_info['valid_until']) # Go upload! if not updater.upload_cert(args.key_file, args.cert_file): print("Failed to upload X.509 files to IPMI!") exit(2) cert_valid = updater.get_ipmi_cert_valid() if not cert_valid: print("Uploads failed validation") exit(2) if not args.quiet: print("Uploaded files ok.") cert_info = updater.get_ipmi_cert_info() if not cert_info: print("Failed to extract certificate information from IPMI!") if updater.model == "X11": print("Try checking either your IPMI network config or your IPMI license!") exit(2) if not args.quiet and cert_info['has_cert']: @@ -353,7 +422,7 @@ def main(): if not args.no_reboot: if not args.quiet: print("Rebooting IPMI to apply changes.") if not updater.reboot_ipmi(): print("Rebooting failed! Go reboot it manually?") if not args.quiet: -
oxc revised this gist
Aug 31, 2020 . 1 changed file with 4 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -21,6 +21,7 @@ import os import argparse import re import requests import logging from datetime import datetime @@ -176,6 +177,9 @@ def upload_cert(session, url, key_file, cert_file, model): key_data = filehandle.read() with open(cert_file, 'rb') as filehandle: cert_data = filehandle.read() # extract certificates only (IMPI doesn't like DH PARAMS) cert_data = b'\n'.join(re.findall(b'-----BEGIN CERTIFICATE-----.*?-----END CERTIFICATE-----', cert_data, re.DOTALL)) + b'\n' if model == "X10": -
oxc revised this gist
Oct 3, 2019 . 1 changed file with 83 additions and 84 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -33,12 +33,10 @@ LOGIN_URL = '%s/cgi/login.cgi' IPMI_CERT_INFO_URL = '%s/cgi/ipmi.cgi' UPLOAD_CERT_URL = '%s/cgi/upload_ssl.cgi' REBOOT_IPMI_X10_URL = '%s/cgi/BMCReset.cgi' REBOOT_IPMI_X11_URL = '%s/cgi/op.cgi' CONFIG_CERT_URL = '%s/cgi/url_redirect.cgi?url_name=config_ssl' REDFISH_ROOT = '%s/redfish/v1/' def login(session, url, username, password): """ @@ -68,35 +66,28 @@ def login(session, url, username, password): return True def get_ipmi_cert_info(session, url, model): """ Verify existing certificate information :param session: Current session object :type session requests.session :param url: base-URL to IPMI :return: dict """ if model == "X10": timestamp = datetime.utcnow().strftime('%a %d %b %Y %H:%M:%S GMT') cert_info_data = { 'SSL_STATUS.XML': '(0,0)', 'time_stamp': timestamp # 'Thu Jul 12 2018 19:52:48 GMT+0300 (FLE Daylight Time)' } elif model == "X11": cert_info_data = { 'op': 'SSL_STATUS.XML', 'r': '(0,0)', '_': '' } #for cookie in session.cookies: # print(cookie) @@ -127,20 +118,29 @@ def get_ipmi_cert_info(session, url, model, user, password): 'valid_until': valid_until } def get_ipmi_cert_valid(session, url, model): """ Verify existing certificate information :param session: Current session object :type session requests.session :param url: base-URL to IPMI :return: bool """ if model == "X10": timestamp = datetime.utcnow().strftime('%a %d %b %Y %H:%M:%S GMT') cert_info_data = { 'SSL_VALIDATE.XML': '(0,0)', 'time_stamp': timestamp # 'Thu Jul 12 2018 19:52:48 GMT+0300 (FLE Daylight Time)' } elif model == "X11": cert_info_data = { 'op': 'SSL_VALIDATE.XML', 'r': '(0,0)', '_': '' } #for cookie in session.cookies: # print(cookie) @@ -162,7 +162,7 @@ def get_ipmi_cert_valid(session, url): valid_cert = int(status.get('VALIDATE')) return bool(valid_cert) def upload_cert(session, url, key_file, cert_file, model): """ Send X.509 certificate and private key to server :param session: Current session object @@ -177,52 +177,54 @@ def upload_cert(session, url, key_file, cert_file, model, user, password): with open(cert_file, 'rb') as filehandle: cert_data = filehandle.read() if model == "X10": files_to_upload = [ ('/tmp/cert.pem', ('cert.pem', cert_data, 'application/octet-stream')), ('/tmp/key.pem', ('key.pem', key_data, 'application/octet-stream')) ] elif model == "X11": files_to_upload = [ ('cert_file', ('fullchain.pem', cert_data, 'application/octet-stream')), ('key_file', ('privkey.pem', key_data, 'application/octet-stream')) ] upload_cert_url = UPLOAD_CERT_URL % url try: result = session.post(upload_cert_url, files=files_to_upload, timeout=REQUEST_TIMEOUT, verify=False) except ConnectionError: return False if not result.ok: return False if 'Content-Type' not in result.headers.keys() or result.headers['Content-Type'] != 'text/html': # On failure, Content-Type will be 'text/plain' and 'Transfer-Encoding' is 'chunked' return False if 'CONFPAGE_RESET' not in result.text: return False return True def reboot_ipmi(session, url, model): if model == "X10": timestamp = datetime.utcnow().strftime('%a %d %b %Y %H:%M:%S GMT') reboot_data = { 'time_stamp': timestamp # 'Thu Jul 12 2018 19:52:48 GMT+0300 (FLE Daylight Time)' } reboot_url = REBOOT_IPMI_X10_URL % url elif model == "X11": reboot_data = { 'op': 'main_bmcreset', '_': '' } reboot_url = REBOOT_IPMI_X11_URL % url try: result = session.post(reboot_url, reboot_data, timeout=REQUEST_TIMEOUT, verify=False) except ConnectionError: return False if not result.ok: @@ -250,7 +252,7 @@ def determine_model(session, url): if "UpdateService" in data: return "X11" else: return "X10" def main(): parser = argparse.ArgumentParser(description='Update Supermicro IPMI SSL certificate') @@ -297,24 +299,23 @@ def main(): if not args.quiet: print("Board model is " + model) if not login(session, args.ipmi_url, args.username, args.password): print("Login failed. Cannot continue!") exit(2) # Set mandatory cookies: url_parts = urlparse(args.ipmi_url) # Cookie: langSetFlag=0; language=English; SID=<dynamic session ID here!>; mainpage=configuration; subpage=config_ssl mandatory_cookies = { 'langSetFlag': '0', 'language': 'English', 'mainpage': 'configuration', 'subpage': 'config_ssl' } for cookie_name, cookie_value in mandatory_cookies.items(): session.cookies.set(cookie_name, cookie_value, domain=url_parts.hostname) cert_info = get_ipmi_cert_info(session, args.ipmi_url, model) if not cert_info: print("Failed to extract certificate information from IPMI!") if model == "X11": @@ -324,21 +325,19 @@ def main(): print("There exists a certificate, which is valid until: %s" % cert_info['valid_until']) # Go upload! if not upload_cert(session, args.ipmi_url, args.key_file, args.cert_file, model): print("Failed to upload X.509 files to IPMI!") exit(2) cert_valid = get_ipmi_cert_valid(session, args.ipmi_url, model) if not cert_valid: print("Uploads failed validation") exit(2) if not args.quiet: print("Uploaded files ok.") cert_info = get_ipmi_cert_info(session, args.ipmi_url, model) if not cert_info: print("Failed to extract certificate information from IPMI!") if model == "X11": @@ -350,7 +349,7 @@ def main(): if not args.no_reboot: if not args.quiet: print("Rebooting IPMI to apply changes.") if not reboot_ipmi(session, args.ipmi_url, model): print("Rebooting failed! Go reboot it manually?") if not args.quiet: -
dmerner revised this gist
Sep 25, 2019 . 1 changed file with 164 additions and 50 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -22,9 +22,11 @@ import os import argparse import requests import logging from datetime import datetime from lxml import etree from urllib.parse import urlparse from requests.auth import HTTPBasicAuth REQUEST_TIMEOUT = 5.0 @@ -33,7 +35,10 @@ UPLOAD_CERT_URL = '%s/cgi/upload_ssl.cgi' REBOOT_IPMI_URL = '%s/cgi/BMCReset.cgi' CONFIG_CERT_URL = '%s/cgi/url_redirect.cgi?url_name=config_ssl' REDFISH_ROOT = '%s/redfish/v1/' REDFISH_SSL = '%sUpdateService/SSLCert/' REDFISH_SSL_UPLOAD = '%sUpdateService/SSLCert/Actions/SSLCert.Upload' REDFISH_BMC_REBOOT = '%sManagers/1/Actions/Manager.Reset' def login(session, url, username, password): """ @@ -63,7 +68,7 @@ def login(session, url, username, password): return True def get_ipmi_cert_info(session, url, model, user, password): """ Verify existing certificate information :param session: Current session object @@ -73,13 +78,28 @@ def get_ipmi_cert_info(session, url): """ timestamp = datetime.utcnow().strftime('%a %d %b %Y %H:%M:%S GMT') if model == "X11": try: r = session.get(REDFISH_SSL % (REDFISH_ROOT % url), auth=HTTPBasicAuth(user, password), verify=False) except ConnectionError: return False if not r.ok: return False data = r.json() return { 'has_cert': True, 'valid_from': data['VaildFrom'], # Yes, Supermicro made a typo in their BMC API. 'valid_until': data['GoodTHRU'] } cert_info_data = { 'SSL_STATUS.XML': '(0,0)', 'time_stamp': timestamp # 'Thu Jul 12 2018 19:52:48 GMT+0300 (FLE Daylight Time)' } #for cookie in session.cookies: # print(cookie) ipmi_info_url = IPMI_CERT_INFO_URL % url try: result = session.post(ipmi_info_url, cert_info_data, timeout=REQUEST_TIMEOUT, verify=False) @@ -107,8 +127,42 @@ def get_ipmi_cert_info(session, url): 'valid_until': valid_until } def get_ipmi_cert_valid(session, url): """ Verify existing certificate information :param session: Current session object :type session requests.session :param url: base-URL to IPMI :return: bool """ timestamp = datetime.utcnow().strftime('%a %d %b %Y %H:%M:%S GMT') cert_info_data = { 'SSL_VALIDATE.XML': '(0,0)', 'time_stamp': timestamp # 'Thu Jul 12 2018 19:52:48 GMT+0300 (FLE Daylight Time)' } #for cookie in session.cookies: # print(cookie) ipmi_info_url = IPMI_CERT_INFO_URL % url try: result = session.post(ipmi_info_url, cert_info_data, timeout=REQUEST_TIMEOUT, verify=False) except ConnectionError: return False if not result.ok: return False root = etree.fromstring(result.text) # <?xml> <IPMI> <SSL_INFO> status = root.xpath('//IPMI/SSL_INFO') if not status: return False # Since xpath will return a list, just pick the first one from it. status = status[0] valid_cert = int(status.get('VALIDATE')) return bool(valid_cert) def upload_cert(session, url, key_file, cert_file, model, user, password): """ Send X.509 certificate and private key to server :param session: Current session object @@ -122,51 +176,81 @@ def upload_cert(session, url, key_file, cert_file): key_data = filehandle.read() with open(cert_file, 'rb') as filehandle: cert_data = filehandle.read() if model == "X10": files_to_upload = [ ('/tmp/cert.pem', ('cert.pem', cert_data, 'application/octet-stream')), ('/tmp/key.pem', ('key.pem', key_data, 'application/octet-stream')) ] upload_cert_url = UPLOAD_CERT_URL % url elif model == "X11": upload_cert_url = REDFISH_SSL_UPLOAD % (REDFISH_ROOT % url) files_to_upload = { 'cert_file' : ('fullchain.pem', open(cert_file, 'rb')), 'key_file' : ('privkey.pem', open(key_file, 'rb')) } try: result = session.post(upload_cert_url, files=files_to_upload, timeout=REQUEST_TIMEOUT, auth=HTTPBasicAuth(user, password) if model == "X11" else None, verify=False) except ConnectionError: return False if not result.ok: return False if model == "X10": if 'Content-Type' not in result.headers.keys() or result.headers['Content-Type'] != 'text/html': # On failure, Content-Type will be 'text/plain' and 'Transfer-Encoding' is 'chunked' return False if 'CONFPAGE_RESET' not in result.text: return False return True def reboot_ipmi(session, url, model, user, password): timestamp = datetime.utcnow().strftime('%a %d %b %Y %H:%M:%S GMT') reboot_data = { 'time_stamp': timestamp # 'Thu Jul 12 2018 19:52:48 GMT+0300 (FLE Daylight Time)' } if model == "X10": reboot_url = REBOOT_IPMI_URL % url elif model == "X11": reboot_url = REDFISH_BMC_REBOOT % (REDFISH_ROOT % url) try: result = session.post(reboot_url, reboot_data, timeout=REQUEST_TIMEOUT, auth=HTTPBasicAuth(user, password) if model == "X11" else None, verify=False) except ConnectionError: return False if not result.ok: return False if model == "X10": if '<STATE CODE="OK"/>' not in result.text: return False return True def determine_model(session, url): redfish_url = REDFISH_ROOT % url try: r = session.get(redfish_url, timeout=REQUEST_TIMEOUT, verify=False) except ConnectionError: exit(2) if not r.ok: exit(2) data = r.json() # The UpdateService methods are only available on newer X11 based boards if "UpdateService" in data: return "X11" else: return "X10" def main(): parser = argparse.ArgumentParser(description='Update Supermicro IPMI SSL certificate') @@ -180,8 +264,10 @@ def main(): help='IPMI username with admin access') parser.add_argument('--password', required=True, help='IPMI user password') parser.add_argument('--no-reboot', action='store_true', help='The default is to reboot the IPMI after upload for the change to take effect.') parser.add_argument('--quiet', action='store_true', help='Do not output anything if successful') args = parser.parse_args() # Confirm args @@ -194,53 +280,81 @@ def main(): if args.ipmi_url[-1] == '/': args.ipmi_url = args.ipmi_url[0:-1] if not args.quiet: # Enable reuest logging logging.basicConfig() logging.getLogger().setLevel(logging.DEBUG) requests_log = logging.getLogger("requests.packages.urllib3") requests_log.setLevel(logging.DEBUG) requests_log.propagate = True # Start the operation requests.packages.urllib3.disable_warnings(requests.packages.urllib3.exceptions.InsecureRequestWarning) session = requests.session() # First determine if we are X10 or X11 model = determine_model(session, args.ipmi_url) if not args.quiet: print("Board model is " + model) if model == "X10": if not login(session, args.ipmi_url, args.username, args.password): print("Login failed. Cannot continue!") exit(2) # Set mandatory cookies: url_parts = urlparse(args.ipmi_url) # Cookie: langSetFlag=0; language=English; SID=<dynamic session ID here!>; mainpage=configuration; subpage=config_ssl mandatory_cookies = { 'langSetFlag': '0', 'language': 'English', 'mainpage': 'configuration', 'subpage': 'config_ssl' } for cookie_name, cookie_value in mandatory_cookies.items(): session.cookies.set(cookie_name, cookie_value, domain=url_parts.hostname) cert_info = get_ipmi_cert_info(session, args.ipmi_url, model, args.username, args.password) if not cert_info: print("Failed to extract certificate information from IPMI!") if model == "X11": print("Try checking either your IPMI network config or your IPMI license!") exit(2) if not args.quiet and cert_info['has_cert']: print("There exists a certificate, which is valid until: %s" % cert_info['valid_until']) # Go upload! if not upload_cert(session, args.ipmi_url, args.key_file, args.cert_file, model, args.username, args.password): print("Failed to upload X.509 files to IPMI!") exit(2) # Redfish currently doesn't have a way to download the certificate on X11 boards if model == "X10": cert_valid = get_ipmi_cert_valid(session, args.ipmi_url) if not cert_valid: print("Uploads failed validation") exit(2) if not args.quiet: print("Uploaded files ok.") cert_info = get_ipmi_cert_info(session, args.ipmi_url, model, args.username, args.password) if not cert_info: print("Failed to extract certificate information from IPMI!") if model == "X11": print("Try checking either your IPMI network config or your IPMI license!") exit(2) if not args.quiet and cert_info['has_cert']: print("After upload, there exists a certificate, which is valid until: %s" % cert_info['valid_until']) if not args.no_reboot: if not args.quiet: print("Rebooting IPMI to apply changes.") if not reboot_ipmi(session, args.ipmi_url, model, args.username, args.password): print("Rebooting failed! Go reboot it manually?") if not args.quiet: print("All done!") if __name__ == "__main__": -
mcdamo revised this gist
Feb 7, 2019 . 1 changed file with 1 addition and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -287,7 +287,7 @@ def main(): if not args.no_reboot: if not args.quiet: print("Rebooting IPMI to apply changes.") if not reboot_ipmi(session, args.ipmi_url): print("Rebooting failed! Go reboot it manually?") -
mcdamo revised this gist
Feb 7, 2019 . 1 changed file with 66 additions and 14 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -22,6 +22,7 @@ import os import argparse import requests import logging from datetime import datetime from lxml import etree from urllib.parse import urlparse @@ -78,8 +79,8 @@ def get_ipmi_cert_info(session, url): 'time_stamp': timestamp # 'Thu Jul 12 2018 19:52:48 GMT+0300 (FLE Daylight Time)' } #for cookie in session.cookies: # print(cookie) ipmi_info_url = IPMI_CERT_INFO_URL % url try: result = session.post(ipmi_info_url, cert_info_data, timeout=REQUEST_TIMEOUT, verify=False) @@ -107,6 +108,40 @@ def get_ipmi_cert_info(session, url): 'valid_until': valid_until } def get_ipmi_cert_valid(session, url): """ Verify existing certificate information :param session: Current session object :type session requests.session :param url: base-URL to IPMI :return: bool """ timestamp = datetime.utcnow().strftime('%a %d %b %Y %H:%M:%S GMT') cert_info_data = { 'SSL_VALIDATE.XML': '(0,0)', 'time_stamp': timestamp # 'Thu Jul 12 2018 19:52:48 GMT+0300 (FLE Daylight Time)' } #for cookie in session.cookies: # print(cookie) ipmi_info_url = IPMI_CERT_INFO_URL % url try: result = session.post(ipmi_info_url, cert_info_data, timeout=REQUEST_TIMEOUT, verify=False) except ConnectionError: return False if not result.ok: return False root = etree.fromstring(result.text) # <?xml> <IPMI> <SSL_INFO> status = root.xpath('//IPMI/SSL_INFO') if not status: return False # Since xpath will return a list, just pick the first one from it. status = status[0] valid_cert = int(status.get('VALIDATE')) return bool(valid_cert) def upload_cert(session, url, key_file, cert_file): """ @@ -123,8 +158,8 @@ def upload_cert(session, url, key_file, cert_file): with open(cert_file, 'rb') as filehandle: cert_data = filehandle.read() files_to_upload = [ ('/tmp/cert.pem', ('cert.pem', cert_data, 'application/octet-stream')), ('/tmp/key.pem', ('key.pem', key_data, 'application/octet-stream')) ] upload_cert_url = UPLOAD_CERT_URL % url @@ -140,7 +175,6 @@ def upload_cert(session, url, key_file, cert_file): return False if 'CONFPAGE_RESET' not in result.text: return False return True @@ -159,9 +193,9 @@ def reboot_ipmi(session, url): if not result.ok: return False #print("Url: %s" % upload_cert_url) #print(result.headers) #print(result.text) if '<STATE CODE="OK"/>' not in result.text: return False @@ -180,8 +214,10 @@ def main(): help='IPMI username with admin access') parser.add_argument('--password', required=True, help='IPMI user password') parser.add_argument('--no-reboot', action='store_true', help='The default is to reboot the IPMI after upload for the change to take effect.') parser.add_argument('--quiet', action='store_true', help='Do not output anything if successful') args = parser.parse_args() # Confirm args @@ -194,6 +230,14 @@ def main(): if args.ipmi_url[-1] == '/': args.ipmi_url = args.ipmi_url[0:-1] if not args.quiet: # Enable reuest logging logging.basicConfig() logging.getLogger().setLevel(logging.DEBUG) requests_log = logging.getLogger("requests.packages.urllib3") requests_log.setLevel(logging.DEBUG) requests_log.propagate = True # Start the operation requests.packages.urllib3.disable_warnings(requests.packages.urllib3.exceptions.InsecureRequestWarning) session = requests.session() @@ -218,29 +262,37 @@ def main(): if not cert_info: print("Failed to extract certificate information from IPMI!") exit(2) if not args.quiet and cert_info['has_cert']: print("There exists a certificate, which is valid until: %s" % cert_info['valid_until']) # Go upload! if not upload_cert(session, args.ipmi_url, args.key_file, args.cert_file): print("Failed to upload X.509 files to IPMI!") exit(2) cert_valid = get_ipmi_cert_valid(session, args.ipmi_url) if not cert_valid: print("Uploads failed validation") exit(2) if not args.quiet: print("Uploaded files ok.") cert_info = get_ipmi_cert_info(session, args.ipmi_url) if not cert_info: print("Failed to extract certificate information from IPMI!") exit(2) if not args.quiet and cert_info['has_cert']: print("After upload, there exists a certificate, which is valid until: %s" % cert_info['valid_until']) if not args.no_reboot: if not args.quiet: print("Rebooting IPMI to apply changes.") if not reboot_ipmi(session, args.ipmi_url): print("Rebooting failed! Go reboot it manually?") if not args.quiet: print("All done!") if __name__ == "__main__": -
HQJaTu revised this gist
Oct 21, 2018 . 1 changed file with 40 additions and 5 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -24,13 +24,15 @@ import requests from datetime import datetime from lxml import etree from urllib.parse import urlparse REQUEST_TIMEOUT = 5.0 LOGIN_URL = '%s/cgi/login.cgi' IPMI_CERT_INFO_URL = '%s/cgi/ipmi.cgi' UPLOAD_CERT_URL = '%s/cgi/upload_ssl.cgi' REBOOT_IPMI_URL = '%s/cgi/BMCReset.cgi' CONFIG_CERT_URL = '%s/cgi/url_redirect.cgi?url_name=config_ssl' def login(session, url, username, password): @@ -76,6 +78,8 @@ def get_ipmi_cert_info(session, url): 'time_stamp': timestamp # 'Thu Jul 12 2018 19:52:48 GMT+0300 (FLE Daylight Time)' } for cookie in session.cookies: print(cookie) ipmi_info_url = IPMI_CERT_INFO_URL % url try: result = session.post(ipmi_info_url, cert_info_data, timeout=REQUEST_TIMEOUT, verify=False) @@ -119,8 +123,8 @@ def upload_cert(session, url, key_file, cert_file): with open(cert_file, 'rb') as filehandle: cert_data = filehandle.read() files_to_upload = [ ('/tmp/cert.key', ('cert.key', key_data, 'application/octet-stream')), ('/tmp/cert.pem', ('cert.pem', cert_data, 'application/x-x509-ca-cert')) ] upload_cert_url = UPLOAD_CERT_URL % url @@ -141,15 +145,24 @@ def upload_cert(session, url, key_file, cert_file): def reboot_ipmi(session, url): timestamp = datetime.utcnow().strftime('%a %d %b %Y %H:%M:%S GMT') reboot_data = { 'time_stamp': timestamp # 'Thu Jul 12 2018 19:52:48 GMT+0300 (FLE Daylight Time)' } upload_cert_url = REBOOT_IPMI_URL % url try: result = session.post(upload_cert_url, reboot_data, timeout=REQUEST_TIMEOUT, verify=False) except ConnectionError: return False if not result.ok: return False print("Url: %s" % upload_cert_url) print(result.headers) print(result.text) if '<STATE CODE="OK"/>' not in result.text: return False return True @@ -187,6 +200,20 @@ def main(): if not login(session, args.ipmi_url, args.username, args.password): print("Login failed. Cannot continue!") exit(2) # Set mandatory cookies: url_parts = urlparse(args.ipmi_url) # Cookie: langSetFlag=0; language=English; SID=<dynamic session ID here!>; mainpage=configuration; subpage=config_ssl mandatory_cookies = { 'langSetFlag': '0', 'language': 'English', 'mainpage': 'configuration', 'subpage': 'config_ssl' } for cookie_name, cookie_value in mandatory_cookies.items(): session.cookies.set(cookie_name, cookie_value, domain=url_parts.hostname) cert_info = get_ipmi_cert_info(session, args.ipmi_url) if not cert_info: print("Failed to extract certificate information from IPMI!") @@ -200,6 +227,14 @@ def main(): exit(2) print("Uploaded files ok.") cert_info = get_ipmi_cert_info(session, args.ipmi_url) if not cert_info: print("Failed to extract certificate information from IPMI!") exit(2) if cert_info['has_cert']: print("After upload, there exists a certificate, which is valid until: %s" % cert_info['valid_until']) if not args.no_reboot: print("Rebooting IPMI to apply changes.") if not reboot_ipmi(session, args.ipmi_url): -
HQJaTu created this gist
Jul 12, 2018 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,212 @@ #!/usr/bin/env python3 # vim: autoindent tabstop=4 shiftwidth=4 expandtab softtabstop=4 filetype=python # This file is part of Supermicro IPMI certificate updater. # Supermicro IPMI certificate updater is free software: you can # redistribute it and/or modify it under the terms of the GNU General Public # License as published by the Free Software Foundation, version 2. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more # details. # # You should have received a copy of the GNU General Public License along with # this program; if not, write to the Free Software Foundation, Inc., 51 # Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. # # Copyright (c) Jari Turkia import os import argparse import requests from datetime import datetime from lxml import etree REQUEST_TIMEOUT = 5.0 LOGIN_URL = '%s/cgi/login.cgi' IPMI_CERT_INFO_URL = '%s/cgi/ipmi.cgi' UPLOAD_CERT_URL = '%s/cgi/upload_ssl.cgi' REBOOT_IPMI_URL = '%s/cgi/url_redirect.cgi?url_name=config_ssl_fw_reset' def login(session, url, username, password): """ Log into IPMI interface :param session: Current session object :type session requests.session :param url: base-URL to IPMI :param username: username to use for logging in :param password: password to use for logging in :return: bool """ login_data = { 'name': username, 'pwd': password } login_url = LOGIN_URL % url try: result = session.post(login_url, login_data, timeout=REQUEST_TIMEOUT, verify=False) except ConnectionError: return False if not result.ok: return False if '/cgi/url_redirect.cgi?url_name=mainmenu' not in result.text: return False return True def get_ipmi_cert_info(session, url): """ Verify existing certificate information :param session: Current session object :type session requests.session :param url: base-URL to IPMI :return: dict """ timestamp = datetime.utcnow().strftime('%a %d %b %Y %H:%M:%S GMT') cert_info_data = { 'SSL_STATUS.XML': '(0,0)', 'time_stamp': timestamp # 'Thu Jul 12 2018 19:52:48 GMT+0300 (FLE Daylight Time)' } ipmi_info_url = IPMI_CERT_INFO_URL % url try: result = session.post(ipmi_info_url, cert_info_data, timeout=REQUEST_TIMEOUT, verify=False) except ConnectionError: return False if not result.ok: return False root = etree.fromstring(result.text) # <?xml> <IPMI> <SSL_INFO> <STATUS> status = root.xpath('//IPMI/SSL_INFO/STATUS') if not status: return False # Since xpath will return a list, just pick the first one from it. status = status[0] has_cert = int(status.get('CERT_EXIST')) has_cert = bool(has_cert) if has_cert: valid_from = status.get('VALID_FROM') valid_until = status.get('VALID_UNTIL') return { 'has_cert': has_cert, 'valid_from': valid_from, 'valid_until': valid_until } def upload_cert(session, url, key_file, cert_file): """ Send X.509 certificate and private key to server :param session: Current session object :type session requests.session :param url: base-URL to IPMI :param key_file: filename to X.509 certificate private key :param cert_file: filename to X.509 certificate PEM :return: """ with open(key_file, 'rb') as filehandle: key_data = filehandle.read() with open(cert_file, 'rb') as filehandle: cert_data = filehandle.read() files_to_upload = [ ('/tmp/key.pem', ('/tmp/key.pem', key_data, 'application/octet-stream')), ('/tmp/cert.pem', ('/tmp/cert.pem', cert_data, 'application/x-x509-ca-cert')) ] upload_cert_url = UPLOAD_CERT_URL % url try: result = session.post(upload_cert_url, files=files_to_upload, timeout=REQUEST_TIMEOUT, verify=False) except ConnectionError: return False if not result.ok: return False if 'Content-Type' not in result.headers.keys() or result.headers['Content-Type'] != 'text/html': # On failure, Content-Type will be 'text/plain' and 'Transfer-Encoding' is 'chunked' return False if 'CONFPAGE_RESET' not in result.text: return False return True def reboot_ipmi(session, url): upload_cert_url = REBOOT_IPMI_URL % url try: result = session.get(upload_cert_url, timeout=REQUEST_TIMEOUT, verify=False) except ConnectionError: return False if not result.ok: return False if 'LANG_FW_RESET_DESC1' not in result.text: return False return True def main(): parser = argparse.ArgumentParser(description='Update Supermicro IPMI SSL certificate') parser.add_argument('--ipmi-url', required=True, help='Supermicro IPMI 2.0 URL') parser.add_argument('--key-file', required=True, help='X.509 Private key filename') parser.add_argument('--cert-file', required=True, help='X.509 Certificate filename') parser.add_argument('--username', required=True, help='IPMI username with admin access') parser.add_argument('--password', required=True, help='IPMI user password') parser.add_argument('--no-reboot', help='The default is to reboot the IPMI after upload for the change to take effect.') args = parser.parse_args() # Confirm args if not os.path.isfile(args.key_file): print("--key-file '%s' doesn't exist!" % args.key_file) exit(2) if not os.path.isfile(args.cert_file): print("--cert-file '%s' doesn't exist!" % args.cert_file) exit(2) if args.ipmi_url[-1] == '/': args.ipmi_url = args.ipmi_url[0:-1] # Start the operation requests.packages.urllib3.disable_warnings(requests.packages.urllib3.exceptions.InsecureRequestWarning) session = requests.session() if not login(session, args.ipmi_url, args.username, args.password): print("Login failed. Cannot continue!") exit(2) cert_info = get_ipmi_cert_info(session, args.ipmi_url) if not cert_info: print("Failed to extract certificate information from IPMI!") exit(2) if cert_info['has_cert']: print("There exists a certificate, which is valid until: %s" % cert_info['valid_until']) # Go upload! if not upload_cert(session, args.ipmi_url, args.key_file, args.cert_file): print("Failed to upload X.509 files to IPMI!") exit(2) print("Uploaded files ok.") if not args.no_reboot: print("Rebooting IPMI to apply changes.") if not reboot_ipmi(session, args.ipmi_url): print("Rebooting failed! Go reboot it manually?") print("All done!") if __name__ == "__main__": main()