#!/usr/bin/env python3 # # git-subline-merge # # Created by Paul Altin on 02.03.18. # Downloaded from https://github.com/paulaltin/git-subline-merge # # An interactive git merge driver which can resolve non-overlapping conflicts on individual or adjacent lines. # # To install for use during merge/rebase, place this script somewhere on your path, add these lines to your ~/.gitconfig: # # [merge "git-subline-merge"] # name = An interactive merge driver for resolving sub-line conflicts # driver = git-subline-merge %O %A %B %L %P # recursive = binary # # and this to your git attributes file (e.g. ~/.config/git/attributes): # # * merge=git-subline-merge # # Alternatively, run this script on a conflicted file using 'git-subline-merge /path/to/file' # import os, sys, re, tempfile from subprocess import call from shutil import copyfile from builtins import input # improved command line editing on *nix try: import readline except ImportError: pass # for interpreting environment variables try: from distutils.util import strtobool except ImportError: from distutils import strtobool ############# ### SETUP ### ############# WINDOWS = os.name == 'nt' # max hunk sizes MAX_HUNK_SIZE = int(os.getenv('GIT_SUBLINE_MERGE_MAX_HUNK_SIZE', 16)) MAX_HUNK_SIZE_DIFF = int(os.getenv('GIT_SUBLINE_MERGE_MAX_HUNK_SIZE_DIFF', 8)) # colors # don't use color if stdout is not a terminal or if it doesn't support at least 8-bit color # also respect NO_COLOR informal standard stdout_isatty = hasattr(sys.stdout, 'isatty') and sys.stdout.isatty() term_colors = int(os.popen('which tput >/dev/null && tput colors').read() or 0) no_color = os.getenv('NO_COLOR') is not None show_color = stdout_isatty and term_colors >= 8 and not no_color # use colorama if available for colored terminal output on Windows if WINDOWS and show_color: try: from colorama import init init() except ImportError: print('Install colorama (pip install colorama) for colored output on Windows.') pass # interactive or non-interactive mode stdin_isatty = hasattr(sys.stdin, 'isatty') and sys.stdin.isatty() interactive = not bool(strtobool(os.getenv('GIT_SUBLINE_MERGE_NON_INTERACTIVE_MODE', 'False'))) if interactive and not stdin_isatty and not WINDOWS: try: sys.stdin = open('/dev/tty') except IOError: print('Warning: git-subline-merge could not run in interactive mode because there is no controlling terminal.\n' 'You can enable non-interactive mode using the GIT_SUBLINE_MERGE_NON_INTERACTIVE_MODE environment variable,\n' ' however note that in this mode you will NOT be able to review merges before they are written to file.') sys.exit(1) ############### ### HELPERS ### ############### # for colored output in terminal class color: bold = '\033[1m' if show_color else '' welcome = '\033[1m\033[91m' if show_color else '' # bold, red info = '\033[96m' if show_color else '' # cyan highlight = '\033[103m\033[1m\033[30m' if show_color else '' # bold, black, yellow bg added1 = '\033[48;5;28m' if show_color else '' # bright green added2 = '\033[48;5;22m' if show_color else '' # dark green deleted1 = '\033[48;5;124m' if show_color else '' # bright red deleted2 = '\033[48;5;88m' if show_color else '' # dark red deleted_both = '\033[48;5;166m' if show_color else '' # orange success = '\033[92m' if show_color else '' # green warning = '\033[93m' if show_color else '' # yellow error = '\033[91m' if show_color else '' # red end = '\033[0m' if show_color else '' # get number of lines in a file, either from a name or a file handle # if given a handle, ensure position is reset to its original value def file_len(f): i = 0 # file try: with open(f, 'r') as fh: for i,l in enumerate(fh): pass # file handle except TypeError: pos = f.tell() f.seek(0) for i,l in enumerate(f): pass f.seek(pos) return i + 1 # get the line numbers of the beginning and end of a conflicted hunk # the file parameter is a path to a file on disk def find_nth_conflicted_hunk(file, n): start = stop = -1 with open(file, 'r') as f: m = 0 for i,l in enumerate(f): if l.startswith(marker_start): m += 1 if (m == n and start == -1): start = i elif start >= 0 and l.startswith(marker_end): stop = i + 1 break if start == -1 or stop == -1: raise IndexError('Hunk %d hunk not found!' % n) return (start, stop) # find nth conflict in file and replace it with given lines # the file parameter is a path to a file on disk # lines is a list of strings def replace_nth_conflicted_hunk_with_lines(file, n, lines): contents = None with open(file, 'r') as f: contents = f.readlines() start, stop = find_nth_conflicted_hunk(file, n) contents[start:stop] = lines with open(file, 'w') as f: f.write(''.join(contents)) # prompt user for input, repeating until one of the allowed responses is given def ask_for_input(msg, allowed=None): if allowed is not None: msg = color.bold + color.info + msg + ' (' + '/'.join(allowed) + ')? ' + color.end while True: r = input(msg).strip().lower() if r in [s.strip().lower() for s in allowed]: break else: msg = color.bold + color.info + msg + color.end r = input(msg) return r # get exit status from a call to os.system # on Windows, os.WEXITSTATUS() doesn't work and os.system() returns the status directly def get_exit_status(exit_code): return exit_code if WINDOWS else os.WEXITSTATUS(exit_code) ############# ### DIFFS ### ############# # print one version of a hunk, highlighting the differences between it and the other version(s) # display is either 'current', 'base' or 'other' # added lines are indicated by a leading '+' and are highlighted in green # deleted lines are indicated by a leading '-' and are highlighted in red # changed lines are indicated by a leading '*' and changes within them are highlighted in either green or red # for current or other version, we just need to highlight the differences between it # and the base version, which can only be changes or additions # for the base version, we want to highlight the differences between it and BOTH # the current and the other version, which can only be changes or deletions # we take the diff with both versions and combine them by keeping all color codes def print_formatted_diff(hunk, display): if display == 'current' or display == 'other': old = hunk[1] new = hunk[0] if display == 'current' else hunk[2] color_added = color.added1 if display == 'current' else color.added2 color_deleted = color.deleted1 if display == 'current' else color.deleted2 clines = format_diff(old, new, 'new', color_added, color_deleted) for ln in clines: print(ln) elif display == 'base': clines1 = format_diff(hunk[1], hunk[0], 'old', color.added1, color.deleted1) clines2 = format_diff(hunk[1], hunk[2], 'old', color.added2, color.deleted2) clines = combine_colors(clines1, clines2, color.deleted1, color.deleted2, color.deleted_both) for ln in clines: print(ln) # add colors and prefixes showing the parts added or deleted between two versions # fold and fnew are file handles for the old and new versions # display must be either 'old' or 'new' # added lines are indicated by a leading '+' and are highlighted using color_added # deleted lines are indicated by a leading '-' and are highlighted using color_deleted # changed lines are indicated by a leading '*' and changes within them are highlighted using color_added and color_deleted def format_diff(fold, fnew, display, color_added, color_deleted): # reset file handles fold.seek(0) fnew.seek(0) # find line numbers of added, deleted and changed lines # for added and deleted groups, get the (zero-indexed) number of the first line and the length of the group # for changed groups, get the (zero-indexed) numbers of the first and one past the last line # ideally we'd use \n as the delimiter, but these break the shell on Windows delimiter = marker_start + "GIT_SUBLINE_MERGE_DELIMITER" + marker_end oldformat = ' --old-group-format="D%de+%dn,%dE+%dN' + delimiter + '" ' newformat = ' --new-group-format="A%de+%dn,%dE+%dN' + delimiter + '" ' changedformat = ' --changed-group-format="C%de+%dl,%dE+%dL' + delimiter + '" ' unchangedformat = ' --unchanged-group-format="" ' cmd = 'diff' + oldformat + newformat + changedformat + unchangedformat + fold.name + ' ' + fnew.name hunks = [h for h in os.popen(cmd).read().split(delimiter) if len(h)] # extract indices from diff output # added and deleted are a list of line numbers # changed are lists of (start, stop) pairs added, deleted, oldchange, newchange = [], [], [], [] for h in hunks: if h[0] == 'A': a = h[1:].split(',')[1] start,length = [int(x) for x in a.split('+')] added += list(range(start, start+length)) elif h[0] == 'D': d = h[1:].split(',')[0] start,length = [int(x) for x in d.split('+')] deleted += list(range(start, start+length)) elif h[0] == 'C': c = h[1:].split(',') start,stop = [int(x) for x in c[0].split('+')] oldchange += [[start, stop]] start,stop = [int(x) for x in c[1].split('+')] newchange += [[start, stop]] # generate a list of lines with prefixes and highlighting lines = [] i, j = 0, 0 old = fold.read().splitlines() new = fnew.read().splitlines() version = old if display == 'old' else new change = oldchange if display == 'old' else newchange while i < len(version): if display == 'new' and i in added: lines.append('+ ' + color_added + version[i] + color.end) elif display == 'old' and i in deleted: lines.append('- ' + color_deleted + version[i] + color.end) elif j < len(change) and i == change[j][0]: oldhunk = old[oldchange[j][0]:oldchange[j][1]] newhunk = new[newchange[j][0]:newchange[j][1]] coloredhunk = format_changed_hunk(oldhunk, newhunk, display, color_added, color_deleted) for ln in coloredhunk: lines.append('* ' + ln) i += change[j][1] - change[j][0] - 1 # 1 will be added later j += 1 else: lines.append(' ' + version[i]) i += 1 return lines # word-level diff with coloring # additions are highlighted using color_added, deletions using color_deleted # h1 and h2 are lists of strings (lines) # display must be either 'old' or 'new' # return value is a list of strings (lines) that matches # either h1 or h2, depending on the value of display def format_changed_hunk(h1, h2, display, color_added, color_deleted): result = '' # split lines at non-word characters # keep track of where the linebreaks are in terms of segment index splA, splB, lcA, lcB = [], [], [0], [0] for ln in h1: segments = re.split('(\W)', ln) splA += segments lcA.append(lcA[-1] + len(segments)) for ln in h2: segments = re.split('(\W)', ln) splB += segments lcB.append(lcB[-1] + len(segments)) lcA.pop(0) lcB.pop(0) # write split lines to temp files tmpA = tempfile.NamedTemporaryFile(mode='w+') tmpA.write('\n'.join(splA)) tmpA.flush() tmpB = tempfile.NamedTemporaryFile(mode='w+') tmpB.write('\n'.join(splB)) tmpB.flush() # use diff to find added and removed section indices old = os.popen('diff --unchanged-line-format="." --old-line-format="-" --new-line-format="" ' + tmpA.name + ' ' + tmpB.name).read() new = os.popen('diff --unchanged-line-format="." --old-line-format="" --new-line-format="+" ' + tmpA.name + ' ' + tmpB.name).read() removed = [] for i,c in enumerate(old): if c == '-': removed.append(i) added = [] for i,c in enumerate(new): if c == '+': added.append(i) if display == 'old': for i,w in enumerate(splA): if i in lcA: result += '\n' if i in removed: result += color_deleted result += w.rstrip('\n') if i in removed: result += color.end elif display == 'new': for i,w in enumerate(splB): if i in lcB: result += '\n' if i in added: result += color_added result += w.rstrip('\n') if i in added: result += color.end tmpA.close() tmpB.close() return result.split('\n') # combine highlighting from two versions of the same (base) hunk and print # lines1 and lines2 are lists of strings (lines) and must be identical # except for highlighting with color1 and color2 # regions which are highlighted in both versions will be printed with color_both def combine_colors(lines1, lines2, color1, color2, color_both): # merge the two versions lines = [] assert len(lines1) == len(lines2) for ln1, ln2 in zip(lines1, lines2): i, j = 0, 0 result = '' highlight1, highlight2 = False, False # remove prefixes (we will add them back later) prefixes = [ln1[0], ln2[0]] ln1, ln2 = ln1[2:], ln2[2:] # i iterates through ln1, j iterates through ln2 while i < len(ln1) or j < len(ln2): # add special characters from version 1 if i < len(ln1): if show_color and ln1[i:i+len(color1)] == color1: result += (color.end + color_both) if highlight2 else color1 highlight1 = True i += len(color1) continue elif show_color and ln1[i:i+len(color.end)] == color.end: highlight1 = False result += color.end if highlight2: result += color2 i += len(color.end) continue # add special characters from version 2 if j < len(ln2): if show_color and ln2[j:j+len(color2)] == color2: result += (color.end + color_both) if highlight1 else color2 highlight2 = True j += len(color2) continue elif show_color and ln2[j:j+len(color.end)] == color.end: highlight2 = False result += color.end if highlight1: result += color1 j += len(color.end) continue # all other characters should match assert ln1[i] == ln2[j] result += ln1[i] i += 1 j += 1 # add prefix and print line prefix = '- ' if '-' in prefixes else '* ' if '*' in prefixes else ' ' lines.append(prefix + result) return lines ############### ### MERGING ### ############### def process_hunk(hunk, index, resolved, num_conflicts): first_run = True fixed = abort = False # get number of lines in current, base and other hunks sizes = [file_len(f) for f in hunk] # only process small hunks if all([s <= MAX_HUNK_SIZE for s in sizes]) and max(abs(sizes[0]-sizes[1]), abs(sizes[2]-sizes[1])) <= MAX_HUNK_SIZE_DIFF: # interactive mode if interactive: print(color.highlight + '\nConflicted hunk %d of %d (spans %d/%d/%d lines) in %s...' % (index, num_conflicts, sizes[0], sizes[1], sizes[2], filename) + color.end) # ask user for action to take while True: for f in hunk: f.seek(0) if first_run: print('') action = 'v' first_run = False else: print('') print(color.bold + ' v - view entire hunk' + color.end) print(color.bold + ' x - view hunk in context' + color.end) print(color.bold + ' s - attempt sub-line merge' + color.end) print(color.bold + ' m - resolve manually' + color.end) print(color.bold + ' c - use current version' + color.end) print(color.bold + ' b - use base version' + color.end) print(color.bold + ' o - use other version' + color.end) print(color.bold + ' k - skip this hunk' + color.end) print(color.bold + ' q - skip all hunks in this file' + color.end) action = ask_for_input('Resolve this hunk', ['v','x','s','m','c','b','o','k','q']) print('') # actions if action == 'x': start, stop = find_nth_conflicted_hunk(current, index-resolved) with open(current, 'r') as c: for i,l in enumerate(c): if i in range(max(0, start-10), start): print(l.rstrip('\n')) elif i >= start: break if action in ['v','x']: print(color.info + marker_start + ' Current version' + color.end) print_formatted_diff(hunk, 'current') print(color.info + marker_base + ' Base version' + color.end) print_formatted_diff(hunk, 'base') print(color.info + marker_other + ' Other version' + color.end) print_formatted_diff(hunk, 'other') print(color.info + marker_end + color.end) if action == 'x': length = file_len(current) with open(current, 'r') as c: for i,l in enumerate(c): if i in range(stop, min(stop+10, length)): print(l.rstrip('\n')) elif i >= stop: break elif action == 's': result = subline_merge_hunk(hunk) if result is not None: print(color.info + marker_start + ' Sub-line merge yields:' + color.end) if len(result): print('\n'.join([x.rstrip('\n') for x in result]).rstrip('\n')) print(color.info + marker_end + color.end + '\n') accept = ask_for_input('Accept sub-line merge', ['y','n']) success = True if accept == 'y' else False if (success): replace_nth_conflicted_hunk_with_lines(current, index-resolved, result) fixed = True break else: print(color.bold + color.error + 'Sub-line merge failed, hunk has overlapping changes' + color.end) elif action == 'm': result = manual_merge_hunk(hunk) if result is not None: print(color.info + marker_start + ' Manual resolution is:' + color.end) if len(result): print('\n'.join([x.rstrip('\n') for x in result]).rstrip('\n')) print(color.info + marker_end + color.end + '\n') accept = ask_for_input('Accept manual resolution', ['y','n']) success = True if accept == 'y' else False if (success): replace_nth_conflicted_hunk_with_lines(current, index-resolved, result) fixed = True break else: print(color.bold + color.warning + 'User cancelled manual resolve' + color.end) elif action in ['c','b','o']: idx = ['c','b','o'].index(action) result = hunk[idx].readlines() branchname = ['Current','Base','Other'][idx] print(color.info + marker_start + ' ' + branchname + ' version is:' + color.end) if len(result): print('\n'.join([x.rstrip('\n') for x in result]).rstrip('\n')) print(color.info + marker_end + color.end + '\n') warn = ['the other branch', 'both branches', 'the current branch'][idx] print(color.bold + color.warning + 'Warning: this will discard changes on ' + warn + '!' + color.end + '\n') accept = ask_for_input('Accept', ['y','n']) success = True if accept == 'y' else False if (success): replace_nth_conflicted_hunk_with_lines(current, index-resolved, result) fixed = True break elif action == 'k': break elif action == 'q': abort = True break # non-interactive mode else: for f in hunk: f.seek(0) result = subline_merge_hunk(hunk) if result is not None: replace_nth_conflicted_hunk_with_lines(current, index-resolved, result) print(color.bold + color.success + 'git-subline-merge resolved conflict %d of %d in %s, resulting hunk was:' % (index, num_conflicts, filename) + color.end) print(color.info + marker_start + color.end) if len(result): print('\n'.join([x.rstrip('\n') for x in result]).rstrip('\n')) print(color.info + marker_end + color.end) fixed = True else: print(color.bold + color.error + 'git-subline-merge failed to resolve conflict %d of %d in %s, hunk has overlapping changes' % (index, num_conflicts, filename) + color.end) # hunk too large else: print(color.bold + color.info + '\nSkipping hunk %d of %d (spans %d/%d/%d lines) in %s, too large...' % (index, num_conflicts, sizes[0], sizes[1], sizes[2], filename) + color.end) return fixed, abort # attempt to do sub-line merging of a conflicted hunk def subline_merge_hunk(hunk): result = None fs = [None, None, None] # separate each character in a string by a newline character # on Windows the temp file can't be open twice, so we have to close it before calling # git-merge-file and delete it manually later for i in range(3): h = hunk[i].read() fs[i] = tempfile.NamedTemporaryFile(mode='w+', delete=False) fs[i].write('\n'.join(h[i:i+1] for i in range(len(h)))) fs[i].close() # attempt merge on separated text status = os.system('git merge-file %s %s %s' % (fs[0].name, fs[1].name, fs[2].name)) num_conflicts = get_exit_status(status) # if successful, take merge result after removing newlines # split into a list of lines if (num_conflicts == 0): with open(fs[0].name, 'r+') as f0: h = f0.read() result = h[0::2].splitlines(True) for f in fs: os.unlink(f.name) return result # open an editor for manual merging of a conflicted hunk def manual_merge_hunk(hunk): # prepare temp file for editing f = tempfile.NamedTemporaryFile(mode='w+', delete=False) f.write(marker_start + ' Current version is:\n') f.write(hunk[0].read()) f.write(marker_base + ' Base version is:\n') f.write(hunk[1].read()) f.write(marker_other + ' Other version is:\n') f.write(hunk[2].read()) f.write(marker_end + '\n') f.close() # imitate the process that Git uses to determine which editor to use editor = os.getenv('GIT_EDITOR') if editor is None: editor = os.popen('git config core.editor').read().rstrip('\n') or None if editor is None: editor = os.getenv('VISUAL') if editor is None: editor = os.getenv('EDITOR') if editor is None: editor = 'vi' # known issue with vim on Mac OS X: can give nonzero exit status even when quit cleanly due to # errors in .vimrc, so check its normal exit code first. success_code = 0 if editor == 'vi': success_code = call(['vi -c "q" %s' % f.name], shell=(not WINDOWS), stdin=sys.stdin) status = call(['%s %s' % (editor, f.name)], shell=(not WINDOWS), stdin=sys.stdin) # if editor returned without an error, return the saved lines result = None if status <= success_code: with open(f.name, 'r') as f: result = f.readlines() os.unlink(f.name) return result ############ ### MAIN ### ############ # invoked by git if len(sys.argv) == 6: # arguments passed in by git: # [1] name of temp file containing base version of file # [2] name of temp file containing current version of file # [3] name of temp file containing other version of file # [4] length of conflict markers (default is 7 for <<<<<<<) # [5] name of conflicted file # the result should be left in the current version (argv[2]) base, current, other, marker_len, filename = sys.argv[1:] marker_len = int(marker_len) marker_start = '<' * marker_len marker_base = '|' * marker_len marker_other = '=' * marker_len marker_end = '>' * marker_len # run git merge on the files using diff3 (result is written to 'current') # the number of conflicts is encoded in the exit status of the git-merge-file command status = os.system('git merge-file --diff3 --marker-size=%d -L "Current version" -L "Base version" -L "Other version" %s %s %s' % (marker_len, current, base, other)) num_conflicts = get_exit_status(status) # invoked manually on a single file elif len(sys.argv) >= 2 and os.path.isfile(sys.argv[1]): # first argument is path to file current = sys.argv[1] filename = current # optional second argument for conflict marker size (default is 7 for <<<<<<<) try: marker_len = int(sys.argv[2]) except ValueError: print('Error: unable to read conflict marker size from argument "%s".' % (sys.argv[2])) sys.exit(1) except IndexError: pass finally: marker_len = 7; # extra arguments are ignored if len(sys.argv) > 3: print('Warning: ignoring extra arguments (%s).' % (', '.join(sys.argv[3:]))) marker_start = '<' * marker_len marker_base = '|' * marker_len marker_other = '=' * marker_len marker_end = '>' * marker_len # use grep to find number of conflicts in the file num_start = int(os.popen('grep -c "^%s" "%s"' % (marker_start, current)).read()) num_base = int(os.popen('grep -c "^%s" "%s"' % (marker_base, current)).read()) num_other = int(os.popen('grep -c "^%s" "%s"' % (marker_other, current)).read()) num_end = int(os.popen('grep -c "^%s" "%s"' % (marker_end, current)).read()) # check that the number of conflict markers match # if they don't, probably the merge wasn't generated using diff3, and we can't resolve conflicts without the base version if num_start == num_base == num_other == num_end: num_conflicts = num_start else: print('Badly formatted conflicts found in file. Possibly you need to change your conflictstyle to diff3?') sys.exit(1) # otherwise show help text else: print("usage: git-subline-merge [conflict-marker-size]\n" " : path to file with conflicts (must be generated using diff3 style, see README)\n" " [conflict-marker-size] (optional): number of characters used in conflict markers (default 7)") sys.exit(1) # sometimes (e.g. during interactive rebase) this script is called even though # there are no conflicts - in that case just exit successfully here if num_conflicts == 0: sys.exit(0) # welcome message if interactive: print(color.welcome + '\ngit-subline-merge v1.0\n' + color.end) # make temporary files # one is a copy of the conflicted 'current', to iterate with # three are used to hold the different versions of each conflicted hunk # open with 'a' so we can append without worrying about newline characters tmp = tempfile.TemporaryFile(mode='w+') with open(current, 'r') as c: tmp.write(c.read()) tmp.seek(0) hunk = [ tempfile.NamedTemporaryFile(mode='a+'), tempfile.NamedTemporaryFile(mode='a+'), tempfile.NamedTemporaryFile(mode='a+') ] # process file line by line resolved = zone = index = 0 for line in tmp: # beginning of conflict zone if line.startswith(marker_start): zone = 1 for f in hunk: f.truncate(0) f.seek(0) # base version of conflict elif line.startswith(marker_base): zone = 2 # other version of conflict elif line.startswith(marker_other): zone = 3 # end of conflict zone elif line.startswith(marker_end): zone = 0 index += 1 fixed, abort = process_hunk(hunk, index, resolved, num_conflicts) if fixed: resolved += 1 elif abort: break # append line to appropriate hunk zone elif zone: hunk[zone-1].write(line) # close all temp files (will be automatically deleted) tmp.close() for h in hunk: h.close() # print resolution newline = '\n' if interactive else '' col = color.success if resolved == num_conflicts else color.warning if resolved > 0 else color.error print(col + color.bold + newline + 'Resolved %d of %d conflicts in %s' % (resolved, num_conflicts, filename) + newline + color.end) # only exit with success if all conflicts were resolved sys.exit(0 if resolved == num_conflicts else 1)