#!/usr/bin/env python3 """Gitea API Client for OpenClaw (REST, no tea CLI required).""" import argparse import json import os import subprocess import sys import urllib.error import urllib.request SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) def candidate_config_paths(): paths = [] current = SCRIPT_DIR while True: paths.append(os.path.join(current, ".clawdbot", "credentials", "gitea", "config.json")) parent = os.path.dirname(current) if parent == current: break current = parent paths.extend([ "/home/node/.openclaw/workspace/.clawdbot/credentials/gitea/config.json", os.path.expanduser("~/.clawdbot/credentials/gitea/config.json"), ]) # Deduplicate while preserving order return list(dict.fromkeys(paths)) CONFIG_PATHS = candidate_config_paths() def get_config(): config = {} for path in CONFIG_PATHS: if os.path.exists(path): try: with open(path, "r", encoding="utf-8") as f: config = json.load(f) break except (OSError, json.JSONDecodeError): pass if os.getenv("GITEA_URL"): config["url"] = os.getenv("GITEA_URL").rstrip("/") if os.getenv("GITEA_TOKEN"): config["token"] = os.getenv("GITEA_TOKEN") if "url" not in config: print("❌ Gitea URL not configured.") print("Set GITEA_URL or create ~/.clawdbot/credentials/gitea/config.json") sys.exit(1) if "token" not in config: print("❌ Gitea token not configured.") print("Set GITEA_TOKEN or add token to ~/.clawdbot/credentials/gitea/config.json") sys.exit(1) return config def api_request(endpoint, method="GET", payload=None): config = get_config() url = f"{config['url']}/api/v1{endpoint}" data = json.dumps(payload).encode("utf-8") if payload is not None else None req = urllib.request.Request( url, data=data, method=method, headers={ "Authorization": f"token {config['token']}", "Content-Type": "application/json", "Accept": "application/json", }, ) try: with urllib.request.urlopen(req) as resp: body = resp.read().decode("utf-8") or "{}" parsed = json.loads(body) return parsed, resp.status, None except urllib.error.HTTPError as e: body = e.read().decode("utf-8") if e.fp else "" try: parsed = json.loads(body) if body else {} msg = parsed.get("message", body) except Exception: msg = body or str(e) return None, e.code, msg except Exception as e: return None, None, str(e) def api_get_all_pages(endpoint, page_size=100): page = 1 items = [] while True: sep = "&" if "?" in endpoint else "?" paged_endpoint = f"{endpoint}{sep}limit={page_size}&page={page}" data, status, err = api_request(paged_endpoint) if data is None: return None, status, err if not isinstance(data, list): return data, status, err items.extend(data) if len(data) < page_size: break page += 1 return items, 200, None def print_api_error(action, status, err): print(f"❌ Failed to {action}. status={status} error={err}") def cmd_repos(_): repos, status, err = api_get_all_pages("/user/repos") if repos is None: print_api_error("list repos", status, err) return 1 if not repos: print("No repositories found.") return 0 print(f"📦 Repositories ({len(repos)}):\n") for i, repo in enumerate(repos, 1): private = "🔒" if repo.get("private") else "🌍" stars = repo.get("stars_count", 0) desc = (repo.get("description") or "").strip() print(f"{i}. {private} {repo.get('full_name','?')}") if desc: print(f" {desc}") print(f" ⭐ {stars} Updated: {repo.get('updated_at','')[:19]}") return 0 def cmd_create(args): me, status, err = api_request("/user") if me is None: print_api_error("get current user", status, err) return 1 owner = args.org or me["login"] endpoint = f"/org/{owner}/repos" if args.org else "/user/repos" payload = { "name": args.name, "description": args.description or "", "private": args.private, "auto_init": True, } repo, status, err = api_request(endpoint, method="POST", payload=payload) if repo is None: print_api_error("create repo", status, err) return 1 base = get_config()["url"] print(f"✅ Repository created: {repo.get('full_name')}") print(f"URL: {base}/{repo.get('full_name')}") return 0 def cmd_issues(args): issues, status, err = api_request(f"/repos/{args.owner}/{args.repo}/issues") if issues is None: print_api_error("list issues", status, err) return 1 if not issues: print("No issues found.") return 0 for issue in issues: print(f"#{issue['number']} [{issue.get('state')}] {issue.get('title')}") return 0 def cmd_issue(args): payload = {"title": args.title, "body": args.body or ""} issue, status, err = api_request( f"/repos/{args.owner}/{args.repo}/issues", method="POST", payload=payload ) if issue is None: print_api_error("create issue", status, err) return 1 base = get_config()["url"] print(f"✅ Issue created: #{issue['number']} - {issue['title']}") print(f"URL: {base}/{args.owner}/{args.repo}/issues/{issue['number']}") return 0 def cmd_pulls(args): pulls, status, err = api_request(f"/repos/{args.owner}/{args.repo}/pulls") if pulls is None: print_api_error("list pull requests", status, err) return 1 if not pulls: print("No pull requests found.") return 0 for pr in pulls: print(f"#{pr['number']} [{pr.get('state')}] {pr.get('title')}") return 0 def cmd_pr(args): payload = { "title": args.title, "head": args.head, "base": args.base, "body": args.body or "", } pr, status, err = api_request( f"/repos/{args.owner}/{args.repo}/pulls", method="POST", payload=payload ) if pr is None: print_api_error("create pull request", status, err) return 1 base = get_config()["url"] print(f"✅ Pull request created: #{pr['number']} - {pr['title']}") print(f"URL: {base}/{args.owner}/{args.repo}/pulls/{pr['number']}") return 0 def cmd_release(args): payload = { "tag_name": args.tag, "name": args.name or args.tag, "body": args.body or "", } rel, status, err = api_request( f"/repos/{args.owner}/{args.repo}/releases", method="POST", payload=payload ) if rel is None: print_api_error("create release", status, err) return 1 base = get_config()["url"] print(f"✅ Release created: {rel.get('tag_name')}") print(f"URL: {base}/{args.owner}/{args.repo}/releases/tag/{rel.get('tag_name')}") return 0 def cmd_branches(args): branches, status, err = api_request(f"/repos/{args.owner}/{args.repo}/branches") if branches is None: print_api_error("list branches", status, err) return 1 if not branches: print("No branches found.") return 0 for b in branches: sha = (b.get("commit") or {}).get("id", "")[:8] print(f"{b.get('name')} ({sha})") return 0 def cmd_user(args): endpoint = f"/users/{args.username}" if args.username else "/user" user, status, err = api_request(endpoint) if user is None: print_api_error("get user", status, err) return 1 print(f"👤 {user.get('login')}") print(f"Email: {user.get('email') or 'N/A'}") print(f"Public repos: {user.get('public_repos_count', 0)}") return 0 def cmd_clone(args): base = get_config()["url"] target = args.dest or "." url = f"{base}/{args.owner_repo}.git" print(f"Cloning {url} -> {target}") rc = subprocess.call(["git", "clone", url, target]) return rc def build_parser(): p = argparse.ArgumentParser(description="Gitea REST CLI") sub = p.add_subparsers(dest="cmd", required=True) sub.add_parser("repos") c = sub.add_parser("create") c.add_argument("--name", required=True) c.add_argument("--description") c.add_argument("--private", action="store_true") c.add_argument("--org") i = sub.add_parser("issues") i.add_argument("--owner", required=True) i.add_argument("--repo", required=True) ic = sub.add_parser("issue") ic.add_argument("--owner", required=True) ic.add_argument("--repo", required=True) ic.add_argument("--title", required=True) ic.add_argument("--body") pls = sub.add_parser("pulls") pls.add_argument("--owner", required=True) pls.add_argument("--repo", required=True) pr = sub.add_parser("pr") pr.add_argument("--owner", required=True) pr.add_argument("--repo", required=True) pr.add_argument("--head", required=True) pr.add_argument("--base", required=True) pr.add_argument("--title", required=True) pr.add_argument("--body") r = sub.add_parser("release") r.add_argument("--owner", required=True) r.add_argument("--repo", required=True) r.add_argument("--tag", required=True) r.add_argument("--name") r.add_argument("--body") b = sub.add_parser("branches") b.add_argument("--owner", required=True) b.add_argument("--repo", required=True) u = sub.add_parser("user") u.add_argument("username", nargs="?") cl = sub.add_parser("clone") cl.add_argument("owner_repo") cl.add_argument("dest", nargs="?") return p def main(): parser = build_parser() args = parser.parse_args() handlers = { "repos": cmd_repos, "create": cmd_create, "issues": cmd_issues, "issue": cmd_issue, "pulls": cmd_pulls, "pr": cmd_pr, "release": cmd_release, "branches": cmd_branches, "user": cmd_user, "clone": cmd_clone, } raise SystemExit(handlers[args.cmd](args)) if __name__ == "__main__": main()