8.4 KB294 lines
Blame
1import type { Dirent } from "node:fs";
2import { appendFile, mkdir, readFile, readdir } from "node:fs/promises";
3import { dirname, resolve } from "node:path";
4import type { FastifyInstance } from "fastify";
5import { z } from "zod";
6
7interface RingLogEntry {
8 ts: string;
9 source: string;
10 level: string;
11 message: string;
12 payload: unknown;
13 owner?: string;
14 repo?: string;
15}
16
17interface RingInstanceSummary {
18 owner: string;
19 repo: string;
20 total: number;
21 last_ts: string | null;
22 last_level: string | null;
23 last_message: string | null;
24}
25
26const querySchema = z.object({
27 limit: z.coerce.number().int().min(1).max(1000).default(200),
28});
29
30const repoParamsSchema = z.object({
31 owner: z.string().min(1).max(120).regex(/^[A-Za-z0-9._-]+$/),
32 repo: z.string().min(1).max(120).regex(/^[A-Za-z0-9._-]+$/),
33});
34
35function toRecord(value: unknown): value is Record<string, unknown> {
36 return typeof value === "object" && value !== null && !Array.isArray(value);
37}
38
39function coerceHeaderString(value: string | string[] | undefined): string | null {
40 if (typeof value === "string") return value.trim() || null;
41 if (Array.isArray(value) && typeof value[0] === "string") {
42 const first = value[0].trim();
43 return first || null;
44 }
45 return null;
46}
47
48function safeStringify(value: unknown): string {
49 try {
50 const serialized = JSON.stringify(value);
51 return typeof serialized === "string" ? serialized : String(value);
52 } catch {
53 return String(value);
54 }
55}
56
57function normalizePayload(
58 body: unknown,
59 fallbackSource: string
60): RingLogEntry {
61 const now = new Date().toISOString();
62
63 if (typeof body === "string") {
64 return {
65 ts: now,
66 source: fallbackSource,
67 level: "info",
68 message: body,
69 payload: body,
70 };
71 }
72
73 if (typeof body === "number" || typeof body === "boolean" || body === null) {
74 return {
75 ts: now,
76 source: fallbackSource,
77 level: "info",
78 message: String(body),
79 payload: body,
80 };
81 }
82
83 if (Array.isArray(body)) {
84 return {
85 ts: now,
86 source: fallbackSource,
87 level: "info",
88 message: `Array(${body.length})`,
89 payload: body,
90 };
91 }
92
93 if (toRecord(body)) {
94 const source =
95 typeof body.source === "string" && body.source.trim()
96 ? body.source.trim()
97 : fallbackSource;
98 const level =
99 typeof body.level === "string" && body.level.trim()
100 ? body.level.trim()
101 : "info";
102 const message =
103 typeof body.message === "string" && body.message.trim()
104 ? body.message
105 : safeStringify(body);
106 return {
107 ts: now,
108 source,
109 level,
110 message,
111 payload: body,
112 };
113 }
114
115 return {
116 ts: now,
117 source: fallbackSource,
118 level: "info",
119 message: "Unsupported payload",
120 payload: body,
121 };
122}
123
124async function readLogFile(path: string): Promise<string> {
125 try {
126 return await readFile(path, "utf8");
127 } catch {
128 return "";
129 }
130}
131
132function parseLogLine(line: string): RingLogEntry {
133 try {
134 return JSON.parse(line) as RingLogEntry;
135 } catch {
136 return {
137 ts: new Date().toISOString(),
138 source: "ring",
139 level: "info",
140 message: line,
141 payload: line,
142 };
143 }
144}
145
146async function readEntries(path: string, limit: number): Promise<{ entries: RingLogEntry[]; total: number }> {
147 const content = await readLogFile(path);
148 const lines = content.split("\n").filter(Boolean);
149 const total = lines.length;
150 const entries = lines
151 .slice(Math.max(0, total - limit))
152 .map((line) => parseLogLine(line))
153 .reverse();
154 return { entries, total };
155}
156
157function sortInstancesDescByNewest(a: RingInstanceSummary, b: RingInstanceSummary): number {
158 const aTs = a.last_ts ? new Date(a.last_ts).getTime() : 0;
159 const bTs = b.last_ts ? new Date(b.last_ts).getTime() : 0;
160 if (aTs !== bTs) return bTs - aTs;
161 return b.total - a.total;
162}
163
164export async function ringRoutes(app: FastifyInstance) {
165 // Allow plain text ingestion as well as JSON.
166 app.addContentTypeParser(
167 "text/plain",
168 { parseAs: "string" },
169 (_request, body, done) => done(null, body)
170 );
171
172 const ringDataDir = resolve(process.env.RING_DATA_DIR ?? "./data/ring");
173 const globalLogPath = resolve(
174 process.env.RING_LOG_PATH ?? `${ringDataDir}/logs.ndjson`
175 );
176 const repoLogsRoot = resolve(
177 process.env.RING_REPO_LOG_ROOT ?? `${ringDataDir}/repos`
178 );
179
180 function repoLogPath(owner: string, repo: string): string {
181 return resolve(repoLogsRoot, owner, `${repo}.ndjson`);
182 }
183
184 // Backward-compatible global ingest.
185 app.post("/ring/logs", async (request, reply) => {
186 const source =
187 coerceHeaderString(request.headers["x-ring-source"]) ?? "ingest";
188 const entry = normalizePayload(request.body, source);
189 const serialized = `${JSON.stringify(entry)}\n`;
190
191 try {
192 await mkdir(dirname(globalLogPath), { recursive: true });
193 await appendFile(globalLogPath, serialized, "utf8");
194 return { ok: true, entry };
195 } catch (error) {
196 request.log.error({ error }, "Failed to append Ring log");
197 return reply.code(500).send({ error: "Failed to write log entry" });
198 }
199 });
200
201 // Backward-compatible global log view.
202 app.get<{ Querystring: { limit?: string } }>("/ring/logs", async (request) => {
203 const parsed = querySchema.safeParse(request.query);
204 const limit = parsed.success ? parsed.data.limit : 200;
205 return await readEntries(globalLogPath, limit);
206 });
207
208 // Repo-scoped ingest.
209 app.post<{ Params: { owner: string; repo: string } }>(
210 "/repos/:owner/:repo/ring/logs",
211 async (request, reply) => {
212 const paramsParsed = repoParamsSchema.safeParse(request.params);
213 if (!paramsParsed.success) {
214 return reply.code(400).send({ error: "Invalid owner/repo" });
215 }
216 const { owner, repo } = paramsParsed.data;
217 const source =
218 coerceHeaderString(request.headers["x-ring-source"]) ?? `${owner}/${repo}`;
219 const baseEntry = normalizePayload(request.body, source);
220 const entry: RingLogEntry = { ...baseEntry, owner, repo };
221 const serialized = `${JSON.stringify(entry)}\n`;
222
223 try {
224 const path = repoLogPath(owner, repo);
225 await mkdir(dirname(path), { recursive: true });
226 await appendFile(path, serialized, "utf8");
227 return { ok: true, entry };
228 } catch (error) {
229 request.log.error({ error }, "Failed to append repo Ring log");
230 return reply.code(500).send({ error: "Failed to write log entry" });
231 }
232 }
233 );
234
235 // Repo-scoped log view.
236 app.get<{
237 Params: { owner: string; repo: string };
238 Querystring: { limit?: string };
239 }>("/repos/:owner/:repo/ring/logs", async (request, reply) => {
240 const paramsParsed = repoParamsSchema.safeParse(request.params);
241 if (!paramsParsed.success) {
242 return reply.code(400).send({ error: "Invalid owner/repo" });
243 }
244 const { owner, repo } = paramsParsed.data;
245 const queryParsed = querySchema.safeParse(request.query);
246 const limit = queryParsed.success ? queryParsed.data.limit : 200;
247
248 return await readEntries(repoLogPath(owner, repo), limit);
249 });
250
251 // List all repo Ring instances that have been set up (log file exists).
252 app.get("/ring/instances", async () => {
253 const instances: RingInstanceSummary[] = [];
254 let ownerDirs: Dirent[] = [];
255 try {
256 ownerDirs = await readdir(repoLogsRoot, { withFileTypes: true });
257 } catch {
258 return { instances };
259 }
260
261 for (const ownerDir of ownerDirs) {
262 if (!ownerDir.isDirectory()) continue;
263 const owner = ownerDir.name;
264 const ownerPath = resolve(repoLogsRoot, owner);
265 let files: Dirent[] = [];
266 try {
267 files = await readdir(ownerPath, { withFileTypes: true });
268 } catch {
269 continue;
270 }
271 for (const file of files) {
272 if (!file.isFile() || !file.name.endsWith(".ndjson")) continue;
273 const repo = file.name.slice(0, -7);
274 const path = resolve(ownerPath, file.name);
275 const content = await readLogFile(path);
276 const lines = content.split("\n").filter(Boolean);
277 const total = lines.length;
278 const last = total > 0 ? parseLogLine(lines[total - 1]) : null;
279 instances.push({
280 owner,
281 repo,
282 total,
283 last_ts: last?.ts ?? null,
284 last_level: last?.level ?? null,
285 last_message: last?.message ?? null,
286 });
287 }
288 }
289
290 instances.sort(sortInstancesDescByNewest);
291 return { instances };
292 });
293}
294