1
0

feat(server): 实现请求合并和原子化缓存写入提升并发健壮性

- 新增请求合并机制,避免相同URL的高并发回源请求
- 采用原子化写入策略(临时文件+重命名)防止并发读写导致文件损坏
- 重构核心处理逻辑,将缓存操作提取为独立函数
- 优化错误处理和缓存兜底逻辑
This commit is contained in:
2026-01-21 21:36:06 +08:00
parent 13ca7372a8
commit e9bfa3c61f
2 changed files with 127 additions and 76 deletions

View File

@@ -56,3 +56,6 @@
- 新增备用接口机制:当 mShots 返回无效图片(如生成中 GIF或失败时自动降级尝试使用 `thum.io` 获取截图,确保高可用性。 - 新增备用接口机制:当 mShots 返回无效图片(如生成中 GIF或失败时自动降级尝试使用 `thum.io` 获取截图,确保高可用性。
- 性能优化:将关键路径上的同步文件 I/O (readFileSync/writeFileSync) 替换为异步操作 (fs.promises),防止高并发下 Event Loop 阻塞导致服务无响应。 - 性能优化:将关键路径上的同步文件 I/O (readFileSync/writeFileSync) 替换为异步操作 (fs.promises),防止高并发下 Event Loop 阻塞导致服务无响应。
- 用户体验优化:当在浏览器中直接访问 API (Accept: text/html) 时,返回一个带有加载动画的 HTML 页面,解决等待过程中的白屏问题。 - 用户体验优化:当在浏览器中直接访问 API (Accept: text/html) 时,返回一个带有加载动画的 HTML 页面,解决等待过程中的白屏问题。
- 并发健壮性提升:
- 实现 **请求合并 (Request Coalescing)**:当多个客户端同时请求同一个未缓存的 URL 时,复用同一个回源请求,避免瞬间高并发流量击穿上游 (Thundering Herd)。
- 实现 **原子化缓存写入 (Atomic Write)**:使用“写临时文件 + 重命名”策略,确保缓存文件在写入过程中不会被读取到不完整的数据,彻底解决并发读写导致的文件损坏问题。

190
server.js
View File

@@ -15,6 +15,9 @@ const CACHE_DIR = path.join(process.cwd(), 'cache');
fs.mkdirSync(CACHE_DIR, { recursive: true }); fs.mkdirSync(CACHE_DIR, { recursive: true });
// 请求合并 Map (Deduplication)
const pendingRequests = new Map();
/** /**
* 计算 SHA1 哈希 * 计算 SHA1 哈希
* @param {string} input * @param {string} input
@@ -172,7 +175,93 @@ async function fetchFallbackWithRetry(targetUrl) {
} }
/** /**
* 核心处理逻辑:检查缓存 -> 回源 -> (失败则) 备用接口 -> 写入缓存 -> 返回响应 * 执行回源、备用请求并写入缓存
* @param {string} upstreamUrl
* @param {string} targetUrl
* @param {string} key
* @returns {Promise<{data: Buffer, contentType: string, status: number}>}
*/
async function fetchAndCache(upstreamUrl, targetUrl, key) {
const { data: dataPath, meta: metaPath } = getCachePaths(key);
// 1. 回源请求
let finalResp;
let isFallback = false;
try {
const resp = await fetchUpstreamWithRetry(upstreamUrl);
finalResp = resp;
// 2. 检查响应是否有效
if (!isValidImageResponse(resp.status, resp.headers, resp.data)) {
console.log(`[upstream-invalid] url=${upstreamUrl} status=${resp.status} len=${resp.data.byteLength}, trying fallback...`);
const fallbackResp = await fetchFallbackWithRetry(targetUrl);
if (fallbackResp && isValidImageResponse(fallbackResp.status, fallbackResp.headers, fallbackResp.data)) {
console.log(`[fallback-success] url=${targetUrl}`);
finalResp = fallbackResp;
isFallback = true;
} else {
console.log(`[fallback-failed] url=${targetUrl}, returning original response`);
}
}
} catch (err) {
console.error(`[upstream-failed] url=${upstreamUrl} err=${err.message}`);
// 如果回源彻底失败,抛出错误,以便上层处理(如兜底读取旧缓存)
throw err;
}
// 3. 仅缓存有效图片
if (isValidImageResponse(finalResp.status, finalResp.headers, finalResp.data)) {
const contentType = finalResp.headers['content-type'] || 'image/jpeg';
const meta = {
url: isFallback ? `fallback:${targetUrl}` : upstreamUrl,
contentType,
size: finalResp.data.byteLength,
createdAt: new Date().toISOString(),
source: isFallback ? 'thum.io' : 'mshots'
};
// 原子化写入:先写临时文件,再重命名
const tempSuffix = `.${Date.now()}-${Math.random().toString(36).slice(2)}.tmp`;
const tempDataPath = dataPath + tempSuffix;
const tempMetaPath = metaPath + tempSuffix;
try {
await fsPromises.writeFile(tempDataPath, finalResp.data);
await fsPromises.writeFile(tempMetaPath, JSON.stringify(meta));
// 重命名 (原子操作)
await fsPromises.rename(tempDataPath, dataPath);
await fsPromises.rename(tempMetaPath, metaPath);
} catch (e) {
console.error(`[cache-write-error] ${e.message}`);
// 尝试清理临时文件
try { await fsPromises.unlink(tempDataPath); } catch (_) {}
try { await fsPromises.unlink(tempMetaPath); } catch (_) {}
}
return {
status: 200,
headers: finalResp.headers,
data: finalResp.data,
contentType,
isFallback
};
}
// 无效响应,直接返回
return {
status: finalResp.status,
headers: finalResp.headers,
data: finalResp.data,
contentType: finalResp.headers['content-type'],
isFallback
};
}
/**
* 核心处理逻辑:检查缓存 -> (合并请求) -> 回源 -> (失败则) 备用接口 -> 写入缓存 -> 返回响应
* @param {object} res * @param {object} res
* @param {string} upstreamUrl * @param {string} upstreamUrl
* @param {string} targetUrl * @param {string} targetUrl
@@ -193,7 +282,6 @@ async function handleProxyRequest(res, upstreamUrl, targetUrl) {
meta = JSON.parse(metaRaw); meta = JSON.parse(metaRaw);
} catch (e) { } catch (e) {
console.warn(`[cache-warn] meta corrupted for ${upstreamUrl}`); console.warn(`[cache-warn] meta corrupted for ${upstreamUrl}`);
// meta 损坏,视为无缓存,继续回源
} }
if (meta) { if (meta) {
@@ -212,7 +300,6 @@ async function handleProxyRequest(res, upstreamUrl, targetUrl) {
res.set('X-Source', 'mshots-cache'); res.set('X-Source', 'mshots-cache');
} }
// 流式返回
const stream = fs.createReadStream(dataPath); const stream = fs.createReadStream(dataPath);
stream.pipe(res); stream.pipe(res);
return; return;
@@ -221,81 +308,40 @@ async function handleProxyRequest(res, upstreamUrl, targetUrl) {
} }
} }
} catch (err) { } catch (err) {
// 文件不存在或其他错误,忽略,继续回源 // 缓存未命中
} }
// 2. 回源请求 (带请求合并/去重)
try { try {
// 2. 回源请求 let resultPromise;
const resp = await fetchUpstreamWithRetry(upstreamUrl);
// 3. 检查响应是否有效 if (pendingRequests.has(key)) {
let finalResp = resp; console.log(`[coalesce-hit] joining pending request for ${upstreamUrl}`);
let isFallback = false; resultPromise = pendingRequests.get(key);
if (!isValidImageResponse(resp.status, resp.headers, resp.data)) {
console.log(`[upstream-invalid] url=${upstreamUrl} status=${resp.status} len=${resp.data.byteLength}, trying fallback...`);
const fallbackResp = await fetchFallbackWithRetry(targetUrl);
if (fallbackResp && isValidImageResponse(fallbackResp.status, fallbackResp.headers, fallbackResp.data)) {
console.log(`[fallback-success] url=${targetUrl}`);
finalResp = fallbackResp;
isFallback = true;
} else { } else {
console.log(`[fallback-failed] url=${targetUrl}, returning original response`); resultPromise = fetchAndCache(upstreamUrl, targetUrl, key);
} pendingRequests.set(key, resultPromise);
// 无论成功失败,结束后移除 map
resultPromise.finally(() => {
pendingRequests.delete(key);
});
} }
// 4. 仅缓存有效图片 (无论是回源还是备用) const result = await resultPromise;
if (isValidImageResponse(finalResp.status, finalResp.headers, finalResp.data)) {
const contentType = finalResp.headers['content-type'] || 'image/jpeg'; // 返回结果
const meta = { res.status(result.status);
url: isFallback ? `fallback:${targetUrl}` : upstreamUrl, if (result.contentType) res.type(result.contentType);
contentType,
size: finalResp.data.byteLength,
createdAt: new Date().toISOString(),
source: isFallback ? 'thum.io' : 'mshots'
};
try {
await fsPromises.writeFile(dataPath, finalResp.data);
await fsPromises.writeFile(metaPath, JSON.stringify(meta));
} catch (e) {
// 写盘失败也继续返回
}
res.type(contentType);
res.set('Cache-Control', 'public, max-age=315360000, immutable'); res.set('Cache-Control', 'public, max-age=315360000, immutable');
if (isFallback) { if (result.isFallback) {
res.set('X-Source', 'fallback-thum.io'); res.set('X-Source', 'fallback-thum.io');
} }
return res.send(finalResp.data); return res.send(result.data);
}
// 5. 非图片或非200兜底策略
// 若本地已有有效缓存(可能是旧的但有效),优先返回缓存
try {
await fsPromises.access(dataPath);
await fsPromises.access(metaPath);
const metaRaw = await fsPromises.readFile(metaPath, 'utf8');
const meta = JSON.parse(metaRaw);
if (meta.contentType && meta.contentType.toLowerCase().startsWith('image/')) {
res.set('Cache-Control', 'public, max-age=315360000, immutable');
res.type(meta.contentType);
const stream = fs.createReadStream(dataPath);
stream.on('error', () => {
if (!res.headersSent) res.status(500).send('Cache read error');
});
return stream.pipe(res);
}
} catch (_) {}
// 否则透传上游响应
res.status(resp.status);
if (resp.headers['content-type']) res.type(resp.headers['content-type']);
return res.send(resp.data);
} catch (err) { } catch (err) {
// 5. 回源彻底失败 // 3. 回源彻底失败
// 若本地有缓存可兜底 // 若本地有缓存可兜底 (即使过期)
try { try {
await fsPromises.access(dataPath); await fsPromises.access(dataPath);
await fsPromises.access(metaPath); await fsPromises.access(metaPath);
@@ -303,18 +349,20 @@ async function handleProxyRequest(res, upstreamUrl, targetUrl) {
const metaRaw = await fsPromises.readFile(metaPath, 'utf8'); const metaRaw = await fsPromises.readFile(metaPath, 'utf8');
const meta = JSON.parse(metaRaw); const meta = JSON.parse(metaRaw);
if (meta.contentType && meta.contentType.toLowerCase().startsWith('image/')) { if (meta.contentType && meta.contentType.toLowerCase().startsWith('image/')) {
console.log(`[fallback-cache] using stale cache for ${upstreamUrl}`);
res.set('Cache-Control', 'public, max-age=315360000, immutable'); res.set('Cache-Control', 'public, max-age=315360000, immutable');
res.type(meta.contentType); res.type(meta.contentType);
const stream = fs.createReadStream(dataPath); const stream = fs.createReadStream(dataPath);
stream.on('error', () => { stream.pipe(res);
if (!res.headersSent) res.status(500).send('Cache read error'); return;
});
return stream.pipe(res);
} }
} catch (_) {} } catch (_) {}
console.error(`[upstream-failed] url=${upstreamUrl} err=${err.message}`);
console.error(`[upstream-failed-final] url=${upstreamUrl} err=${err.message}`);
if (!res.headersSent) {
return res.status(502).type('text/plain').send('Upstream error'); return res.status(502).type('text/plain').send('Upstream error');
} }
}
} }
// 反代 mShots路径 /mshots/v1/... // 反代 mShots路径 /mshots/v1/...