-
Notifications
You must be signed in to change notification settings - Fork 0
/
ferrit.py
425 lines (330 loc) · 12.1 KB
/
ferrit.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
import argparse
import json
import os
import re
import sys
from functools import partial
from subprocess import DEVNULL, PIPE, CalledProcessError, run
from urllib.parse import quote, urljoin, urlparse
import requests
import urllib3
from pkg_resources import get_distribution
from requests_futures.sessions import FuturesSession
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
try:
__version__ = get_distribution(__name__).version
except Exception:
__version__ = None
GITARGS = ["git", "-c", "advice.detachedHead=false"]
class Ferrit:
SSL_VERIFY = True
REMOTE_NAME = "origin"
RES_START = ")]}'\n"
def __init__(self):
self._user_name_map = None
def setup(self):
try:
p = run(
[*GITARGS, "rev-parse", "--show-toplevel"],
stdout=PIPE,
check=True,
)
except CalledProcessError as e:
sys.exit(e.returncode)
repo_dir = p.stdout.decode("utf-8").strip()
os.chdir(repo_dir)
try:
p = run(
[*GITARGS, "remote", "get-url", self.REMOTE_NAME],
stdout=PIPE,
check=True,
)
except CalledProcessError as e:
sys.exit(e.returncode)
self.remote_url = p.stdout.decode("utf-8").strip()
o = urlparse(self.remote_url)
if not o.path.startswith("/a/"):
self.crash("unexpected remote url format (not a gerrit remote?)")
self.repo_name = o.path[len("/a/"):]
req = {
"protocol": "https",
"path": o.path,
}
try:
req["user"], req["host"] = o.netloc.split("@")
except ValueError:
req["host"] = o.netloc
try:
p = run(
[*GITARGS, "credential", "fill"],
input="\n".join(f"{k}={v}" for k, v in req.items()).encode(),
stdout=PIPE,
check=True,
)
except CalledProcessError as e:
sys.exit(e.returncode)
res = dict(e.split("=", 1) for e in p.stdout.decode().splitlines())
username = res["username"]
password = quote(res["password"], safe="")
host = res["host"]
self.api_base_url = f'https://{username}:{password}@{host}/a/'
def run(self):
self.setup()
parser = argparse.ArgumentParser()
parser.add_argument(
"--version",
action="version",
version="%(prog)s v{}".format(__version__ or "?"),
)
subparsers = parser.add_subparsers(dest="command")
subparsers.required = True # not in call due to bug in argparse
cmds = [
("fetch", ["fe"]),
("checkout", ["ch"]),
("cherry-pick", ["cp"]),
("show", ["sh"]),
("rev-parse", ["sha", "id"]),
]
for name, aliases in cmds:
subparser = subparsers.add_parser(name, aliases=aliases)
subparser.add_argument("number", type=ChangeNum)
subparser.set_defaults(func=partial(self.fetch_and_cmd, name))
dashboard_parser = subparsers.add_parser("dashboard", aliases=["da"])
dashboard_parser.set_defaults(func=self.run_dashboard)
search_parser = subparsers.add_parser("search", aliases=["se"])
search_parser.add_argument("query", nargs="+")
search_parser.set_defaults(func=self.run_search)
args = parser.parse_args()
args.func(args)
def fetch_and_cmd(self, cmd_name, args):
num = args.number
_, patch_set = self.get_change_and_patch_set(num.change, num.patch_set)
if cmd_name == "rev-parse":
print(patch_set["__sha"])
return
self.fetch(patch_set)
if cmd_name != "fetch":
print()
run([*GITARGS, cmd_name, "FETCH_HEAD"])
def get_change_and_patch_set(self, change_num, patch_set_num=None):
change = self.api_get_change(change_num)
if change is None:
self.crash("change not found")
patch_sets = self.get_ordered_patch_sets(change)
if patch_set_num is None:
patch_set = patch_sets[-1]
else:
try:
patch_set = patch_sets[patch_set_num - 1]
except IndexError:
self.crash("patch set not found")
return change, patch_set
def fetch(self, patch_set):
fetch_info = patch_set["fetch"]["http"]
url = fetch_info["url"]
ref = fetch_info["ref"]
if urlparse(url).path != urlparse(self.remote_url).path:
self.crash("fetch url mismatch (wrong repo?)")
sha = patch_set["__sha"]
try:
p = run(
[*GITARGS, "cat-file", "-t", sha],
stdout=PIPE,
stderr=DEVNULL,
check=True,
)
assert p.stdout.decode("utf-8").strip() == "commit"
except (CalledProcessError, AssertionError):
pass
else:
run([*GITARGS, "update-ref", "FETCH_HEAD", sha], check=True)
print("ferrit: already fetched, manually setting FETCH_HEAD")
return
try:
run([*GITARGS, "fetch", url, ref], stdout=PIPE, check=True)
except CalledProcessError:
sys.exit(1)
def fetch_and_checkout(self, patch_set):
self.fetch(patch_set)
run([*GITARGS, "checkout", "FETCH_HEAD"])
def run_dashboard(self, args):
self.run_list_changes()
def run_search(self, args):
qs = ["status:open"] + args.query
changes = self.api_get_changes(qs)
if len(changes) == 0:
print("No changes")
else:
print()
self.print_changes(changes)
print()
suggested_change = changes[0]
default = len(changes) == 1
if default:
do_checkout = self.yn_question("Checkout?", True)
else:
q = "Checkout change {}?".format(suggested_change["_number"])
do_checkout = self.yn_question(q, False)
if do_checkout:
patch_set = self.get_ordered_patch_sets(suggested_change)[-1]
self.fetch_and_checkout(patch_set)
def run_list_changes(self):
base_qs = ["status:open", "-is:ignored"]
querys = [
("Your Turn", ["attention:self"]),
("Work in progress", ["is:open", "owner:self", "is:wip"]),
("Outgoing reviews", ["is:open", "owner:self", "-is:wip", "-is:ignored"]),
("Incoming reviews", [
"is:open",
"-owner:self",
"-is:wip",
"-is:ignored",
"(reviewer:self+OR+assignee:self)",
]),
("CCed on", ["is:open", "-is:ignored", "cc:self"]),
]
paths = [self.api_path_for_changes(base_qs + qs) for _, qs in querys]
out = [""]
changess = self.api_get_session(paths)
for changes, (label, qs) in zip(changess, querys):
out.append(label + ":")
if changes:
for change in changes:
out.append(self.change_str(change))
else:
out.append(" No changes")
out.append("")
print("\n".join(out))
def print_change(self, change):
print(self.change_str(change))
def change_str(self, change):
num = change["_number"]
wip = change.get("work_in_progress", False)
private = change.get("is_private", False)
patch_sets = self.get_number_of_patch_sets(change)
if wip or private:
merge_symbol = "-"
elif change.get("mergeable"):
merge_symbol = " "
else:
merge_symbol = "M"
owner_name = self.user_name_map[change["owner"]["_account_id"]]
shown_subject = change["subject"]
if len(shown_subject) > 54:
shown_subject = shown_subject[:50] + " ..."
s = "{n:5} {k:3} {p} {w} {m} {o:<3} {s}".format(
n=num,
k=patch_sets,
w=("W" if wip else " "),
p=("P" if private else " "),
m=merge_symbol,
o=owner_name,
s=shown_subject,
)
return s
def print_changes(self, changes):
print("\n".join([self.change_str(c) for c in changes]))
def api_get(self, path):
url = urljoin(self.api_base_url, path)
r = requests.get(url, verify=self.SSL_VERIFY)
if r.status_code == 200:
pass
elif r.status_code == 404:
return None
else:
self.crash("bad response: {} ({})".format(r.status_code, r.text))
assert r.text.startswith(self.RES_START)
return json.loads(r.text[len(self.RES_START):])
def api_get_session(self, paths):
session = FuturesSession()
futures = []
for path in paths:
url = urljoin(self.api_base_url, path)
future = session.get(url, verify=self.SSL_VERIFY)
futures.append(future)
try:
results = [future.result() for future in futures]
except requests.exceptions.ConnectionError:
self.crash("connection error")
ret = []
for r in results:
if r.status_code == 200:
assert r.text.startswith(self.RES_START)
d = json.loads(r.text[len(self.RES_START):])
elif r.status_code == 404:
d = None
else:
self.crash("bad response: {} ({})".format(r.status_code, r.text))
ret.append(d)
return ret
def api_get_change(self, change_num):
path = "changes/{}/?o=ALL_REVISIONS".format(change_num)
change = self.api_get(path)
if change:
self.add_info_to_change(change)
return change
def api_path_for_changes(self, qs):
qs = qs + ["repo:" + self.repo_name]
qs = list(set(qs))
return "changes/?o=ALL_REVISIONS&q=" + "+".join(qs)
def api_get_changes(self, qs):
qs.append("repo:" + self.repo_name)
qs = list(set(qs))
path = "changes/?o=ALL_REVISIONS&q=" + "+".join(qs)
changes = self.api_get(path)
for change in changes:
self.add_info_to_change(change)
return changes
def get_ordered_patch_sets(self, change):
patch_sets = list(change["revisions"].values())
patch_sets.sort(key=lambda ps: ps["_number"])
return patch_sets
def get_number_of_patch_sets(self, change):
return len(self.get_ordered_patch_sets(change))
@property
def user_name_map(self):
if self._user_name_map is None:
self._user_name_map = self.api_get_user_name_map()
return self._user_name_map
def api_get_user_name_map(self):
r = self.api_get("accounts/?o=DETAILS&q=is:active")
user_map = {d["_account_id"]: self.initials(d["name"]) for d in r}
return user_map
def add_info_to_change(self, change):
patch_sets = change["revisions"]
for sha, patch_set in patch_sets.items():
patch_set["__sha"] = sha
def initials(self, s):
s = s.strip().upper().replace("-", " ")
ws = s.split()
return "".join([w[0] for w in ws])
def yn_question(self, msg, default=False):
suffix = "Y/n" if default else "y/N"
suffix = " [{}] ".format(suffix)
try:
inp = input(msg + suffix).strip().lower()
except KeyboardInterrupt:
print()
self.quit()
if inp:
return "yes".startswith(inp)
else:
return default
def quit(self):
sys.exit(0)
def crash(self, msg):
sys.stderr.write("error: " + str(msg) + "\n")
sys.exit(1)
class ChangeNum:
def __init__(self, s):
pattern = r"(\d+)(?:\/(\d+))?"
match = re.fullmatch(pattern, s.strip())
if not match:
raise ValueError
groups = match.groups()
self.change = int(groups[0])
self.patch_set = None if groups[1] is None else int(groups[1])
def main():
Ferrit().run()
if __name__ == "__main__":
main()