Last active
December 22, 2017 06:03
-
-
Save QuadTriangle/6bb6da4293975fbf5a8b7c4b8a288231 to your computer and use it in GitHub Desktop.
Buy airtel (BD) data pack automatically
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| ''' | |
| A small python script to buy Airtel data pack with ease and convenience. | |
| It uses Robi ecare API and SL4A (only tested on QPython v2.0.11). | |
| SIM card must be attached on same device. If you want to run this | |
| script on another device, please, use the version v4. | |
| Use this script at your own _risk_. | |
| author: Quad Triangle <t.me/QuadTriangle, fb.com/QuadTriangle> | |
| year: 2017 | |
| license: Unlicense <http://unlicense.org/> | |
| version: v6 | |
| ''' | |
| import re | |
| import sys | |
| import time | |
| from hashlib import sha1 | |
| import requests | |
| from requests.models import RequestEncodingMixin as REM | |
| try: | |
| import androidhelper as android | |
| except: | |
| import android | |
| # robi ecare api | |
| HOST = 'https://ecare-app.robi.com.bd' | |
| LOGIN = '/airtel_sc/index.php?r=/accounts/login&_dc=' | |
| PACKAGES = '/airtel_sc/index.php?r=/data-packages/get-data-packages&_dc=' | |
| BUY_PACKAGE = '/airtel_sc/index.php?r=/data-packages/activate-data-package&_dc=' | |
| droid = android.Android() | |
| session = requests.Session() | |
| msg_ids_robi_existing = set() | |
| # session.verify = False | |
| # requests.packages.urllib3.disable_warnings() | |
| session.headers = { | |
| 'Connection': 'keep-alive', | |
| 'Origin': 'file://', | |
| 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8', | |
| 'Accept': '*/*', | |
| 'x-wap-profile': 'http://218.249.47.94/Xianghe/MTK_Phone_JB_UAprofile.xml', | |
| 'X-Requested-With': 'net.omobio.airtelsc', | |
| 'User-Agent': 'Mozilla/5.0 (Linux; U; Android 4.2.2; en-us; Symphony) AppleWebKit/534.30 (KHTML, like Gecko) Version/4.0 Mobile Safari/534.30', | |
| 'Accept-Encoding': 'gzip,deflate', | |
| 'Accept-Language': 'en-US', | |
| 'Accept-Charset': 'utf-8, utf-16, *;q=0.7' | |
| } | |
| data = { | |
| 'app_type': 'mobile_app', | |
| 'force_loging': '0', | |
| 'device_model': 'Symphony', | |
| 'device_platform': 'Android', | |
| 'device_version': '4.2.2', | |
| 'network_type': 'mobile', | |
| 'app_version': '4.0.2' | |
| } | |
| def handle_exception(*args): | |
| droid.dialogDismiss() | |
| sys.__excepthook__(*args) | |
| # dismiss sl4a dialog on exception | |
| sys.excepthook = handle_exception | |
| def get_number(text='please enter the phone number'): | |
| droid.dialogCreateInput('Number', text, '016', 'number') | |
| droid.dialogSetPositiveButtonText('ok') | |
| droid.dialogShow() | |
| num = droid.dialogGetResponse().result['value'] | |
| if num.startswith('016') and num.isdigit() and len(num) == 11: | |
| droid.dialogDismiss() | |
| return num | |
| return get_number('invalid number. input again') | |
| def get_password(): | |
| droid.dialogCreatePassword() | |
| droid.dialogSetPositiveButtonText('ok') | |
| droid.dialogShow() | |
| _pass = droid.dialogGetResponse().result['value'] | |
| if _pass: | |
| droid.dialogDismiss() | |
| return _pass | |
| return get_password() | |
| def login_validator(resp_data): | |
| values = resp_data.values() | |
| if 'general.invalid_user_credentials' in values: | |
| return 'invalid' | |
| elif 'general.err_unknown' in values: | |
| return 'down' | |
| elif resp_data.get('success'): | |
| return 'success' | |
| elif 'planned maintenance' in resp_data.get('error'): | |
| return 'maintenance' | |
| else: | |
| droid.dialogDismiss() | |
| print resp_data | |
| sys.exit(1) | |
| def login_robi_ecare(num, _pass): | |
| data.update({ | |
| 'conn': num, | |
| 'device_uuid': sha1(num).hexdigest()[:16], # :D | |
| 'device_imsi': sha1(num).hexdigest()[:16], # :P | |
| 'password': _pass | |
| }) | |
| resp = session.post(HOST + LOGIN + time_dc(), | |
| data=REM._encode_params(data)) | |
| status = login_validator(resp.json()) | |
| del data['password'], data['force_loging'] | |
| if status == 'success': | |
| data.update({ | |
| 'session_key': resp.json()['sessionKey'], | |
| 'conn_type': resp.json()['connections'][0]['conn_type'], | |
| 'operator': resp.json()['connections'][0]['operator'], | |
| 'conn': resp.json()['connections'][0]['conn'], | |
| 'ref_number': resp.json()['user']['mobile'], | |
| 'lang': 'en', | |
| 'page': '1', | |
| 'start': '0', | |
| 'limit': '15' | |
| }) | |
| return status | |
| def login_progress(num, _pass): | |
| droid.dialogCreateSpinnerProgress('Login', 'Trying to login.') | |
| droid.dialogShow() | |
| status = login_robi_ecare(num, _pass) | |
| droid.dialogDismiss() | |
| return status | |
| def login_status_dialog(status): | |
| if status == 'success': | |
| return | |
| elif status == 'invalid': | |
| droid.dialogCreateAlert('Login Failed', | |
| 'wrong number or password') | |
| elif status == 'down': | |
| droid.dialogCreateAlert('Login Failed', | |
| 'airtel data pack server down' | |
| '\nplease try again later') | |
| elif status == 'maintenance': | |
| droid.dialogCreateAlert('Login Failed', | |
| 'server is currently unavailable' | |
| '\nfor a planned maintenance' | |
| '\nplease try again later') | |
| droid.dialogSetNegativeButtonText("ok") | |
| droid.dialogShow() | |
| droid.dialogGetResponse() | |
| droid.dialogDismiss() | |
| sys.exit(1) | |
| def get_buy_times(text='how many times do you want to buy this pack'): | |
| droid.dialogCreateInput('Buy Times', text, inputType='number') | |
| droid.dialogSetPositiveButtonText('ok') | |
| droid.dialogShow() | |
| total_buy = droid.dialogGetResponse().result['value'] | |
| if total_buy.isdigit() and len(total_buy) <= 3: | |
| droid.dialogDismiss() | |
| return total_buy | |
| return get_buy_times('invalid number. buy up to 100 times') | |
| def confirm_buy(plan_id, buy_times, total_cost): | |
| droid.dialogCreateAlert('Confirm Data Package', | |
| 'Pack: ' + plan_id + | |
| '\nBuy Times: ' + buy_times + | |
| '\nTotal Cost: ' + total_cost + 'tk') | |
| droid.dialogSetPositiveButtonText("Yes") | |
| droid.dialogSetNegativeButtonText("No") | |
| droid.dialogShow() | |
| resp = droid.dialogGetResponse().result | |
| which = resp.get('which') | |
| droid.dialogDismiss() | |
| return which == 'positive' | |
| def msg_get_robi_sheba_ids(): | |
| msgs = droid.smsGetMessages(True).result | |
| if msgs is None: | |
| print 'enable SMS permission for Qpython' | |
| print '[settings>apps>qpython>permissions]' | |
| sys.exit(1) | |
| msg_ids = [] | |
| for msg in msgs: | |
| if msg['address'] == 'Robi Sheba': | |
| msg_ids.append(msg['_id']) | |
| return msg_ids | |
| def msg_get_secret_by_id(msg_id): | |
| msg = droid.smsGetMessageById(msg_id).result | |
| re_obj = re.search(r'\d{6}', msg['body']) | |
| if re_obj: | |
| return re_obj.group() | |
| return None | |
| def msg_ids_robi_update_esisting(): | |
| msg_ids = msg_get_robi_sheba_ids() | |
| msg_ids_robi_existing.update(msg_ids) | |
| def msg_get_secret(): | |
| msg_ids = msg_get_robi_sheba_ids() | |
| for msg_id in msg_ids: | |
| if msg_id in msg_ids_robi_existing: | |
| continue | |
| secret = msg_get_secret_by_id(msg_id) | |
| if secret: | |
| msg_ids_robi_update_esisting() | |
| return secret | |
| return None | |
| def msg_get_secret_wait_for(timeout): | |
| for t in range(1, timeout + 1): | |
| time.sleep(t) | |
| secret = msg_get_secret() | |
| if secret: | |
| return secret | |
| return None | |
| def get_data_pack(items): | |
| droid.dialogCreateAlert('Data Package') | |
| droid.dialogSetSingleChoiceItems(['select one pack'] + | |
| items) | |
| droid.dialogSetPositiveButtonText('ok') | |
| droid.dialogShow() | |
| # Wait for response | |
| droid.dialogGetResponse() | |
| index = droid.dialogGetSelectedItems().result[0] | |
| droid.dialogDismiss() | |
| if index != 0: | |
| return items[index - 1] | |
| return get_data_pack(items) | |
| def get_packages(): | |
| resp = session.post(HOST + PACKAGES + time_dc(), | |
| data=REM._encode_params(data)) | |
| del data['page'], data['start'], data['limit'] | |
| packages = {} | |
| for item in resp.json().get('data'): | |
| packages[item.get('plan_id')] = [item.get('name'), | |
| item.get('tariff_with_vat')] | |
| return packages | |
| def get_packages_progress(): | |
| droid.dialogCreateSpinnerProgress('Package', 'Retrieving packages.') | |
| droid.dialogShow() | |
| packages = get_packages() | |
| droid.dialogDismiss() | |
| return packages | |
| def update_data_plan(): | |
| packages = get_packages_progress() | |
| packages_sorted = sorted(packages.items(), | |
| key=lambda x: float((x[1][1])[4:])) # (-_-) | |
| items = [item[0] + ' - ' + item[1][1] for item in packages_sorted] | |
| data_pack = get_data_pack(items) | |
| plan_id = data_pack.split(' - ')[0] | |
| cost = data_pack.split(' - ')[1][4:] | |
| data.update({'plan_id': plan_id, | |
| 'name': packages[plan_id][0]}) | |
| return cost | |
| def parse_buy_response(resp_data): | |
| if resp_data.get('success'): | |
| print 'status: success' | |
| elif resp_data.get('error') == 'pin_sent': | |
| print 'status: secret sent' | |
| elif 'Invalid Secret' in resp_data.get('stack-trace', ['', ''])[1]: | |
| print 'got wrong secret: waiting 15 seconds to avoid confusion' | |
| print '[if you see this message several times, please, try again later]' | |
| time.sleep(15) | |
| msg_ids_robi_update_esisting() | |
| else: | |
| print resp_data | |
| def time_dc(): | |
| return (repr(time.time())[:14]).replace('.', '') | |
| def buy_pack_start(buy_times): | |
| # ignore existing msg | |
| msg_ids_robi_update_esisting() | |
| for x in range(1, buy_times + 1): | |
| print '\n\ntrying to buy pack no. ' + str(x) | |
| resp = session.post(HOST + BUY_PACKAGE + time_dc(), | |
| data=REM._encode_params(data)) | |
| parse_buy_response(resp.json()) | |
| print 'waiting for sms: timeout in 20 seconds' | |
| secret = msg_get_secret_wait_for(20) | |
| if not secret: | |
| print 'skipping: timeout: secret not found' | |
| continue | |
| print 'secret found: requesting to buy the pack' | |
| data.update({'secret_code': secret}) | |
| resp = session.post(HOST + BUY_PACKAGE + time_dc(), | |
| data=REM._encode_params(data)) | |
| parse_buy_response(resp.json()) | |
| del data['secret_code'] | |
| def main(): | |
| num = get_number() | |
| _pass = get_password() | |
| status = login_progress(num, _pass) | |
| login_status_dialog(status) | |
| cost = update_data_plan() | |
| buy_times = get_buy_times() | |
| total_cost = str(float(cost) * int(buy_times)) | |
| ready = confirm_buy(data['plan_id'], buy_times, total_cost) | |
| if not ready: | |
| sys.exit(1) | |
| buy_pack_start(int(buy_times)) | |
| return | |
| if __name__ == '__main__': | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment