Skip to content

Instantly share code, notes, and snippets.

@phucngta
Last active May 17, 2022 04:38
Show Gist options
  • Select an option

  • Save phucngta/652994e46a381cc5adf9acbd15aaa7fc to your computer and use it in GitHub Desktop.

Select an option

Save phucngta/652994e46a381cc5adf9acbd15aaa7fc to your computer and use it in GitHub Desktop.
post deploy jobs
#!/usr/bin/env python3
import gitlab
import os
import subprocess
import json
import requests
import re
import odoorpc
from datetime import date, datetime
# https://github.com/matthewwithanm/python-markdownify
from markdownify import markdownify as md
STAGE_CODES = {
'to_deploy_integration': 'TDI',
'ready_for_integration': 'RFT',
'ready_for_staging': 'TDS',
'ready_for_production': 'TDP',
}
PROJECT_DEVELOP_BRANCH = os.getenv('PROJECT_DEVELOP_BRANCH', 'develop')
SLACK_WEBHOOK = os.environ['SLACK_WEBHOOK']
SLACK_WEBHOOK_FAILED = os.getenv('SLACK_WEBHOOK_FAILED')
SLACK_WEBHOOK_DEBUG = os.getenv('SLACK_WEBHOOK_DEBUG')
DEBUG_SLACK = os.getenv('DEBUG_SLACK', "False")
DEBUG_SLACK = json.loads(DEBUG_SLACK.lower())
ALTER_NAME_DEPLOY = os.getenv('ALTER_NAME_DEPLOY', "")
GITLAB_PERSONAL_TOKEN = os.environ['GITLAB_PERSONAL_TOKEN']
PROJECT_ID = os.environ['CI_PROJECT_ID']
CI_PROJECT_NAME = os.environ['CI_PROJECT_NAME']
CI_PROJECT_URL = os.environ['CI_PROJECT_URL']
CI_COMMIT_REF_NAME = os.environ['CI_COMMIT_REF_NAME']
CI_PIPELINE_ID = os.environ['CI_PIPELINE_ID']
CI_PIPELINE_URL = os.environ['CI_PIPELINE_URL']
CI_JOB_STATUS = os.environ['CI_JOB_STATUS']
CI_JOB_URL = os.environ['CI_JOB_URL']
CI_JOB_ID = os.environ['CI_JOB_ID']
CI_JOB_NAME = os.environ['CI_JOB_NAME']
CI_SERVER_HOST = os.environ['CI_SERVER_HOST']
SERVER_URL = f'https://{CI_SERVER_HOST}'
# print(SERVER_URL, PROJECT_ID)
gl = gitlab.Gitlab(SERVER_URL, private_token=GITLAB_PERSONAL_TOKEN)
project = gl.projects.get(PROJECT_ID)
TASK_MS_URL = os.getenv('TASK_MS_URL', 'rms.reach.com.vn')
TASK_MS_DB = os.getenv('TASK_MS_DB', 'rms-template')
TASK_MS_PORT = os.getenv('TASK_MS_PORT', 7069)
TASK_MS_LOGIN_USER = os.getenv('TASK_MS_LOGIN_USER', False)
TASK_MS_LOGIN_PWD = os.getenv('TASK_MS_LOGIN_PWD', False)
# print(TASK_MS_URL, TASK_MS_PORT, TASK_MS_DB, TASK_MS_LOGIN_USER, TASK_MS_LOGIN_PWD)
odoo = odoorpc.ODOO(TASK_MS_URL, port=TASK_MS_PORT)
odoo.login(TASK_MS_DB, TASK_MS_LOGIN_USER, TASK_MS_LOGIN_PWD)
TASK_OBJECT = odoo.env['project.task']
TASKS_INFO = {}
# print("Không lỗi Odoo RPC")
def _output_log(task, message):
logging = f">>> {datetime.now().strftime('%m/%d/%Y %H:%M:%S')} {message}"
task.merge_deploy_note += "\n" + logging
print(logging)
def fetch_task_bms_info(task_numbers):
task_ids = TASK_OBJECT.search_read(
[('reference', 'in', task_numbers)], ['id'])
for task_id in task_ids:
task_bms_id = task_id.get('id')
if task_bms_id:
task = TASK_OBJECT.browse(task_bms_id)
TASKS_INFO.update({
task.reference: {
'_record': task,
'id': task.id,
'name': task.name,
'description': md(task.description, strip=['img']),
'merge_staging_date': task.merge_staging_date,
'deploy_staging_date': task.deploy_staging_date,
'merge_production_date': task.merge_production_date,
'deploy_production_date': task.deploy_production_date,
'stage_code': task.stage_id.code,
# 'assigned_user': task.user_id.name,
'rms_url': f"https://{TASK_MS_URL}/web#id={task_bms_id}&action=1003&active_id=3&model=project.task&view_type=form&cids=1&menu_id=785",
}
})
def update_deployment_stage_task_bms():
print(f'CI_JOB_STATUS: {CI_JOB_STATUS}')
print(f'CI_COMMIT_REF_NAME: {CI_COMMIT_REF_NAME}')
if CI_JOB_STATUS == "success" and CI_COMMIT_REF_NAME == PROJECT_DEVELOP_BRANCH:
for reference, info in TASKS_INFO.items():
print(f'reference {reference} - stage_code {info.get("stage_code")}')
if info.get("stage_code") and info.get('stage_code') == STAGE_CODES['to_deploy_integration']:
task = info.get("_record")
try:
task.action_next_stage()
print(f'Change task {reference} to Ready For Testing')
except:
print(f'Change task {reference} to Ready For Testing failed')
_output_log(task, f'Change task {reference} to Ready For Testing failed')
if CI_JOB_STATUS == "success" and CI_COMMIT_REF_NAME == 'staging':
for reference, info in TASKS_INFO.items():
print(f'reference {reference} - stage_code {info.get("stage_code")}')
print(f'\t merge_staging_date {info.get("merge_staging_date")} - deploy_staging_date {info.get("deploy_staging_date")} ')
if info.get("merge_staging_date") and not info.get('deploy_staging_date'):
task = info.get("_record")
_output_log(task, f'Update Deploy Staging Date for task {reference}')
task.deploy_staging_date = datetime.now()
# Update Ngày Deploy Staging cho tất cả task con khi Task Cha được deploy
# sub_tasks = info.get("sub_tasks")
# for sub_task in sub_tasks:
# if sub_task.merge_staging_date and not sub_task.deploy_staging_date:
# sub_task.deploy_staging_date = date.today()
if CI_JOB_STATUS == "success" and CI_COMMIT_REF_NAME == 'production':
for reference, info in TASKS_INFO.items():
print(f'reference {reference} - stage_code {info.get("stage_code")}')
print(f'\t merge_production_date {info.get("merge_production_date")} - deploy_production_date {info.get("deploy_production_date")} ')
if info.get("merge_production_date") and not info.get('deploy_production_date'):
task = info.get("_record")
task.deploy_production_date = datetime.now()
_output_log(task, f'Update Deploy Production Date for task {reference}')
# Update Ngày Deploy Production cho tất cả task con khi Task Cha được deploy
# sub_tasks = info.get("sub_tasks")
# for sub_task in sub_tasks:
# if sub_task.merge_production_date and not sub_task.deploy_production_date:
# sub_task.deploy_production_date = date.today()
def get_latest_deploy_date():
jobs = project.jobs.list(query_parameters={'scope': 'success'}, as_list=False)
last_job = project.jobs.get(CI_JOB_ID)
for job in jobs:
if job._attrs['ref'] == CI_COMMIT_REF_NAME \
and job._attrs['stage'] == 'deploy' \
and job._attrs['id'] != CI_JOB_ID and job.name == CI_JOB_NAME:
last_job = job
break
latest_deploy_date = last_job._attrs['created_at']
# create_date = datetime.datetime.strptime(create_date, '%Y-%m-%dT%H:%M:%S.%f%z')
print("Latest Successful Deploy Job", last_job._attrs['id'])
print("Latest Deploy Date", latest_deploy_date)
return latest_deploy_date
def get_need_deployed_merge_requests_str():
latest_deploy_date = get_latest_deploy_date()
mrs = project.mergerequests.list(
query_parameters={
'target_branch': CI_COMMIT_REF_NAME,
'state': 'merged',
'updated_after': latest_deploy_date
}, as_list=False)
message_lst = []
for mr in mrs:
source_branch = mr.source_branch
if not re.match('T\d{5}', source_branch):
continue
fetch_task_bms_info([source_branch])
task_bms = f"<{TASKS_INFO.get(source_branch, {}).get('rms_url', '')}|{source_branch}>"
merge_commit = f"<{mr.web_url}|{CI_JOB_STATUS == 'success' and ':bee:' or ':bug:'}>"
message_lst.append(f"{merge_commit} *`{task_bms}`* => {mr.title} ({TASKS_INFO.get(source_branch, {}).get('assigned_user', '')})")
message_str = "\n".join(message_lst)
print("Message to send:", message_str)
return message_str
def send_payload_slack():
if CI_JOB_STATUS == "failed":
slack_msg_header = f":x: *[{CI_PROJECT_NAME.upper()}]* Deploy to *{ALTER_NAME_DEPLOY.upper() or CI_COMMIT_REF_NAME.upper()}* failed"
elif CI_JOB_STATUS == "success":
slack_msg_header = f":white_check_mark: *[{CI_PROJECT_NAME.upper()}]* Deploy to *{ALTER_NAME_DEPLOY.upper() or CI_COMMIT_REF_NAME.upper()}* succeeded"
else:
return
print("Slack Msg Header", slack_msg_header)
merge_requests_str = get_need_deployed_merge_requests_str()
if merge_requests_str == "":
print("==> Skip notify: No merge requests were deployed")
return
json_payload = prepare_payload_slack_json(
slack_msg_header, merge_requests_str)
headers = {'content-type': 'application/json', 'Accept-Charset': 'UTF-8'}
print("DEBUG_MODE", DEBUG_SLACK)
if CI_JOB_STATUS == "success" and CI_COMMIT_REF_NAME == "develop" and not DEBUG_SLACK:
print("==> Skip notify successful deployment in integration")
return
SLACK_WEBHOOK_POST = SLACK_WEBHOOK
if CI_COMMIT_REF_NAME == "develop":
if CI_JOB_STATUS == "failed":
SLACK_WEBHOOK_POST = SLACK_WEBHOOK_FAILED
if DEBUG_SLACK:
SLACK_WEBHOOK_POST = SLACK_WEBHOOK_DEBUG
r = requests.post(SLACK_WEBHOOK_POST, data=json_payload, headers=headers)
print(r.content)
def prepare_payload_slack_json(slack_msg_header, merge_requests_str):
payload = {
"blocks": [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": slack_msg_header
}
},
{
"type": "divider"
},
{
"type": "section",
"fields": [
{
"type": "mrkdwn",
"text": f"*Job:* <{CI_JOB_URL}|{CI_JOB_ID}>"
},
{
"type": "mrkdwn",
"text": f"*Pipeline:* <{CI_PIPELINE_URL}|{CI_PIPELINE_ID}>"
},
]
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text":
merge_requests_str
and f"*List merge requests {CI_JOB_STATUS == 'success' and 'were' or 'were _not_'} deployed:* \n" + merge_requests_str
or "*No merge requests were deployed*"
}
}
]
}
payload_json = json.dumps(payload)
# print("Payload Json: ", payload_json, "\n\n")
return payload_json
if __name__ == "__main__":
send_payload_slack()
update_deployment_stage_task_bms()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment