Skip to content

Instantly share code, notes, and snippets.

@copyleftdev
Last active March 29, 2026 04:45
Show Gist options
  • Select an option

  • Save copyleftdev/a1ecdb023e263547c5ffe6a7c122150d to your computer and use it in GitHub Desktop.

Select an option

Save copyleftdev/a1ecdb023e263547c5ffe6a7c122150d to your computer and use it in GitHub Desktop.
Supplemental IOC & Infrastructure Intelligence — Toll Booth Phishing Deep Dive (follow-up to Ben Polonsky writeup)

Supplemental IOC & Infrastructure Intelligence

Toll Booth Phishing Campaign — Deep Dive Follow-Up

Context: This supplements Ben Polonsky's writeup with net-new findings from active infrastructure reconnaissance conducted 2026-03-28. All probing was passive/non-invasive against already-identified phishing infrastructure.


1. Backend Framework Identification

The article identified OpenResty 1.29.2.1 as the web server. Behind it sits a second layer:

SERVLET: io.javalin.jetty.JavalinJettyServlet-5bf7f15f

Full stack: Cloudflare (free tier) → OpenResty 1.29.2.1 → Javalin (Java/Kotlin) on Eclipse Jetty

Discovered via WebSocket upgrade request to ilsos.orekx.life, which triggered Jetty's default error page:

<meta http-equiv="Content-Type" content="text/html;charset=ISO-8859-1"/>
<title>Error 404 WebSocket handler not found</title>
<tr><th>SERVLET:</th><td>io.javalin.jetty.JavalinJettyServlet-5bf7f15f</td></tr>

Corroborated by 8K path overflow returning Jetty's signature 431 Bad Message format.


2. Operator C2 Panel: /console/

Not in the original article. The phishing backend exposes an auth-gated operator panel:

