api/src/routes/canopy.tsblame
View source
80fafdf1import type { FastifyInstance } from "fastify";
80fafdf2import { z } from "zod";
80fafdf3import type { CanopyRunner } from "../services/canopy-runner.js";
5bcd5db4import type { CanopyEventBus, CanopyEvent } from "../services/canopy-events.js";
80fafdf5
791afd46function lookupRepo(db: any, repo: string) {
80fafdf7 return db
791afd48 .prepare(`SELECT id, name, default_branch FROM repos WHERE name = ?`)
791afd49 .get(repo) as any;
80fafdf10}
80fafdf11
1da987412export async function canopyGlobalRoutes(app: FastifyInstance) {
1da987413 const db = (app as any).db;
5bcd5db14 const eventBus = (app as any).canopyEventBus as CanopyEventBus | undefined;
5bcd5db15
5bcd5db16 // SSE endpoint for live updates
5bcd5db17 app.get<{
5bcd5db18 Querystring: { scope?: string; runId?: string; owner?: string; repo?: string };
5bcd5db19 }>("/canopy/events", async (request, reply) => {
5bcd5db20 const { scope = "global", runId, owner, repo } = request.query;
5bcd5db21
5bcd5db22 // Resolve owner/repo to repoId for repo-scoped filtering
5bcd5db23 let repoId: number | null = null;
5bcd5db24 if (scope === "repo" && repo) {
5bcd5db25 const repoRow = lookupRepo(db, repo);
5bcd5db26 if (repoRow) repoId = repoRow.id;
5bcd5db27 }
5bcd5db28
5bcd5db29 const numericRunId = runId ? parseInt(runId) : null;
5bcd5db30
5bcd5db31 reply.raw.writeHead(200, {
5bcd5db32 "Content-Type": "text/event-stream",
5bcd5db33 "Cache-Control": "no-cache",
5bcd5db34 "Connection": "keep-alive",
5bcd5db35 "X-Accel-Buffering": "no",
5bcd5db36 });
5bcd5db37 reply.raw.write(": connected\n\n");
5bcd5db38
5bcd5db39 const keepalive = setInterval(() => {
5bcd5db40 reply.raw.write(": keepalive\n\n");
5bcd5db41 }, 15000);
5bcd5db42
5bcd5db43 const listener = (event: CanopyEvent) => {
5bcd5db44 // Filter by scope
5bcd5db45 if (scope === "run" && event.runId !== numericRunId) return;
5bcd5db46 if (scope === "repo" && repoId !== null && event.repoId !== repoId) return;
5bcd5db47 if (scope === "repo" && event.type === "log:append") return;
5bcd5db48 if (scope === "global" && !["run:created", "run:completed", "run:cancelled", "run:started"].includes(event.type)) return;
5bcd5db49
5bcd5db50 try {
5bcd5db51 reply.raw.write(`event: ${event.type}\ndata: ${JSON.stringify(event)}\n\n`);
5bcd5db52 } catch {
5bcd5db53 // Client disconnected
5bcd5db54 }
5bcd5db55 };
5bcd5db56
5bcd5db57 const unsubscribe = eventBus?.subscribe(listener);
5bcd5db58
5bcd5db59 request.raw.on("close", () => {
5bcd5db60 clearInterval(keepalive);
5bcd5db61 unsubscribe?.();
5bcd5db62 });
5bcd5db63
5bcd5db64 await reply.hijack();
5bcd5db65 });
1da987466
721afa667 // Recent runs across all repos (optionally filtered by owner).
721afa668 // Returns the N most recent runs per repo so active repos don't crowd out others.
1da987469 app.get<{
721afa670 Querystring: { limit?: string; owner?: string; per_repo?: string };
1da987471 }>("/canopy/recent-runs", async (request) => {
721afa672 const perRepo = parseInt(request.query.per_repo ?? "30") || 30;
fe3b50973 const owner = request.query.owner;
721afa674 const ownerClause = owner ? `WHERE rwo.owner_name = ?` : "";
721afa675 const query = `
721afa676 SELECT * FROM (
721afa677 SELECT pr.*, pr.pipeline_name, r.name AS repo_name, rwo.owner_name,
721afa678 ROW_NUMBER() OVER (PARTITION BY pr.repo_id ORDER BY pr.created_at DESC) AS rn
721afa679 FROM pipeline_runs pr
721afa680 JOIN repos r ON pr.repo_id = r.id
721afa681 JOIN repos_with_owner rwo ON r.id = rwo.id
721afa682 ${ownerClause}
721afa683 ) WHERE rn <= ?
721afa684 ORDER BY created_at DESC`;
fe3b50985 const params: any[] = [];
721afa686 if (owner) params.push(owner);
721afa687 params.push(perRepo);
fe3b50988 const runs = db.prepare(query).all(...params);
1da987489 return { runs };
1da987490 });
1da987491}
1da987492
f0bb19293export async function canopyWebhookRoute(app: FastifyInstance) {
80fafdf94 const runner = (app as any).canopyRunner as CanopyRunner | undefined;
80fafdf95
80fafdf96 // Internal webhook — no auth (only reachable from Docker network)
f0bb19297 app.post("/canopy/webhook", async (request, reply) => {
80fafdf98 const { event, repo, bookmark, old_id, new_id } = request.body as any;
80fafdf99 if (event !== "push" || !repo || !bookmark || !new_id) {
80fafdf100 return reply.code(400).send({ error: "Invalid webhook payload" });
80fafdf101 }
80fafdf102 if (!runner) {
80fafdf103 return reply.code(503).send({ error: "Canopy not enabled" });
80fafdf104 }
c98936c105 try {
c98936c106 await runner.onPush({
c98936c107 repo,
c98936c108 branch: bookmark,
c98936c109 oldCommitId: old_id ?? "",
c98936c110 newCommitId: new_id,
c98936c111 });
c98936c112 } catch {
c98936c113 return reply.code(500).send({ error: "Failed to queue pipeline runs" });
c98936c114 }
80fafdf115 return { ok: true };
80fafdf116 });
f0bb192117}
f0bb192118
f0bb192119export async function canopyRoutes(app: FastifyInstance) {
f0bb192120 const db = (app as any).db;
f0bb192121 const runner = (app as any).canopyRunner as CanopyRunner | undefined;
80fafdf122
37f6938123 const RUN_SELECT = `
2d074a3124 SELECT
2d074a3125 pr.*,
2d074a3126 COALESCE(p.name, pr.pipeline_name) AS pipeline_name,
2d074a3127 COALESCE(p.file, pr.pipeline_file) AS pipeline_file,
2d074a3128 pr.repo_id
37f6938129 FROM pipeline_runs pr
2d074a3130 LEFT JOIN pipelines p ON pr.pipeline_id = p.id`;
37f6938131
80fafdf132 // List pipeline runs
80fafdf133 app.get<{
80fafdf134 Params: { owner: string; repo: string };
80fafdf135 Querystring: { status?: string; limit?: string };
f0bb192136 }>("/:owner/:repo/canopy/runs", async (request, reply) => {
791afd4137 const { repo } = request.params;
80fafdf138 const { status, limit } = request.query;
791afd4139 const repoRow = lookupRepo(db, repo);
791afd4140 if (!repoRow) return { runs: [] };
80fafdf141
2d074a3142 let query = `${RUN_SELECT} WHERE pr.repo_id = ?`;
80fafdf143 const params: any[] = [repoRow.id];
80fafdf144 if (status) {
37f6938145 query += ` AND pr.status = ?`;
80fafdf146 params.push(status);
80fafdf147 }
37f6938148 query += ` ORDER BY pr.created_at DESC LIMIT ?`;
80fafdf149 params.push(parseInt(limit ?? "20") || 20);
80fafdf150
80fafdf151 const runs = db.prepare(query).all(...params);
80fafdf152 return { runs };
80fafdf153 });
80fafdf154
80fafdf155 // Get single run with steps
80fafdf156 app.get<{
80fafdf157 Params: { owner: string; repo: string; id: string };
f0bb192158 }>("/:owner/:repo/canopy/runs/:id", async (request, reply) => {
80fafdf159 const { id } = request.params;
80fafdf160 const run = db
37f6938161 .prepare(`${RUN_SELECT} WHERE pr.id = ?`)
80fafdf162 .get(parseInt(id));
80fafdf163 if (!run) return reply.code(404).send({ error: "Run not found" });
80fafdf164
80fafdf165 const steps = db
80fafdf166 .prepare(
80fafdf167 `SELECT * FROM pipeline_steps WHERE run_id = ? ORDER BY step_index`
80fafdf168 )
80fafdf169 .all(parseInt(id));
80fafdf170
80fafdf171 return { run, steps };
80fafdf172 });
80fafdf173
80fafdf174 // Get step logs
80fafdf175 app.get<{
80fafdf176 Params: { owner: string; repo: string; id: string; idx: string };
f0bb192177 }>("/:owner/:repo/canopy/runs/:id/logs/:idx", async (request, reply) => {
80fafdf178 const { id, idx } = request.params;
80fafdf179 const step = db
80fafdf180 .prepare(
80fafdf181 `SELECT id FROM pipeline_steps WHERE run_id = ? AND step_index = ?`
80fafdf182 )
80fafdf183 .get(parseInt(id), parseInt(idx)) as any;
80fafdf184 if (!step) return reply.code(404).send({ error: "Step not found" });
80fafdf185
80fafdf186 const logs = db
80fafdf187 .prepare(
80fafdf188 `SELECT stream, content, created_at FROM step_logs WHERE step_id = ? ORDER BY id`
80fafdf189 )
80fafdf190 .all(step.id);
80fafdf191
80fafdf192 return { logs };
80fafdf193 });
80fafdf194
80fafdf195 // Cancel a running pipeline
80fafdf196 app.post<{
80fafdf197 Params: { owner: string; repo: string; id: string };
80fafdf198 }>(
f0bb192199 "/:owner/:repo/canopy/runs/:id/cancel",
80fafdf200 { preHandler: [(app as any).authenticate] },
80fafdf201 async (request, reply) => {
80fafdf202 if (!runner) {
80fafdf203 return reply.code(503).send({ error: "Canopy not enabled" });
80fafdf204 }
80fafdf205 const { id } = request.params;
80fafdf206 const run = db
37f6938207 .prepare(`${RUN_SELECT} WHERE pr.id = ?`)
80fafdf208 .get(parseInt(id)) as any;
57c315f209 if (!run || (run.status !== "running" && run.status !== "pending")) {
57c315f210 return reply.code(400).send({ error: "Run is not running or pending" });
80fafdf211 }
80fafdf212
80fafdf213 runner.cancelRun(parseInt(id));
80fafdf214 db.prepare(
80fafdf215 `UPDATE pipeline_runs SET status = 'cancelled', finished_at = datetime('now') WHERE id = ?`
80fafdf216 ).run(parseInt(id));
57c315f217 // Also cancel any pending/running steps
57c315f218 db.prepare(
57c315f219 `UPDATE pipeline_steps SET status = 'cancelled' WHERE run_id = ? AND status IN ('pending', 'running')`
57c315f220 ).run(parseInt(id));
80fafdf221
80fafdf222 const updated = db
37f6938223 .prepare(`${RUN_SELECT} WHERE pr.id = ?`)
80fafdf224 .get(parseInt(id));
80fafdf225 return { run: updated };
80fafdf226 }
80fafdf227 );
80fafdf228
80fafdf229 // Manual trigger
80fafdf230 app.post<{
80fafdf231 Params: { owner: string; repo: string };
80fafdf232 }>(
f0bb192233 "/:owner/:repo/canopy/trigger",
80fafdf234 { preHandler: [(app as any).authenticate] },
80fafdf235 async (request, reply) => {
80fafdf236 if (!runner) {
80fafdf237 return reply.code(503).send({ error: "Canopy not enabled" });
80fafdf238 }
b5baf6d239 const { owner, repo } = request.params;
191af2a240 const { ref, pipeline } = (request.body as any) ?? {};
791afd4241 const branch = ref || "main";
80fafdf242 const bridgeUrl =
80fafdf243 process.env.GROVE_BRIDGE_URL ?? "http://localhost:3100";
80fafdf244 const resolveRes = await fetch(
80fafdf245 `${bridgeUrl}/repos/${repo}/resolve/${branch}`
80fafdf246 );
80fafdf247 if (!resolveRes.ok) {
80fafdf248 return reply.code(400).send({ error: "Could not resolve branch" });
80fafdf249 }
80fafdf250 const { commit_id } = (await resolveRes.json()) as any;
80fafdf251
c98936c252 try {
c98936c253 await runner.onPush({
c98936c254 repo,
c98936c255 branch,
c98936c256 oldCommitId: "",
c98936c257 newCommitId: commit_id,
191af2a258 }, pipeline);
c98936c259 } catch {
c98936c260 return reply.code(500).send({ error: "Failed to queue pipeline runs" });
c98936c261 }
4b55905262
4b55905263 // Deploy pages if applicable
4b55905264 const pagesDeployer = (app as any).pagesDeployer;
4b55905265 if (pagesDeployer) {
b5baf6d266 void pagesDeployer.deploy(owner, repo, branch).catch((err: any) => {
4b55905267 app.log.error({ err, repo, branch }, "Pages deployment failed (manual trigger)");
4b55905268 });
4b55905269 }
4b55905270
80fafdf271 return { triggered: true, branch, commit_id };
80fafdf272 }
80fafdf273 );
80fafdf274
80fafdf275 // List pipeline definitions from repo HEAD
80fafdf276 app.get<{
80fafdf277 Params: { owner: string; repo: string };
f0bb192278 }>("/:owner/:repo/canopy/pipelines", async (request, reply) => {
791afd4279 const { repo } = request.params;
80fafdf280 const bridgeUrl =
80fafdf281 process.env.GROVE_BRIDGE_URL ?? "http://localhost:3100";
791afd4282 const branch = "main";
80fafdf283
80fafdf284 try {
80fafdf285 const treeRes = await fetch(
791afd4286 `${bridgeUrl}/repos/${repo}/tree/${branch}/.canopy`
80fafdf287 );
80fafdf288 if (!treeRes.ok) return { pipelines: [] };
80fafdf289 const tree = await treeRes.json();
80fafdf290
80fafdf291 const pipelines: Array<{ file: string; name: string }> = [];
80fafdf292 for (const entry of (tree as any).entries) {
80fafdf293 if (!entry.name.endsWith(".yml") && !entry.name.endsWith(".yaml"))
80fafdf294 continue;
80fafdf295 const blobRes = await fetch(
791afd4296 `${bridgeUrl}/repos/${repo}/blob/${branch}/.canopy/${entry.name}`
80fafdf297 );
80fafdf298 if (!blobRes.ok) continue;
80fafdf299 const blob = (await blobRes.json()) as any;
80fafdf300 try {
80fafdf301 const { parse: parseYaml } = await import("yaml");
80fafdf302 const config = parseYaml(blob.content);
80fafdf303 if (config?.name) {
80fafdf304 pipelines.push({ file: `.canopy/${entry.name}`, name: config.name });
80fafdf305 }
80fafdf306 } catch {}
80fafdf307 }
80fafdf308 return { pipelines };
80fafdf309 } catch {
80fafdf310 return { pipelines: [] };
80fafdf311 }
80fafdf312 });
80fafdf313
80fafdf314 // --- Secrets CRUD ---
80fafdf315
80fafdf316 const createSecretSchema = z.object({
80fafdf317 name: z
80fafdf318 .string()
80fafdf319 .min(1)
80fafdf320 .max(100)
80fafdf321 .regex(/^\w+$/),
80fafdf322 value: z.string().min(1),
80fafdf323 });
80fafdf324
80fafdf325 // List secret names (no values)
80fafdf326 app.get<{ Params: { owner: string; repo: string } }>(
f0bb192327 "/:owner/:repo/canopy/secrets",
80fafdf328 { preHandler: [(app as any).authenticate] },
80fafdf329 async (request, reply) => {
80fafdf330 const { owner, repo } = request.params;
791afd4331 const repoRow = lookupRepo(db, repo);
80fafdf332 if (!repoRow)
80fafdf333 return reply.code(404).send({ error: "Repository not found" });
80fafdf334
80fafdf335 const secrets = db
80fafdf336 .prepare(
80fafdf337 `SELECT name, created_at, updated_at FROM canopy_secrets WHERE repo_id = ? ORDER BY name`
80fafdf338 )
80fafdf339 .all(repoRow.id);
80fafdf340 return { secrets };
80fafdf341 }
80fafdf342 );
80fafdf343
80fafdf344 // Create/update secret
80fafdf345 app.post<{ Params: { owner: string; repo: string } }>(
f0bb192346 "/:owner/:repo/canopy/secrets",
80fafdf347 { preHandler: [(app as any).authenticate] },
80fafdf348 async (request, reply) => {
80fafdf349 if (!runner) {
80fafdf350 return reply.code(503).send({ error: "Canopy not enabled" });
80fafdf351 }
80fafdf352 const parsed = createSecretSchema.safeParse(request.body);
80fafdf353 if (!parsed.success) {
80fafdf354 return reply.code(400).send({ error: parsed.error.flatten() });
80fafdf355 }
80fafdf356
80fafdf357 const { owner, repo } = request.params;
791afd4358 const repoRow = lookupRepo(db, repo);
80fafdf359 if (!repoRow)
80fafdf360 return reply.code(404).send({ error: "Repository not found" });
80fafdf361
80fafdf362 const { name, value } = parsed.data;
80fafdf363 const encrypted = runner.encryptSecret(value);
80fafdf364
80fafdf365 db.prepare(
80fafdf366 `INSERT INTO canopy_secrets (repo_id, name, encrypted_value)
80fafdf367 VALUES (?, ?, ?)
80fafdf368 ON CONFLICT(repo_id, name) DO UPDATE SET
80fafdf369 encrypted_value = excluded.encrypted_value,
80fafdf370 updated_at = datetime('now')`
80fafdf371 ).run(repoRow.id, name, encrypted);
80fafdf372
80fafdf373 return { ok: true };
80fafdf374 }
80fafdf375 );
80fafdf376
80fafdf377 // Delete secret
80fafdf378 app.delete<{ Params: { owner: string; repo: string; name: string } }>(
f0bb192379 "/:owner/:repo/canopy/secrets/:name",
80fafdf380 { preHandler: [(app as any).authenticate] },
80fafdf381 async (request, reply) => {
80fafdf382 const { owner, repo, name } = request.params;
791afd4383 const repoRow = lookupRepo(db, repo);
80fafdf384 if (!repoRow)
80fafdf385 return reply.code(404).send({ error: "Repository not found" });
80fafdf386
80fafdf387 const result = db
80fafdf388 .prepare(
80fafdf389 `DELETE FROM canopy_secrets WHERE repo_id = ? AND name = ?`
80fafdf390 )
80fafdf391 .run(repoRow.id, name);
80fafdf392
80fafdf393 if (result.changes === 0) {
80fafdf394 return reply.code(404).send({ error: "Secret not found" });
80fafdf395 }
80fafdf396 return reply.code(204).send();
80fafdf397 }
80fafdf398 );
80fafdf399}