# 简化版的魔法脚本 # python3.6+, 依赖: pip3 install bs4 lxml loguru import os from time import sleep, time from bs4 import BeautifulSoup from requests import get, post from loguru import logger uid = 00000 # uid cookie = 'nexusphp_u2=' # cookie hours = 24 # 魔法时长 ur = 1 # 上传比率 dr = 0 # 下载比率 user = 'SELF' # 魔法范围,所有人填 'ALL',自己填 'SELF',别人填 'OTHER' uc_max = -1 # 单个魔法 uc 使用量最大值,如果为 -1 则不检查,否则超过不放魔法 uc_24_max = -1 # 24h 内 uc 使用最大值,如果为 -1 则不检查,否则超过不放魔法 tid_max = -1 # 种子 id 最大值,如果为 -1 则不检查,否则超过不放魔法 user_agent = 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) ' \ 'Chrome/99.0.4814.0 Safari/537.36 Edg/99.0.1135.6' # 用户代理 proxies = { # 'http': 'http://127.0.0.1:10809', 'https': 'http://127.0.0.1:10809' } # 代理 log_path = f'{os.path.splitext(__file__)[0]}.log' # 日志路径 data_path = f'{os.path.splitext(__file__)[0]}.data.txt' # 数据保存路径 logger.add(level='DEBUG', sink=log_path, encoding='utf-8') # 日志设置 interval = 180 # 检查间隔 request_args = {'headers': {'cookie': cookie, 'user_agent': user_agent}, 'proxies': proxies, 'timeout': 20} class Magic: def __init__(self): with open(data_path, 'a', encoding='utf-8'): pass self.magic_info = [] with open(data_path, 'r', encoding='utf-8') as f: fr = f.read() if fr.startswith('magic_info = '): self.magic_info = eval(fr.lstrip('magic_info = ')) @staticmethod def get_pro(tr): pro = {'ur': 1.0, 'dr': 1.0} pro_dict = {'free': {'dr': 0.0}, 'twoup': {'ur': 2.0}, 'halfdown': {'dr': 0.5}, 'thirtypercent': {'dr': 0.3}} if tr.get('class'): [pro.update(data) for key, data in pro_dict.items() if key in tr['class'][0]] td = tr.tr and tr.select('tr')[1].td or tr.select('td')[1] pro_dict_1 = {'free': {'dr': 0.0}, '2up': {'ur': 2.0}, '50pct': {'dr': 0.5}, '30pct': {'dr': 0.3}, 'custom': {}} for img in td.select('img') or []: if not [pro.update(data) for key, data in pro_dict_1.items() if key in img['class'][0]]: pro[{'arrowup': 'ur', 'arrowdown': 'dr'}[img['class'][0]]] = float(img.next_element.text[:-1].replace(',', '.')) for span in td.select('span') or []: [pro.update(data) for key, data in pro_dict.items() if key in (span.get('class') and span['class'][0] or '')] return list(pro.values()) @property def to_info(self): url = f'https://u2.dmhy.org/getusertorrentlistajax.php?userid={uid}&type=leeching' table = BeautifulSoup(get(url, **request_args).text.replace('\n', ''), 'lxml').table if table: for tr in table.contents[1:]: tid = int(tr.contents[1].a['href'][15:-6]) data = {'pro': self.get_pro(tr), 'seeder_num': int(tr.contents[3].string)} yield tid, data def run(self): if uc_24_max == -1 or self.total_cost <= uc_24_max: for tid, data in self.to_info: if tid_max == -1 or tid <= tid_max: _ur = 1 if ur <= data['pro'][0] else ur _dr = 1 if dr >= data['pro'][1] else dr if not (_ur == 1 and _dr == 1): _data = {'hours': hours, 'torrent': tid, 'user': user, 'ur': _ur, 'dr': _dr} for _info in self.magic_info: if tid == _info['torrent']: if _data['ur'] <= _info['ur'] and _data['dr'] >= _info['dr']: continue if data['seeder_num'] > 0: try: self.magic(_data) except Exception as _e: logger.exception(_e) @property def total_cost(self): _total = 0 tmp = [] for _info in self.magic_info: if time() - _info['ts'] < _info['hours'] * 3600: _total += _info.get('uc') or 0 tmp.append(_info) self.magic_info = tmp return _total def magic(self, _data): tid = _data['torrent'] url = f'https://u2.dmhy.org/promotion.php?action=magic&torrent={tid}' soup = BeautifulSoup(get(url, **request_args).text.replace('\n', ''), 'lxml') data = {h['name']: h['value'] for h in soup.find_all('input', {'type': 'hidden'})} data.update({'user_other': uid, 'start': 0, 'promotion': 8, 'comment': ''}) data.update(_data) response = post('https://u2.dmhy.org/promotion.php?test=1', **request_args, data=data).json() if response['status'] == 'operational': uc = int(float(BeautifulSoup(response['price'], 'lxml').span['title'].replace(',', ''))) if uc_max != -1 and uc > uc_max: self.magic_info.append({**_data, **{'ts': int(time())}}) logger.warning(f"Magic for torrent {data['torrent']} cost exceeded") else: _post = post(f'https://u2.dmhy.org/promotion.php?action=magic&torrent={tid}', **request_args, data=data) if _post.status_code == 200: self.magic_info.append({**_data, **{'ts': int(time()), 'uc': uc}}) _user = data['user_other'] if data['user'] == 'OTHER' else data['user'].lower() logger.info(f"Sent a {data['ur']}x upload and {data['dr']}x download magic to torrent {tid}, " f"user {_user}, duration {data['hours']}h, ucoin cost {uc}") logger.debug(f'24h uc usage: {self.total_cost}') else: logger.error(f'Failed to send magic to torrent {tid} | ' f'status code: {_post.status_code} | data: {data}') with open(f'{data_path}', 'w', encoding='utf-8') as f: f.write(f'magic_info = {self.magic_info}\n') if __name__ == '__main__': m = Magic() while True: try: m.run() except Exception as e: logger.exception(e) finally: sleep(interval)