diff --git a/README.md b/README.md index e6f5d35..ce0a790 100644 --- a/README.md +++ b/README.md @@ -56,3 +56,6 @@ - 新增备用接口机制:当 mShots 返回无效图片(如生成中 GIF)或失败时,自动降级尝试使用 `thum.io` 获取截图,确保高可用性。 - 性能优化:将关键路径上的同步文件 I/O (readFileSync/writeFileSync) 替换为异步操作 (fs.promises),防止高并发下 Event Loop 阻塞导致服务无响应。 - 用户体验优化:当在浏览器中直接访问 API (Accept: text/html) 时,返回一个带有加载动画的 HTML 页面,解决等待过程中的白屏问题。 + - 并发健壮性提升: + - 实现 **请求合并 (Request Coalescing)**:当多个客户端同时请求同一个未缓存的 URL 时,复用同一个回源请求,避免瞬间高并发流量击穿上游 (Thundering Herd)。 + - 实现 **原子化缓存写入 (Atomic Write)**:使用“写临时文件 + 重命名”策略,确保缓存文件在写入过程中不会被读取到不完整的数据,彻底解决并发读写导致的文件损坏问题。 diff --git a/server.js b/server.js index 8dd02d8..9a27097 100644 --- a/server.js +++ b/server.js @@ -15,6 +15,9 @@ const CACHE_DIR = path.join(process.cwd(), 'cache'); fs.mkdirSync(CACHE_DIR, { recursive: true }); +// 请求合并 Map (Deduplication) +const pendingRequests = new Map(); + /** * 计算 SHA1 哈希 * @param {string} input @@ -172,7 +175,93 @@ async function fetchFallbackWithRetry(targetUrl) { } /** - * 核心处理逻辑:检查缓存 -> 回源 -> (失败则) 备用接口 -> 写入缓存 -> 返回响应 + * 执行回源、备用请求并写入缓存 + * @param {string} upstreamUrl + * @param {string} targetUrl + * @param {string} key + * @returns {Promise<{data: Buffer, contentType: string, status: number}>} + */ +async function fetchAndCache(upstreamUrl, targetUrl, key) { + const { data: dataPath, meta: metaPath } = getCachePaths(key); + + // 1. 回源请求 + let finalResp; + let isFallback = false; + + try { + const resp = await fetchUpstreamWithRetry(upstreamUrl); + finalResp = resp; + + // 2. 检查响应是否有效 + if (!isValidImageResponse(resp.status, resp.headers, resp.data)) { + console.log(`[upstream-invalid] url=${upstreamUrl} status=${resp.status} len=${resp.data.byteLength}, trying fallback...`); + const fallbackResp = await fetchFallbackWithRetry(targetUrl); + + if (fallbackResp && isValidImageResponse(fallbackResp.status, fallbackResp.headers, fallbackResp.data)) { + console.log(`[fallback-success] url=${targetUrl}`); + finalResp = fallbackResp; + isFallback = true; + } else { + console.log(`[fallback-failed] url=${targetUrl}, returning original response`); + } + } + } catch (err) { + console.error(`[upstream-failed] url=${upstreamUrl} err=${err.message}`); + // 如果回源彻底失败,抛出错误,以便上层处理(如兜底读取旧缓存) + throw err; + } + + // 3. 仅缓存有效图片 + if (isValidImageResponse(finalResp.status, finalResp.headers, finalResp.data)) { + const contentType = finalResp.headers['content-type'] || 'image/jpeg'; + const meta = { + url: isFallback ? `fallback:${targetUrl}` : upstreamUrl, + contentType, + size: finalResp.data.byteLength, + createdAt: new Date().toISOString(), + source: isFallback ? 'thum.io' : 'mshots' + }; + + // 原子化写入:先写临时文件,再重命名 + const tempSuffix = `.${Date.now()}-${Math.random().toString(36).slice(2)}.tmp`; + const tempDataPath = dataPath + tempSuffix; + const tempMetaPath = metaPath + tempSuffix; + + try { + await fsPromises.writeFile(tempDataPath, finalResp.data); + await fsPromises.writeFile(tempMetaPath, JSON.stringify(meta)); + + // 重命名 (原子操作) + await fsPromises.rename(tempDataPath, dataPath); + await fsPromises.rename(tempMetaPath, metaPath); + } catch (e) { + console.error(`[cache-write-error] ${e.message}`); + // 尝试清理临时文件 + try { await fsPromises.unlink(tempDataPath); } catch (_) {} + try { await fsPromises.unlink(tempMetaPath); } catch (_) {} + } + + return { + status: 200, + headers: finalResp.headers, + data: finalResp.data, + contentType, + isFallback + }; + } + + // 无效响应,直接返回 + return { + status: finalResp.status, + headers: finalResp.headers, + data: finalResp.data, + contentType: finalResp.headers['content-type'], + isFallback + }; +} + +/** + * 核心处理逻辑:检查缓存 -> (合并请求) -> 回源 -> (失败则) 备用接口 -> 写入缓存 -> 返回响应 * @param {object} res * @param {string} upstreamUrl * @param {string} targetUrl @@ -193,7 +282,6 @@ async function handleProxyRequest(res, upstreamUrl, targetUrl) { meta = JSON.parse(metaRaw); } catch (e) { console.warn(`[cache-warn] meta corrupted for ${upstreamUrl}`); - // meta 损坏,视为无缓存,继续回源 } if (meta) { @@ -212,7 +300,6 @@ async function handleProxyRequest(res, upstreamUrl, targetUrl) { res.set('X-Source', 'mshots-cache'); } - // 流式返回 const stream = fs.createReadStream(dataPath); stream.pipe(res); return; @@ -221,81 +308,40 @@ async function handleProxyRequest(res, upstreamUrl, targetUrl) { } } } catch (err) { - // 文件不存在或其他错误,忽略,继续回源 + // 缓存未命中 } + // 2. 回源请求 (带请求合并/去重) try { - // 2. 回源请求 - const resp = await fetchUpstreamWithRetry(upstreamUrl); + let resultPromise; - // 3. 检查响应是否有效 - let finalResp = resp; - let isFallback = false; - - if (!isValidImageResponse(resp.status, resp.headers, resp.data)) { - console.log(`[upstream-invalid] url=${upstreamUrl} status=${resp.status} len=${resp.data.byteLength}, trying fallback...`); - const fallbackResp = await fetchFallbackWithRetry(targetUrl); - - if (fallbackResp && isValidImageResponse(fallbackResp.status, fallbackResp.headers, fallbackResp.data)) { - console.log(`[fallback-success] url=${targetUrl}`); - finalResp = fallbackResp; - isFallback = true; - } else { - console.log(`[fallback-failed] url=${targetUrl}, returning original response`); - } - } - - // 4. 仅缓存有效图片 (无论是回源还是备用) - if (isValidImageResponse(finalResp.status, finalResp.headers, finalResp.data)) { - const contentType = finalResp.headers['content-type'] || 'image/jpeg'; - const meta = { - url: isFallback ? `fallback:${targetUrl}` : upstreamUrl, - contentType, - size: finalResp.data.byteLength, - createdAt: new Date().toISOString(), - source: isFallback ? 'thum.io' : 'mshots' - }; - try { - await fsPromises.writeFile(dataPath, finalResp.data); - await fsPromises.writeFile(metaPath, JSON.stringify(meta)); - } catch (e) { - // 写盘失败也继续返回 - } - res.type(contentType); - res.set('Cache-Control', 'public, max-age=315360000, immutable'); - if (isFallback) { - res.set('X-Source', 'fallback-thum.io'); - } - return res.send(finalResp.data); - } - - // 5. 非图片或非200:兜底策略 - // 若本地已有有效缓存(可能是旧的但有效),优先返回缓存 - try { - await fsPromises.access(dataPath); - await fsPromises.access(metaPath); - - const metaRaw = await fsPromises.readFile(metaPath, 'utf8'); - const meta = JSON.parse(metaRaw); - if (meta.contentType && meta.contentType.toLowerCase().startsWith('image/')) { - res.set('Cache-Control', 'public, max-age=315360000, immutable'); - res.type(meta.contentType); - const stream = fs.createReadStream(dataPath); - stream.on('error', () => { - if (!res.headersSent) res.status(500).send('Cache read error'); + if (pendingRequests.has(key)) { + console.log(`[coalesce-hit] joining pending request for ${upstreamUrl}`); + resultPromise = pendingRequests.get(key); + } else { + resultPromise = fetchAndCache(upstreamUrl, targetUrl, key); + pendingRequests.set(key, resultPromise); + + // 无论成功失败,结束后移除 map + resultPromise.finally(() => { + pendingRequests.delete(key); }); - return stream.pipe(res); - } - } catch (_) {} + } - // 否则透传上游响应 - res.status(resp.status); - if (resp.headers['content-type']) res.type(resp.headers['content-type']); - return res.send(resp.data); + const result = await resultPromise; + + // 返回结果 + res.status(result.status); + if (result.contentType) res.type(result.contentType); + res.set('Cache-Control', 'public, max-age=315360000, immutable'); + if (result.isFallback) { + res.set('X-Source', 'fallback-thum.io'); + } + return res.send(result.data); } catch (err) { - // 5. 回源彻底失败 - // 若本地有缓存可兜底 + // 3. 回源彻底失败 + // 若本地有缓存可兜底 (即使过期) try { await fsPromises.access(dataPath); await fsPromises.access(metaPath); @@ -303,17 +349,19 @@ async function handleProxyRequest(res, upstreamUrl, targetUrl) { const metaRaw = await fsPromises.readFile(metaPath, 'utf8'); const meta = JSON.parse(metaRaw); if (meta.contentType && meta.contentType.toLowerCase().startsWith('image/')) { + console.log(`[fallback-cache] using stale cache for ${upstreamUrl}`); res.set('Cache-Control', 'public, max-age=315360000, immutable'); res.type(meta.contentType); const stream = fs.createReadStream(dataPath); - stream.on('error', () => { - if (!res.headersSent) res.status(500).send('Cache read error'); - }); - return stream.pipe(res); + stream.pipe(res); + return; } } catch (_) {} - console.error(`[upstream-failed] url=${upstreamUrl} err=${err.message}`); - return res.status(502).type('text/plain').send('Upstream error'); + + console.error(`[upstream-failed-final] url=${upstreamUrl} err=${err.message}`); + if (!res.headersSent) { + return res.status(502).type('text/plain').send('Upstream error'); + } } }