| 1 | import type Database from "better-sqlite3"; |
| 2 | import type { CanopyRunner } from "./canopy-runner.js"; |
| 3 | import type { PagesDeployer } from "./pages-deployer.js"; |
| 4 | |
| 5 | /** |
| 6 | * Polls grove-bridge bookmarks to detect pushes. |
| 7 | * Gets the repo list directly from grove-bridge (Mononoke), |
| 8 | * compares bookmark positions to `bookmark_state` table, |
| 9 | * and calls runner.onPush() when a bookmark advances. |
| 10 | */ |
| 11 | export class CanopyPoller { |
| 12 | private interval: ReturnType<typeof setInterval> | null = null; |
| 13 | private polling = false; |
| 14 | |
| 15 | constructor( |
| 16 | private db: Database.Database, |
| 17 | private bridgeUrl: string, |
| 18 | private runner: CanopyRunner, |
| 19 | private pagesDeployer: PagesDeployer | null, |
| 20 | private logger: { info: (...args: any[]) => void; error: (...args: any[]) => void } |
| 21 | ) {} |
| 22 | |
| 23 | start(intervalMs: number): void { |
| 24 | this.interval = setInterval(() => this.poll(), intervalMs); |
| 25 | } |
| 26 | |
| 27 | stop(): void { |
| 28 | if (this.interval) { |
| 29 | clearInterval(this.interval); |
| 30 | this.interval = null; |
| 31 | } |
| 32 | } |
| 33 | |
| 34 | private async poll(): Promise<void> { |
| 35 | if (this.polling) return; |
| 36 | this.polling = true; |
| 37 | |
| 38 | try { |
| 39 | const res = await fetch(`${this.bridgeUrl}/repos`); |
| 40 | if (!res.ok) return; |
| 41 | const data = (await res.json()) as any; |
| 42 | const repos: string[] = data.repos ?? []; |
| 43 | |
| 44 | for (const repo of repos) { |
| 45 | await this.pollRepo(repo); |
| 46 | } |
| 47 | } catch (err) { |
| 48 | this.logger.error({ err }, "Canopy poll error"); |
| 49 | } finally { |
| 50 | this.polling = false; |
| 51 | } |
| 52 | } |
| 53 | |
| 54 | private async pollRepo(repoName: string): Promise<void> { |
| 55 | let bookmarks: Array<{ name: string; commit_id: string }>; |
| 56 | try { |
| 57 | const res = await fetch( |
| 58 | `${this.bridgeUrl}/repos/${repoName}/bookmarks` |
| 59 | ); |
| 60 | if (!res.ok) return; |
| 61 | const data = (await res.json()) as any; |
| 62 | bookmarks = data.bookmarks ?? []; |
| 63 | } catch { |
| 64 | return; |
| 65 | } |
| 66 | |
| 67 | const existingStates = this.db |
| 68 | .prepare( |
| 69 | `SELECT bookmark_name, commit_id FROM bookmark_state WHERE repo_name = ?` |
| 70 | ) |
| 71 | .all(repoName) as Array<{ bookmark_name: string; commit_id: string }>; |
| 72 | |
| 73 | const stateMap = new Map( |
| 74 | existingStates.map((s) => [s.bookmark_name, s.commit_id]) |
| 75 | ); |
| 76 | |
| 77 | // If no state exists yet, seed all bookmarks without triggering |
| 78 | const isFirstSeed = existingStates.length === 0; |
| 79 | |
| 80 | const upsert = this.db.prepare( |
| 81 | `INSERT INTO bookmark_state (repo_name, bookmark_name, commit_id, updated_at) |
| 82 | VALUES (?, ?, ?, datetime('now')) |
| 83 | ON CONFLICT(repo_name, bookmark_name) DO UPDATE SET |
| 84 | commit_id = excluded.commit_id, |
| 85 | updated_at = datetime('now')` |
| 86 | ); |
| 87 | |
| 88 | for (const bookmark of bookmarks) { |
| 89 | const oldCommitId = stateMap.get(bookmark.name); |
| 90 | |
| 91 | if (oldCommitId === bookmark.commit_id) continue; |
| 92 | |
| 93 | if (isFirstSeed) { |
| 94 | upsert.run(repoName, bookmark.name, bookmark.commit_id); |
| 95 | continue; |
| 96 | } |
| 97 | |
| 98 | try { |
| 99 | this.logger.info( |
| 100 | { repo: repoName, branch: bookmark.name, commit: bookmark.commit_id }, |
| 101 | "Bookmark changed, triggering pipeline" |
| 102 | ); |
| 103 | await this.runner.onPush({ |
| 104 | repo: repoName, |
| 105 | branch: bookmark.name, |
| 106 | oldCommitId: oldCommitId ?? "", |
| 107 | newCommitId: bookmark.commit_id, |
| 108 | }); |
| 109 | // Deploy pages if applicable |
| 110 | if (this.pagesDeployer) { |
| 111 | const repo = this.db |
| 112 | .prepare( |
| 113 | `SELECT r.default_branch, rwo.owner_name FROM repos r |
| 114 | JOIN repos_with_owner rwo ON rwo.id = r.id |
| 115 | WHERE r.name = ? AND r.pages_enabled = 1` |
| 116 | ) |
| 117 | .get(repoName) as any; |
| 118 | if (repo && bookmark.name === (repo.default_branch ?? "main")) { |
| 119 | void this.pagesDeployer.deploy(repo.owner_name, repoName, bookmark.name).catch((err) => { |
| 120 | this.logger.error( |
| 121 | { err, repo: repoName, branch: bookmark.name }, |
| 122 | "Pages deployment failed" |
| 123 | ); |
| 124 | }); |
| 125 | } |
| 126 | } |
| 127 | |
| 128 | // Only mark bookmark processed after runs are queued successfully. |
| 129 | upsert.run(repoName, bookmark.name, bookmark.commit_id); |
| 130 | } catch (err) { |
| 131 | this.logger.error( |
| 132 | { err, repo: repoName, branch: bookmark.name, commit: bookmark.commit_id }, |
| 133 | "Failed to queue pipeline for bookmark change" |
| 134 | ); |
| 135 | } |
| 136 | } |
| 137 | } |
| 138 | } |
| 139 | |