Skip to content

Instantly share code, notes, and snippets.

@lastforkbender
Last active September 26, 2025 08:33
Show Gist options
  • Select an option

  • Save lastforkbender/5579acca18c997f58af04a47b5630108 to your computer and use it in GitHub Desktop.

Select an option

Save lastforkbender/5579acca18c997f58af04a47b5630108 to your computer and use it in GitHub Desktop.

Revisions

  1. lastforkbender revised this gist Sep 26, 2025. 1 changed file with 118 additions and 1 deletion.
    119 changes: 118 additions & 1 deletion ce_mmap_hqi.py
    Original file line number Diff line number Diff line change
    @@ -482,4 +482,121 @@ def apply_then_measure(store_path: str, frame_id: str, out_path: str):
    interp.apply_unitary_system([[complex(x) for x in row] for row in H])
    res = interp.is_extra_classical()
    with open(out_path, 'w') as f: f.write('classical\n' if res else 'quantum\n')
    #______________________________________________________________________________
    #______________________________________________________________________________

    # q8p.py
    #___________________________________________________________________________________

    class AA:

    slots = ('_u_tlr', '_u_srv')

    def __init__(self):
    pass
    #___________________________________________________________________________________

    def set_aa_short_bus(self, vhs_com: tuple) -> list:

    mtx, vhs, chs, chn = vhs_com[0], vhs_com[1], vhs_com[2], vhs_com[3]
    t = time.ctime().replace(' ', '').split(':')
    t = math.log(int(f'{"".join([c for c in t if c.isdigit()])}{t[1]}')*vhs)
    mtx_len = len(mtx)
    if mtx_len%2 != 0:
    raise Exception(f'<q8p> variance matrices length({len(mtx)}) not even')
    mtx = [self.aa_vhs_commutator(mtx[i], mtx[i+1], vhs, chs, chn) for i in range(mtx_len-1)]
    for i in range(mtx_len-1):
    print(self.solve_aa_short_bus_pvt_cycle(t, mtx[i]))
    #___________________________________________________________________________________

    def solve_aa_short_bus_pvt_cycle(self, t: float, mt: tuple) -> tuple:

    hd, R, tlr, srv = mt[0], mt[1], mt[2], mt[3]
    if tlr < srv:
    gr = self.get_pvt_gate_range(False, t, tlr, srv)
    else:
    gr = self.get_pvt_gate_range(True, t, tlr, srv)
    if gr+hd > abs(cmath.sqrt(R.real)*cmath.sqrt(R.imag))**hd/gr:
    return math.floor(abs((t+gr)*cmath.cos(R))+tlr), math.floor((gr+hd*t)/t)
    else:
    return 1, math.floor(((gr*hd)/abs(cmath.sin(R)))/t)
    #___________________________________________________________________________________

    def get_pvt_gate_range(self, isRested: bool, t: float, tlr: float, srv: float) -> int:

    x = tlr*srv/math.sqrt(t)
    if isRested:
    return math.ceil(abs(cmath.sqrt(complex((x+t)/math.tanh(t*2), x*t)))/(tlr+srv))
    return math.ceil(abs(cmath.sqrt(complex(x+t, x*t)))/math.pi)
    #___________________________________________________________________________________

    def aa_vhs_commutator(self, vrncA: list, vrncB: list, vhs: float, chs: complex, chn: complex) -> tuple:

    c_ttl = (chs.real+chs.imag)+(chn.real+chn.imag)
    if sum(vrncA) < c_ttl or sum(vrncB) < c_ttl:
    raise ValueError(f'<q8p> @chs + @chn summation({c_ttl}) > listed variances')
    hd = (int(vhs*sum(vrncA)*sum(vrncB)*math.pi)*int(math.sqrt(abs(chs)+abs(chn))))*math.pi
    R = min(math.cos(abs(chs)+abs(chn)), math.sin(vhs+abs(chn)))
    t, q = cmath.tan(((chs*chn)/math.pi)+R), math.pi/2
    k = (vrncA[0]*vrncA[len(vrncA)-1]*vrncB[0]*vrncB[len(vrncB)-1])**t
    ds, s, c, r, rv = math.ceil(abs(cmath.exp(R+chs-chn)))**vhs*t, hd, 1, set(), set()
    t+=k**R
    while s > -1:
    for h in vrncA:
    t+=h-ds
    m = (t*k+s)-cmath.sqrt((t-ds)+c)-t+(ds*(c*2))
    if m.real < 0:
    q = abs(c-t)
    for n in vrncB:
    w = sum([math.sin(abs(m)+q) for x in vrncB])
    a_ds, a_ks = abs(ds), abs(cmath.cos(k))
    if ((w/(c/2))+(R*n))-a_ds <= (vhs+c+R)/(math.pi-abs(t)+a_ks):
    r.add(w)
    else:
    t-=vhs
    n = math.sin((abs(cmath.cos(t)+ds)//(c*abs(t))))
    if n > 0.0:
    n, cn = math.floor(n*c*abs(cmath.tan(k+m))), 0
    lm = [complex((t*c)/(n+1)/q, -1) for x in range(n)]
    r = list(r)
    r.sort()
    d = len(r)
    if d > 0:
    while cn < len(lm):
    l = -(abs(lm[cn])-cn)/q
    if cn >= d: cn = 0
    l = l+r[cn]
    if l > 0.0: rv.add(l)
    cn+=1
    r = set(r)
    a_m = abs(m)
    if a_m > 0: r.add(a_m/(abs(cmath.sqrt(chn))+math.pi))
    s+=-1
    c+=1
    r, rv = list(r), list(rv)
    r.sort()
    rv.sort()
    tlr = -math.log((abs(cmath.tan(t))/hd)*abs(cmath.log(cmath.sqrt(m+hd))))
    return hd, R+cmath.log(t), tlr, sum(r)/sum(rv)/cn
    #___________________________________________________________________________________

    def test():

    cls = AA()

    stackA = [0.2, 0.5, 0.8, 1.1, 1.3, 1.5]
    stackB = [0.7, 0.9, 1.1, 1.5, 1.8, 2.0]
    stackC = [0.1, 0.4, 1.2, 1.8, 1.9, 2.2]
    stackD = [0.2, 0.5, 0.8, 1.0, 1.4, 2.6]
    stackE = [0.1, 0.2, 0.6, 0.8, 1.0, 1.7]
    stackF = [0.2, 0.5, 0.8, 1.0, 1.4, 2.6]
    stackG = [0.1, 0.4, 0.9, 1.2, 1.4, 2.3]
    stackH = [0.6, 0.9, 1.2, 1.6, 1.9, 2.8]
    grid = [stackA, stackB, stackC, stackD, stackE, stackF, stackG, stackH]
    vhs = 0.2

    chs = complex(1.0, 0.2)
    chn = complex(1.0, 0.5)

    cls.set_aa_short_bus((grid, vhs, chs, chn))

    test()
  2. lastforkbender created this gist Sep 26, 2025.
    485 changes: 485 additions & 0 deletions ce_mmap_hqi.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,485 @@
    # ce_mmap_hqi.py - Extra Particle(Classic/Quantum) tiny srf-sim for palindrome
    # luyh rotational variable(s) complex interpolation dynamics
    #______________________________________________________________________________

    import tempfile
    import struct
    import cmath
    import math
    import mmap
    import time
    import os
    #______________________________________________________________________________
    BUSH = b'RENOWNED'
    HDR_FMT = '>Q'
    DOUBLE_FMT = '>d'
    HDR_SZ = struct.calcsize(HDR_FMT)
    REC_HDR_FMT = 'BQ'
    REC_HDR_SZ = struct.calcsize(REC_HDR_FMT)
    REG_ENTRY_HDR_FMT = '>H'
    REG_ENTRY_HDR_SZ = struct.calcsize(REG_ENTRY_HDR_FMT)
    REG_FRAME_META_FMT = '>BQQQII'
    REG_FRAME_META_SZ = struct.calcsize(REG_FRAME_META_FMT)
    #______________________________________________________________________________

    def pack_rec_header(rec_type: int, body_len: int) -> bytes:

    return struct.pack(REC_HDR_FMT, rec_type, body_len)
    #______________________________________________________________________________

    def unpack_rec_header(b: bytes) -> tuple:

    return struct.unpack(REC_HDR_FMT, b)
    #______________________________________________________________________________

    def pack_u64(x: int) -> bytes:

    return struct.pack(">Q", x)
    #______________________________________________________________________________

    def unpack_u64(b: bytes) -> int:

    return struct.unpack(">Q", b)[0]
    #______________________________________________________________________________

    def pack_u32(x: int) -> bytes:

    return struct.pack(">I", x)
    #______________________________________________________________________________

    def unpack_u32(b: bytes) -> int:

    return struct.unpack(">I", b)[0]
    #______________________________________________________________________________

    def pack_u16(x: int) -> bytes:

    return struct.pack(">H", x)
    #______________________________________________________________________________

    def unpack_u16(b: bytes) -> int:

    return struct.unpack(">H", b)[0]
    #______________________________________________________________________________
    #//////////////////////////////////////////////////////////////////////////////

    class BinStore:

    def __init__(self, path: str, initial_size: int=8_000_000):

    self.path, mm = path, None
    init = os.path.exists(path)
    self.fd = os.open(path, os.O_RDWR | os.O_CREAT)
    if init:
    os.ftruncate(self.fd, initial_size)
    mm = mmap.mmap(self.fd, 0)
    try:
    mm[0:8] = BUSH
    mm[8:16] = pack_u64(16)
    mm[16:24] = pack_u64(0)
    finally:
    mm.close()
    self.mm = mmap.mmap(self.fd, 0)
    self.size = self.mm.size()
    self.write_pos = self._find_end()
    #______________________________________________________________________________

    def _find_end(self) -> int:

    i = 24
    while i+REC_HDR_SZ <= self.size:
    hdr = self.mm[i:i+REC_HDR_SZ]
    if hdr == b'\x00'*REC_HDR_SZ:
    return i
    rtype, blen = unpack_rec_header(hdr)
    i+=REC_HDR_SZ+blen
    return i
    #______________________________________________________________________________

    def close(self):

    try:
    self.mm.flush()
    self.mm.close()
    finally:
    os.close(self.fd)
    #______________________________________________________________________________

    def _ensure_capacity(self, needed: int):

    if self.write_pos+needed <= self.size:
    return
    new_size = max(self.size*2, self.write_pos+needed+1024)
    self.mm.close()
    os.ftruncate(self.fd, new_size)
    self.mm = mmap.mmap(self.fd, 0)
    self.size = new_size
    #______________________________________________________________________________

    def append_record(self, rec_type: int, body: bytes) -> int:

    hdr = pack_rec_header(rec_type, len(body))
    rec = hdr+body
    off = self.write_pos
    self._ensure_capacity(len(rec))
    self.mm[off:off+len(rec)] = rec
    self.write_pos+=len(rec)
    if rec_type == 1: self.mm[16:24] = pack_u64(off)
    return off
    #______________________________________________________________________________

    def read_record_header(self, offset: int) -> tuple:

    hdr = self.mm[offset:offset+REC_HDR_SZ]
    return unpack_rec_header(hdr)
    #______________________________________________________________________________

    def read_record_body(self, offset: int, blen: int) -> bytes:

    return bytes(self.mm[offset+REC_HDR_SZ: offset+REC_HDR_SZ+blen])
    #______________________________________________________________________________

    def read_registry_offset(self) -> int:

    return unpack_u64(self.mm[16:24])
    #______________________________________________________________________________

    def read_BUSH(self) -> bytes:

    return bytes(self.mm[0:8])
    #______________________________________________________________________________
    #//////////////////////////////////////////////////////////////////////////////

    class Base:
    __slots__ = ('_id',)

    def __init__(self, _id: str):
    self._id = _id
    #______________________________________________________________________________
    #//////////////////////////////////////////////////////////////////////////////

    class ExtraParticle(Base):

    __slots__ = Base.__slots__ + ('store_path', 'rec_off')

    def __init__(self, _id: str, store_path: str, rec_off: int=0):

    super().__init__(_id)
    self.store_path, self.rec_off = store_path, rec_off
    #______________________________________________________________________________

    def read(self) -> list:

    if self.rec_off == 0:
    return None
    s = BinStore(self.store_path)
    rtype, blen = s.read_record_header(self.rec_off)
    if rtype != 3:
    s.close()
    raise ValueError('expected extra record...')
    body = s.read_record_body(self.rec_off, blen)
    s.close()
    return _unpack_complex_vec(body)
    #______________________________________________________________________________
    #//////////////////////////////////////////////////////////////////////////////

    class SystemState(Base):

    __slots__ = Base.__slots__ + ('store_path', 'rec_off', 'dim')

    def __init__(self, _id: str, store_path: str, rec_off: int):

    super().__init__(_id)
    self.store_path = store_path
    self.rec_off = rec_off
    s = BinStore(self.store_path)
    rtype, blen = s.read_record_header(self.rec_off)
    if rtype != 2:
    s.close()
    raise ValueError("expected system record")
    body = s.read_record_body(self.rec_off, blen)
    s.close()
    vec = _unpack_complex_vec(body)
    self.dim = len(vec)
    #______________________________________________________________________________

    def read_vec(self) -> list:

    s = BinStore(self.store_path)
    rtype, blen = s.read_record_header(self.rec_off)
    body = s.read_record_body(self.rec_off, blen)
    s.close()
    return _unpack_complex_vec(body)
    #______________________________________________________________________________
    #//////////////////////////////////////////////////////////////////////////////

    class ReferenceFrame(Base):

    __slots__ = Base.__slots__ + ('store_path', 'frame_id', 'has_joint', 'joint_off',
    'system_rec', 'extra_rec', 'sys_dim', 'extra_dim',
    'children')

    def __init__(self, _id: str, store_path: str, frame_id: str,
    has_joint: bool, joint_off: int, system_rec: int,
    extra_rec: int, sys_dim: int, extra_dim: int):

    super().__init__(_id)
    self.store_path, self.frame_id, self.has_joint = store_path, frame_id, has_joint
    self.joint_off, self.system_rec, self.extra_rec = joint_off, system_rec, extra_rec
    self.sys_dim, self.extra_dim = sys_dim, extra_dim
    self.children: List['ReferenceFrame'] = []
    #______________________________________________________________________________

    def add_child(self, child: 'ReferenceFrame'):

    self.children.append(child)
    #______________________________________________________________________________

    def _pack_complex_vec(vec: list) -> bytes:

    out = bytearray()
    out+=pack_u32(len(vec))
    for z in vec:
    out+=struct.pack(DOUBLE_FMT, float(z.real))
    out+=struct.pack(DOUBLE_FMT, float(z.imag))
    return bytes(out)
    #______________________________________________________________________________

    def _unpack_complex_vec(body: bytes) -> list:

    cnt, vec, off = unpack_u32(body[0:4]), [], 4
    for _ in range(cnt):
    re = struct.unpack(DOUBLE_FMT, body[off:off+8])[0]; off+=8
    im = struct.unpack(DOUBLE_FMT, body[off:off+8])[0]; off+=8
    vec.append(complex(re, im))
    return vec
    #______________________________________________________________________________

    def pack_registry(reg: list) -> bytes:

    out = bytearray()
    for fid, meta in reg.items():
    fid_b = fid.encode('utf-8')
    out+=pack_u16(len(fid_b))
    out+=fid_b
    has_joint = 1 if meta.get('joint_off', 0) else 0
    out+=struct.pack(REG_FRAME_META_FMT, has_joint, meta.get('joint_off', 0),
    meta.get('system_off', 0), meta.get('extra_off', 0),
    meta.get('sys_dim', 0), meta.get('extra_dim', 0))
    return bytes(out)
    #______________________________________________________________________________

    def unpack_registry(body: bytes) -> dict:

    i, reg = 0, {}
    while i < len(body):
    fid_len = unpack_u16(body[i:i+2]); i+=2
    fid = body[i:i+fid_len].decode('utf-8'); i+=fid_len
    has_joint = body[i]; i+=1
    joint_off = unpack_u64(body[i:i+8]); i+=8
    system_off = unpack_u64(body[i:i+8]); i+=8
    extra_off = unpack_u64(body[i:i+8]); i+=8
    sys_dim = unpack_u32(body[i:i+4]); i+=4
    extra_dim = unpack_u32(body[i:i+4]); i+=4
    reg[fid] = {'joint_off':joint_off, 'system_off':system_off,
    'extra_off':extra_off, 'sys_dim':sys_dim,
    'extra_dim':extra_dim}
    return reg
    #______________________________________________________________________________

    def mat_vec(U: list, v: list) -> list:

    return [sum(ui[j]*v[j] for j in range(len(v))) for ui in U]
    #______________________________________________________________________________

    def dagger(U: list) -> list:

    return [list(col) for col in zip(*[[complex(x).conjugate() for x in row] for row in U])]
    #______________________________________________________________________________

    def is_unitary(U: list, eps: float=1e-9) -> bool:

    n, ut = len(U), dagger(U)
    prod = [[sum(U[i][k]*ut[k][j] for k in range(n)) for j in range(n)] for i in range(n)]
    for i in range(n):
    for j in range(n):
    target = 1.0 if i == j else 0.0
    if abs(prod[i][j]-target) > eps:
    return False
    return True
    #______________________________________________________________________________

    def pure_state_density(vec: list) -> list:

    return [[vec[i]*complex(vec[j]).conjugate() for j in range(len(vec))] for i in range(len(vec))]
    #______________________________________________________________________________

    def partial_trace(rho: list, sys_dim: int, extra_dim: int, trace_over: str) -> list:

    if trace_over == 'extra':
    res = [[0+0j for _ in range(sys_dim)] for _ in range(sys_dim)]
    for a in range(sys_dim):
    for b in range(sys_dim):
    s = 0+0j
    for k in range(extra_dim):
    i, j = a*extra_dim+k, b*extra_dim+k
    s+=rho[i][j]
    res[a][b] = s
    return res
    else:
    res = [[0+0j for _ in range(extra_dim)] for _ in range(extra_dim)]
    for a in range(extra_dim):
    for b in range(extra_dim):
    s = 0+0j
    for k in range(sys_dim):
    i, j = k*extra_dim+a, k*extra_dim+b
    s+=rho[i][j]
    res[a][b] = s
    return res
    #______________________________________________________________________________

    def is_classical_density(rho: list, eps: float=1e-9) -> bool:

    n = len(rho)
    for i in range(n):
    for j in range(n):
    if i != j and abs(rho[i][j]) > eps:
    return False
    return True
    #______________________________________________________________________________
    #//////////////////////////////////////////////////////////////////////////////

    class Interpreter:

    __slots__ = ('store_path', 'attached_frame_id')

    def __init__(self, store_path: str):

    self.store_path, self.attached_frame_id = store_path, None
    #______________________________________________________________________________

    def attach(self, frame_id: str):

    self.attached_frame_id = frame_id
    #______________________________________________________________________________

    def detach(self):

    self.attached_frame_id = None
    #______________________________________________________________________________

    def is_extra_classical(self) -> bool:

    if self.attached_frame_id is None:
    raise RuntimeError('not attached')
    s = BinStore(self.store_path)
    reg_off = s.read_registry_offset()
    if reg_off == 0:
    s.close()
    raise RuntimeError('no registry')
    rtype, blen = s.read_record_header(reg_off)
    reg = unpack_registry(s.read_record_body(reg_off, blen))
    info = reg.get(self.attached_frame_id)
    if info is None:
    s.close()
    raise RuntimeError('frame not in registry')
    if info['joint_off'] != 0:
    rec = s.read_record_body(info['joint_off'], s.read_record_header(info['joint_off'])[1])
    s.close()
    rho = pure_state_density(_unpack_complex_vec(rec))
    red_extra = partial_trace(rho, info['sys_dim'], info['extra_dim'], trace_over='sys')
    return is_classical_density(red_extra)
    else:
    s.close()
    return info['extra_off'] == 0
    #______________________________________________________________________________

    def apply_unitary_system(self, U: list) -> int:

    if self.attached_frame_id is None:
    raise RuntimeError('not attached')
    s = BinStore(self.store_path)
    reg_off = s.read_registry_offset()
    if reg_off == 0:
    s.close()
    raise RuntimeError('no registry')
    rtype, blen = s.read_record_header(reg_off)
    reg = unpack_registry(s.read_record_body(reg_off, blen))
    info = reg.get(self.attached_frame_id)
    if info is None:
    s.close()
    raise RuntimeError('frame not in registry')
    if info['joint_off'] != 0:
    joint_body = s.read_record_body(info['joint_off'], s.read_record_header(info['joint_off'])[1])
    joint_vec = _unpack_complex_vec(joint_body)
    sys_dim = info['sys_dim']; extra_dim = info['extra_dim']
    if len(U) != sys_dim or not is_unitary(U):
    s.close()
    raise ValueError('unitary mismatch')
    new_joint = [0+0j]*(sys_dim*extra_dim)
    for i in range(sys_dim):
    for j in range(extra_dim):
    acc = 0+0j
    for k in range(sys_dim): acc+=U[i][k]*joint_vec[k*extra_dim+j]
    new_joint[i*extra_dim+j] = acc
    norm = math.sqrt(sum(abs(x)**2 for x in new_joint))
    newn = [x/norm for x in new_joint]
    body_new = _pack_complex_vec(newn)
    new_off = s.append_record(4, body_new)
    reg2 = dict(reg)
    reg2[self.attached_frame_id] = dict(info)
    reg2[self.attached_frame_id]['joint_off'] = new_off
    body_reg = pack_registry(reg2)
    s.append_record(1, body_reg)
    s.close()
    return new_off
    else:
    sys_body = s.read_record_body(info['system_off'], s.read_record_header(info['system_off'])[1])
    vec = _unpack_complex_vec(sys_body)
    if len(U) != len(vec) or not is_unitary(U):
    s.close()
    raise ValueError('unitary mismatch')
    new = mat_vec(U, vec)
    norm = math.sqrt(sum(abs(x)**2 for x in new))
    newn = [x/norm for x in new]
    body_new = _pack_complex_vec(newn)
    new_off = s.append_record(2, body_new)
    reg2 = dict(reg)
    reg2[self.attached_frame_id] = dict(info)
    reg2[self.attached_frame_id]['system_off'] = new_off
    body_reg = pack_registry(reg2)
    s.append_record(1, body_reg)
    s.close()
    return new_off
    #______________________________________________________________________________

    def create_initial_store(path: str) -> None:

    s, sysA, joint = BinStore(path), [1+0j, 0+0j], []
    off_sysA = s.append_record(2, _pack_complex_vec(sysA))
    for i in range(2):
    for j in range(2): joint.append(1/math.sqrt(2) if i == j else 0+0j)
    off_jointQ = s.append_record(4, _pack_complex_vec(joint))
    reg = {'A': {'joint_off':0, 'system_off':off_sysA, 'extra_off':0, 'sys_dim':2, 'extra_dim':1},
    'Q': {'joint_off':off_jointQ, 'system_off':0, 'extra_off':0, 'sys_dim':2, 'extra_dim':2},}
    s.append_record(1, pack_registry(reg))
    s.close()
    #______________________________________________________________________________

    def measure(store_path: str, frame_id: str, out_path: str):

    interp = Interpreter(store_path)
    interp.attach(frame_id)
    res = interp.is_extra_classical()
    with open(out_path, 'w') as f: f.write('classical\n' if res else 'quantum\n')
    #______________________________________________________________________________

    def apply_then_measure(store_path: str, frame_id: str, out_path: str):

    interp = Interpreter(store_path)
    interp.attach(frame_id)
    H = [[1/math.sqrt(2), 1/math.sqrt(2)], [1/math.sqrt(2), -1/math.sqrt(2)]]
    interp.apply_unitary_system([[complex(x) for x in row] for row in H])
    res = interp.is_extra_classical()
    with open(out_path, 'w') as f: f.write('classical\n' if res else 'quantum\n')
    #______________________________________________________________________________