Skip to content

Commit 438b7fd

Browse files
tsctxtargos
authored andcommitted
stream: fix cloned webstreams not being unref correctly
PR-URL: #51526 Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Vinícius Lourenço Claro Cardoso <[email protected]>
1 parent 4b1d25b commit 438b7fd

File tree

5 files changed

+63
-14
lines changed

5 files changed

+63
-14
lines changed

‎lib/internal/webstreams/readablestream.js‎

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -607,7 +607,11 @@ class ReadableStream{
607607
consttransfer=lazyTransfer();
608608
setupReadableStreamDefaultControllerFromSource(
609609
this,
610-
newtransfer.CrossRealmTransformReadableSource(port),
610+
// The MessagePort is set to be referenced when reading.
611+
// After two MessagePorts are closed, there is a problem with
612+
// lingering promise not being properly resolved.
613+
// https://github.com/nodejs/node/issues/51486
614+
newtransfer.CrossRealmTransformReadableSource(port,true),
611615
0,()=>1);
612616
}
613617
}

‎lib/internal/webstreams/transfer.js‎

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -102,10 +102,11 @@ function InternalCloneableDOMException(){
102102
InternalCloneableDOMException[kDeserialize]=()=>{};
103103

104104
classCrossRealmTransformReadableSource{
105-
constructor(port){
105+
constructor(port,unref){
106106
this[kState]={
107107
port,
108108
controller: undefined,
109+
unref,
109110
};
110111

111112
port.onmessage=({ data })=>{
@@ -143,13 +144,19 @@ class CrossRealmTransformReadableSource{
143144
error);
144145
port.close();
145146
};
147+
148+
port.unref();
146149
}
147150

148151
start(controller){
149152
this[kState].controller=controller;
150153
}
151154

152155
asyncpull(){
156+
if(this[kState].unref){
157+
this[kState].unref=false;
158+
this[kState].port.ref();
159+
}
153160
this[kState].port.postMessage({type: 'pull'});
154161
}
155162

@@ -170,11 +177,12 @@ class CrossRealmTransformReadableSource{
170177
}
171178

172179
classCrossRealmTransformWritableSink{
173-
constructor(port){
180+
constructor(port,unref){
174181
this[kState]={
175182
port,
176183
controller: undefined,
177184
backpressurePromise: createDeferredPromise(),
185+
unref,
178186
};
179187

180188
port.onmessage=({ data })=>{
@@ -211,13 +219,18 @@ class CrossRealmTransformWritableSink{
211219
port.close();
212220
};
213221

222+
port.unref();
214223
}
215224

216225
start(controller){
217226
this[kState].controller=controller;
218227
}
219228

220229
asyncwrite(chunk){
230+
if(this[kState].unref){
231+
this[kState].unref=false;
232+
this[kState].port.ref();
233+
}
221234
if(this[kState].backpressurePromise===undefined){
222235
this[kState].backpressurePromise={
223236
promise: PromiseResolve(),
@@ -262,12 +275,12 @@ class CrossRealmTransformWritableSink{
262275
}
263276

264277
functionnewCrossRealmReadableStream(writable,port){
265-
constreadable=
266-
newReadableStream(
267-
newCrossRealmTransformReadableSource(port));
278+
// MessagePort should always be unref.
279+
// There is a problem with the process not terminating.
280+
// https://github.com/nodejs/node/issues/44985
281+
constreadable=newReadableStream(newCrossRealmTransformReadableSource(port,false));
268282

269-
constpromise=
270-
readableStreamPipeTo(readable,writable,false,false,false);
283+
constpromise=readableStreamPipeTo(readable,writable,false,false,false);
271284

272285
setPromiseHandled(promise);
273286

@@ -278,12 +291,15 @@ function newCrossRealmReadableStream(writable, port){
278291
}
279292

280293
functionnewCrossRealmWritableSink(readable,port){
281-
constwritable=
282-
newWritableStream(
283-
newCrossRealmTransformWritableSink(port));
294+
// MessagePort should always be unref.
295+
// There is a problem with the process not terminating.
296+
// https://github.com/nodejs/node/issues/44985
297+
constwritable=newWritableStream(newCrossRealmTransformWritableSink(port,false));
284298

285299
constpromise=readableStreamPipeTo(readable,writable,false,false,false);
300+
286301
setPromiseHandled(promise);
302+
287303
return{
288304
writable,
289305
promise,

‎lib/internal/webstreams/writablestream.js‎

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -263,8 +263,6 @@ class WritableStream{
263263
this[kState].transfer.readable=readable;
264264
this[kState].transfer.promise=promise;
265265

266-
setPromiseHandled(this[kState].transfer.promise);
267-
268266
return{
269267
data: {port: this[kState].transfer.port2},
270268
deserializeInfo:
@@ -283,7 +281,11 @@ class WritableStream{
283281
consttransfer=lazyTransfer();
284282
setupWritableStreamDefaultControllerFromSink(
285283
this,
286-
newtransfer.CrossRealmTransformWritableSink(port),
284+
// The MessagePort is set to be referenced when reading.
285+
// After two MessagePorts are closed, there is a problem with
286+
// lingering promise not being properly resolved.
287+
// https://github.com/nodejs/node/issues/51486
288+
newtransfer.CrossRealmTransformWritableSink(port,true),
287289
1,
288290
()=>1);
289291
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
'use strict';
2+
3+
require('../common');
4+
const{ ok }=require('node:assert');
5+
6+
// This test verifies that cloned ReadableStream and WritableStream instances
7+
// do not keep the process alive. The test fails if it timesout (it should just
8+
// exit immediately)
9+
10+
constrs1=newReadableStream();
11+
constws1=newWritableStream();
12+
13+
const[rs2,ws2]=structuredClone([rs1,ws1],{transfer: [rs1,ws1]});
14+
15+
ok(rs2instanceofReadableStream);
16+
ok(ws2instanceofWritableStream);

‎test/parallel/test-whatwg-webstreams-transfer.js‎

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -464,12 +464,23 @@ const theData = 'hello'
464464
tracker.verify();
465465
});
466466
467+
// We create an interval to keep the event loop alive while
468+
// we wait for the stream read to complete. The reason this is needed is because there's
469+
// otherwise nothing to keep the worker thread event loop alive long enough to actually
470+
// complete the read from the stream. Under the covers the ReadableStream uses an
471+
// unref'd MessagePort to communicate with the main thread. Because the MessagePort
472+
// is unref'd, it's existence would not keep the thread alive on its own. There was previously
473+
// a bug where this MessagePort was ref'd which would block the thread and main thread
474+
// from terminating at all unless the stream was consumed/closed.
475+
const i = setInterval(() =>{}, 1000);
476+
467477
parentPort.onmessage = tracker.calls(({data }) =>{
468478
assert(isReadableStream(data));
469479
const reader = data.getReader();
470480
reader.read().then(tracker.calls((result) =>{
471481
assert(!result.done);
472482
assert(result.value instanceof Uint8Array);
483+
clearInterval(i);
473484
}));
474485
parentPort.close();
475486
});

0 commit comments

Comments
(0)