Skip to content

Commit 71acd36

Browse files
debadree25RafaelGSS
authored andcommitted
stream: implement TransformStream cleanup using "transformer.cancel"
Fixes: #49971 PR-URL: #50126 Reviewed-By: Matteo Collina <[email protected]>
1 parent 3ccd5fa commit 71acd36

File tree

72 files changed

+418
-94
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

72 files changed

+418
-94
lines changed

‎lib/internal/webstreams/readablestream.js‎

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ const{
1414
ObjectCreate,
1515
ObjectDefineProperties,
1616
ObjectSetPrototypeOf,
17+
Promise,
1718
PromisePrototypeThen,
1819
PromiseResolve,
1920
PromiseReject,
@@ -2444,7 +2445,7 @@ function setupReadableStreamDefaultController(
24442445
conststartResult=startAlgorithm();
24452446

24462447
PromisePrototypeThen(
2447-
PromiseResolve(startResult),
2448+
newPromise((r)=>r(startResult)),
24482449
()=>{
24492450
controller[kState].started=true;
24502451
assert(!controller[kState].pulling);
@@ -3243,7 +3244,7 @@ function setupReadableByteStreamController(
32433244
conststartResult=startAlgorithm();
32443245

32453246
PromisePrototypeThen(
3246-
PromiseResolve(startResult),
3247+
newPromise((r)=>r(startResult)),
32473248
()=>{
32483249
controller[kState].started=true;
32493250
assert(!controller[kState].pulling);

‎lib/internal/webstreams/transformstream.js‎

Lines changed: 105 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ const{
66
ObjectDefineProperties,
77
ObjectSetPrototypeOf,
88
PromisePrototypeThen,
9-
PromiseResolve,
109
SymbolToStringTag,
1110
Symbol,
1211
}=primordials;
@@ -47,6 +46,7 @@ const{
4746
nonOpFlush,
4847
kType,
4948
kState,
49+
nonOpCancel,
5050
}=require('internal/webstreams/util');
5151

5252
const{
@@ -384,8 +384,7 @@ function initializeTransformStream(
384384
returntransformStreamDefaultSourcePullAlgorithm(stream);
385385
},
386386
cancel(reason){
387-
transformStreamErrorWritableAndUnblockWrite(stream,reason);
388-
returnPromiseResolve();
387+
returntransformStreamDefaultSourceCancelAlgorithm(stream,reason);
389388
},
390389
},{
391390
highWaterMark: readableHighWaterMark,
@@ -427,6 +426,10 @@ function transformStreamErrorWritableAndUnblockWrite(stream, error){
427426
writableStreamDefaultControllerErrorIfNeeded(
428427
writable[kState].controller,
429428
error);
429+
transformStreamUnblockWrite(stream);
430+
}
431+
432+
functiontransformStreamUnblockWrite(stream){
430433
if(stream[kState].backpressure)
431434
transformStreamSetBackpressure(stream,false);
432435
}
@@ -443,13 +446,15 @@ function setupTransformStreamDefaultController(
443446
stream,
444447
controller,
445448
transformAlgorithm,
446-
flushAlgorithm){
449+
flushAlgorithm,
450+
cancelAlgorithm){
447451
assert(isTransformStream(stream));
448452
assert(stream[kState].controller===undefined);
449453
controller[kState]={
450454
stream,
451455
transformAlgorithm,
452456
flushAlgorithm,
457+
cancelAlgorithm,
453458
};
454459
stream[kState].controller=controller;
455460
}
@@ -460,21 +465,26 @@ function setupTransformStreamDefaultControllerFromTransformer(
460465
constcontroller=newTransformStreamDefaultController(kSkipThrow);
461466
consttransform=transformer?.transform||defaultTransformAlgorithm;
462467
constflush=transformer?.flush||nonOpFlush;
468+
constcancel=transformer?.cancel||nonOpCancel;
463469
consttransformAlgorithm=
464470
FunctionPrototypeBind(transform,transformer);
465471
constflushAlgorithm=
466472
FunctionPrototypeBind(flush,transformer);
473+
constcancelAlgorithm=
474+
FunctionPrototypeBind(cancel,transformer);
467475

468476
setupTransformStreamDefaultController(
469477
stream,
470478
controller,
471479
transformAlgorithm,
472-
flushAlgorithm);
480+
flushAlgorithm,
481+
cancelAlgorithm);
473482
}
474483

475484
functiontransformStreamDefaultControllerClearAlgorithms(controller){
476485
controller[kState].transformAlgorithm=undefined;
477486
controller[kState].flushAlgorithm=undefined;
487+
controller[kState].cancelAlgorithm=undefined;
478488
}
479489

480490
functiontransformStreamDefaultControllerEnqueue(controller,chunk){
@@ -563,7 +573,40 @@ function transformStreamDefaultSinkWriteAlgorithm(stream, chunk){
563573
}
564574

565575
asyncfunctiontransformStreamDefaultSinkAbortAlgorithm(stream,reason){
566-
transformStreamError(stream,reason);
576+
const{
577+
controller,
578+
readable,
579+
}=stream[kState];
580+
581+
if(controller[kState].finishPromise!==undefined){
582+
returncontroller[kState].finishPromise;
583+
}
584+
585+
const{ promise, resolve, reject }=createDeferredPromise();
586+
controller[kState].finishPromise=promise;
587+
constcancelPromise=ensureIsPromise(
588+
controller[kState].cancelAlgorithm,
589+
controller,
590+
reason);
591+
transformStreamDefaultControllerClearAlgorithms(controller);
592+
593+
PromisePrototypeThen(
594+
cancelPromise,
595+
()=>{
596+
if(readable[kState].state==='errored')
597+
reject(readable[kState].storedError);
598+
else{
599+
readableStreamDefaultControllerError(readable[kState].controller,reason);
600+
resolve();
601+
}
602+
},
603+
(error)=>{
604+
readableStreamDefaultControllerError(readable[kState].controller,error);
605+
reject(error);
606+
},
607+
);
608+
609+
returncontroller[kState].finishPromise;
567610
}
568611

569612
functiontransformStreamDefaultSinkCloseAlgorithm(stream){
@@ -572,23 +615,32 @@ function transformStreamDefaultSinkCloseAlgorithm(stream){
572615
controller,
573616
}=stream[kState];
574617

618+
if(controller[kState].finishPromise!==undefined){
619+
returncontroller[kState].finishPromise;
620+
}
621+
const{ promise, resolve, reject }=createDeferredPromise();
622+
controller[kState].finishPromise=promise;
575623
constflushPromise=
576624
ensureIsPromise(
577625
controller[kState].flushAlgorithm,
578626
controller,
579627
controller);
580628
transformStreamDefaultControllerClearAlgorithms(controller);
581-
returnPromisePrototypeThen(
629+
PromisePrototypeThen(
582630
flushPromise,
583631
()=>{
584632
if(readable[kState].state==='errored')
585-
throwreadable[kState].storedError;
586-
readableStreamDefaultControllerClose(readable[kState].controller);
633+
reject(readable[kState].storedError);
634+
else{
635+
readableStreamDefaultControllerClose(readable[kState].controller);
636+
resolve();
637+
}
587638
},
588639
(error)=>{
589-
transformStreamError(stream,error);
590-
throwreadable[kState].storedError;
640+
readableStreamDefaultControllerError(readable[kState].controller,error);
641+
reject(error);
591642
});
643+
returncontroller[kState].finishPromise;
592644
}
593645

594646
functiontransformStreamDefaultSourcePullAlgorithm(stream){
@@ -598,6 +650,48 @@ function transformStreamDefaultSourcePullAlgorithm(stream){
598650
returnstream[kState].backpressureChange.promise;
599651
}
600652

653+
functiontransformStreamDefaultSourceCancelAlgorithm(stream,reason){
654+
const{
655+
controller,
656+
writable,
657+
}=stream[kState];
658+
659+
if(controller[kState].finishPromise!==undefined){
660+
returncontroller[kState].finishPromise;
661+
}
662+
663+
const{ promise, resolve, reject }=createDeferredPromise();
664+
controller[kState].finishPromise=promise;
665+
constcancelPromise=ensureIsPromise(
666+
controller[kState].cancelAlgorithm,
667+
controller,
668+
reason);
669+
transformStreamDefaultControllerClearAlgorithms(controller);
670+
671+
PromisePrototypeThen(cancelPromise,
672+
()=>{
673+
if(writable[kState].state==='errored')
674+
reject(writable[kState].storedError);
675+
else{
676+
writableStreamDefaultControllerErrorIfNeeded(
677+
writable[kState].controller,
678+
reason);
679+
transformStreamUnblockWrite(stream);
680+
resolve();
681+
}
682+
},
683+
(error)=>{
684+
writableStreamDefaultControllerErrorIfNeeded(
685+
writable[kState].controller,
686+
error);
687+
transformStreamUnblockWrite(stream);
688+
reject(error);
689+
},
690+
);
691+
692+
returncontroller[kState].finishPromise;
693+
}
694+
601695
module.exports={
602696
TransformStream,
603697
TransformStreamDefaultController,

‎lib/internal/webstreams/writablestream.js‎

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ const{
77
FunctionPrototypeCall,
88
ObjectDefineProperties,
99
ObjectSetPrototypeOf,
10+
Promise,
1011
PromisePrototypeThen,
1112
PromiseResolve,
1213
PromiseReject,
@@ -1295,7 +1296,7 @@ function setupWritableStreamDefaultController(
12951296
conststartResult=startAlgorithm();
12961297

12971298
PromisePrototypeThen(
1298-
PromiseResolve(startResult),
1299+
newPromise((r)=>r(startResult)),
12991300
()=>{
13001301
assert(stream[kState].state==='writable'||
13011302
stream[kState].state==='erroring');

‎test/fixtures/wpt/README.md‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ Last update:
2727
- performance-timeline: https://github.com/web-platform-tests/wpt/tree/17ebc3aea0/performance-timeline
2828
- resource-timing: https://github.com/web-platform-tests/wpt/tree/22d38586d0/resource-timing
2929
- resources: https://github.com/web-platform-tests/wpt/tree/1e140d63ec/resources
30-
- streams: https://github.com/web-platform-tests/wpt/tree/517e945bbf/streams
30+
- streams: https://github.com/web-platform-tests/wpt/tree/a8872d92b1/streams
3131
- url: https://github.com/web-platform-tests/wpt/tree/c2d7e70b52/url
3232
- user-timing: https://github.com/web-platform-tests/wpt/tree/5ae85bf826/user-timing
3333
- wasm/jsapi: https://github.com/web-platform-tests/wpt/tree/cde25e7e3c/wasm/jsapi

‎test/fixtures/wpt/streams/piping/abort.any.js‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// META: global=window,worker
1+
// META: global=window,worker,shadowrealm
22
// META: script=../resources/recording-streams.js
33
// META: script=../resources/test-utils.js
44
'use strict';

‎test/fixtures/wpt/streams/piping/close-propagation-backward.any.js‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// META: global=window,worker
1+
// META: global=window,worker,shadowrealm
22
// META: script=../resources/recording-streams.js
33
'use strict';
44

‎test/fixtures/wpt/streams/piping/close-propagation-forward.any.js‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// META: global=window,worker
1+
// META: global=window,worker,shadowrealm
22
// META: script=../resources/test-utils.js
33
// META: script=../resources/recording-streams.js
44
'use strict';
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
<!DOCTYPE html>
2+
<scripttype="module">
3+
leta=newReadableStream();
4+
letb=self.open()
5+
letf=newb.WritableStream();
6+
a.pipeThrough(
7+
{"readable": a,"writable": f},
8+
{"signal": AbortSignal.abort()}
9+
)
10+
awaitnewPromise(setTimeout);
11+
structuredClone(undefined,{"transfer": [f]})
12+
</script>

‎test/fixtures/wpt/streams/piping/error-propagation-backward.any.js‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// META: global=window,worker
1+
// META: global=window,worker,shadowrealm
22
// META: script=../resources/test-utils.js
33
// META: script=../resources/recording-streams.js
44
'use strict';

‎test/fixtures/wpt/streams/piping/error-propagation-forward.any.js‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// META: global=window,worker
1+
// META: global=window,worker,shadowrealm
22
// META: script=../resources/test-utils.js
33
// META: script=../resources/recording-streams.js
44
'use strict';

0 commit comments

Comments
(0)