Skip to content

Instantly share code, notes, and snippets.

@a-r-g-v
Last active April 2, 2025 15:18
Show Gist options
  • Select an option

  • Save a-r-g-v/a7899ec9d5a25739b34835c273949748 to your computer and use it in GitHub Desktop.

Select an option

Save a-r-g-v/a7899ec9d5a25739b34835c273949748 to your computer and use it in GitHub Desktop.

Revisions

  1. a-r-g-v renamed this gist Apr 2, 2025. 1 changed file with 1 addition and 3 deletions.
    4 changes: 1 addition & 3 deletions gistfile1.txt → download.py
    Original file line number Diff line number Diff line change
    @@ -1,4 +1,3 @@
    ```
    #!/usr/bin/env python3
    # -*- coding: utf-8 -*-

    @@ -297,5 +296,4 @@ def main():


    if __name__ == "__main__":
    main()
    ```
    main()
  2. a-r-g-v created this gist Apr 2, 2025.
    301 changes: 301 additions & 0 deletions gistfile1.txt
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,301 @@
    ```
    #!/usr/bin/env python3
    # -*- coding: utf-8 -*-

    import os
    import sys
    import getpass
    import json
    import time
    import logging
    import requests
    from shutil import copyfileobj

    from pyicloud import PyiCloudService
    from pyicloud.exceptions import PyiCloudAPIResponseException


    # デバッグログを有効化
    def setup_logger():
    logger = logging.getLogger('pyicloud')
    logger.setLevel(logging.DEBUG)

    # コンソールハンドラー
    console_handler = logging.StreamHandler()
    console_handler.setLevel(logging.DEBUG)

    # フォーマッタ
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    console_handler.setFormatter(formatter)

    logger.addHandler(console_handler)

    # requestsライブラリも詳細ログを出力
    requests_log = logging.getLogger("requests.packages.urllib3")
    requests_log.setLevel(logging.DEBUG)
    requests_log.propagate = True

    return logger


    def login_icloud():
    """Apple ID とパスワードをプロンプト入力し、pyicloud でログインして認証済みの PyiCloudService を返す。"""
    logger = setup_logger()

    apple_id = input("Apple ID(メールアドレス)を入力してください: ")
    password = getpass.getpass("Apple IDのパスワードを入力してください(非表示): ")

    # デバッグ用ログ出力
    logger.debug("PyiCloudServiceを初期化します...")

    # 標準のPyiCloudServiceを使用
    api = PyiCloudService(apple_id, password)

    # 2FA が有効な場合の処理
    if api.requires_2fa:
    print("二要素認証(2FA)コードの入力が必要です。")

    # 信頼済みデバイスのリストを表示して選択させる
    if api.trusted_devices:
    print("\n信頼済みデバイスの一覧:")
    for i, device in enumerate(api.trusted_devices):
    # デバイス情報を表示(電話番号の一部が含まれる場合はSMSが可能なデバイス)
    phone_number = device.get('phoneNumber', '')
    device_name = device.get('deviceName', 'Unknown Device')

    # デバイスの詳細情報をデバッグ表示
    logger.debug(f"デバイス {i} の詳細: {json.dumps(device, indent=2)}")

    if phone_number:
    print(f" [{i}]: {device_name} ({phone_number})")
    else:
    print(f" [{i}]: {device_name}")

    device_index = int(input("\n認証コードを受け取るデバイスの番号を選択してください: "))
    device = api.trusted_devices[device_index]

    # 選択したデバイスに検証コードを送信
    print(f"\n選択したデバイス {device.get('deviceName', 'Unknown Device')} に認証コードを送信します...")

    # デバイス情報をログに出力
    logger.debug(f"選択したデバイス情報: {json.dumps(device, indent=2)}")

    # 検証コードを送信
    success = api.send_verification_code(device)
    logger.debug(f"検証コード送信結果: {success}")

    if success:
    # SMS受信の場合は電話番号で確認
    if device.get('phoneNumber'):
    print(f"SMS経由で {device.get('phoneNumber')} に認証コードを送信しました。")
    else:
    print("デバイスに認証コードを送信しました。")
    else:
    print("検証コードの送信に失敗しました。")
    return None
    else:
    print("信頼済みデバイスが見つかりません。")
    return None

    # コードの入力とリトライループ
    max_attempts = 3
    attempt = 0
    verification_success = False

    while attempt < max_attempts and not verification_success:
    attempt += 1
    if attempt > 1:
    print(f"\n認証コードの検証に失敗しました。残り試行回数: {max_attempts - attempt + 1}")
    retry = input("新しいコードを要求しますか? (y/n): ")
    if retry.lower() == 'y':
    print("新しい認証コードを要求します...")
    success = api.send_verification_code(device)
    if success:
    print("新しい認証コードを送信しました。")
    else:
    print("新しい認証コードの送信に失敗しました。")

    # コードの入力
    code = input("受け取った6桁の認証コードを入力してください: ")

    # 入力されたコードをログに記録
    logger.debug(f"入力されたコード: {code}")

    # コードの前後の空白と改行を削除
    code = code.strip()
    logger.debug(f"整形後のコード: {code}")

    try:
    # validate_verification_code メソッドを使用(2FA用のvalidate_2fa_codeではなく)
    logger.debug("validate_verification_code メソッドを呼び出します...")

    # validate_verification_code は device オブジェクトと code を必要とする
    result = api.validate_verification_code(device, code)

    logger.debug(f"検証結果: {result}")

    if result:
    verification_success = True
    print("認証コードの検証に成功しました。")
    else:
    # APIからの応答が成功の場合は、resultがFalseでも認証は成功と判断
    if isinstance(result, dict) and result.get('success') is True:
    verification_success = True
    print("認証コードの検証に成功しました。")
    else:
    print("認証コードの検証に失敗しました。")
    except PyiCloudAPIResponseException as ex:
    status_code = getattr(ex, 'status', None)
    reason = getattr(ex, 'reason', None)
    response_text = getattr(ex, 'response_text', None)

    print(f"APIエラー: {ex}")
    print(f"ステータスコード: {status_code}, 理由: {reason}")

    if response_text:
    print(f"レスポンステキスト: {response_text}")

    # エラーの詳細をログに出力
    logger.error(f"API応答エラー: {ex}")
    logger.error(f"ステータスコード: {status_code}, 理由: {reason}")

    if response_text:
    try:
    json_response = json.loads(response_text)
    logger.error(f"JSONレスポンス: {json.dumps(json_response, indent=2)}")
    except:
    logger.error(f"テキストレスポンス: {response_text}")

    print("コードが無効か有効期限が切れている可能性があります。")
    except Exception as e:
    logger.exception(f"予期しないエラー: {e}")
    print(f"予期しないエラー: {e}")

    if not verification_success:
    raise Exception("認証コードの検証に失敗しました。最大試行回数を超えました。")

    if not api.is_trusted_session:
    # 今回のセッションを信頼済みに登録
    try:
    api.trust_session()
    print("このセッションを信頼済みに登録しました。(約2か月有効)")
    except Exception as ex:
    print(f"セッションの信頼登録に失敗しました: {ex}")
    print("このエラーは無視して続行します。")
    logger.warning(f"セッション信頼失敗: {ex}")
    # 認証自体は成功しているので続行する
    pass
    elif api.requires_2sa:
    # 古い 2ステップ認証(2SA)の場合(必要なら実装)
    print("2SA 認証が必要です。処理を実装してください。")
    sys.exit(1)

    # 認証状態の最終確認
    if not api.user:
    logger.error("iCloudユーザー情報を取得できません。認証に失敗しました。")
    raise Exception("iCloud ログインに失敗しました。")
    else:
    logger.debug(f"認証成功: ユーザー {api.user.get('dsInfo', {}).get('fullName', 'Unknown')} としてログイン")

    print("iCloud にログインしました。")
    return api


    def ensure_dir(path):
    """指定ディレクトリが存在しなければ作成する。"""
    if not os.path.isdir(path):
    os.makedirs(path)


    def find_large_files_in_drive(folder, threshold):
    """iCloud Drive のフォルダを再帰的に探索し、閾値(バイト)以上のファイルをリストアップ。"""
    items = []
    for name in folder.dir():
    item = folder[name]
    if item.type == 'file':
    # sizeがNoneの場合は比較をスキップ
    if item.size is not None and item.size >= threshold:
    items.append(item)
    elif item.type == 'folder':
    # 再帰的に探索
    items.extend(find_large_files_in_drive(item, threshold))
    return items


    def download_icloud_drive(api, size_threshold, download_dir):
    """iCloud Drive 内のファイルから size_threshold(バイト)以上のものをダウンロード。"""
    print(f"\n=== iCloud Drive から {size_threshold}バイト以上のファイルを探します ===")
    large_files = find_large_files_in_drive(api.drive, size_threshold)
    print(f"該当ファイル数: {len(large_files)} 件")

    ensure_dir(download_dir)

    for file_item in large_files:
    file_name = file_item.name
    file_path = os.path.join(download_dir, file_name)
    size = file_item.size
    print(f" ダウンロード: {file_name} (サイズ: {size} バイト) -> {file_path}")
    with file_item.open(stream=True) as resp:
    with open(file_path, 'wb') as out_f:
    copyfileobj(resp.raw, out_f)
    print("iCloud Drive のダウンロード完了。\n")


    def download_icloud_photos(api, size_threshold, download_dir):
    """iCloud 写真ライブラリ(すべての写真)から size_threshold(バイト)以上の写真・動画をダウンロード。"""
    print(f"=== iCloud 写真ライブラリから {size_threshold}バイト以上のファイルを探します ===")
    all_photos = api.photos.all
    large_assets = []

    # 写真ライブラリは多数ある場合があるので、時間がかかるかもしれません
    for asset in all_photos:
    if asset.size and asset.size >= size_threshold:
    large_assets.append(asset)

    print(f"該当写真/動画: {len(large_assets)} 件")

    ensure_dir(download_dir)

    for photo in large_assets:
    filename = photo.filename or f"{photo.id}.jpg" # ファイル名がなければID使う
    file_path = os.path.join(download_dir, filename)
    print(f" ダウンロード: {filename} (サイズ: {photo.size} バイト) -> {file_path}")
    resp = photo.download() # レスポンスを取得
    with open(file_path, 'wb') as out_f:
    copyfileobj(resp.raw, out_f)
    print("iCloud 写真ライブラリのダウンロード完了。\n")


    def main():
    # 引数チェック: 第1引数に「閾値(MB)」を取る。指定がなければデフォルト10MB。
    if len(sys.argv) > 1:
    try:
    mb_value = float(sys.argv[1])
    except ValueError:
    print("サイズ指定(MB)の引数が不正です。例: 10")
    sys.exit(1)
    else:
    mb_value = 10.0 # デフォルト 10MB

    size_threshold = int(mb_value * 1024 * 1024)
    print(f"コマンドライン引数から閾値を取得しました: {mb_value} MB ({size_threshold} バイト以上を対象)")

    # iCloud ログイン
    api = login_icloud()

    # ダウンロード先ディレクトリ (カレント下に downloads/{drive,photos} を作成)
    drive_download_dir = os.path.join("downloads", "drive")
    photos_download_dir = os.path.join("downloads", "photos")

    # iCloud Drive の大きいファイルをダウンロード
    download_icloud_drive(api, size_threshold, drive_download_dir)

    # iCloud 写真ライブラリの大きいファイルをダウンロード
    download_icloud_photos(api, size_threshold, photos_download_dir)

    print("すべての処理が完了しました。")


    if __name__ == "__main__":
    main()
    ```