4.2 KB139 lines
Blame
1import type Database from "better-sqlite3";
2import type { CanopyRunner } from "./canopy-runner.js";
3import type { PagesDeployer } from "./pages-deployer.js";
4
5/**
6 * Polls grove-bridge bookmarks to detect pushes.
7 * Gets the repo list directly from grove-bridge (Mononoke),
8 * compares bookmark positions to `bookmark_state` table,
9 * and calls runner.onPush() when a bookmark advances.
10 */
11export class CanopyPoller {
12 private interval: ReturnType<typeof setInterval> | null = null;
13 private polling = false;
14
15 constructor(
16 private db: Database.Database,
17 private bridgeUrl: string,
18 private runner: CanopyRunner,
19 private pagesDeployer: PagesDeployer | null,
20 private logger: { info: (...args: any[]) => void; error: (...args: any[]) => void }
21 ) {}
22
23 start(intervalMs: number): void {
24 this.interval = setInterval(() => this.poll(), intervalMs);
25 }
26
27 stop(): void {
28 if (this.interval) {
29 clearInterval(this.interval);
30 this.interval = null;
31 }
32 }
33
34 private async poll(): Promise<void> {
35 if (this.polling) return;
36 this.polling = true;
37
38 try {
39 const res = await fetch(`${this.bridgeUrl}/repos`);
40 if (!res.ok) return;
41 const data = (await res.json()) as any;
42 const repos: string[] = data.repos ?? [];
43
44 for (const repo of repos) {
45 await this.pollRepo(repo);
46 }
47 } catch (err) {
48 this.logger.error({ err }, "Canopy poll error");
49 } finally {
50 this.polling = false;
51 }
52 }
53
54 private async pollRepo(repoName: string): Promise<void> {
55 let bookmarks: Array<{ name: string; commit_id: string }>;
56 try {
57 const res = await fetch(
58 `${this.bridgeUrl}/repos/${repoName}/bookmarks`
59 );
60 if (!res.ok) return;
61 const data = (await res.json()) as any;
62 bookmarks = data.bookmarks ?? [];
63 } catch {
64 return;
65 }
66
67 const existingStates = this.db
68 .prepare(
69 `SELECT bookmark_name, commit_id FROM bookmark_state WHERE repo_name = ?`
70 )
71 .all(repoName) as Array<{ bookmark_name: string; commit_id: string }>;
72
73 const stateMap = new Map(
74 existingStates.map((s) => [s.bookmark_name, s.commit_id])
75 );
76
77 // If no state exists yet, seed all bookmarks without triggering
78 const isFirstSeed = existingStates.length === 0;
79
80 const upsert = this.db.prepare(
81 `INSERT INTO bookmark_state (repo_name, bookmark_name, commit_id, updated_at)
82 VALUES (?, ?, ?, datetime('now'))
83 ON CONFLICT(repo_name, bookmark_name) DO UPDATE SET
84 commit_id = excluded.commit_id,
85 updated_at = datetime('now')`
86 );
87
88 for (const bookmark of bookmarks) {
89 const oldCommitId = stateMap.get(bookmark.name);
90
91 if (oldCommitId === bookmark.commit_id) continue;
92
93 if (isFirstSeed) {
94 upsert.run(repoName, bookmark.name, bookmark.commit_id);
95 continue;
96 }
97
98 try {
99 this.logger.info(
100 { repo: repoName, branch: bookmark.name, commit: bookmark.commit_id },
101 "Bookmark changed, triggering pipeline"
102 );
103 await this.runner.onPush({
104 repo: repoName,
105 branch: bookmark.name,
106 oldCommitId: oldCommitId ?? "",
107 newCommitId: bookmark.commit_id,
108 });
109 // Deploy pages if applicable
110 if (this.pagesDeployer) {
111 const repo = this.db
112 .prepare(
113 `SELECT r.default_branch, rwo.owner_name FROM repos r
114 JOIN repos_with_owner rwo ON rwo.id = r.id
115 WHERE r.name = ? AND r.pages_enabled = 1`
116 )
117 .get(repoName) as any;
118 if (repo && bookmark.name === (repo.default_branch ?? "main")) {
119 void this.pagesDeployer.deploy(repo.owner_name, repoName, bookmark.name).catch((err) => {
120 this.logger.error(
121 { err, repo: repoName, branch: bookmark.name },
122 "Pages deployment failed"
123 );
124 });
125 }
126 }
127
128 // Only mark bookmark processed after runs are queued successfully.
129 upsert.run(repoName, bookmark.name, bookmark.commit_id);
130 } catch (err) {
131 this.logger.error(
132 { err, repo: repoName, branch: bookmark.name, commit: bookmark.commit_id },
133 "Failed to queue pipeline for bookmark change"
134 );
135 }
136 }
137 }
138}
139