Skip to content

Commit d0a6f35

Browse files
tsctxrichardlau
authored andcommitted
stream: fix cloned webstreams not being unref correctly
PR-URL: #51526 Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Vinícius Lourenço Claro Cardoso <[email protected]>
1 parent 35675aa commit d0a6f35

File tree

5 files changed

+63
-14
lines changed

5 files changed

+63
-14
lines changed

‎lib/internal/webstreams/readablestream.js‎

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -620,7 +620,11 @@ class ReadableStream{
620620
consttransfer=lazyTransfer();
621621
setupReadableStreamDefaultControllerFromSource(
622622
this,
623-
newtransfer.CrossRealmTransformReadableSource(port),
623+
// The MessagePort is set to be referenced when reading.
624+
// After two MessagePorts are closed, there is a problem with
625+
// lingering promise not being properly resolved.
626+
// https://github.com/nodejs/node/issues/51486
627+
newtransfer.CrossRealmTransformReadableSource(port,true),
624628
0,()=>1);
625629
}
626630
}

‎lib/internal/webstreams/transfer.js‎

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -104,10 +104,11 @@ function InternalCloneableDOMException(){
104104
InternalCloneableDOMException[kDeserialize]=()=>{};
105105

106106
classCrossRealmTransformReadableSource{
107-
constructor(port){
107+
constructor(port,unref){
108108
this[kState]={
109109
port,
110110
controller: undefined,
111+
unref,
111112
};
112113

113114
port.onmessage=({ data })=>{
@@ -145,13 +146,19 @@ class CrossRealmTransformReadableSource{
145146
error);
146147
port.close();
147148
};
149+
150+
port.unref();
148151
}
149152

150153
start(controller){
151154
this[kState].controller=controller;
152155
}
153156

154157
asyncpull(){
158+
if(this[kState].unref){
159+
this[kState].unref=false;
160+
this[kState].port.ref();
161+
}
155162
this[kState].port.postMessage({type: 'pull'});
156163
}
157164

@@ -172,11 +179,12 @@ class CrossRealmTransformReadableSource{
172179
}
173180

174181
classCrossRealmTransformWritableSink{
175-
constructor(port){
182+
constructor(port,unref){
176183
this[kState]={
177184
port,
178185
controller: undefined,
179186
backpressurePromise: createDeferredPromise(),
187+
unref,
180188
};
181189

182190
port.onmessage=({ data })=>{
@@ -213,13 +221,18 @@ class CrossRealmTransformWritableSink{
213221
port.close();
214222
};
215223

224+
port.unref();
216225
}
217226

218227
start(controller){
219228
this[kState].controller=controller;
220229
}
221230

222231
asyncwrite(chunk){
232+
if(this[kState].unref){
233+
this[kState].unref=false;
234+
this[kState].port.ref();
235+
}
223236
if(this[kState].backpressurePromise===undefined){
224237
this[kState].backpressurePromise={
225238
promise: PromiseResolve(),
@@ -264,12 +277,12 @@ class CrossRealmTransformWritableSink{
264277
}
265278

266279
functionnewCrossRealmReadableStream(writable,port){
267-
constreadable=
268-
newReadableStream(
269-
newCrossRealmTransformReadableSource(port));
280+
// MessagePort should always be unref.
281+
// There is a problem with the process not terminating.
282+
// https://github.com/nodejs/node/issues/44985
283+
constreadable=newReadableStream(newCrossRealmTransformReadableSource(port,false));
270284

271-
constpromise=
272-
readableStreamPipeTo(readable,writable,false,false,false);
285+
constpromise=readableStreamPipeTo(readable,writable,false,false,false);
273286

274287
setPromiseHandled(promise);
275288

@@ -280,12 +293,15 @@ function newCrossRealmReadableStream(writable, port){
280293
}
281294

282295
functionnewCrossRealmWritableSink(readable,port){
283-
constwritable=
284-
newWritableStream(
285-
newCrossRealmTransformWritableSink(port));
296+
// MessagePort should always be unref.
297+
// There is a problem with the process not terminating.
298+
// https://github.com/nodejs/node/issues/44985
299+
constwritable=newWritableStream(newCrossRealmTransformWritableSink(port,false));
286300

287301
constpromise=readableStreamPipeTo(readable,writable,false,false,false);
302+
288303
setPromiseHandled(promise);
304+
289305
return{
290306
writable,
291307
promise,

‎lib/internal/webstreams/writablestream.js‎

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -294,8 +294,6 @@ class WritableStream{
294294
this[kState].transfer.readable=readable;
295295
this[kState].transfer.promise=promise;
296296

297-
setPromiseHandled(this[kState].transfer.promise);
298-
299297
return{
300298
data: {port: this[kState].transfer.port2},
301299
deserializeInfo:
@@ -314,7 +312,11 @@ class WritableStream{
314312
consttransfer=lazyTransfer();
315313
setupWritableStreamDefaultControllerFromSink(
316314
this,
317-
newtransfer.CrossRealmTransformWritableSink(port),
315+
// The MessagePort is set to be referenced when reading.
316+
// After two MessagePorts are closed, there is a problem with
317+
// lingering promise not being properly resolved.
318+
// https://github.com/nodejs/node/issues/51486
319+
newtransfer.CrossRealmTransformWritableSink(port,true),
318320
1,
319321
()=>1);
320322
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
'use strict';
2+
3+
require('../common');
4+
const{ ok }=require('node:assert');
5+
6+
// This test verifies that cloned ReadableStream and WritableStream instances
7+
// do not keep the process alive. The test fails if it timesout (it should just
8+
// exit immediately)
9+
10+
constrs1=newReadableStream();
11+
constws1=newWritableStream();
12+
13+
const[rs2,ws2]=structuredClone([rs1,ws1],{transfer: [rs1,ws1]});
14+
15+
ok(rs2instanceofReadableStream);
16+
ok(ws2instanceofWritableStream);

‎test/parallel/test-whatwg-webstreams-transfer.js‎

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -454,12 +454,23 @@ const theData = 'hello'
454454
tracker.verify();
455455
});
456456
457+
// We create an interval to keep the event loop alive while
458+
// we wait for the stream read to complete. The reason this is needed is because there's
459+
// otherwise nothing to keep the worker thread event loop alive long enough to actually
460+
// complete the read from the stream. Under the covers the ReadableStream uses an
461+
// unref'd MessagePort to communicate with the main thread. Because the MessagePort
462+
// is unref'd, it's existence would not keep the thread alive on its own. There was previously
463+
// a bug where this MessagePort was ref'd which would block the thread and main thread
464+
// from terminating at all unless the stream was consumed/closed.
465+
const i = setInterval(() =>{}, 1000);
466+
457467
parentPort.onmessage = tracker.calls(({data }) =>{
458468
assert(isReadableStream(data));
459469
const reader = data.getReader();
460470
reader.read().then(tracker.calls((result) =>{
461471
assert(!result.done);
462472
assert(result.value instanceof Uint8Array);
473+
clearInterval(i);
463474
}));
464475
parentPort.close();
465476
});

0 commit comments

Comments
(0)