Source code for gitmeta
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Check multiple git repositories' status
Usage:
git-meta [-dc] [-a|-o|-n|-k|-r|-?|--no-remote] [-t]
Options:
-d, --discover Look for new git repositories
-c, --clean If a non-valid repository is encountered, it is removed from
the list
-a, --all Display all repositories
-o, --ok Display only repositories where everithing is fine
-n, --nok Display only repositories where there is something happening
-k, --ko Display only repositories where the working tree status is not
clean
-r, --remote Display only repositories needing some pushes with their
remotes
--no-remote List repositories having no remote set
-?, --unknown Display only repositories in unknown state
-t, --terminal Open terminals for each selected repository
-h, --help Show this help
--version Display the version of git-meta
--pdb Launch debugger when crashing
"""
__version__ = "0.2"
import os
import re
import git
import glob
import logging
import subprocess
from pathlib import Path
from appdirs import user_cache_dir
log = logging.getLogger(__name__)
def pm_on_crash(type, value, tb):
"""Exception hook, in order to start pdb when an exception occurs"""
import pdb
import traceback
traceback.print_exception(type, value, tb)
pdb.pm()
class TagStr(str):
"""Tagged strings
Allow you to decorate a bit your terminal output in a html fashion.
Of course everything is not ported.
Example:
.. code-block:: python
TagStr("Hello, I'm a <u>decorated</u> string")
TagStr("What a coincidence, <color=green>me too</color>")
"""
_shell = {
"bold": ["\033[1m", "\033[21m"],
"b": ["\033[1m", "\033[21m"],
"underlined": ["\033[4m", "\033[24m"],
"u": ["\033[4m", "\033[24m"],
"reverse": ["\033[7m", "\033[27m"],
"color": {
"default": "\033[39m",
"end": "\033[39m",
"black": "\033[30m",
"darkred": "\033[31m",
"darkgreen": "\033[32m",
"darkyellow": "\033[33m",
"darkblue": "\033[34m",
"darkmagenta": "\033[35m",
"darkcyan": "\033[36m",
"darkgray": "\033[90m",
"gray": "\033[37m",
"red": "\033[91m",
"green": "\033[92m",
"yellow": "\033[93m",
"blue": "\033[94m",
"magenta": "\033[95m",
"cyan": "\033[96m",
"white": "\033[97m",
},
"end": "\033[0m",
}
def __add__(self, elem):
"""Concatenation handling
>>> a = TagStr("Hello ")
>>> b = "World"
>>> type(a+b) is TagStr
True
>>> c = TagStr("World")
>>> type(a+c) is TagStr
True
>>> a + 1
Traceback (most recent call last):
...
TypeError: TagStr concatenate only with str or TagStr
"""
if not isinstance(elem, (str, self.__class__)):
raise TypeError("TagStr concatenate only with str or TagStr")
return self.__class__(str(self) + str(elem))
def __radd__(self, elem):
"""Concatenation handling
>>> a = "World"
>>> b = TagStr("Hello ")
>>> type(a+b) is TagStr
True
>>> c = TagStr("World")
>>> type(c+b) is TagStr
True
>>> 1 + b
Traceback (most recent call last):
...
TypeError: TagStr concatenate only with str or TagStr
"""
if not isinstance(elem, (str, self.__class__)):
raise TypeError("TagStr concatenate only with str or TagStr")
return self.__class__(str(elem) + str(self))
def shell(self):
"""
>>> test_str = TagStr('Here is a <u>test</u> string')
>>> test_str
'Here is a <u>test</u> string'
>>> test_str.shell()
'Here is a \\x1b[4mtest\\x1b[24m string'
Returns:
str: Convert the tags to a decorated shell output.
"""
text = str(self)
for tag, code in self._shell.items():
if isinstance(code, dict):
closing = self._shell["end"]
# Determination of the closing tag
if "end" in code.keys():
closing = self._shell[tag]["end"]
for tag2, code2 in code.items():
regex = re.compile(
r"(<{0}={1}>)(.*?)(</{0}>)".format(tag, tag2), re.DOTALL
)
text = regex.sub(r"{0}\2{1}".format(code2, closing), text)
else:
regex = re.compile(r"(<{0}>)(.*?)(</{0}>)".format(tag), re.DOTALL)
if isinstance(code, list):
text = regex.sub(r"{0}\2{1}".format(*code), text)
else:
text = regex.sub(r"{0}\2{1}".format(code, self._shell["end"]), text)
return text
def empty(self):
"""
>>> test_str = TagStr('Here is a <u>test</u> string')
>>> test_str
'Here is a <u>test</u> string'
>>> test_str.empty()
'Here is a test string'
Returns:
str: Refined string without tags
"""
regex = re.compile(r"(<.*?>)")
return regex.sub("", self)
[docs]class Repo(git.Repo):
"""Class representing a repository.
Allows to perform common git commands
"""
[docs] def remote_diff(self):
"""For each branch with a remote counterpart, give the number of
commit difference
Returns:
dict: keys -> branch name
values -> number
"""
diffs = {}
for branch in self.branches:
if branch.tracking_branch() is not None:
remote = branch.tracking_branch()
base = self.merge_base(branch, remote)[0]
# From base to local branch
count_desc = len(
list(
self.iter_commits(
"{}..{}".format(base.hexsha, branch.commit.hexsha)
)
)
)
# From base to remote branch
count_asc = len(
list(
self.iter_commits(
"{}..{}".format(base.hexsha, remote.commit.hexsha)
)
)
)
# Express remote difference in the '__git_ps1' fashion
if count_asc or count_desc:
if count_asc and count_desc:
count = "{0}-{1}".format(count_desc, count_asc)
elif count_desc:
count = str(count_desc)
else:
count = str(-count_asc)
diffs[branch.name] = count
return diffs
[docs] def has_remote(self):
"""
Return:
bool: True if the repository has a remote branch defined for at least
local branch
"""
for branch in self.branches():
if branch.tracking_branch() is not None:
return True
else:
return False
[docs] def stashed(self):
"""List stashes
Returns:
bool: True if there is stashed modifications
"""
return len(self.git.stash("list")) > 0
[docs] def statusline(self, line_width=80):
"""Create the status line for the selected repository.
Returns:
string: Status line as used by git-meta
"""
form = {"filler": "", "more": []}
max_path_len = line_width - 30
template = " {path} {filler}{more} {status}"
if self.bare:
form["path"] = self.path
form["more"] = ""
form["status"] = "[<color=yellow>BARE</color>]"
else:
if len(self.working_dir) <= max_path_len + 3:
form["path"] = self.working_dir
else:
form["path"] = "..." + self.working_dir[-max_path_len:]
if not self.is_dirty():
form["status"] = "[ <color=green>OK</color> ]"
else:
form["status"] = "[ <color=red>KO</color> ]"
remote_diff = self.remote_diff()
if remote_diff:
form["more"] = ["%s:%s" % tuple(x) for x in remote_diff.items()]
if self.stashed():
form["more"].append("<color=yellow>stash</color>")
if len(form["more"]):
form["more"] = "(" + ",".join(form["more"]) + ")"
else:
form["more"] = ""
line = TagStr(template.format(**form))
form["filler"] = " " * (line_width - len(line.empty()))
line = TagStr(template.format(**form))
return line.shell()
[docs]class Meta(object):
"""Class handling the repositories database"""
def __init__(self):
self.repolist = []
self._define_paths()
# Load the database file
try:
self.read_list()
except (IOError, FileNotFoundError):
self.discover()
def _define_paths(self):
cache = user_cache_dir("gitmeta", "gitmeta")
# Default locations
self.config = {
"repolist": os.path.join(cache, "repolist.txt"),
"ignorelist": os.path.join(cache, "ignore.txt"),
"scanroot": os.environ["HOME"],
}
# Parsing of the global config file
try:
global_config = git.config.GitConfigParser(config_level="global")
except (IOError, FileNotFoundError):
pass
else:
if global_config.has_section("meta"):
for k, v in global_config.items("meta"):
self.config[k] = v
[docs] def read_list(self):
"""Read the database file to extract the paths of previously scanned
repositories
"""
with open(self.config["repolist"]) as repolist_f:
self.repolist = repolist_f.read().splitlines()
[docs] def discover(self):
"""Scan the subfolders to discover repositories"""
try:
with open(self.config["ignorelist"]) as ignore_file:
ignorelist = ignore_file.read().splitlines()
except (IOError, FileNotFoundError):
ignorelist = []
repolist = []
print(
"Discovery of repositories in {0} sub-directories".format(
self.config["scanroot"]
)
)
for root, dirs, files in os.walk(self.config["scanroot"]):
if root in ignorelist:
# we also want to ignore every subfolder
del dirs[:]
continue
# Check for globing pattern match to ignore
for ignore_path in ignorelist:
if glob.has_magic(ignore_path) and glob.fnmatch.fnmatch(
root, ignore_path
):
del dirs[:]
continue
if ".git" in dirs or "config" in files:
# It looks like a repository, but is it?
try:
repo = Repo(root)
except git.exc.GitError:
# This is not a valid git repository
continue
else:
# Success !!
repolist.append(repo.working_dir)
# In case of found repository it's not necessary
# to digg deeper.
# Thus, we avoid the struggle of handling submodules
# which are perfectly managed by the base repository.
del dirs[:]
self._write_repolist(repolist)
# Force the content of the repolist to the newly made
self.repolist = sorted(set(repolist))
def _write_repolist(self, repolist):
"""Writing all the repositories discovered to the database
file for future utilisation
Args:
repolist (list of str)
"""
os.makedirs(os.path.dirname(self.config["repolist"]), exist_ok=True)
with open(self.config["repolist"], "w+") as repofile:
repofile.write("\n".join(repolist))
def iter(self, clean=False, filter_status=None):
for path in self.repolist:
try:
repo = Repo(path)
except pygit2.GitError:
errstr = TagStr(
"<color=red>The directory\n %s\nis not a valid repository." % path
)
if clean:
errstr += " Discarded</color>"
else:
errstr += " Use the option --clean to discard it.</color>"
print(errstr.shell())
if clean:
repolist = self.repolist[:]
repolist.remove(path)
self._write_repolist(repolist)
else:
if (
filter_status in (None, "all")
or (filter_status == "OK" and not repo.is_dirty())
or (filter_status == "KO" and repo.is_dirty())
or (filter_status == "remote" and repo.remote_diff())
or (filter_status == "no-remote" and not repo.has_remote())
or (
filter_status == "NOK"
and (repo.is_dirty() or repo.remote_diff() or repo.stashed())
)
):
yield repo
[docs] def scan(self, clean=False, filter_status=None):
"""Scan all the repositories in the database for their statuses
Args:
clean (bool): If True, remove any unvalid repository from the watched list
filter_status (str): Only return repositiries having the given status.
Usable status are "OK", "KO", "remote", "NOK" and "all"
"""
try:
_, column = os.popen("stty size", "r").read().split()
line_width = int(column)
except Exception:
line_width = 80
for repo in self.iter(clean=clean, filter_status=filter_status):
print(repo.statusline(line_width))
def terminal(self, filter_status=None):
for repo in self.iter(filter_status=filter_status):
subprocess.Popen(
["gnome-terminal", "--working-directory", repo.working_dir],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
def main(): # pragma: no cover
"""Main ``git-meta`` script function """
import sys
from docopt import docopt
if "--pdb" in sys.argv:
sys.argv.remove("--pdb")
func = pm_on_crash
args = docopt(__doc__, version=__version__)
# print(args)
filter_status = "NOK" # default behaviour
if args["--all"]:
filter_status = "all"
elif args["--ok"]:
filter_status = "OK"
elif args["--nok"]:
filter_status = "NOK"
elif args["--ko"]:
filter_status = "KO"
elif args["--remote"]:
filter_status = "remote"
elif args["--unknown"]:
filter_status = "?"
elif args["--no-remote"]:
filter_status = "no-remote"
meta = Meta()
if args["--discover"]:
meta.discover()
meta.scan(filter_status=filter_status, clean=args["--clean"])
if args["--terminal"]:
meta.terminal(filter_status=filter_status)