Skip to content

Commit 41ab269

Browse files
committed
fix: split s3 client into 3 separate clients
1 parent 635177e commit 41ab269

File tree

10 files changed

+120
-72
lines changed

10 files changed

+120
-72
lines changed

src/http/plugins/signals.ts

Lines changed: 15 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -11,45 +11,39 @@ declare module 'fastify' {
1111
}
1212
}
1313

14+
const abortOnce = (ac: AbortController) => {
15+
if (!ac.signal.aborted) ac.abort()
16+
}
17+
1418
export const signals = fastifyPlugin(
1519
async function (fastify: FastifyInstance) {
16-
fastify.addHook('onRequest', async (req, res) => {
20+
fastify.addHook('onRequest', async (req, reply) => {
1721
req.signals = {
1822
body: new AbortController(),
1923
response: new AbortController(),
2024
disconnect: new AbortController(),
2125
}
2226

23-
// Client terminated the request before the body was fully sent
27+
// Body upload interrupted (fires early)
2428
req.raw.once('close', () => {
2529
if (req.raw.aborted) {
26-
req.signals.body.abort()
27-
28-
if (!req.signals.disconnect.signal.aborted) {
29-
req.signals.disconnect.abort()
30-
}
30+
abortOnce(req.signals.body)
31+
abortOnce(req.signals.disconnect)
3132
}
3233
})
3334

34-
// Client terminated the request before server finished sending the response
35-
res.raw.once('close', () => {
36-
const aborted = !res.raw.writableFinished
37-
if (aborted) {
38-
req.signals.response.abort()
39-
40-
if (!req.signals.disconnect.signal.aborted) {
41-
req.signals.disconnect.abort()
42-
}
35+
// Response interrupted (connection closed before finish)
36+
reply.raw.once('close', () => {
37+
if (!reply.raw.writableFinished) {
38+
abortOnce(req.signals.response)
39+
abortOnce(req.signals.disconnect)
4340
}
4441
})
4542
})
4643

4744
fastify.addHook('onRequestAbort', async (req) => {
48-
req.signals.body.abort()
49-
50-
if (!req.signals.disconnect.signal.aborted) {
51-
req.signals.disconnect.abort()
52-
}
45+
abortOnce(req.signals.body)
46+
abortOnce(req.signals.disconnect)
5347
})
5448
},
5549
{ name: 'request-signals' }

src/http/routes/s3/commands/create-multipart-upload.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import { S3ProtocolHandler } from '@storage/protocols/s3/s3-handler'
22
import { S3Router } from '../router'
33
import { ROUTE_OPERATIONS } from '../../operations'
4-
import { S3Backend } from '@storage/backend'
54
import { ERRORS } from '@internal/errors'
65

76
const CreateMultiPartUploadInput = {

src/storage/backend/adapter.ts

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,10 @@ export type UploadPart = {
4646
/**
4747
* A generic storage Adapter to interact with files
4848
*/
49-
export abstract class StorageBackendAdapter {
50-
client: any
51-
constructor() {
52-
this.client = null
53-
}
49+
export abstract class StorageBackendAdapter<T = unknown> {
50+
constructor() {}
51+
52+
abstract getClient(): T
5453

5554
async list(
5655
bucket: string,

src/storage/backend/file.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,10 @@ export class FileBackend implements StorageBackendAdapter {
5858
this.etagAlgorithm = storageFileEtagAlgorithm
5959
}
6060

61+
getClient(): unknown {
62+
return null
63+
}
64+
6165
async list(
6266
bucket: string,
6367
options?: {

src/storage/backend/index.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { StorageBackendAdapter } from './adapter'
22
import { FileBackend } from './file'
33
import { S3Backend, S3ClientOptions } from './s3/adapter'
44
import { getConfig, StorageBackendType } from '../../config'
5+
import { S3Client } from '@aws-sdk/client-s3'
56

67
export * from './s3'
78
export * from './file'
@@ -14,14 +15,18 @@ type ConfigForStorage<Type extends StorageBackendType> = Type extends 's3'
1415
? S3ClientOptions
1516
: undefined
1617

18+
type BackendAdapterForType<Type extends StorageBackendType> = Type extends 's3'
19+
? StorageBackendAdapter<S3Client>
20+
: StorageBackendAdapter
21+
1722
export function createStorageBackend<Type extends StorageBackendType>(
1823
type: Type,
1924
config?: ConfigForStorage<Type>
2025
) {
21-
let storageBackend: StorageBackendAdapter
26+
let storageBackend: BackendAdapterForType<Type>
2227

2328
if (type === 'file') {
24-
storageBackend = new FileBackend()
29+
storageBackend = new FileBackend() as BackendAdapterForType<Type>
2530
} else {
2631
const defaultOptions: S3ClientOptions = {
2732
region: storageS3Region,

src/storage/backend/s3/adapter.ts

Lines changed: 74 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -42,35 +42,74 @@ export interface S3ClientOptions {
4242
accessKey?: string
4343
secretKey?: string
4444
role?: string
45-
httpAgent?: InstrumentedAgent
4645
requestTimeout?: number
46+
httpAgents?: S3HttpAgents
47+
}
48+
49+
interface S3HttpAgents {
50+
api: InstrumentedAgent
51+
upload: InstrumentedAgent
52+
download: InstrumentedAgent
4753
}
4854

4955
/**
5056
* S3Backend
5157
* Interacts with a s3-compatible file system with this S3Adapter
5258
*/
53-
export class S3Backend implements StorageBackendAdapter {
54-
client: S3Client
55-
agent: InstrumentedAgent
59+
export class S3Backend implements StorageBackendAdapter<S3Client> {
60+
apiClient: S3Client
61+
uploadClient: S3Client
62+
downloadClient: S3Client
63+
64+
agents: {
65+
api: InstrumentedAgent
66+
upload: InstrumentedAgent
67+
download: InstrumentedAgent
68+
}
5669

5770
constructor(options: S3ClientOptions) {
58-
this.agent =
59-
options.httpAgent ??
60-
createAgent('s3_default', {
71+
this.agents = options.httpAgents ?? {
72+
api: createAgent('s3_api', {
6173
maxSockets: storageS3MaxSockets,
62-
})
74+
}),
75+
upload: createAgent('s3_upload', {
76+
maxSockets: storageS3MaxSockets,
77+
}),
78+
download: createAgent('s3_download', {
79+
maxSockets: storageS3MaxSockets,
80+
}),
81+
}
6382

64-
if (this.agent.httpsAgent && tracingEnabled) {
65-
this.agent.monitor()
83+
if (this.agents && tracingEnabled) {
84+
Object.values(this.agents).forEach((agent) => {
85+
agent.monitor()
86+
})
6687
}
6788

6889
// Default client for API operations
69-
this.client = this.createS3Client({
90+
this.apiClient = this.createS3Client({
7091
...options,
71-
name: 's3_default',
72-
httpAgent: this.agent,
92+
name: 's3_api',
93+
httpAgent: this.agents.api,
7394
})
95+
96+
// Dedicated client for downloads
97+
this.downloadClient = this.createS3Client({
98+
...options,
99+
name: 's3_download',
100+
httpAgent: this.agents.download,
101+
})
102+
103+
// Dedicated client for uploads
104+
this.uploadClient = this.createS3Client({
105+
...options,
106+
name: 's3_upload',
107+
httpAgent: this.agents.upload,
108+
})
109+
}
110+
111+
getClient() {
112+
return this.apiClient
74113
}
75114

76115
/**
@@ -98,7 +137,7 @@ export class S3Backend implements StorageBackendAdapter {
98137
input.IfModifiedSince = new Date(headers.ifModifiedSince)
99138
}
100139
const command = new GetObjectCommand(input)
101-
const data = await this.client.send(command, {
140+
const data = await this.downloadClient.send(command, {
102141
abortSignal: signal,
103142
})
104143

@@ -144,7 +183,7 @@ export class S3Backend implements StorageBackendAdapter {
144183
const dataStream = tracingFeatures?.upload ? monitorStream(body) : body
145184

146185
const upload = new Upload({
147-
client: this.client,
186+
client: this.uploadClient,
148187
params: {
149188
Bucket: bucketName,
150189
Key: withOptionalVersion(key, version),
@@ -212,7 +251,7 @@ export class S3Backend implements StorageBackendAdapter {
212251
Bucket: bucket,
213252
Key: withOptionalVersion(key, version),
214253
})
215-
await this.client.send(command)
254+
await this.apiClient.send(command)
216255
}
217256

218257
/**
@@ -251,7 +290,7 @@ export class S3Backend implements StorageBackendAdapter {
251290
ContentType: metadata?.mimetype,
252291
CacheControl: metadata?.cacheControl,
253292
})
254-
const data = await this.client.send(command)
293+
const data = await this.apiClient.send(command)
255294
return {
256295
httpStatusCode: data.$metadata.httpStatusCode || 200,
257296
eTag: data.CopyObjectResult?.ETag || '',
@@ -280,7 +319,7 @@ export class S3Backend implements StorageBackendAdapter {
280319
ContinuationToken: options?.nextToken || undefined,
281320
StartAfter: options?.startAfter,
282321
})
283-
const data = await this.client.send(command)
322+
const data = await this.apiClient.send(command)
284323
const keys =
285324
data.Contents?.filter((ele) => {
286325
if (options?.beforeDate) {
@@ -327,7 +366,7 @@ export class S3Backend implements StorageBackendAdapter {
327366
Objects: s3Prefixes,
328367
},
329368
})
330-
await this.client.send(command)
369+
await this.apiClient.send(command)
331370
} catch (e) {
332371
throw StorageBackendError.fromError(e)
333372
}
@@ -349,7 +388,7 @@ export class S3Backend implements StorageBackendAdapter {
349388
Bucket: bucket,
350389
Key: withOptionalVersion(key, version),
351390
})
352-
const data = await this.client.send(command)
391+
const data = await this.apiClient.send(command)
353392
return {
354393
cacheControl: data.CacheControl || 'no-cache',
355394
mimetype: data.ContentType || 'application/octet-stream',
@@ -380,7 +419,7 @@ export class S3Backend implements StorageBackendAdapter {
380419
MaxParts: maxParts,
381420
})
382421

383-
const result = await this.client.send(command)
422+
const result = await this.apiClient.send(command)
384423

385424
return {
386425
parts: result.Parts || [],
@@ -406,7 +445,7 @@ export class S3Backend implements StorageBackendAdapter {
406445
}
407446

408447
const command = new GetObjectCommand(input)
409-
return getSignedUrl(this.client, command, { expiresIn: 600 })
448+
return getSignedUrl(this.apiClient, command, { expiresIn: 600 })
410449
}
411450

412451
async createMultiPartUpload(
@@ -428,7 +467,7 @@ export class S3Backend implements StorageBackendAdapter {
428467
},
429468
})
430469

431-
const resp = await this.client.send(createMultiPart)
470+
const resp = await this.apiClient.send(createMultiPart)
432471

433472
if (!resp.UploadId) {
434473
throw ERRORS.InvalidUploadId()
@@ -457,7 +496,7 @@ export class S3Backend implements StorageBackendAdapter {
457496
ContentLength: length,
458497
})
459498

460-
const resp = await this.client.send(paralellUploadS3, {
499+
const resp = await this.uploadClient.send(paralellUploadS3, {
461500
abortSignal: signal,
462501
})
463502

@@ -491,7 +530,7 @@ export class S3Backend implements StorageBackendAdapter {
491530
UploadId: uploadId,
492531
})
493532

494-
const partsResponse = await this.client.send(listPartsInput)
533+
const partsResponse = await this.apiClient.send(listPartsInput)
495534
parts = partsResponse.Parts || []
496535
}
497536

@@ -507,7 +546,7 @@ export class S3Backend implements StorageBackendAdapter {
507546
},
508547
})
509548

510-
const response = await this.client.send(completeUpload)
549+
const response = await this.apiClient.send(completeUpload)
511550

512551
let location = key
513552
let bucket = bucketName
@@ -534,7 +573,7 @@ export class S3Backend implements StorageBackendAdapter {
534573
Key: key,
535574
UploadId: uploadId,
536575
})
537-
await this.client.send(abortUpload)
576+
await this.apiClient.send(abortUpload)
538577
}
539578

540579
async uploadPartCopy(
@@ -556,7 +595,7 @@ export class S3Backend implements StorageBackendAdapter {
556595
CopySourceRange: bytesRange ? `bytes=${bytesRange.fromByte}-${bytesRange.toByte}` : undefined,
557596
})
558597

559-
const part = await this.client.send(uploadPartCopy)
598+
const part = await this.uploadClient.send(uploadPartCopy)
560599

561600
return {
562601
eTag: part.CopyPartResult?.ETag,
@@ -565,14 +604,18 @@ export class S3Backend implements StorageBackendAdapter {
565604
}
566605

567606
async backup(backupInfo: BackupObjectInfo) {
568-
return new ObjectBackup(this.client, backupInfo).backup()
607+
return new ObjectBackup(this.apiClient, backupInfo).backup()
569608
}
570609

571610
close() {
572-
this.agent.close()
611+
Object.values(this.agents).forEach((agent) => {
612+
agent.close()
613+
})
573614
}
574615

575-
protected createS3Client(options: S3ClientOptions & { name: string }) {
616+
protected createS3Client(
617+
options: S3ClientOptions & { name: string; httpAgent: InstrumentedAgent }
618+
) {
576619
const params: S3ClientConfig = {
577620
region: options.region,
578621
runtime: 'node',

0 commit comments

Comments
 (0)