Endpoint Status Meaning
GET /console/ 401 Operator panel root — requires authentication
GET /console (no slash) 400 Route exists, trailing slash required
/console/* (all sub-paths) 400 Routes registered but auth-gated
/console/ws (WebSocket) 400 Real-time C2 WebSocket endpoint

Confirmed sub-routes (all return 400, indicating registered handlers behind auth):

/console/login          /console/auth           /console/api
/console/admin          /console/dashboard      /console/operator
/console/operators      /console/config         /console/settings
/console/victims        /console/sessions       /console/campaigns
/console/templates      /console/logs           /console/data
/console/export         /console/import         /console/status
/console/health         /console/ws             /console/websocket
/console/static/        /console/assets/        /console/js/
/console/css/           /console/favicon.ico

Auth mechanism is not Basic, Bearer, API-key, or simple cookie. Enforced at Javalin middleware level before route dispatch. Classic Jetty semicolon/traversal bypasses all fail.


3. Current Infrastructure State (2026-03-28)

Domain DNS HTTP Notes
xykri.com clientHold (suspended) Dead Registrar Cloud Yuqu LLC finally acted
oe.xykri.com No resolution Dead
orekx.life NS only (no A) Cloudflare NS: kevin/keira
ilsos.orekx.life 172.67.200.47 / 104.21.76.199 LIVE API framework responding; /console/ returns 401
hxlkm.life 172.67.218.70 / 104.21.45.190 Parked Rerouted to Skenzo/ParkingCrew
ilsos.hxlkm.life Same as parent Down Resolves but no HTTP response

ilsos.orekx.life is the critical finding — the Javalin backend is still running. Phishing victim routes (/pay/b_info.html, /pay/c_info.html, /api/verify/{code}) return 404 (deregistered), but the operator panel at /console/ is live and auth-gated. This infrastructure can be re-weaponized with fresh routes at any time.


4. Cloudflare Intelligence

Separate Cloudflare accounts per domain (compartmentalized OPSEC):

Domain CF Nameservers CF Flight ID
xykri.com ivan / kinsley
orekx.life kevin / keira 447f109
hxlkm.life pola / kellen 12f952

Cloudflare Trace (/cdn-cgi/trace):

  • Datacenter: LAX (Los Angeles)
  • TLS: TLSv1.3 with X25519Kyber768Draft00 (post-quantum key exchange)
  • WARP/Gateway/RBI: all off
  • Free tier only — no WAF rules, no Workers, no Pages, no Tunnel

Origin IP: Not leaked. Unflare origin discovery, subdomain brute (35 subs), DNS history, mail records, and direct-IP-with-Host-header all failed to reveal the origin. Likely Tencent or Alibaba Cloud based on ecosystem patterns.


5. Domain Parking Layer (hxlkm.life)

hxlkm.life root domain leaked critical headers:

Via: 0.0 Caddy
X-Redirect: skenzo
X-Pcrew-Blocked-Reason: (empty)
X-Pcrew-Ip-Organization: Spectrum
X-Domain: hxlkm.life
Accept-Ch: viewport-width, dpr, device-memory, rtt, downlink, ect,
           ua, ua-full-version, ua-platform, ua-platform-version,
           ua-arch, ua-model, ua-mobile

Root is served via Skenzo (domain monetization, BVI) / ParkingCrew (Team Internet AG, Munich) through a Caddy reverse proxy. But wildcard DNS still active — all subdomains (*.hxlkm.life) route to Cloudflare → nginx origin returning 403.


6. Registration Intelligence

xykri.com — Cloud Yuqu LLC (China)

Registrar:      Cloud Yuqu LLC (IANA 3824)
Registrar URL:  diymysite.com
WHOIS Server:   whois.diymysite.com
Registrant:     Jiangxi Province (江西省), China
Created:        2025-04-15
Status:         clientHold (suspended as of 2026-03-21)
Abuse Email:    abuse@diymysite.com / demi@diymysite.com
Abuse Phone:    +86.17723349228 / +86.19981778832

orekx.life — Dynadot (RDAP)

Registrar:    Dynadot Inc (IANA 472)
Registered:   2026-03-19T10:51:57Z
Last Changed: 2026-03-24T10:52:29Z
Abuse:        abuse@dynadot.com / +1.6502620100
Status:       client transfer prohibited

hxlkm.life — Dynadot (RDAP)

Registrar:    Dynadot Inc (IANA 472)
Registered:   2026-03-20T10:56:51Z  ← ONE DAY after orekx.life
Last Changed: 2026-03-25T10:57:27Z
Abuse:        abuse@dynadot.com / +1.6502620100
Status:       client transfer prohibited

Pre-staging confirmed: hxlkm.life was registered exactly one day after orekx.life as a ready-to-go fallback domain.


7. Certificate Transparency

Subject Issuer Issued SAN Notes
orekx.life Let's Encrypt E7 2026-03-19 *.orekx.life, orekx.life Wildcard, same day as registration
hxlkm.life Let's Encrypt E7 2026-03-20 *.hxlkm.life, hxlkm.life Wildcard, same day as registration
oe.xykri.com Let's Encrypt R13 2026-03-19 Single Specific subdomain cert
xykri.com Sectigo DV E36 2026-03-15 *.xykri.com, xykri.com Wildcard, paid Sectigo via CF

Wildcard certs on all domains enable rapid subdomain rotation without new issuance.


8. Ecosystem Attribution

The article references "Crimson Vector" and "Lighthouse Smishing Syndicate 2026". These names do not appear in public threat intelligence. The operation maps to the Smishing Triad ecosystem, tracked under these names:

Vendor Tracking Name
Google TAG Lighthouse PhaaS (lawsuit filed Nov 2025, SDNY)
Prodaft LARVA-241 (Wang Duo Yu / Lighthouse developer)
Silent Push Smishing Triad
Unit 42 Global Smishing Campaign
Resecurity Smishing Triad
Netcraft Lighthouse / Lucid PhaaS

Key details:

  • 194,000+ malicious domains, 8,800+ IPs, 200+ ASNs
  • $1B+ estimated revenue over three years
  • 1M+ confirmed victims across 120+ countries
  • Key developer Wang Duo Yu (老王) named in Google's civil action
  • .life is a confirmed TLD in the Smishing Triad rotation
  • Cloud Yuqu LLC is a known registrar in this ecosystem
  • Post-Lighthouse disruption (Nov 2025), operators migrated between Darcula and Lucid PhaaS platforms

9. Consolidated IOCs

Domains

xykri.com                   # QR redirect (SUSPENDED - clientHold)
oe.xykri.com               # Landing page (DOWN)
orekx.life                  # Phishing host
ilsos.orekx.life            # Fake IL SoS portal (LIVE - C2 panel active)
hxlkm.life                  # Replacement host (PARKED via Skenzo)
ilsos.hxlkm.life            # Replacement portal (DOWN)

IPs (Cloudflare Edge — origin masked)

172.67.200.47               # ilsos.orekx.life
104.21.76.199               # ilsos.orekx.life
172.67.218.70               # hxlkm.life
104.21.45.190               # hxlkm.life
2606:4700:3030::ac43:c82f   # ilsos.orekx.life (IPv6)
2606:4700:3033::6815:4cc7   # ilsos.orekx.life (IPv6)
2606:4700:3030::6815:2dbe   # hxlkm.life (IPv6)
2606:4700:3034::ac43:da46   # hxlkm.life (IPv6)

Cloudflare Nameservers (per-account)

ivan.ns.cloudflare.com / kinsley.ns.cloudflare.com    # xykri.com
kevin.ns.cloudflare.com / keira.ns.cloudflare.com     # orekx.life
pola.ns.cloudflare.com / kellen.ns.cloudflare.com     # hxlkm.life

Registrar

Cloud Yuqu LLC (IANA 3824)
  URL:    diymysite.com
  WHOIS:  whois.diymysite.com
  Abuse:  abuse@diymysite.com / demi@diymysite.com
  Phone:  +86.17723349228 / +86.19981778832
  Location: Hainan, China

Technology Stack

CDN:        Cloudflare (free tier, separate accounts per domain)
Frontend:   OpenResty 1.29.2.1 (Nginx + Lua)
Backend:    Javalin on Eclipse Jetty (io.javalin.jetty.JavalinJettyServlet-5bf7f15f)
Encryption: AES-256-CTR (key in cleartext with payload)
Template:   US-IL-ticket-ilsos
C2 Panel:   /console/ (auth-gated, 25+ operator routes)

API Endpoints (from article — currently deregistered)

POST /api/verify/{code}              # Victim verification + redirect
POST /pay/api/open/getSyncSettings   # Encrypted data exfiltration
POST /pay/api/open/pollInstruction   # Real-time operator C2 polling
GET  /pay/b_info.html                # Billing info collection form
GET  /pay/c_info.html                # Credit card collection form

Operator Panel Routes (NEW — discovered via recon)

/console/                            # Panel root (401 Unauthorized)
/console/login                       # Login endpoint
/console/auth                        # Auth endpoint
/console/api                         # Panel API
/console/victims                     # Victim data management
/console/sessions                    # Active sessions
/console/campaigns                   # Campaign management
/console/templates                   # Phishing template library
/console/operators                   # Operator accounts
/console/export                      # Data exfiltration/export
/console/ws                          # WebSocket C2 channel

10. GitHub OSINT — Kit Internals & Operator Intelligence

Source: gsmith257-cyber/Smishing-Triad (public repo)

A researcher published SQLi dumps, cracked credentials, and a password cracker from the earlier USPS variant of this same PhaaS operation. Key findings:

Password Hashing Algorithm (confirmed):

// From cracker.go — the kit uses triple-MD5 with a hardcoded salt
buf.WriteString(password + "wangduoyu666!.+-")  // salt = developer's name + lucky numbers
hash := md5.Sum(buf.Bytes())                     // MD5 round 1
hash = md5.Sum([]byte(hex(hash)))                // MD5 round 2
hash = md5.Sum([]byte(hex(hash)))                // MD5 round 3

The salt wangduoyu666!.+- literally contains the Lighthouse developer's name — Wang Duo Yu (王多鱼). This is the same individual named in Google's November 2025 lawsuit. 50+ operator passwords were cracked from this scheme (e.g., 123456, 88888888, A123123, Aa123456).

Admin Table Schema (from SQLi dump):

id, token, desc, name, type, avatar, login_ip, password, username, login_time, permission
  • type=1 = admin, type=2 = operator, type=4 = unknown role
  • Admin account links to Telegram: https://t.me/wangduoyu0
  • Operator login IPs: 222.77.240.43, 2409:8a34:3c28:e01:... (Chinese IP space)
  • Usernames are encrypted (base64-like encoding), not plaintext

Config Table Schema (phishing kit configuration):

pid, tg_uid, otp, key, url, mount, state, title, captcha, ht_type,
tg_open, timeout, allow_pc, tg_token, pay_status, succ_count,
redirect_url, refresh_rate, refuse_cards, country_whitelist, ...

Key fields reveal:

  • Telegram integration for real-time operator alerts (tg_token, tg_uid, tg_open)
  • BIN filteringrefuse_cards contains 1000+ card BIN prefixes to reject (likely prepaid/virtual cards)
  • Country whitelisting for victim filtering
  • OTP/CAPTCHA toggle for verification pages
  • allow_pc flag — confirms mobile-only rendering is configurable
  • succ_count — tracks successful card captures per campaign

Additional GitHub Signals

Query Finding
"此页面为真人身份验证" Zero results — this Chinese verification string is unique to this kit variant and not yet leaked publicly. High-confidence detection fingerprint.
"US-IL-ticket-ilsos" Zero results — template ID not leaked
diymysite.com Appears in SmokeDetector (spam detection), deCloudflare (149,603 NS entries), DECEPT-URL phishing research, and ICANN registrar databases
"Cloud Yuqu" Listed in ICANN registrar IDs and DNS abuse contact databases

Relevance to /console/ Panel

If the toll phishing kit at ilsos.orekx.life uses the same password hashing scheme (wangduoyu666!.+- triple-MD5), then the 50+ cracked passwords from the USPS variant represent potential authentication material for the /console/ operator panel we discovered. The admin table schema also maps cleanly to the console routes we identified (/console/operators, /console/sessions, /console/campaigns).


11. Recommendations

  1. Cloudflare abuse report for ilsos.orekx.life — the backend is live with an active operator panel. This is the highest-leverage takedown action available right now.
  2. Dynadot abuse report for both orekx.life and hxlkm.life — request full suspension, not just root parking.
  3. IC3/FBI submission with the full IOC set and Smishing Triad attribution.
  4. Monitor Javalin/Jetty fingerprint — the servlet hash 5bf7f15f could be used for Shodan/Censys pivoting to find other deployments of this same PhaaS kit if the origin IP is ever exposed.

12. URLScan.io Pivot — 5,947 Live Kit Deployments

Using the phishing kit's favicon hash and "Human Verification" page title as pivot fingerprints, URLScan.io reveals the Lighthouse kit is massively active across thousands of domains.

Pivot query: page.title:"Human Verification" AND page.server:cloudflare5,947 scans

Favicon hash pivot (e5da3e0862438e2046d7b2ba0b575f0639e03b6936f69d2c459616b8686d9439) → 147 unique deployments

Active Deployments (last 7 days)

Domain IP URL Pattern Last Seen
tio.usamotortoys.com 104.21.70.132 /public/SNrbRt 2026-03-29
edna.agdstudios.com 2a06:98c1:3121::3 /public/PTDP2W 2026-03-28
evq.aprenderaleer.com 188.114.97.3 /public/zNORpK 2026-03-27
ik.djjonathan.com 172.67.152.128 /public/bL8K4N 2026-03-27
gt.aopinion.com 172.67.206.218 /public/taSr3a 2026-03-26
ru.wbxvgzc.com 188.114.96.3 /public/nh9ven 2026-03-26
ek.anoreksi.com 188.114.97.3 /public/ryTerg 2026-03-26
qe.tuoitreyduoc.com 172.67.140.33 /public/ny 2026-03-25
sn.iluminacionpreferente.com 172.67.223.145 /public/9T2omD 2026-03-25
mh.sumsuc.com 172.67.192.235 /public/D8ICVX 2026-03-25
cj.whdmql.com 172.67.182.90 /public/LQTmrJ 2026-03-24

All share: Cloudflare edge, subdomain.domain pattern, /public/{6-char-code} URL path, "Human Verification" title.

Multi-State Targeting (from xykri.com URLScan history)

URL Path Chinese Target State
/public/马萨2 马萨诸塞 Massachusetts
/public/田纳西2 田纳西 Tennessee
/public/z0gMz1 encoded Unknown
/public/IHqaQS encoded Unknown
/public/GNGW7d encoded Unknown

Additional Cloudflare Edge IPs (from URLScan)

172.67.210.175    # oe.xykri.com (new)
104.21.69.173     # oe.xykri.com (new)
188.114.96.3      # Cloudflare (new)
188.114.97.3      # Cloudflare (new)

Kit Detection Signatures

Signature Type Coverage
URLScan favicon hash e5da3e...9439 File hash 147+ domains
Page title "Human Verification" + CF + /public/ HTTP fingerprint 5,947+ scans
changleField typo in Socket.IO events Code fingerprint ALL Lighthouse deployments
io.javalin.jetty.JavalinJettyServlet in WS 404 Server fingerprint Backend framework

13. GitHub OSINT — Developer & Ecosystem Intelligence

Potential Developer Dump: Atlks/sumdoc2023

Contains admin.sql with the same superuser ID 9527 and TG:wangduoyu0 found in SQLi dumps. Also contains Telegram bot code, PHP/Node.js backend work, Chinese technical documentation, and cryptocurrency trading spreadsheets. May be from someone inside the Smishing Triad development ecosystem.

Wang Duo Yu's Personal Domains

wangduoyu.me      # Active IOC (Securonix, Infoblox feeds)
wangduoyu.shop    # Active IOC
wangduoyu.site    # Active IOC
wangduoyu.com     # Expired
wangduoyu.net     # Expired
wangduoyu.com.cn  # Expired

Threat Intel Repos Tracking Smishing Triad

Repository Content
Securonix/AutonomousThreatSweeper SIEM hunting queries + wangduoyu IOC domains
infobloxopen/threat-intelligence Official Infoblox IOC feeds for USPS smishing
iocradar0-netizen/IOCs Dedicated Smishing Triad.txt IOC file
nabeelxy/web-security-agents Unit 42 global smishing campaign report
polarityio/google-threat-intelligence Smishing Triad in Google TI constants
MISP/misp-website Smishing Triad added to MISP galaxy taxonomy
mthcht/ThreatIntel-Reports Netcraft report: 17,500+ Lighthouse/Lucid domains

Operator IPs (from SQLi dumps across repos)

137.184.82.92     # DigitalOcean
185.14.47.210     # European hosting
117.44.51.222     # China Telecom (Fujian)
120.37.249.242    # China Telecom (Fujian)
202.124.43.230    # Asian hosting
106.226.19.70     # China Mobile
45.145.74.134     # European VPS
27.153.244.196    # China Telecom (Fujian)
117.26.240.241    # China Telecom (Fujian)
162.251.63.49     # US hosting
147.185.242.202   # US hosting
120.37.228.20     # China Telecom (Fujian)

Pattern: Heavy concentration in Fujian Province, China (China Telecom) — a known cybercrime hub.


Reconnaissance conducted 2026-03-28/29 using dnsx, subfinder, naabu, nuclei, katana, Unflare, ffuf, URLScan.io, URLhaus, ThreatFox, AlienVault OTX, manual HTTP probing, and GitHub code/repo search. No exploitation attempted — all findings are from publicly observable responses, public APIs, and public repositories.

#!/usr/bin/env python3
"""
lighthouse_chase.py — Lighthouse Campaign Hunter & Live Infrastructure Interactor
Follows the full Lighthouse PhaaS attack chain:
Redirector (/public/{code}) → Verify API → Phishing SPA → Socket.IO C2
Designed for the LIVE infrastructure. Companion to lighthouse_dissector.py.
Usage:
python3 lighthouse_chase.py chase <redirector_domain> <code>
python3 lighthouse_chase.py bulk-chase <file_of_domain:code_pairs>
python3 lighthouse_chase.py rip <phishing_domain> <path>
python3 lighthouse_chase.py socketio <phishing_domain>
python3 lighthouse_chase.py bruteforce <redirector_domain> [--charset alphanum] [--length 6]
python3 lighthouse_chase.py urlscan-hunt [--days 7] [--limit 100]
python3 lighthouse_chase.py campaign-map <redirector_domain> <code>
No exploitation — passive reconnaissance and protocol analysis only.
"""
import argparse
import base64
import itertools
import json
import os
import random
import re
import ssl
import string
import struct
import sys
import time
import urllib.request
import urllib.error
from datetime import datetime, timezone, timedelta
# ---------------------------------------------------------------------------
# HTTP helpers
# ---------------------------------------------------------------------------
def _ctx():
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
return ctx
MOBILE_UA = "Mozilla/5.0 (iPhone; CPU iPhone OS 17_4 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.4 Mobile/15E148 Safari/604.1"
def _req(url, method="GET", headers=None, data=None, timeout=8, follow=True):
hdrs = {"User-Agent": MOBILE_UA}
if headers:
hdrs.update(headers)
req = urllib.request.Request(url, data=data, headers=hdrs, method=method)
try:
if follow:
resp = urllib.request.urlopen(req, timeout=timeout, context=_ctx())
else:
opener = urllib.request.build_opener(urllib.request.HTTPSHandler(context=_ctx()))
# Disable redirect following
class NoRedirect(urllib.request.HTTPRedirectHandler):
def redirect_request(self, req, fp, code, msg, headers, newurl):
return None
opener = urllib.request.build_opener(NoRedirect, urllib.request.HTTPSHandler(context=_ctx()))
resp = opener.open(req, timeout=timeout)
return resp.status, dict(resp.headers), resp.read()
except urllib.error.HTTPError as e:
return e.code, dict(e.headers), e.read()
except Exception as e:
return 0, {}, str(e).encode()
# ---------------------------------------------------------------------------
# chase — Follow the full redirect chain
# ---------------------------------------------------------------------------
def cmd_chase(args):
"""Follow: redirector/public/{code} → POST /api/verify/{code} → phishing SPA"""
domain = args.domain.replace("https://", "").replace("http://", "").strip("/")
code = args.code
base = f"https://{domain}"
print(f"[*] Chasing redirect chain on {domain} with code '{code}'")
print(f"[*] Timestamp: {datetime.now(timezone.utc).isoformat()}")
print()
# Step 1: Check the landing page
print("=== Step 1: Landing Page ===")
status, hdrs, body = _req(f"{base}/public/{code}")
body_str = body.decode("utf-8", errors="replace")
title_match = re.search(r"<title>(.*?)</title>", body_str)
title = title_match.group(1) if title_match else "unknown"
print(f" GET /public/{code} → {status} ({len(body)}b)")
print(f" Title: {title}")
# Extract code from page if embedded
code_match = re.search(r'const\s+code\s*=\s*"([^"]+)"', body_str)
if code_match:
embedded_code = code_match.group(1)
print(f" Embedded code: {embedded_code}")
if embedded_code != code:
print(f" [!] Embedded code differs from URL code!")
# Check for Chinese verification text
if "此页面为真人身份验证" in body_str:
print(" [!] Chinese verification context found (Lighthouse kit signature)")
# Step 2: Hit the verification API
print("\n=== Step 2: Verification API ===")
status, hdrs, body = _req(
f"{base}/api/verify/{code}",
method="POST",
headers={"Content-Type": "application/json", "Referer": f"{base}/public/{code}",
"Origin": base}
)
body_str = body.decode("utf-8", errors="replace")
print(f" POST /api/verify/{code} → {status} ({len(body)}b)")
target_url = None
if status == 200:
try:
data = json.loads(body_str)
target_url = data.get("url")
print(f" [REDIRECT] → {target_url}")
# Parse target domain
if target_url:
target_match = re.match(r"https?://([^/]+)(.*)", target_url)
if target_match:
target_domain = target_match.group(1)
target_path = target_match.group(2) or "/"
print(f" Target domain: {target_domain}")
print(f" Target path: {target_path}")
# Extract state/service from URL
parts = target_domain.split(".")
if len(parts) >= 2:
subdomain = parts[0]
print(f" State/Service: {subdomain}")
except json.JSONDecodeError:
# Might be encrypted
if len(body) >= 49:
print(f" [ENCRYPTED] Payload is {len(body)} bytes — use lighthouse_dissector.py decrypt")
print(f" Key (first 32b): {body[:32].hex()}")
else:
print(f" Response: {body_str[:200]}")
else:
print(f" Response: {body_str[:200]}")
# Step 3: Probe the target
if target_url:
print(f"\n=== Step 3: Target Probe ({target_url}) ===")
status, hdrs, body = _req(target_url)
body_str = body.decode("utf-8", errors="replace")
server = hdrs.get("Server", hdrs.get("server", "unknown"))
title_match = re.search(r"<title>(.*?)</title>", body_str)
title = title_match.group(1) if title_match else "unknown"
print(f" GET {target_url} → {status} ({len(body)}b)")
print(f" Server: {server}")
print(f" Title: {title}")
if "Suspected phishing" in title:
print(" [!] Cloudflare has flagged this as SUSPECTED PHISHING")
elif "openresty" in body_str.lower():
print(" [!] OpenResty default page (phishing SPA not at root)")
# Check for OpenResty leak in 301
status_nr, hdrs_nr, body_nr = _req(target_url.rstrip("/"), follow=False)
if status_nr == 301:
location = hdrs_nr.get("Location", hdrs_nr.get("location", ""))
server_nr = body_nr.decode("utf-8", errors="replace")
if "openresty" in server_nr.lower():
ver_match = re.search(r"openresty/([\d.]+)", server_nr)
if ver_match:
print(f" [!] OpenResty version leaked: {ver_match.group(1)}")
# DNS for target domain
if target_match:
print(f"\n DNS: {target_domain}")
try:
import subprocess
r = subprocess.run(["dig", "+short", target_domain], capture_output=True, text=True, timeout=5)
for line in r.stdout.strip().split("\n"):
if line: print(f" {line}")
except Exception:
pass
# Check RDAP
root_domain = ".".join(target_domain.split(".")[-2:])
tld = target_domain.split(".")[-1]
if tld == "life":
rdap_url = f"https://rdap.identitydigital.services/rdap/domain/{root_domain}"
elif tld == "org":
rdap_url = f"https://rdap.org/domain/{root_domain}"
else:
rdap_url = None
if rdap_url:
try:
_, _, rdap_body = _req(rdap_url)
rdap = json.loads(rdap_body)
events = rdap.get("events", [])
for e in events:
if e.get("eventAction") == "registration":
print(f" Registered: {e.get('eventDate','?')}")
elif e.get("eventAction") == "expiration":
print(f" Expires: {e.get('eventDate','?')}")
status_list = rdap.get("status", [])
if "add period" in status_list:
print(f" [!] Domain is in ADD PERIOD (brand new registration!)")
except Exception:
pass
# Summary
print("\n=== Chain Summary ===")
print(f" Redirector: {domain}")
print(f" Code: {code}")
print(f" Target: {target_url or 'FAILED'}")
if target_url:
print(f" Action: Report to Cloudflare, Dynadot, and IC3")
# ---------------------------------------------------------------------------
# rip — Download the full phishing SPA
# ---------------------------------------------------------------------------
def cmd_rip(args):
"""Download and analyze the phishing SPA from a live target."""
domain = args.domain.replace("https://", "").replace("http://", "").strip("/")
path = args.path if args.path.startswith("/") else f"/{args.path}"
base = f"https://{domain}"
outdir = args.output or f"rip_{domain.replace('.', '_')}"
os.makedirs(outdir, exist_ok=True)
print(f"[*] Ripping phishing SPA from {domain}{path}")
print(f"[*] Output: {outdir}/")
# Get main page
status, hdrs, body = _req(f"{base}{path}/")
with open(f"{outdir}/index.html", "wb") as f:
f.write(body)
print(f" index.html → {len(body)}b")
body_str = body.decode("utf-8", errors="replace")
# Extract all referenced resources
resources = set()
# Script sources
for src in re.findall(r'<script[^>]*src="([^"]*)"', body_str):
if not src.startswith("http"):
resources.add(src)
# CSS links
for href in re.findall(r'<link[^>]*href="([^"]*)"', body_str):
if not href.startswith("http") and not href.startswith("//"):
resources.add(href)
# Images
for src in re.findall(r'<img[^>]*src="([^"]*)"', body_str):
if not src.startswith("http") and not src.startswith("data:"):
resources.add(src)
# Favicon
for href in re.findall(r'href="([^"]*favicon[^"]*)"', body_str):
if not href.startswith("http"):
resources.add(href)
# Fetch all resources
for res_path in sorted(resources):
full_url = f"{base}{res_path}" if res_path.startswith("/") else f"{base}{path}/{res_path}"
status, _, res_body = _req(full_url)
local_path = res_path.lstrip("/").replace("/", "_")
with open(f"{outdir}/{local_path}", "wb") as f:
f.write(res_body)
kind = "JS " if local_path.endswith(".js") else "CSS" if local_path.endswith(".css") else " "
print(f" [{kind}] {res_path} → {len(res_body)}b")
# Analyze the SPA
print(f"\n=== SPA Analysis ===")
all_content = body_str
for fn in os.listdir(outdir):
fp = os.path.join(outdir, fn)
if fn.endswith((".js", ".html")):
with open(fp, "r", errors="replace") as f:
all_content += f.read()
patterns = {
"changleField": "Kit fingerprint typo (ALL Lighthouse deployments)",
"pollInstruction": "C2 polling endpoint",
"getSyncSettings": "Data exfiltration endpoint",
"JwrCrypto": "Encryption object",
"AES-CTR": "Encryption algorithm",
"socket.io": "Socket.IO C2 library",
"io(": "Socket.IO connection init",
"cvv_fullName": "Card data field",
"b_info": "Billing info page",
"wangduoyu": "Developer signature",
}
for pat, desc in patterns.items():
count = all_content.lower().count(pat.lower())
if count > 0:
print(f" [FOUND] {pat} — {desc} ({count}x)")
# Chinese strings
chinese = set(re.findall(r'[\u4e00-\u9fff][\u4e00-\u9fffA-Za-z0-9,,.。!!??\s]{2,}', all_content))
if chinese:
print(f"\n Chinese strings ({len(chinese)}):")
for cs in sorted(chinese)[:20]:
print(f" {cs}")
print(f"\n[*] Rip complete: {outdir}/")
# ---------------------------------------------------------------------------
# socketio — Tap into the Socket.IO C2 channel
# ---------------------------------------------------------------------------
def cmd_socketio(args):
"""Attempt Engine.IO v4 handshake with the phishing SPA's C2 channel."""
domain = args.domain.replace("https://", "").replace("http://", "").strip("/")
base = f"https://{domain}"
print(f"[*] Probing Socket.IO / Engine.IO on {domain}")
# Try common Socket.IO paths
paths = ["/console/", "/socket.io/", "/ws/", "/", "/pay/"]
for path in paths:
url = f"{base}{path}?EIO=4&transport=polling"
status, hdrs, body = _req(url)
body_str = body.decode("utf-8", errors="replace")
if status == 200 and len(body) > 10:
# Engine.IO handshake response starts with a length-prefixed JSON
# Format: {length}:{json_payload}
print(f"\n [!] Engine.IO handshake at {path}")
print(f" Status: {status} ({len(body)}b)")
# Try to parse Engine.IO format
eio_match = re.match(r"(\d+)(.*)", body_str)
if eio_match:
prefix_len = int(eio_match.group(1))
payload = body_str[len(eio_match.group(1)):]
print(f" Prefix: {eio_match.group(1)}")
# Extract the JSON part (after the message type byte)
if payload and payload[0] == "0":
# Type 0 = open
try:
handshake = json.loads(payload[1:])
print(f" Session ID: {handshake.get('sid', '?')}")
print(f" Ping Interval: {handshake.get('pingInterval', '?')}ms")
print(f" Ping Timeout: {handshake.get('pingTimeout', '?')}ms")
print(f" Max Payload: {handshake.get('maxPayload', '?')}b")
print(f" Upgrades: {handshake.get('upgrades', [])}")
sid = handshake.get("sid")
if sid:
print(f"\n [*] Got session: {sid}")
print(f" [*] You can now poll with:")
print(f" curl '{base}{path}?EIO=4&transport=polling&sid={sid}'")
# Try one poll
poll_url = f"{base}{path}?EIO=4&transport=polling&sid={sid}"
ps, _, pb = _req(poll_url)
pb_str = pb.decode("utf-8", errors="replace")
print(f"\n Poll response: {ps} ({len(pb)}b)")
if pb_str:
print(f" Data: {pb_str[:300]}")
except json.JSONDecodeError:
print(f" Raw: {payload[:200]}")
else:
print(f" Raw: {body_str[:200]}")
elif status != 404:
print(f" {path}?EIO=4 → {status} ({len(body)}b)")
# ---------------------------------------------------------------------------
# bruteforce — Enumerate valid verification codes
# ---------------------------------------------------------------------------
def cmd_bruteforce(args):
"""Brute-force verification codes to discover active campaigns."""
domain = args.domain.replace("https://", "").replace("http://", "").strip("/")
base = f"https://{domain}"
charset = string.ascii_letters + string.digits if args.charset == "alphanum" else args.charset
length = args.length
max_attempts = args.max_attempts
rate = args.rate
print(f"[*] Brute-forcing verification codes on {domain}")
print(f"[*] Charset: {'alphanum' if len(charset) > 20 else charset} ({len(charset)} chars)")
print(f"[*] Length: {length}, Max: {max_attempts}, Rate: {rate}/s")
print()
found = []
attempts = 0
# Generate random codes (full brute-force of 62^6 = 56B is impractical)
for _ in range(max_attempts):
code = "".join(random.choices(charset, k=length))
attempts += 1
status, _, body = _req(
f"{base}/api/verify/{code}",
method="POST",
headers={"Content-Type": "application/json"},
timeout=5
)
if status == 200 and b"url" in body:
try:
data = json.loads(body)
target = data.get("url", "")
print(f" [FOUND] Code: {code} → {target}")
found.append({"code": code, "target": target})
except json.JSONDecodeError:
pass
if attempts % 50 == 0:
print(f" ... {attempts}/{max_attempts} tested, {len(found)} found", file=sys.stderr)
time.sleep(1.0 / rate)
print(f"\n[*] Done: {attempts} tested, {len(found)} valid codes found")
if found:
print(json.dumps(found, indent=2))
# ---------------------------------------------------------------------------
# urlscan-hunt — Find Lighthouse deployments via URLScan.io
# ---------------------------------------------------------------------------
def cmd_urlscan_hunt(args):
"""Search URLScan.io for live Lighthouse kit deployments."""
days = args.days
limit = args.limit
after_date = (datetime.now(timezone.utc) - timedelta(days=days)).strftime("%Y-%m-%dT00:00:00.000Z")
print(f"[*] Hunting Lighthouse deployments on URLScan.io")
print(f"[*] Window: last {days} days (after {after_date})")
# Search by page title + URL pattern
query = 'page.title:"Human Verification" AND page.server:cloudflare AND page.url:"/public/"'
url = f"https://urlscan.io/api/v1/search/?q={urllib.request.quote(query)}&size={limit}&sort=date"
status, _, body = _req(url, timeout=15)
if status != 200:
print(f"[!] URLScan returned {status}")
return
data = json.loads(body)
total = data.get("total", 0)
results = data.get("results", [])
print(f"[*] Total matching: {total}")
print(f"[*] Fetched: {len(results)}")
print()
campaigns = {}
for r in results:
task = r.get("task", {})
page = r.get("page", {})
domain = page.get("domain", "")
url = task.get("url", "")
ip = page.get("ip", "")
time_str = task.get("time", "")
scan_id = r.get("_id", "")
if domain not in campaigns:
campaigns[domain] = {
"domain": domain,
"ip": ip,
"urls": [],
"first_seen": time_str,
"last_seen": time_str,
"scan_ids": [],
}
campaigns[domain]["urls"].append(url)
campaigns[domain]["scan_ids"].append(scan_id)
campaigns[domain]["last_seen"] = max(campaigns[domain]["last_seen"], time_str)
print(f"{'Domain':<40} {'IP':<18} {'URLs':<5} {'Last Seen':<25}")
print("-" * 90)
for domain in sorted(campaigns.keys(), key=lambda d: campaigns[d]["last_seen"], reverse=True):
c = campaigns[domain]
print(f"{c['domain']:<40} {c['ip']:<18} {len(c['urls']):<5} {c['last_seen']:<25}")
print(f"\n[*] Unique domains: {len(campaigns)}")
# Extract codes for chasing
print(f"\n=== Verification Codes (for chase command) ===")
for domain, c in sorted(campaigns.items(), key=lambda x: x[1]["last_seen"], reverse=True)[:20]:
for url in c["urls"][:1]:
code_match = re.search(r"/public/([^/?]+)", url)
if code_match:
print(f" python3 lighthouse_chase.py chase {domain} {code_match.group(1)}")
# ---------------------------------------------------------------------------
# campaign-map — Full campaign mapping from a single entry point
# ---------------------------------------------------------------------------
def cmd_campaign_map(args):
"""Map the full campaign infrastructure from a single redirector + code."""
domain = args.domain.replace("https://", "").replace("http://", "").strip("/")
code = args.code
base = f"https://{domain}"
print(f"[*] Mapping campaign infrastructure from {domain}/{code}")
print(f"[*] Timestamp: {datetime.now(timezone.utc).isoformat()}")
print()
campaign = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"entry_point": {"domain": domain, "code": code},
"redirector": {},
"targets": [],
"infrastructure": {"ips": set(), "nameservers": set(), "registrars": set()},
}
# Step 1: Redirector analysis
print("=== Redirector ===")
status, hdrs, body = _req(f"{base}/public/{code}")
campaign["redirector"]["landing_status"] = status
campaign["redirector"]["landing_size"] = len(body)
status, _, body = _req(f"{base}/api/verify/{code}", method="POST")
if status == 200:
try:
data = json.loads(body)
target_url = data.get("url", "")
print(f" Redirect: {target_url}")
# Parse target
match = re.match(r"https?://([^/]+)(.*)", target_url)
if match:
target_domain = match.group(1)
target_path = match.group(2) or "/"
root_domain = ".".join(target_domain.split(".")[-2:])
target_info = {
"url": target_url,
"domain": target_domain,
"root_domain": root_domain,
"path": target_path,
"state": target_domain.split(".")[0] if "." in target_domain else "unknown",
}
# DNS
try:
import subprocess
r = subprocess.run(["/home/ops/.local/bin/dnsx", "-a", "-ns", "-resp", "-silent"],
input=root_domain, capture_output=True, text=True, timeout=10)
for line in r.stdout.strip().split("\n"):
print(f" DNS: {line}")
if "[A]" in line:
ip = re.search(r'\[32m([^\[]+)\[', line)
if ip: campaign["infrastructure"]["ips"].add(ip.group(1))
if "[NS]" in line:
ns = re.search(r'\[32m([^\[]+)\[', line)
if ns: campaign["infrastructure"]["nameservers"].add(ns.group(1))
except Exception:
pass
# HTTP probe
status, hdrs, body = _req(target_url)
body_str = body.decode("utf-8", errors="replace")
title_match = re.search(r"<title>(.*?)</title>", body_str)
target_info["http_status"] = status
target_info["title"] = title_match.group(1) if title_match else ""
target_info["cf_blocked"] = "Suspected phishing" in target_info["title"]
target_info["server"] = hdrs.get("Server", hdrs.get("server", ""))
print(f" Status: {status} | Title: {target_info['title']}")
if target_info["cf_blocked"]:
print(f" [!] CLOUDFLARE BLOCKED")
campaign["targets"].append(target_info)
except json.JSONDecodeError:
print(f" [ENCRYPTED] {len(body)}b payload")
# Convert sets for JSON
campaign["infrastructure"]["ips"] = sorted(campaign["infrastructure"]["ips"])
campaign["infrastructure"]["nameservers"] = sorted(campaign["infrastructure"]["nameservers"])
campaign["infrastructure"]["registrars"] = sorted(campaign["infrastructure"]["registrars"])
print(f"\n{json.dumps(campaign, indent=2, ensure_ascii=False)}")
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main():
p = argparse.ArgumentParser(
description="Lighthouse Campaign Hunter & Live Infrastructure Interactor",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
sub = p.add_subparsers(dest="command")
s = sub.add_parser("chase", help="Follow the full redirect chain")
s.add_argument("domain", help="Redirector domain")
s.add_argument("code", help="Verification code from /public/{code}")
s = sub.add_parser("rip", help="Download the full phishing SPA")
s.add_argument("domain", help="Phishing SPA domain")
s.add_argument("path", help="Path to phishing SPA (e.g., /bmv)")
s.add_argument("--output", "-o", help="Output directory")
s = sub.add_parser("socketio", help="Probe Socket.IO C2 channel")
s.add_argument("domain", help="Phishing SPA domain")
s = sub.add_parser("bruteforce", help="Brute-force verification codes")
s.add_argument("domain", help="Redirector domain")
s.add_argument("--charset", default="alphanum", help="Character set (default: alphanum)")
s.add_argument("--length", type=int, default=6, help="Code length (default: 6)")
s.add_argument("--max-attempts", type=int, default=500, help="Max attempts (default: 500)")
s.add_argument("--rate", type=float, default=10, help="Requests per second (default: 10)")
s = sub.add_parser("urlscan-hunt", help="Hunt deployments via URLScan.io")
s.add_argument("--days", type=int, default=7, help="Lookback days (default: 7)")
s.add_argument("--limit", type=int, default=100, help="Max results (default: 100)")
s = sub.add_parser("campaign-map", help="Map full campaign from entry point")
s.add_argument("domain", help="Redirector domain")
s.add_argument("code", help="Verification code")
args = p.parse_args()
cmds = {
"chase": cmd_chase,
"rip": cmd_rip,
"socketio": cmd_socketio,
"bruteforce": cmd_bruteforce,
"urlscan-hunt": cmd_urlscan_hunt,
"campaign-map": cmd_campaign_map,
}
if args.command in cmds:
cmds[args.command](args)
else:
p.print_help()
if __name__ == "__main__":
main()
#!/usr/bin/env python3
"""
lighthouse_dissector.py — Lighthouse PhaaS Kit Analysis Toolkit
Decrypts, deobfuscates, probes, and monitors Lighthouse/Smishing Triad
phishing infrastructure. Built from reverse-engineering the kit's main.js
and live infrastructure reconnaissance.
Usage:
python3 lighthouse_dissector.py decrypt <hex_or_b64_payload>
python3 lighthouse_dissector.py decrypt-file <binary_file>
python3 lighthouse_dissector.py deobfuscate <main_js_file>
python3 lighthouse_dissector.py probe <domain>
python3 lighthouse_dissector.py monitor <domain> [--interval 5]
python3 lighthouse_dissector.py fingerprint <domain>
python3 lighthouse_dissector.py report <domain> [--output report.json]
Author context:
Supplements Ben Polonsky's toll booth phishing writeup.
https://tech-journal.gitbook.io/ben-polonsky-writeups
No exploitation — passive reconnaissance and payload decryption only.
"""
import argparse
import base64
import binascii
import hashlib
import json
import os
import re
import struct
import sys
import time
import urllib.request
import urllib.error
import ssl
from datetime import datetime, timezone
# ---------------------------------------------------------------------------
# AES-CTR Decryption (the kit's broken crypto — key embedded in payload)
# ---------------------------------------------------------------------------
def aes_ctr_decrypt_payload(raw: bytes) -> dict:
"""
Decrypt a Lighthouse kit AES-256-CTR payload.
Layout (from JwrCrypto in main.js):
bytes 0-31 : AES-256 key (32 bytes)
bytes 32-47 : CTR counter/IV (16 bytes)
bytes 48+ : ciphertext
The implementation is fundamentally broken — the key is transmitted
in plaintext alongside the ciphertext.
"""
if len(raw) < 49:
raise ValueError(f"Payload too short ({len(raw)} bytes, need >= 49)")
key = raw[0:32]
iv = raw[32:48]
ciphertext = raw[48:]
try:
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
cipher = Cipher(algorithms.AES(key), modes.CTR(iv))
decryptor = cipher.decryptor()
plaintext = decryptor.update(ciphertext) + decryptor.finalize()
except ImportError:
# Fallback: pure-python AES-CTR (slow but no dependencies)
plaintext = _aes_ctr_pure(key, iv, ciphertext)
try:
text = plaintext.decode("utf-8")
return json.loads(text)
except (UnicodeDecodeError, json.JSONDecodeError):
return {"raw_plaintext": plaintext.hex(), "utf8_attempt": plaintext.decode("utf-8", errors="replace")}
def _aes_ctr_pure(key: bytes, nonce: bytes, data: bytes) -> bytes:
"""Pure-python AES-CTR fallback using OpenSSL via subprocess."""
import subprocess
import tempfile
with tempfile.NamedTemporaryFile(delete=False, suffix=".bin") as f:
f.write(data)
ct_path = f.name
try:
result = subprocess.run(
["openssl", "enc", "-aes-256-ctr", "-d",
"-K", key.hex(), "-iv", nonce.hex(),
"-in", ct_path, "-nopad"],
capture_output=True, timeout=10
)
return result.stdout
finally:
os.unlink(ct_path)
def cmd_decrypt(args):
"""Decrypt a hex or base64 encoded payload."""
payload = args.payload
# Try hex first
try:
raw = bytes.fromhex(payload.replace(" ", "").replace("0x", ""))
except ValueError:
# Try base64
try:
raw = base64.b64decode(payload)
except Exception:
print(f"[!] Cannot decode payload as hex or base64", file=sys.stderr)
sys.exit(1)
result = aes_ctr_decrypt_payload(raw)
print(json.dumps(result, indent=2, ensure_ascii=False))
def cmd_decrypt_file(args):
"""Decrypt a binary file containing an AES-CTR payload."""
with open(args.file, "rb") as f:
raw = f.read()
print(f"[*] Read {len(raw)} bytes from {args.file}")
print(f"[*] Key (hex): {raw[0:32].hex()}")
print(f"[*] IV (hex): {raw[32:48].hex()}")
print(f"[*] Ciphertext: {len(raw) - 48} bytes")
print()
result = aes_ctr_decrypt_payload(raw)
print(json.dumps(result, indent=2, ensure_ascii=False))
# ---------------------------------------------------------------------------
# main.js Deobfuscation
# ---------------------------------------------------------------------------
# Known string table entries decoded from the _0x2d5c array
# These are base64-encoded with a custom charset rotation
KNOWN_STRINGS = {
"AES-CTR": "Encryption algorithm",
"pollInstruction": "C2 polling endpoint",
"getSyncSettings": "Data exfiltration endpoint",
"addClick": "Click tracking endpoint",
"addCvv": "CVV submission endpoint",
"cache_the_final_info": "Final data cache endpoint",
"b_info.html": "Billing info page",
"c_info.html": "Card info page",
"d_sms.html": "SMS verification page",
"d_2fa.html": "2FA verification page",
"e_email.html": "Email verification page",
"f_pin.html": "PIN verification page",
"g_app.html": "App verification page",
"h_bank_login": "Bank login page",
"h_paypal_login": "PayPal login page",
"ipinfo.io": "IP geolocation service",
}
# Chinese operator status messages (extracted from unobfuscated literals)
OPERATOR_STATUS = {
"在首页": "On homepage",
"在验证页": "On verification page",
"在登录页": "On login page",
"在卡页": "On card page",
"在积分页": "On points page",
"在信息页": "On info page",
"提交卡信息": "Card info submitted",
"提交登录信息": "Login info submitted",
"提交短信验证": "SMS verification submitted",
"提交邮箱验证": "Email verification submitted",
"提交自定义验证": "Custom verification submitted",
"提交PIN验证码": "PIN code submitted",
"提交PayPal登录": "PayPal login submitted",
"提交PayPal验证": "PayPal verification submitted",
"提交PayPal卡数据": "PayPal card data submitted",
"需操作!": "Operator action needed!",
"需操作": "Operator action needed",
"正在填写卡号": "Filling in card number",
"正在填写卡有效期": "Filling in card expiry",
"正在填写CVV": "Filling in CVV",
"正在填写持卡人姓名": "Filling in cardholder name",
"进入短信验证页": "Entered SMS verification",
"进入2FA验证页": "Entered 2FA verification",
"进入邮箱验证页": "Entered email verification",
"进入PIN验证页": "Entered PIN verification",
"进入银行登录页": "Entered bank login page",
"进入自定义验证页": "Entered custom verification",
"用户进入首页": "User entered homepage",
}
# Operator C2 instruction set (from _0x49c7ee object)
OPERATOR_INSTRUCTIONS = {
"to_sms": {"page": "d_sms.html", "desc": "Route to SMS verification"},
"to_sms_login": {"page": "d_sms_login.html", "desc": "Route to SMS login"},
"to_link": {"page": "link verification", "desc": "Route to link check"},
"to_email": {"page": "e_email.html", "desc": "Route to email verification"},
"to_pin": {"page": "f_pin.html", "desc": "Route to PIN entry"},
"to_app": {"page": "g_app.html", "desc": "Route to app download"},
"to_login_app": {"page": "g_login_app.html", "desc": "Route to app login"},
"to_custom": {"page": "custom page", "desc": "Route to custom verification"},
"to_custompage": {"page": "custom page", "desc": "Route to custom page"},
"to_card": {"page": "c_info.html", "desc": "Route to card entry"},
"to_cvv": {"page": "c_info.html", "desc": "Route to CVV entry"},
"to_info": {"page": "b_info.html", "desc": "Route to billing info"},
"to_bank_otp": {"page": "h_bank_otp.html", "desc": "Route to bank OTP"},
"to_bank_login": {"page": "h_bank_login.html", "desc": "Route to bank login"},
"to_bank_card": {"page": "h_bank_card.html", "desc": "Route to bank card"},
"to_paypal_login": {"page": "h_paypal_login.html", "desc": "Route to PayPal login"},
"to_paypal_card": {"page": "h_paypal_card.html", "desc": "Route to PayPal card"},
"to_paypal_sms": {"page": "h_paypal_sms.html", "desc": "Route to PayPal SMS"},
"to_paypal_verify": {"page": "PayPal verify", "desc": "Route to PayPal verification"},
"to_paypal_pin": {"page": "h_paypal_pin.html", "desc": "Route to PayPal PIN"},
"to_paypal_app": {"page": "h_paypal_app.html", "desc": "Route to PayPal app"},
"to_klarna_login": {"page": "r_klarna_login.html", "desc": "Route to Klarna login"},
"to_klarna_email": {"page": "Klarna email", "desc": "Route to Klarna email"},
"to_klarna_phone": {"page": "Klarna phone", "desc": "Route to Klarna phone"},
"to_klarna_pin": {"page": "r_klarna_pin.html", "desc": "Route to Klarna PIN"},
"to_success": {"page": "success", "desc": "Show success page (done)"},
"to_fail": {"page": "fail", "desc": "Show failure page"},
"to_changecard": {"page": "c_info.html", "desc": "Force re-enter card"},
"to_2fa": {"page": "d_2fa.html", "desc": "Route to 2FA"},
"img_update": {"page": None, "desc": "Dynamically update page image"},
"text_update": {"page": None, "desc": "Dynamically update page text"},
"verify_code": {"page": None, "desc": "Validate submitted verification code"},
}
def extract_string_table(js_source: str) -> list:
"""Extract the rotated string array from obfuscated main.js."""
# Match the _0x2d5c function's return array
pattern = r"const\s+_0xf91c8e\s*=\s*\[(.*?)\]"
match = re.search(pattern, js_source, re.DOTALL)
if not match:
# Try alternate pattern
pattern = r"_0x2d5c=function\(\)\{return\s+_0xf91c8e;\}"
# Look for the array assigned earlier
arr_pattern = r"const\s+_0xf91c8e\s*=\s*\[([^\]]+)\]"
match = re.search(arr_pattern, js_source, re.DOTALL)
if not match:
return []
raw = match.group(1)
strings = re.findall(r"'([^']*)'", raw)
return strings
def decode_b64_custom(encoded: str) -> str:
"""Decode the kit's custom base64 encoding (with charset rotation)."""
charset = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789+/='
result = []
acc = 0
bits = 0
for ch in encoded:
idx = charset.find(ch)
if idx == -1:
continue
if idx == 64: # '=' padding
break
acc = (acc << 6) | idx
bits += 6
if bits >= 8:
bits -= 8
result.append((acc >> bits) & 0xFF)
try:
raw_bytes = bytes(result)
return raw_bytes.decode("utf-8", errors="replace")
except Exception:
return encoded
def cmd_deobfuscate(args):
"""Extract and decode strings from obfuscated main.js."""
with open(args.file, "r", encoding="utf-8", errors="replace") as f:
source = f.read()
print(f"[*] Loaded {len(source)} bytes from {args.file}")
# Extract string table
strings = extract_string_table(source)
print(f"[*] Extracted {len(strings)} strings from rotation array")
if strings:
decoded = []
for i, s in enumerate(strings):
d = decode_b64_custom(s)
decoded.append(d)
if any(ord(c) > 127 for c in d) or any(kw in d for kw in ["api", "html", "http", "pay", "card", "login", "verify"]):
print(f" [{i:4d}] {d}")
print(f"\n[*] Total decoded: {len(decoded)}")
# Extract Chinese strings directly (these survive obfuscation)
chinese_pattern = re.compile(r"[\u4e00-\u9fff][\u4e00-\u9fffA-Za-z0-9,,.。!!??\s]{2,}")
chinese_strings = set(chinese_pattern.findall(source))
if chinese_strings:
print(f"\n[*] Chinese strings found ({len(chinese_strings)}):")
for cs in sorted(chinese_strings):
translation = OPERATOR_STATUS.get(cs.strip(), "")
eng = f" → {translation}" if translation else ""
print(f" {cs}{eng}")
# Extract page filenames
page_pattern = re.compile(r"[a-z]_[a-z_]+\.html?")
pages = set(page_pattern.findall(source))
if pages:
print(f"\n[*] Phishing page templates ({len(pages)}):")
for p in sorted(pages):
print(f" {p}")
# Extract URLs
url_pattern = re.compile(r"https?://[^\s'\"]+")
urls = set(url_pattern.findall(source))
if urls:
print(f"\n[*] URLs found ({len(urls)}):")
for u in sorted(urls):
print(f" {u}")
# ---------------------------------------------------------------------------
# Infrastructure Probing
# ---------------------------------------------------------------------------
def _request(url, method="GET", headers=None, data=None, timeout=5):
"""Make an HTTP request, return (status, headers, body)."""
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
hdrs = {"User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 17_4 like Mac OS X)"}
if headers:
hdrs.update(headers)
req = urllib.request.Request(url, data=data, headers=hdrs, method=method)
try:
resp = urllib.request.urlopen(req, timeout=timeout, context=ctx)
return resp.status, dict(resp.headers), resp.read()
except urllib.error.HTTPError as e:
return e.code, dict(e.headers), e.read()
except Exception as e:
return 0, {}, str(e).encode()
def cmd_probe(args):
"""Probe a Lighthouse phishing domain for live infrastructure."""
domain = args.domain.replace("https://", "").replace("http://", "").strip("/")
base = f"https://{domain}"
print(f"[*] Probing {domain}")
print(f"[*] Timestamp: {datetime.now(timezone.utc).isoformat()}")
print()
# --- Basic connectivity ---
print("=== Connectivity ===")
for method in ["HEAD", "GET"]:
code, hdrs, body = _request(f"{base}/", method=method)
server = hdrs.get("Server", hdrs.get("server", "unknown"))
print(f" {method} / → {code} | Server: {server} | Body: {len(body)}b")
# --- Framework fingerprinting ---
print("\n=== Framework Detection ===")
# Jetty detection via WebSocket upgrade
code, hdrs, body = _request(
f"{base}/", method="GET",
headers={
"Upgrade": "websocket",
"Connection": "Upgrade",
"Sec-WebSocket-Key": "dGhlIHNhbXBsZSBub25jZQ==",
"Sec-WebSocket-Version": "13",
}
)
body_str = body.decode("utf-8", errors="replace")
if "JavalinJettyServlet" in body_str:
servlet = re.search(r"SERVLET:</th><td>([^<]+)", body_str)
print(f" [!] Javalin/Jetty detected: {servlet.group(1) if servlet else 'yes'}")
elif "Jetty" in body_str or "ISO-8859-1" in body_str:
print(f" [!] Jetty detected (error page signature)")
else:
print(f" WebSocket upgrade → {code}")
# 8K path overflow for Jetty 431
code, _, body = _request(f"{base}/{'A' * 8000}")
if code == 431:
print(f" [!] Jetty 431 confirmed (Bad Message: Request Header Fields Too Large)")
# --- Operator console ---
print("\n=== Operator Console ===")
for path in ["/console/", "/console", "/admin/", "/manage/"]:
code, _, body = _request(f"{base}{path}")
status = "AUTH-GATED" if code == 401 else "EXISTS" if code in (200, 301, 302, 403) else f"{code}"
if code not in (404, 0):
print(f" [!] {path} → {code} ({status})")
# Console sub-paths
console_routes = [
"login", "auth", "api", "victims", "sessions", "campaigns",
"templates", "operators", "export", "logs", "config", "ws"
]
live_routes = []
for route in console_routes:
code, _, _ = _request(f"{base}/console/{route}")
if code != 404:
live_routes.append((route, code))
if live_routes:
print(f" Console sub-routes ({len(live_routes)} found):")
for route, code in live_routes:
print(f" /console/{route} → {code}")
# --- Known API endpoints ---
print("\n=== API Endpoints ===")
endpoints = [
("POST", "/api/verify/test"),
("POST", "/pay/api/open/getSyncSettings"),
("POST", "/pay/api/open/pollInstruction"),
("GET", "/pay/b_info.html"),
("GET", "/pay/c_info.html"),
]
for method, path in endpoints:
code, _, body = _request(f"{base}{path}", method=method)
body_preview = body.decode("utf-8", errors="replace")[:80]
marker = "[LIVE]" if code == 200 else "[ECHO]" if "not found" in body_preview.lower() else f"[{code}]"
print(f" {marker} {method} {path} → {code} ({len(body)}b)")
# --- Cloudflare trace ---
print("\n=== Cloudflare Trace ===")
code, _, body = _request(f"{base}/cdn-cgi/trace")
if code == 200:
trace = {}
for line in body.decode().strip().split("\n"):
if "=" in line:
k, v = line.split("=", 1)
trace[k] = v
print(f" Datacenter: {trace.get('colo', '?')}")
print(f" TLS: {trace.get('tls', '?')}")
print(f" HTTP: {trace.get('http', '?')}")
print(f" WARP: {trace.get('warp', '?')}")
def cmd_fingerprint(args):
"""Generate a fingerprint profile for a Lighthouse kit deployment."""
domain = args.domain.replace("https://", "").replace("http://", "").strip("/")
base = f"https://{domain}"
fp = {
"domain": domain,
"timestamp": datetime.now(timezone.utc).isoformat(),
"framework": None,
"console_panel": False,
"api_framework_alive": False,
"cloudflare": {},
"tls": {},
"phishing_pages_active": False,
}
# Framework
code, _, body = _request(
f"{base}/",
headers={"Upgrade": "websocket", "Connection": "Upgrade",
"Sec-WebSocket-Key": "dGhlIHNhbXBsZSBub25jZQ==",
"Sec-WebSocket-Version": "13"}
)
body_str = body.decode("utf-8", errors="replace")
if "JavalinJettyServlet" in body_str:
match = re.search(r"JavalinJettyServlet-([a-f0-9]+)", body_str)
fp["framework"] = f"Javalin/Jetty ({match.group(1) if match else 'unknown hash'})"
# Console
code, _, _ = _request(f"{base}/console/")
fp["console_panel"] = code == 401
# API alive check
code, _, body = _request(f"{base}/api/verify/test", method="POST")
fp["api_framework_alive"] = b"Endpoint" in body
# CF trace
code, _, body = _request(f"{base}/cdn-cgi/trace")
if code == 200:
for line in body.decode().strip().split("\n"):
if "=" in line:
k, v = line.split("=", 1)
if k in ("colo", "tls", "warp", "http", "fl"):
fp["cloudflare"][k] = v
# Phishing pages
code, _, _ = _request(f"{base}/pay/b_info.html")
fp["phishing_pages_active"] = code == 200
# TLS cert
try:
import subprocess
result = subprocess.run(
["openssl", "s_client", "-connect", f"{domain}:443",
"-servername", domain],
input=b"", capture_output=True, timeout=5
)
cert_out = result.stdout.decode("utf-8", errors="replace")
cn_match = re.search(r"subject=.*?CN\s*=\s*(\S+)", cert_out)
issuer_match = re.search(r"issuer=.*?CN\s*=\s*(.+?)(?:\n|$)", cert_out)
if cn_match:
fp["tls"]["cn"] = cn_match.group(1)
if issuer_match:
fp["tls"]["issuer"] = issuer_match.group(1).strip()
except Exception:
pass
print(json.dumps(fp, indent=2, ensure_ascii=False))
# ---------------------------------------------------------------------------
# C2 Monitoring
# ---------------------------------------------------------------------------
def cmd_monitor(args):
"""Monitor the pollInstruction C2 endpoint for operator commands."""
domain = args.domain.replace("https://", "").replace("http://", "").strip("/")
base = f"https://{domain}"
interval = args.interval
print(f"[*] Monitoring C2 at {base}/pay/api/open/pollInstruction")
print(f"[*] Interval: {interval}s")
print(f"[*] Press Ctrl+C to stop")
print()
poll_count = 0
while True:
poll_count += 1
ts = datetime.now(timezone.utc).strftime("%H:%M:%S")
try:
code, hdrs, body = _request(
f"{base}/pay/api/open/pollInstruction",
method="POST",
headers={"Content-Type": "application/json"},
data=b"{}",
)
body_str = body.decode("utf-8", errors="replace")
if code == 200 and len(body) > 0:
# Try to decrypt if binary
if body[0:1] != b"{":
try:
decrypted = aes_ctr_decrypt_payload(body)
print(f" [{ts}] #{poll_count} ENCRYPTED RESPONSE → {json.dumps(decrypted, ensure_ascii=False)}")
# Decode operator instruction
if isinstance(decrypted, dict):
instr = decrypted.get("instruction", decrypted.get("action", ""))
if instr and instr in OPERATOR_INSTRUCTIONS:
info = OPERATOR_INSTRUCTIONS[instr]
print(f" ↳ INSTRUCTION: {instr} — {info['desc']}")
except Exception as e:
print(f" [{ts}] #{poll_count} BINARY ({len(body)}b) decrypt failed: {e}")
else:
try:
data = json.loads(body_str)
print(f" [{ts}] #{poll_count} JSON → {json.dumps(data, ensure_ascii=False)}")
except json.JSONDecodeError:
print(f" [{ts}] #{poll_count} {code} ({len(body)}b) {body_str[:100]}")
else:
if poll_count % 10 == 1:
print(f" [{ts}] #{poll_count} → {code} ({len(body)}b) {body_str[:60]}")
except KeyboardInterrupt:
print(f"\n[*] Stopped after {poll_count} polls")
break
except Exception as e:
print(f" [{ts}] #{poll_count} ERROR: {e}")
time.sleep(interval)
# ---------------------------------------------------------------------------
# Report Generation
# ---------------------------------------------------------------------------
def cmd_report(args):
"""Generate a comprehensive IOC/infrastructure report."""
domain = args.domain.replace("https://", "").replace("http://", "").strip("/")
base = f"https://{domain}"
report = {
"meta": {
"tool": "lighthouse_dissector",
"version": "1.0.0",
"timestamp": datetime.now(timezone.utc).isoformat(),
"target": domain,
},
"infrastructure": {},
"framework": {},
"console_panel": {},
"api_endpoints": {},
"cloudflare": {},
"kit_intelligence": {
"encryption": "AES-256-CTR (key-in-payload, broken)",
"phishing_pages": list(sorted(KNOWN_STRINGS.keys())),
"operator_instructions": list(sorted(OPERATOR_INSTRUCTIONS.keys())),
"operator_status_messages": OPERATOR_STATUS,
"data_fields": {
"CARD": ["cvv_fullName", "cvv_number", "cvv_expiry", "cvv_cvv"],
"WEB_LOGIN": ["web_login_account1..3", "web_login_pwd1..3"],
"VERIFY": ["two_factor_authentication", "pin"],
"PAYPAL": ["paypal_login_account"],
},
"password_hash": "triple-MD5 with salt 'wangduoyu666!.+-'",
},
}
print(f"[*] Generating report for {domain}...", file=sys.stderr)
# Connectivity
code, hdrs, _ = _request(f"{base}/", method="HEAD")
report["infrastructure"]["http_status"] = code
report["infrastructure"]["server"] = hdrs.get("Server", hdrs.get("server", "unknown"))
# Framework
code, _, body = _request(
f"{base}/",
headers={"Upgrade": "websocket", "Connection": "Upgrade",
"Sec-WebSocket-Key": "dGhlIHNhbXBsZSBub25jZQ==",
"Sec-WebSocket-Version": "13"}
)
body_str = body.decode("utf-8", errors="replace")
match = re.search(r"JavalinJettyServlet-([a-f0-9]+)", body_str)
if match:
report["framework"]["name"] = "Javalin on Eclipse Jetty"
report["framework"]["servlet_hash"] = match.group(1)
# Console
code, _, _ = _request(f"{base}/console/")
report["console_panel"]["status"] = code
report["console_panel"]["auth_gated"] = code == 401
if code == 401:
routes_found = []
for route in ["login", "auth", "api", "victims", "sessions", "campaigns",
"templates", "operators", "export", "logs", "config", "ws",
"dashboard", "settings", "data", "import", "health", "status"]:
c, _, _ = _request(f"{base}/console/{route}")
if c != 404:
routes_found.append({"path": f"/console/{route}", "status": c})
report["console_panel"]["routes"] = routes_found
# API endpoints
for method, path in [("POST", "/api/verify/test"), ("POST", "/pay/api/open/getSyncSettings"),
("POST", "/pay/api/open/pollInstruction"), ("GET", "/pay/b_info.html"),
("GET", "/pay/c_info.html")]:
code, _, body = _request(f"{base}{path}", method=method)
report["api_endpoints"][f"{method} {path}"] = {
"status": code,
"body_size": len(body),
"echo_pattern": "Endpoint" in body.decode("utf-8", errors="replace"),
}
# Cloudflare
code, _, body = _request(f"{base}/cdn-cgi/trace")
if code == 200:
for line in body.decode().strip().split("\n"):
if "=" in line:
k, v = line.split("=", 1)
report["cloudflare"][k] = v
# Output
output = json.dumps(report, indent=2, ensure_ascii=False)
if args.output:
with open(args.output, "w") as f:
f.write(output)
print(f"[*] Report written to {args.output}", file=sys.stderr)
else:
print(output)
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(
description="Lighthouse PhaaS Kit Analysis Toolkit",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__,
)
sub = parser.add_subparsers(dest="command")
# decrypt
p = sub.add_parser("decrypt", help="Decrypt a hex/b64 AES-CTR payload")
p.add_argument("payload", help="Hex or base64 encoded payload")
# decrypt-file
p = sub.add_parser("decrypt-file", help="Decrypt a binary payload file")
p.add_argument("file", help="Path to binary payload file")
# deobfuscate
p = sub.add_parser("deobfuscate", help="Extract strings from obfuscated main.js")
p.add_argument("file", help="Path to main.js file")
# probe
p = sub.add_parser("probe", help="Probe live Lighthouse infrastructure")
p.add_argument("domain", help="Target domain (e.g., ilsos.orekx.life)")
# monitor
p = sub.add_parser("monitor", help="Monitor C2 pollInstruction endpoint")
p.add_argument("domain", help="Target domain")
p.add_argument("--interval", type=int, default=5, help="Poll interval in seconds")
# fingerprint
p = sub.add_parser("fingerprint", help="Generate deployment fingerprint")
p.add_argument("domain", help="Target domain")
# report
p = sub.add_parser("report", help="Generate full infrastructure report")
p.add_argument("domain", help="Target domain")
p.add_argument("--output", "-o", help="Output file path")
args = parser.parse_args()
commands = {
"decrypt": cmd_decrypt,
"decrypt-file": cmd_decrypt_file,
"deobfuscate": cmd_deobfuscate,
"probe": cmd_probe,
"monitor": cmd_monitor,
"fingerprint": cmd_fingerprint,
"report": cmd_report,
}
if args.command in commands:
commands[args.command](args)
else:
parser.print_help()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment