| 1 | import type { FastifyInstance } from "fastify"; |
| 2 | import { z } from "zod"; |
| 3 | import type { CanopyRunner } from "../services/canopy-runner.js"; |
| 4 | import type { CanopyEventBus, CanopyEvent } from "../services/canopy-events.js"; |
| 5 | |
| 6 | function lookupRepo(db: any, repo: string) { |
| 7 | return db |
| 8 | .prepare(`SELECT id, name, default_branch FROM repos WHERE name = ?`) |
| 9 | .get(repo) as any; |
| 10 | } |
| 11 | |
| 12 | export async function canopyGlobalRoutes(app: FastifyInstance) { |
| 13 | const db = (app as any).db; |
| 14 | const eventBus = (app as any).canopyEventBus as CanopyEventBus | undefined; |
| 15 | |
| 16 | // SSE endpoint for live updates |
| 17 | app.get<{ |
| 18 | Querystring: { scope?: string; runId?: string; owner?: string; repo?: string }; |
| 19 | }>("/canopy/events", async (request, reply) => { |
| 20 | const { scope = "global", runId, owner, repo } = request.query; |
| 21 | |
| 22 | // Resolve owner/repo to repoId for repo-scoped filtering |
| 23 | let repoId: number | null = null; |
| 24 | if (scope === "repo" && repo) { |
| 25 | const repoRow = lookupRepo(db, repo); |
| 26 | if (repoRow) repoId = repoRow.id; |
| 27 | } |
| 28 | |
| 29 | const numericRunId = runId ? parseInt(runId) : null; |
| 30 | |
| 31 | reply.raw.writeHead(200, { |
| 32 | "Content-Type": "text/event-stream", |
| 33 | "Cache-Control": "no-cache", |
| 34 | "Connection": "keep-alive", |
| 35 | "X-Accel-Buffering": "no", |
| 36 | }); |
| 37 | reply.raw.write(": connected\n\n"); |
| 38 | |
| 39 | const keepalive = setInterval(() => { |
| 40 | reply.raw.write(": keepalive\n\n"); |
| 41 | }, 15000); |
| 42 | |
| 43 | const listener = (event: CanopyEvent) => { |
| 44 | // Filter by scope |
| 45 | if (scope === "run" && event.runId !== numericRunId) return; |
| 46 | if (scope === "repo" && repoId !== null && event.repoId !== repoId) return; |
| 47 | if (scope === "repo" && event.type === "log:append") return; |
| 48 | if (scope === "global" && !["run:created", "run:completed", "run:cancelled", "run:started"].includes(event.type)) return; |
| 49 | |
| 50 | try { |
| 51 | reply.raw.write(`event: ${event.type}\ndata: ${JSON.stringify(event)}\n\n`); |
| 52 | } catch { |
| 53 | // Client disconnected |
| 54 | } |
| 55 | }; |
| 56 | |
| 57 | const unsubscribe = eventBus?.subscribe(listener); |
| 58 | |
| 59 | request.raw.on("close", () => { |
| 60 | clearInterval(keepalive); |
| 61 | unsubscribe?.(); |
| 62 | }); |
| 63 | |
| 64 | await reply.hijack(); |
| 65 | }); |
| 66 | |
| 67 | // Recent runs across all repos (optionally filtered by owner). |
| 68 | // Returns the N most recent runs per repo so active repos don't crowd out others. |
| 69 | app.get<{ |
| 70 | Querystring: { limit?: string; owner?: string; per_repo?: string }; |
| 71 | }>("/canopy/recent-runs", async (request) => { |
| 72 | const perRepo = parseInt(request.query.per_repo ?? "30") || 30; |
| 73 | const owner = request.query.owner; |
| 74 | const ownerClause = owner ? `WHERE rwo.owner_name = ?` : ""; |
| 75 | const query = ` |
| 76 | SELECT * FROM ( |
| 77 | SELECT pr.*, pr.pipeline_name, r.name AS repo_name, rwo.owner_name, |
| 78 | ROW_NUMBER() OVER (PARTITION BY pr.repo_id ORDER BY pr.created_at DESC) AS rn |
| 79 | FROM pipeline_runs pr |
| 80 | JOIN repos r ON pr.repo_id = r.id |
| 81 | JOIN repos_with_owner rwo ON r.id = rwo.id |
| 82 | ${ownerClause} |
| 83 | ) WHERE rn <= ? |
| 84 | ORDER BY created_at DESC`; |
| 85 | const params: any[] = []; |
| 86 | if (owner) params.push(owner); |
| 87 | params.push(perRepo); |
| 88 | const runs = db.prepare(query).all(...params); |
| 89 | return { runs }; |
| 90 | }); |
| 91 | } |
| 92 | |
| 93 | export async function canopyWebhookRoute(app: FastifyInstance) { |
| 94 | const runner = (app as any).canopyRunner as CanopyRunner | undefined; |
| 95 | |
| 96 | // Internal webhook — no auth (only reachable from Docker network) |
| 97 | app.post("/canopy/webhook", async (request, reply) => { |
| 98 | const { event, repo, bookmark, old_id, new_id } = request.body as any; |
| 99 | if (event !== "push" || !repo || !bookmark || !new_id) { |
| 100 | return reply.code(400).send({ error: "Invalid webhook payload" }); |
| 101 | } |
| 102 | if (!runner) { |
| 103 | return reply.code(503).send({ error: "Canopy not enabled" }); |
| 104 | } |
| 105 | try { |
| 106 | await runner.onPush({ |
| 107 | repo, |
| 108 | branch: bookmark, |
| 109 | oldCommitId: old_id ?? "", |
| 110 | newCommitId: new_id, |
| 111 | }); |
| 112 | } catch { |
| 113 | return reply.code(500).send({ error: "Failed to queue pipeline runs" }); |
| 114 | } |
| 115 | return { ok: true }; |
| 116 | }); |
| 117 | } |
| 118 | |
| 119 | export async function canopyRoutes(app: FastifyInstance) { |
| 120 | const db = (app as any).db; |
| 121 | const runner = (app as any).canopyRunner as CanopyRunner | undefined; |
| 122 | |
| 123 | const RUN_SELECT = ` |
| 124 | SELECT |
| 125 | pr.*, |
| 126 | COALESCE(p.name, pr.pipeline_name) AS pipeline_name, |
| 127 | COALESCE(p.file, pr.pipeline_file) AS pipeline_file, |
| 128 | pr.repo_id |
| 129 | FROM pipeline_runs pr |
| 130 | LEFT JOIN pipelines p ON pr.pipeline_id = p.id`; |
| 131 | |
| 132 | // List pipeline runs |
| 133 | app.get<{ |
| 134 | Params: { owner: string; repo: string }; |
| 135 | Querystring: { status?: string; limit?: string }; |
| 136 | }>("/:owner/:repo/canopy/runs", async (request, reply) => { |
| 137 | const { repo } = request.params; |
| 138 | const { status, limit } = request.query; |
| 139 | const repoRow = lookupRepo(db, repo); |
| 140 | if (!repoRow) return { runs: [] }; |
| 141 | |
| 142 | let query = `${RUN_SELECT} WHERE pr.repo_id = ?`; |
| 143 | const params: any[] = [repoRow.id]; |
| 144 | if (status) { |
| 145 | query += ` AND pr.status = ?`; |
| 146 | params.push(status); |
| 147 | } |
| 148 | query += ` ORDER BY pr.created_at DESC LIMIT ?`; |
| 149 | params.push(parseInt(limit ?? "20") || 20); |
| 150 | |
| 151 | const runs = db.prepare(query).all(...params); |
| 152 | return { runs }; |
| 153 | }); |
| 154 | |
| 155 | // Get single run with steps |
| 156 | app.get<{ |
| 157 | Params: { owner: string; repo: string; id: string }; |
| 158 | }>("/:owner/:repo/canopy/runs/:id", async (request, reply) => { |
| 159 | const { id } = request.params; |
| 160 | const run = db |
| 161 | .prepare(`${RUN_SELECT} WHERE pr.id = ?`) |
| 162 | .get(parseInt(id)); |
| 163 | if (!run) return reply.code(404).send({ error: "Run not found" }); |
| 164 | |
| 165 | const steps = db |
| 166 | .prepare( |
| 167 | `SELECT * FROM pipeline_steps WHERE run_id = ? ORDER BY step_index` |
| 168 | ) |
| 169 | .all(parseInt(id)); |
| 170 | |
| 171 | return { run, steps }; |
| 172 | }); |
| 173 | |
| 174 | // Get step logs |
| 175 | app.get<{ |
| 176 | Params: { owner: string; repo: string; id: string; idx: string }; |
| 177 | }>("/:owner/:repo/canopy/runs/:id/logs/:idx", async (request, reply) => { |
| 178 | const { id, idx } = request.params; |
| 179 | const step = db |
| 180 | .prepare( |
| 181 | `SELECT id FROM pipeline_steps WHERE run_id = ? AND step_index = ?` |
| 182 | ) |
| 183 | .get(parseInt(id), parseInt(idx)) as any; |
| 184 | if (!step) return reply.code(404).send({ error: "Step not found" }); |
| 185 | |
| 186 | const logs = db |
| 187 | .prepare( |
| 188 | `SELECT stream, content, created_at FROM step_logs WHERE step_id = ? ORDER BY id` |
| 189 | ) |
| 190 | .all(step.id); |
| 191 | |
| 192 | return { logs }; |
| 193 | }); |
| 194 | |
| 195 | // Cancel a running pipeline |
| 196 | app.post<{ |
| 197 | Params: { owner: string; repo: string; id: string }; |
| 198 | }>( |
| 199 | "/:owner/:repo/canopy/runs/:id/cancel", |
| 200 | { preHandler: [(app as any).authenticate] }, |
| 201 | async (request, reply) => { |
| 202 | if (!runner) { |
| 203 | return reply.code(503).send({ error: "Canopy not enabled" }); |
| 204 | } |
| 205 | const { id } = request.params; |
| 206 | const run = db |
| 207 | .prepare(`${RUN_SELECT} WHERE pr.id = ?`) |
| 208 | .get(parseInt(id)) as any; |
| 209 | if (!run || (run.status !== "running" && run.status !== "pending")) { |
| 210 | return reply.code(400).send({ error: "Run is not running or pending" }); |
| 211 | } |
| 212 | |
| 213 | runner.cancelRun(parseInt(id)); |
| 214 | db.prepare( |
| 215 | `UPDATE pipeline_runs SET status = 'cancelled', finished_at = datetime('now') WHERE id = ?` |
| 216 | ).run(parseInt(id)); |
| 217 | // Also cancel any pending/running steps |
| 218 | db.prepare( |
| 219 | `UPDATE pipeline_steps SET status = 'cancelled' WHERE run_id = ? AND status IN ('pending', 'running')` |
| 220 | ).run(parseInt(id)); |
| 221 | |
| 222 | const updated = db |
| 223 | .prepare(`${RUN_SELECT} WHERE pr.id = ?`) |
| 224 | .get(parseInt(id)); |
| 225 | return { run: updated }; |
| 226 | } |
| 227 | ); |
| 228 | |
| 229 | // Manual trigger |
| 230 | app.post<{ |
| 231 | Params: { owner: string; repo: string }; |
| 232 | }>( |
| 233 | "/:owner/:repo/canopy/trigger", |
| 234 | { preHandler: [(app as any).authenticate] }, |
| 235 | async (request, reply) => { |
| 236 | if (!runner) { |
| 237 | return reply.code(503).send({ error: "Canopy not enabled" }); |
| 238 | } |
| 239 | const { owner, repo } = request.params; |
| 240 | const { ref, pipeline } = (request.body as any) ?? {}; |
| 241 | const branch = ref || "main"; |
| 242 | const bridgeUrl = |
| 243 | process.env.GROVE_BRIDGE_URL ?? "http://localhost:3100"; |
| 244 | const resolveRes = await fetch( |
| 245 | `${bridgeUrl}/repos/${repo}/resolve/${branch}` |
| 246 | ); |
| 247 | if (!resolveRes.ok) { |
| 248 | return reply.code(400).send({ error: "Could not resolve branch" }); |
| 249 | } |
| 250 | const { commit_id } = (await resolveRes.json()) as any; |
| 251 | |
| 252 | try { |
| 253 | await runner.onPush({ |
| 254 | repo, |
| 255 | branch, |
| 256 | oldCommitId: "", |
| 257 | newCommitId: commit_id, |
| 258 | }, pipeline); |
| 259 | } catch { |
| 260 | return reply.code(500).send({ error: "Failed to queue pipeline runs" }); |
| 261 | } |
| 262 | |
| 263 | // Deploy pages if applicable |
| 264 | const pagesDeployer = (app as any).pagesDeployer; |
| 265 | if (pagesDeployer) { |
| 266 | void pagesDeployer.deploy(owner, repo, branch).catch((err: any) => { |
| 267 | app.log.error({ err, repo, branch }, "Pages deployment failed (manual trigger)"); |
| 268 | }); |
| 269 | } |
| 270 | |
| 271 | return { triggered: true, branch, commit_id }; |
| 272 | } |
| 273 | ); |
| 274 | |
| 275 | // List pipeline definitions from repo HEAD |
| 276 | app.get<{ |
| 277 | Params: { owner: string; repo: string }; |
| 278 | }>("/:owner/:repo/canopy/pipelines", async (request, reply) => { |
| 279 | const { repo } = request.params; |
| 280 | const bridgeUrl = |
| 281 | process.env.GROVE_BRIDGE_URL ?? "http://localhost:3100"; |
| 282 | const branch = "main"; |
| 283 | |
| 284 | try { |
| 285 | const treeRes = await fetch( |
| 286 | `${bridgeUrl}/repos/${repo}/tree/${branch}/.canopy` |
| 287 | ); |
| 288 | if (!treeRes.ok) return { pipelines: [] }; |
| 289 | const tree = await treeRes.json(); |
| 290 | |
| 291 | const pipelines: Array<{ file: string; name: string }> = []; |
| 292 | for (const entry of (tree as any).entries) { |
| 293 | if (!entry.name.endsWith(".yml") && !entry.name.endsWith(".yaml")) |
| 294 | continue; |
| 295 | const blobRes = await fetch( |
| 296 | `${bridgeUrl}/repos/${repo}/blob/${branch}/.canopy/${entry.name}` |
| 297 | ); |
| 298 | if (!blobRes.ok) continue; |
| 299 | const blob = (await blobRes.json()) as any; |
| 300 | try { |
| 301 | const { parse: parseYaml } = await import("yaml"); |
| 302 | const config = parseYaml(blob.content); |
| 303 | if (config?.name) { |
| 304 | pipelines.push({ file: `.canopy/${entry.name}`, name: config.name }); |
| 305 | } |
| 306 | } catch {} |
| 307 | } |
| 308 | return { pipelines }; |
| 309 | } catch { |
| 310 | return { pipelines: [] }; |
| 311 | } |
| 312 | }); |
| 313 | |
| 314 | // --- Secrets CRUD --- |
| 315 | |
| 316 | const createSecretSchema = z.object({ |
| 317 | name: z |
| 318 | .string() |
| 319 | .min(1) |
| 320 | .max(100) |
| 321 | .regex(/^\w+$/), |
| 322 | value: z.string().min(1), |
| 323 | }); |
| 324 | |
| 325 | // List secret names (no values) |
| 326 | app.get<{ Params: { owner: string; repo: string } }>( |
| 327 | "/:owner/:repo/canopy/secrets", |
| 328 | { preHandler: [(app as any).authenticate] }, |
| 329 | async (request, reply) => { |
| 330 | const { owner, repo } = request.params; |
| 331 | const repoRow = lookupRepo(db, repo); |
| 332 | if (!repoRow) |
| 333 | return reply.code(404).send({ error: "Repository not found" }); |
| 334 | |
| 335 | const secrets = db |
| 336 | .prepare( |
| 337 | `SELECT name, created_at, updated_at FROM canopy_secrets WHERE repo_id = ? ORDER BY name` |
| 338 | ) |
| 339 | .all(repoRow.id); |
| 340 | return { secrets }; |
| 341 | } |
| 342 | ); |
| 343 | |
| 344 | // Create/update secret |
| 345 | app.post<{ Params: { owner: string; repo: string } }>( |
| 346 | "/:owner/:repo/canopy/secrets", |
| 347 | { preHandler: [(app as any).authenticate] }, |
| 348 | async (request, reply) => { |
| 349 | if (!runner) { |
| 350 | return reply.code(503).send({ error: "Canopy not enabled" }); |
| 351 | } |
| 352 | const parsed = createSecretSchema.safeParse(request.body); |
| 353 | if (!parsed.success) { |
| 354 | return reply.code(400).send({ error: parsed.error.flatten() }); |
| 355 | } |
| 356 | |
| 357 | const { owner, repo } = request.params; |
| 358 | const repoRow = lookupRepo(db, repo); |
| 359 | if (!repoRow) |
| 360 | return reply.code(404).send({ error: "Repository not found" }); |
| 361 | |
| 362 | const { name, value } = parsed.data; |
| 363 | const encrypted = runner.encryptSecret(value); |
| 364 | |
| 365 | db.prepare( |
| 366 | `INSERT INTO canopy_secrets (repo_id, name, encrypted_value) |
| 367 | VALUES (?, ?, ?) |
| 368 | ON CONFLICT(repo_id, name) DO UPDATE SET |
| 369 | encrypted_value = excluded.encrypted_value, |
| 370 | updated_at = datetime('now')` |
| 371 | ).run(repoRow.id, name, encrypted); |
| 372 | |
| 373 | return { ok: true }; |
| 374 | } |
| 375 | ); |
| 376 | |
| 377 | // Delete secret |
| 378 | app.delete<{ Params: { owner: string; repo: string; name: string } }>( |
| 379 | "/:owner/:repo/canopy/secrets/:name", |
| 380 | { preHandler: [(app as any).authenticate] }, |
| 381 | async (request, reply) => { |
| 382 | const { owner, repo, name } = request.params; |
| 383 | const repoRow = lookupRepo(db, repo); |
| 384 | if (!repoRow) |
| 385 | return reply.code(404).send({ error: "Repository not found" }); |
| 386 | |
| 387 | const result = db |
| 388 | .prepare( |
| 389 | `DELETE FROM canopy_secrets WHERE repo_id = ? AND name = ?` |
| 390 | ) |
| 391 | .run(repoRow.id, name); |
| 392 | |
| 393 | if (result.changes === 0) { |
| 394 | return reply.code(404).send({ error: "Secret not found" }); |
| 395 | } |
| 396 | return reply.code(204).send(); |
| 397 | } |
| 398 | ); |
| 399 | } |
| 400 | |