|
#!/usr/bin/env python |
|
# coding: utf-8 |
|
|
|
# In[1]: |
|
|
|
|
|
import re, os, sys, requests, datetime |
|
import argparse |
|
import csv |
|
import subprocess |
|
from typing import NamedTuple |
|
from pathlib import Path |
|
|
|
from collections import defaultdict |
|
try: |
|
from github import Github, Auth |
|
except: |
|
subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", 'PyGithub']) |
|
from github import Github, Auth |
|
## the magic number as offset into the Unicode Private Use Area |
|
puamagic = 1069056 |
|
|
|
env='.env' |
|
## the access token for github needs to be in the .env file |
|
## the format is at=<access token> |
|
## on a line by itself |
|
if os.path.exists(env): |
|
print('Importing environment from .env...') |
|
for line in open(env): |
|
var = line.strip().split('=') |
|
if len(var) == 2: |
|
os.environ[var[0]] = var[1] |
|
|
|
at=os.environ.get('at') |
|
FWS = "\u3000" # full-width space |
|
PILCROW = "¶" |
|
PUA_BASE = 1069056 # U+105000 |
|
|
|
|
|
# In[2]: |
|
|
|
|
|
CSV_FIELDS = ["id", "witness", "pb", "level", "type", "offset"] |
|
|
|
class Marker(NamedTuple): |
|
type: str |
|
char_offset: int |
|
content: str |
|
id: str = "" |
|
|
|
def write_csv(path: Path, markers: list[Marker], id, witness) -> None: |
|
with open(path, "w", newline="", encoding="utf-8") as f: |
|
w = csv.DictWriter(f, fieldnames=CSV_FIELDS) |
|
w.writeheader() |
|
for m in markers: |
|
w.writerow({"id": id, "witness" : witness, "pb" : m.id, "level" : 0, |
|
"type" : m.type, "offset": m.char_offset}) |
|
|
|
|
|
|
|
# In[3]: |
|
|
|
|
|
# Matches <pb:...> tags |
|
_PB_RE = re.compile(r"<pb:[^>]+>") |
|
# Matches mid-body #+KEYWORD lines (with trailing newline) |
|
_ORG_RE = re.compile(r"#\+[^\n]+\n?") |
|
|
|
_PUNC_RE = re.compile(u"[\u3001-\u33FF\uFE00-\uFF7F]+") |
|
|
|
# CJK ideographs (Unified, Ext A, Compat, Ext B-G, Compat Supplement) plus PUA |
|
# (BMP and Supplementary planes). Anything outside this set is treated as a |
|
# marker rather than body text. |
|
_BODY_RE = re.compile( |
|
"[\u3400-\u4DBF\u4E00-\u9FFF\uF900-\uFAFF\uE000-\uF8FF" |
|
"\U00020000-\U0003134F\U0002F800-\U0002FA1F" |
|
"\U000F0000-\U0010FFFD]" |
|
) |
|
|
|
|
|
def strip_body(body: str) -> tuple[str, list[Marker]]: |
|
"""Strip all non-Chinese-character content from body. |
|
|
|
Returns (stripped_chars, markers) where stripped_chars contains only the |
|
meaningful text characters (Chinese + PUA) and markers records everything |
|
that was removed with its char_offset. |
|
""" |
|
markers: list[Marker] = [] |
|
stripped: list[str] = [] |
|
i = 0 |
|
n = len(body) |
|
pb_id = "" # id of the most recently seen <pb:...> tag |
|
line_count = 0 # lines emitted since the current pb |
|
|
|
while i < n: |
|
ch = body[i] |
|
|
|
# Mid-body #+KEYWORD line |
|
if ch == "#" and (i == 0 or body[i - 1] == "\n"): |
|
m = _ORG_RE.match(body, i) |
|
if m: |
|
content = m.group(0) |
|
markers.append(Marker("orgline", len(stripped), content.rstrip("\n"))) |
|
i += len(content) |
|
continue |
|
|
|
# <pb:...> tag |
|
if ch == "<": |
|
m = _PB_RE.match(body, i) |
|
if m: |
|
tag = m.group(0) |
|
pb_id = tag[4:-1] # strip leading "<pb:" and trailing ">" |
|
line_count = 0 |
|
pos = len(stripped) |
|
# If a line marker immediately precedes this pb at the same |
|
# char_offset, remove it: the pilcrow after the pb is kept |
|
# instead, so it can carry the correct line id. |
|
if (markers and markers[-1].type == "line" |
|
and markers[-1].char_offset == pos): |
|
markers.pop() |
|
markers.append(Marker("pb", pos, tag, pb_id)) |
|
i += len(tag) |
|
continue |
|
|
|
# Full-width space |
|
if ch == FWS: |
|
markers.append(Marker(ch, len(stripped), FWS)) |
|
i += 1 |
|
continue |
|
|
|
# Pilcrow (always followed by \n in source) |
|
if ch == PILCROW: |
|
newline = body[i + 1] if i + 1 < n and body[i + 1] == "\n" else "" |
|
line_count += 1 |
|
line_id = f"{pb_id}{line_count:02d}" if pb_id else "" |
|
markers.append(Marker("lb", len(stripped), PILCROW, line_id)) |
|
i += 1 + len(newline) |
|
continue |
|
if ch in ('/', "(", ")", "*"): |
|
markers.append(Marker(ch, len(stripped), ch)) |
|
i += 1 |
|
continue |
|
m = _PUNC_RE.match(body, i) |
|
if m: |
|
markers.append(Marker(ch, len(stripped), ch)) |
|
i += 1 |
|
continue |
|
|
|
# Plain newline |
|
if ch == "\n": |
|
markers.append(Marker("newline", len(stripped), "\n")) |
|
i += 1 |
|
continue |
|
|
|
if not _BODY_RE.match(ch): |
|
markers.append(Marker(ch, len(stripped), ch)) |
|
i += 1 |
|
continue |
|
|
|
stripped.append(ch) |
|
i += 1 |
|
|
|
return "".join(stripped), markers |
|
|
|
|
|
|
|
# In[4]: |
|
|
|
|
|
def get_property(p_in): |
|
p = p_in[2:] |
|
pp = p.split(": ") |
|
if pp[0] in ["DATE", "TITLE"]: |
|
return (pp[0], pp[1]) |
|
elif pp[0] == "PROPERTY": |
|
p1 = pp[1].split() |
|
return (p1[0], " ".join(p1[1:])) |
|
return "Bad property: %s" % (p_in) |
|
|
|
|
|
# In[5]: |
|
|
|
|
|
# save the gaiji table |
|
def save_gjd (txtid, branch, gjd, type="entity"): |
|
if (type=="entity"): |
|
fname = "%s/aux/map/%s_%s-entity-map.xml" % (txtid, txtid, branch) |
|
else: |
|
fname = "%s/aux/map/%s_%s-entity-g.xml" % (txtid, txtid, branch) |
|
of=open(fname, "w") |
|
of.write("""<?xml version="1.0" encoding="UTF-8"?> |
|
<stylesheet xmlns="http://www.w3.org/1999/XSL/Transform" version="2.0"> |
|
<character-map name="krx-map">\n""") |
|
k = [a for a in gjd.keys()] |
|
k.sort() |
|
for kr in k: |
|
if (type=="entity"): |
|
of.write("""<output-character character="%s" string="&%s;"/>\n""" % (gjd[kr], kr)) |
|
else: |
|
of.write("""<output-character character="%s" string="<g ref="%s"/>"/>\n""" % (gjd[kr], kr)) |
|
of.write("""</character-map>\n</stylesheet>\n""") |
|
of.close() |
|
|
|
|
|
# In[6]: |
|
|
|
|
|
def html_header(txtid): |
|
s = """<!DOCTYPE html> |
|
<html> |
|
<head> |
|
<style> |
|
h1 {background-color: green; color: white;} |
|
p {text-align: left; color: black; font-size: 13pt; line-height: 0.75; } |
|
</style> |
|
<meta charset="utf-8"/> |
|
<title>%s</title> |
|
</head> |
|
<body> |
|
<div> |
|
<p class="p"> |
|
""" % (txtid) |
|
return s |
|
|
|
|
|
# In[7]: |
|
|
|
|
|
def krp_header(lx): |
|
s="# -*- mode: mandoku-view; -*-\n" |
|
for k in lx.keys(): |
|
if (k in ('TEXT', 'B')): |
|
continue |
|
if (k in ('DATE', 'TITLE')): |
|
s += "#+%s: %s\n" % (k, lx[k]) |
|
else: |
|
s += "#+PROPERTY: %s %s\n" % (k, lx[k]) |
|
return s |
|
|
|
|
|
# In[8]: |
|
|
|
|
|
# loop through the lines and return a dictionary of metadata and text content |
|
# gjd is the dictionary to hold gaiji encountered, md is wether we want to care about <md: style tags. |
|
# |
|
def parse_text_plain(lines, gjd, md=False): |
|
lx={'TEXT' : []} |
|
lcnt=0 |
|
nl=[] |
|
np=[] |
|
pbxmlid="" |
|
for l in lines: |
|
l=l.replace('<折 t="33"/>', '/') |
|
l=l.replace('\x01', '') |
|
l=l.replace('<img:>', '') |
|
l=re.sub(r"([<]+)([^lmp])", "\\2", l) |
|
l=re.sub(r"([<]+)$", "", l) |
|
#l=re.sub("¶", "", l) |
|
lcnt += 1 |
|
if l.startswith("#+"): |
|
p = get_property(l) |
|
if (p[0] == 'LASTPB'): |
|
l=p[1] + '\n' |
|
else: |
|
lx[p[0]] = p[1] |
|
continue |
|
elif l.startswith("#"): |
|
continue |
|
elif "<pb:" in l: |
|
#nl[-1] = nl[-1] + '¶' |
|
np.append(nl) |
|
nl=[] |
|
pbxmlid=re.sub("<pb:([^_]+)_([^_]+)_([^>]+)¶?>", "\\1_\\2_\\3", l) |
|
pbxmlid=re.sub(u"(.*?<pb:)([^_]+?)_([^_]+)_([^>]+)([>¶]+)", "\\2_\\3_\\4", l) |
|
l=re.sub("¶", "", l) |
|
lcnt = 0 |
|
if "&KR" in l: |
|
# only for the sideeffect |
|
re.sub("&KR([^;]+);", lambda x : gjd.update({"KR%s" % (x.group(1)) : "%c" % (int(x.group(1)) + puamagic)}), l) |
|
l = re.sub("&KR([^;]+);", lambda x : "%c" % (int(x.group(1)) + puamagic ), l) |
|
nl.append(l) |
|
#make sure we end with a ¶ so the line will be emitted |
|
#nl[-1] = nl[-1] + '¶' |
|
np.append(nl) |
|
lx['TEXT'] = np |
|
return lx |
|
|
|
|
|
# In[9]: |
|
|
|
|
|
def save_text(lx, branch, path, html=True, krp=True, suffix=""): |
|
"""lx is the dictionary with content and metadata, path is the target path. Must exist. |
|
""" |
|
if ('WITNESS' in lx): |
|
pass |
|
else: |
|
lx['WITNESS'] = branch |
|
try: |
|
base = path + "/%(ID)s-%(WITNESS)s" % lx + suffix |
|
except: |
|
return |
|
|
|
txt_file = open(base + ".txt", 'w') |
|
if (html): |
|
html_file = open(base + ".html", 'w') |
|
html_file.write( html_header("%(ID)s - %(TITLE)s" % lx)) |
|
if (krp): |
|
krp_file = open(base + ".krp", 'w') |
|
krp_file.write( krp_header(lx)) |
|
lcnt = 1 |
|
ccnt = 0 |
|
lev = '0' |
|
lb = '' |
|
s = '' |
|
for p in lx['TEXT']: |
|
for l in p: |
|
if (krp): |
|
krp_file.write(l+'\n') |
|
l = re.sub('[─。│「」]+', '', l) |
|
if l.startswith('<'): |
|
pb=re.sub("<pb:([^_]+)_([^_]+)_([^>]+)¶?>", "\\3", l) |
|
#print(l, pb) |
|
lcnt = 1 |
|
#lb = format_lns (lx, pb,lcnt, ccnt) |
|
#print (lb, l) |
|
#lns_file.write(lb) |
|
elif (l == ''): |
|
# lb = format_lns(lx, pb,lcnt, ccnt, 'p') |
|
#print (lb, l) |
|
if (html): |
|
html_file.write('<br/>%s\n' % (l)) |
|
else: |
|
if (html): |
|
html_file.write('<br/>%s\n' % (l)) |
|
for c in l: |
|
if c in (' ', "(", ")", "*") : |
|
#lb = format_lns( lx, pb,lcnt, ccnt, c) |
|
#print (lb) |
|
#lns_file.write(lb) |
|
continue |
|
elif c == '¶': |
|
lcnt += 1 |
|
# lb = "%(ID)s-%(WITNESS)s,%(BASEEDITION)s," % lx + "%s%02d,%s,lb," % (pb, lcnt, lev) |
|
continue |
|
ccnt += 1 |
|
s += c |
|
marks = strip_body(''.join([''.join(a) for a in lx['TEXT']])) |
|
txt_file.write(marks[0]) |
|
lns_path = base + ".lns" |
|
write_csv(lns_path, marks[1], "%(ID)s-%(WITNESS)s" % lx, "%(BASEEDITION)s" % lx) |
|
# lns_file.close() |
|
txt_file.close() |
|
if (krp): |
|
krp_file.close() |
|
if (html): |
|
html_file.write('</p></div></body></html>\n') |
|
html_file.close() |
|
|
|
|
|
|
|
# In[10]: |
|
|
|
|
|
def convert_text(txtid, outpath, user='kanripo', by_juan=False): |
|
gh=Github(auth=Auth.Token(at)) |
|
try: |
|
hs=gh.get_repo(f"{user}/{txtid}") |
|
except: |
|
ofxx = open("/Users/chris/krptok-error.txt", "a") |
|
ofxx.write(f"ERROR {txtid} not found?\n") |
|
ofxx.close() |
|
print(f"ERROR {txtid} not found?") |
|
sys.exit() |
|
#get the branches |
|
branches=[a.name for a in hs.get_branches() if not a.name.startswith("_")] |
|
res=[] |
|
pdic = {} |
|
for branch in branches: |
|
try: |
|
if branch == "_data": |
|
flist = [a.path for a in hs.get_contents("/imglist", ref=branch)] |
|
else: |
|
flist = [a.path for a in hs.get_contents("/", ref=branch) if a.path.endswith('txt')] |
|
except: |
|
print('sth wrong', branch) |
|
print(flist) |
|
md = False |
|
xi=[] |
|
gjd = {} |
|
lines = [] |
|
for path in flist: |
|
if path.startswith("imglist"): |
|
r=requests.get(f"https://raw.githubusercontent.com/{user}/{txtid}/{branch}/{path}") |
|
if r.status_code == 200: |
|
cont=r.content.decode(r.encoding) |
|
of = open(ntxtid+bt+path, "w") |
|
of.write(cont) |
|
of.close() |
|
elif path.startswith(txtid): |
|
r=requests.get(f"https://raw.githubusercontent.com/{user}/{txtid}/{branch}/{path}") |
|
if r.status_code == 200: |
|
cont=r.content.decode(r.encoding) |
|
if "<md:" in cont: |
|
md = True |
|
if by_juan: |
|
juan = Path(path).stem.rsplit("_", 1)[-1] |
|
lx = parse_text_plain(cont.split("\n"), gjd) |
|
save_text(lx, branch, outpath, suffix=f"-{juan}") |
|
else: |
|
lines.extend(cont.split("\n")) |
|
#save_text_part(lx, txtid, branch, path) |
|
#print(path, pdic[path]) |
|
else: |
|
pass |
|
#return "No valid content found." |
|
|
|
if not by_juan: |
|
lx = parse_text_plain(lines, gjd) |
|
save_text(lx, branch, outpath) |
|
|
|
# return pdic |
|
|
|
|
|
if __name__ == '__main__': |
|
ap = argparse.ArgumentParser(description="Download and convert a Kanripo text from GitHub.") |
|
ap.add_argument("txtid", help="Kanripo text id (e.g. KR1a0001)") |
|
ap.add_argument("path", nargs="?", default=".", help="Output directory (default: current dir)") |
|
ap.add_argument("--by-juan", action="store_true", |
|
help="Save each juan as its own set of output files instead of one combined file.") |
|
args = ap.parse_args() |
|
convert_text(args.txtid, args.path, by_juan=args.by_juan) |