371 lines
10 KiB
Python
Executable File
371 lines
10 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""Gitea API Client for OpenClaw (REST, no tea CLI required)."""
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
import urllib.error
|
|
import urllib.request
|
|
|
|
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
|
|
|
|
def candidate_config_paths():
|
|
paths = []
|
|
|
|
current = SCRIPT_DIR
|
|
while True:
|
|
paths.append(os.path.join(current, ".clawdbot", "credentials", "gitea", "config.json"))
|
|
parent = os.path.dirname(current)
|
|
if parent == current:
|
|
break
|
|
current = parent
|
|
|
|
paths.extend([
|
|
"/home/node/.openclaw/workspace/.clawdbot/credentials/gitea/config.json",
|
|
os.path.expanduser("~/.clawdbot/credentials/gitea/config.json"),
|
|
])
|
|
|
|
# Deduplicate while preserving order
|
|
return list(dict.fromkeys(paths))
|
|
|
|
|
|
CONFIG_PATHS = candidate_config_paths()
|
|
|
|
|
|
def get_config():
|
|
config = {}
|
|
|
|
for path in CONFIG_PATHS:
|
|
if os.path.exists(path):
|
|
try:
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
config = json.load(f)
|
|
break
|
|
except (OSError, json.JSONDecodeError):
|
|
pass
|
|
|
|
if os.getenv("GITEA_URL"):
|
|
config["url"] = os.getenv("GITEA_URL").rstrip("/")
|
|
if os.getenv("GITEA_TOKEN"):
|
|
config["token"] = os.getenv("GITEA_TOKEN")
|
|
|
|
if "url" not in config:
|
|
print("❌ Gitea URL not configured.")
|
|
print("Set GITEA_URL or create ~/.clawdbot/credentials/gitea/config.json")
|
|
sys.exit(1)
|
|
if "token" not in config:
|
|
print("❌ Gitea token not configured.")
|
|
print("Set GITEA_TOKEN or add token to ~/.clawdbot/credentials/gitea/config.json")
|
|
sys.exit(1)
|
|
|
|
return config
|
|
|
|
|
|
def api_request(endpoint, method="GET", payload=None):
|
|
config = get_config()
|
|
url = f"{config['url']}/api/v1{endpoint}"
|
|
data = json.dumps(payload).encode("utf-8") if payload is not None else None
|
|
|
|
req = urllib.request.Request(
|
|
url,
|
|
data=data,
|
|
method=method,
|
|
headers={
|
|
"Authorization": f"token {config['token']}",
|
|
"Content-Type": "application/json",
|
|
"Accept": "application/json",
|
|
},
|
|
)
|
|
|
|
try:
|
|
with urllib.request.urlopen(req) as resp:
|
|
body = resp.read().decode("utf-8") or "{}"
|
|
parsed = json.loads(body)
|
|
return parsed, resp.status, None
|
|
except urllib.error.HTTPError as e:
|
|
body = e.read().decode("utf-8") if e.fp else ""
|
|
try:
|
|
parsed = json.loads(body) if body else {}
|
|
msg = parsed.get("message", body)
|
|
except Exception:
|
|
msg = body or str(e)
|
|
return None, e.code, msg
|
|
except Exception as e:
|
|
return None, None, str(e)
|
|
|
|
|
|
def api_get_all_pages(endpoint, page_size=100):
|
|
page = 1
|
|
items = []
|
|
|
|
while True:
|
|
sep = "&" if "?" in endpoint else "?"
|
|
paged_endpoint = f"{endpoint}{sep}limit={page_size}&page={page}"
|
|
data, status, err = api_request(paged_endpoint)
|
|
if data is None:
|
|
return None, status, err
|
|
if not isinstance(data, list):
|
|
return data, status, err
|
|
|
|
items.extend(data)
|
|
if len(data) < page_size:
|
|
break
|
|
page += 1
|
|
|
|
return items, 200, None
|
|
|
|
|
|
def print_api_error(action, status, err):
|
|
print(f"❌ Failed to {action}. status={status} error={err}")
|
|
|
|
|
|
def cmd_repos(_):
|
|
repos, status, err = api_get_all_pages("/user/repos")
|
|
if repos is None:
|
|
print_api_error("list repos", status, err)
|
|
return 1
|
|
|
|
if not repos:
|
|
print("No repositories found.")
|
|
return 0
|
|
|
|
print(f"📦 Repositories ({len(repos)}):\n")
|
|
for i, repo in enumerate(repos, 1):
|
|
private = "🔒" if repo.get("private") else "🌍"
|
|
stars = repo.get("stars_count", 0)
|
|
desc = (repo.get("description") or "").strip()
|
|
print(f"{i}. {private} {repo.get('full_name','?')}")
|
|
if desc:
|
|
print(f" {desc}")
|
|
print(f" ⭐ {stars} Updated: {repo.get('updated_at','')[:19]}")
|
|
return 0
|
|
|
|
|
|
def cmd_create(args):
|
|
me, status, err = api_request("/user")
|
|
if me is None:
|
|
print_api_error("get current user", status, err)
|
|
return 1
|
|
|
|
owner = args.org or me["login"]
|
|
endpoint = f"/org/{owner}/repos" if args.org else "/user/repos"
|
|
payload = {
|
|
"name": args.name,
|
|
"description": args.description or "",
|
|
"private": args.private,
|
|
"auto_init": True,
|
|
}
|
|
|
|
repo, status, err = api_request(endpoint, method="POST", payload=payload)
|
|
if repo is None:
|
|
print_api_error("create repo", status, err)
|
|
return 1
|
|
|
|
base = get_config()["url"]
|
|
print(f"✅ Repository created: {repo.get('full_name')}")
|
|
print(f"URL: {base}/{repo.get('full_name')}")
|
|
return 0
|
|
|
|
|
|
def cmd_issues(args):
|
|
issues, status, err = api_request(f"/repos/{args.owner}/{args.repo}/issues")
|
|
if issues is None:
|
|
print_api_error("list issues", status, err)
|
|
return 1
|
|
|
|
if not issues:
|
|
print("No issues found.")
|
|
return 0
|
|
|
|
for issue in issues:
|
|
print(f"#{issue['number']} [{issue.get('state')}] {issue.get('title')}")
|
|
return 0
|
|
|
|
|
|
def cmd_issue(args):
|
|
payload = {"title": args.title, "body": args.body or ""}
|
|
issue, status, err = api_request(
|
|
f"/repos/{args.owner}/{args.repo}/issues", method="POST", payload=payload
|
|
)
|
|
if issue is None:
|
|
print_api_error("create issue", status, err)
|
|
return 1
|
|
|
|
base = get_config()["url"]
|
|
print(f"✅ Issue created: #{issue['number']} - {issue['title']}")
|
|
print(f"URL: {base}/{args.owner}/{args.repo}/issues/{issue['number']}")
|
|
return 0
|
|
|
|
|
|
def cmd_pulls(args):
|
|
pulls, status, err = api_request(f"/repos/{args.owner}/{args.repo}/pulls")
|
|
if pulls is None:
|
|
print_api_error("list pull requests", status, err)
|
|
return 1
|
|
|
|
if not pulls:
|
|
print("No pull requests found.")
|
|
return 0
|
|
|
|
for pr in pulls:
|
|
print(f"#{pr['number']} [{pr.get('state')}] {pr.get('title')}")
|
|
return 0
|
|
|
|
|
|
def cmd_pr(args):
|
|
payload = {
|
|
"title": args.title,
|
|
"head": args.head,
|
|
"base": args.base,
|
|
"body": args.body or "",
|
|
}
|
|
pr, status, err = api_request(
|
|
f"/repos/{args.owner}/{args.repo}/pulls", method="POST", payload=payload
|
|
)
|
|
if pr is None:
|
|
print_api_error("create pull request", status, err)
|
|
return 1
|
|
|
|
base = get_config()["url"]
|
|
print(f"✅ Pull request created: #{pr['number']} - {pr['title']}")
|
|
print(f"URL: {base}/{args.owner}/{args.repo}/pulls/{pr['number']}")
|
|
return 0
|
|
|
|
|
|
def cmd_release(args):
|
|
payload = {
|
|
"tag_name": args.tag,
|
|
"name": args.name or args.tag,
|
|
"body": args.body or "",
|
|
}
|
|
rel, status, err = api_request(
|
|
f"/repos/{args.owner}/{args.repo}/releases", method="POST", payload=payload
|
|
)
|
|
if rel is None:
|
|
print_api_error("create release", status, err)
|
|
return 1
|
|
|
|
base = get_config()["url"]
|
|
print(f"✅ Release created: {rel.get('tag_name')}")
|
|
print(f"URL: {base}/{args.owner}/{args.repo}/releases/tag/{rel.get('tag_name')}")
|
|
return 0
|
|
|
|
|
|
def cmd_branches(args):
|
|
branches, status, err = api_request(f"/repos/{args.owner}/{args.repo}/branches")
|
|
if branches is None:
|
|
print_api_error("list branches", status, err)
|
|
return 1
|
|
|
|
if not branches:
|
|
print("No branches found.")
|
|
return 0
|
|
|
|
for b in branches:
|
|
sha = (b.get("commit") or {}).get("id", "")[:8]
|
|
print(f"{b.get('name')} ({sha})")
|
|
return 0
|
|
|
|
|
|
def cmd_user(args):
|
|
endpoint = f"/users/{args.username}" if args.username else "/user"
|
|
user, status, err = api_request(endpoint)
|
|
if user is None:
|
|
print_api_error("get user", status, err)
|
|
return 1
|
|
|
|
print(f"👤 {user.get('login')}")
|
|
print(f"Email: {user.get('email') or 'N/A'}")
|
|
print(f"Public repos: {user.get('public_repos_count', 0)}")
|
|
return 0
|
|
|
|
|
|
def cmd_clone(args):
|
|
base = get_config()["url"]
|
|
target = args.dest or "."
|
|
url = f"{base}/{args.owner_repo}.git"
|
|
print(f"Cloning {url} -> {target}")
|
|
rc = subprocess.call(["git", "clone", url, target])
|
|
return rc
|
|
|
|
|
|
def build_parser():
|
|
p = argparse.ArgumentParser(description="Gitea REST CLI")
|
|
sub = p.add_subparsers(dest="cmd", required=True)
|
|
|
|
sub.add_parser("repos")
|
|
|
|
c = sub.add_parser("create")
|
|
c.add_argument("--name", required=True)
|
|
c.add_argument("--description")
|
|
c.add_argument("--private", action="store_true")
|
|
c.add_argument("--org")
|
|
|
|
i = sub.add_parser("issues")
|
|
i.add_argument("--owner", required=True)
|
|
i.add_argument("--repo", required=True)
|
|
|
|
ic = sub.add_parser("issue")
|
|
ic.add_argument("--owner", required=True)
|
|
ic.add_argument("--repo", required=True)
|
|
ic.add_argument("--title", required=True)
|
|
ic.add_argument("--body")
|
|
|
|
pls = sub.add_parser("pulls")
|
|
pls.add_argument("--owner", required=True)
|
|
pls.add_argument("--repo", required=True)
|
|
|
|
pr = sub.add_parser("pr")
|
|
pr.add_argument("--owner", required=True)
|
|
pr.add_argument("--repo", required=True)
|
|
pr.add_argument("--head", required=True)
|
|
pr.add_argument("--base", required=True)
|
|
pr.add_argument("--title", required=True)
|
|
pr.add_argument("--body")
|
|
|
|
r = sub.add_parser("release")
|
|
r.add_argument("--owner", required=True)
|
|
r.add_argument("--repo", required=True)
|
|
r.add_argument("--tag", required=True)
|
|
r.add_argument("--name")
|
|
r.add_argument("--body")
|
|
|
|
b = sub.add_parser("branches")
|
|
b.add_argument("--owner", required=True)
|
|
b.add_argument("--repo", required=True)
|
|
|
|
u = sub.add_parser("user")
|
|
u.add_argument("username", nargs="?")
|
|
|
|
cl = sub.add_parser("clone")
|
|
cl.add_argument("owner_repo")
|
|
cl.add_argument("dest", nargs="?")
|
|
|
|
return p
|
|
|
|
|
|
def main():
|
|
parser = build_parser()
|
|
args = parser.parse_args()
|
|
|
|
handlers = {
|
|
"repos": cmd_repos,
|
|
"create": cmd_create,
|
|
"issues": cmd_issues,
|
|
"issue": cmd_issue,
|
|
"pulls": cmd_pulls,
|
|
"pr": cmd_pr,
|
|
"release": cmd_release,
|
|
"branches": cmd_branches,
|
|
"user": cmd_user,
|
|
"clone": cmd_clone,
|
|
}
|
|
raise SystemExit(handlers[args.cmd](args))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|