Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions ChangeLog.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,17 @@ See docs/process.md for more on how version tagging works.

6.0.2 (in development)
----------------------
- Added support for `epoll` (`epoll_create1`/`epoll_ctl`/`epoll_wait`/
`epoll_pwait`) on the legacy (non-WASMFS) JS filesystem, including
level- and edge-triggered modes, `EPOLLONESHOT`, `EPOLLEXCLUSIVE`,
`EPOLLRDHUP`, nesting, and blocking waits under `PROXY_TO_PTHREAD`,
`ASYNCIFY`, and `JSPI`. Also added `emscripten_epoll_set_callback`
(in the new `<emscripten/epoll.h>`, experimental), a non-blocking variant
that delivers an epoll set's readiness to a JS callback with no
`ASYNCIFY`/`JSPI`. As part of this, the (undocumented) `stream_ops.poll`
FS-backend handler signature changed from `poll(stream, timeout)` to
`poll(stream)` returning the current readiness mask; out-of-tree custom FS
backends with a `poll` handler must update. (#27207)
- New `-sNODERAWSOCKETS` setting that backs the POSIX sockets API with real TCP
(`node:net`) and UDP (`node:dgram`) sockets on Node.js, with no `ws`, proxy
process, or pthreads required. Supports incoming and outgoing TCP, UDP, IPv6,
Expand Down
414 changes: 414 additions & 0 deletions src/lib/libepoll.js

Large diffs are not rendered by default.

56 changes: 56 additions & 0 deletions src/lib/libfs.js
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,15 @@ FS.staticInit();`;
readMode = {{{ cDefs.S_IRUGO }}} | {{{ cDefs.S_IXUGO }}};
writeMode = {{{ cDefs.S_IWUGO }}};
mounted = null;
#if USE_CLOSURE_COMPILER
// Closure (@struct) requires these declared ahead of time. The readiness
// wait-queue is populated lazily, and only on nodes that derive real
// readiness (sockets, pipes, an epoll's own node).
/** @type {Set<?>|null} */
listeners = null;
/** @type {number} */
exclTurn = 0;
#endif
constructor(parent, name, mode, rdev) {
if (!parent) {
parent = this; // root node sets parent to itself
Expand Down Expand Up @@ -164,6 +173,48 @@ FS.staticInit();`;
get isDevice() {
return FS.isChrdev(this.mode);
}
// The per-inode readiness wait-queue. The node carries a Set of listener
// entries {cb}; producers (SOCKFS, PIPEFS) call notifyListeners on a
// readiness transition, and poll()/epoll consume it. It lives on the node
// (not the fd) so dup'd fds share one queue. Only nodes that derive real
// readiness (sockets, pipes, and an epoll's own node) ever use this -
// always-ready types (regular files, ttys) never register or notify.
addListener(cb, exclusive = false) {
var entry = {cb, exclusive};
var listeners = (this.listeners ??= new Set());
listeners.add(entry);
return {listeners, entry};
}
notifyListeners(flags) {
// Iterates the set without copying, which is safe ONLY under a
// load-bearing contract that every internal listener must honour:
// 1. A listener must not run user code synchronously (a poll waiter only
// resolves a Promise; an epoll registration only re-lists +
// re-notifies; the epoll callback only schedules a tick). User code
// runs on a later tick, never inside this loop.
// 2. A listener may delete entries only from ITS OWN waiter, never from
// a sibling node's set that may be mid-iteration. (Deleting an entry
// of the set being iterated here is fine - a Set tolerates removal of
// a not-yet-visited entry mid-iteration; mutating a *different* node's
// set is fine because that set is not being iterated.)
// Violating either gives silently skipped wakeups that are near-impossible
// to reproduce. Any new producer/listener must preserve it.
if (!this.listeners) return;
// Fire every non-exclusive listener. Among EPOLLEXCLUSIVE registrations
// (one fd watched by several epolls) wake only one, rotating round-robin
// per node, to avoid a thundering herd. (Only epoll registrations are ever
// exclusive; poll waiters and a node's own consumers are not.)
var excl;
for (var entry of this.listeners) {
if (entry.exclusive) (excl ||= []).push(entry);
else entry.cb(flags);
}
if (excl) {
var i = (this.exclTurn || 0) % excl.length;
this.exclTurn = i + 1;
excl[i].cb(flags);
}
}
},

//
Expand Down Expand Up @@ -1189,6 +1240,11 @@ FS.staticInit();`;
throw new FS.ErrnoError({{{ cDefs.EBADF }}});
}
if (stream.getdents) stream.getdents = null; // free readdir state
// The fd is going away: wake anything waiting on it (poll/epoll) with
// POLLNVAL so a blocking wait unblocks and an epoll registration is evicted
// on its next derive. Only sockets/pipes/epoll ever carry a wait-queue, so
// for every other stream (incl. nodeless noderawfs stdio) this is a no-op.
stream.node?.notifyListeners({{{ cDefs.POLLNVAL }}});
try {
if (stream.stream_ops.close) {
stream.stream_ops.close(stream);
Expand Down
33 changes: 5 additions & 28 deletions src/lib/libpipefs.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,6 @@ addToLibrary({
// able to read from the read end after write end is closed.
refcnt : 2,
timestamp: new Date(),
#if PTHREADS || ASYNCIFY
readableHandlers: [],
registerReadableHandler: (callback) => {
callback.registerCleanupFunc(() => {
const i = pipe.readableHandlers.indexOf(callback);
if (i !== -1) pipe.readableHandlers.splice(i, 1);
});
pipe.readableHandlers.push(callback);
},
notifyReadableHandlers: () => {
while (pipe.readableHandlers.length > 0) {
const cb = pipe.readableHandlers.shift();
if (cb) cb({{{ cDefs.POLLRDNORM }}} | {{{ cDefs.POLLIN }}});
}
pipe.readableHandlers = [];
}
#endif
};

pipe.buckets.push({
Expand All @@ -53,6 +36,8 @@ addToLibrary({

rNode.pipe = pipe;
wNode.pipe = pipe;
// The read end's node carries the poll wait-queue; writes wake it.
pipe.readNode = rNode;

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we never need to notify the write node? I guess we just always accept new writes and buffer them?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently not, because there isn't any capacity limit for writable in the current implementation. This could be added in future.


var readableStream = FS.createStream({
path: rName,
Expand Down Expand Up @@ -97,7 +82,7 @@ addToLibrary({
blocks: 0,
};
},
poll(stream, timeout, notifyCallback) {
poll(stream) {
var pipe = stream.node.pipe;

if ((stream.flags & {{{ cDefs.O_ACCMODE }}}) === {{{ cDefs.O_WRONLY }}}) {
Expand All @@ -108,10 +93,6 @@ addToLibrary({
return ({{{ cDefs.POLLRDNORM }}} | {{{ cDefs.POLLIN }}});
}
}

#if PTHREADS || ASYNCIFY
if (notifyCallback) pipe.registerReadableHandler(notifyCallback);
#endif
return 0;
},
dup(stream) {
Expand Down Expand Up @@ -233,9 +214,7 @@ addToLibrary({
if (freeBytesInCurrBuffer >= dataLen) {
currBucket.buffer.set(data, currBucket.offset);
currBucket.offset += dataLen;
#if PTHREADS || ASYNCIFY
pipe.notifyReadableHandlers();
#endif
pipe.readNode.notifyListeners({{{ cDefs.POLLRDNORM }}} | {{{ cDefs.POLLIN }}});
return dataLen;
} else if (freeBytesInCurrBuffer > 0) {
currBucket.buffer.set(data.subarray(0, freeBytesInCurrBuffer), currBucket.offset);
Expand Down Expand Up @@ -267,9 +246,7 @@ addToLibrary({
newBucket.buffer.set(data);
}

#if PTHREADS || ASYNCIFY
pipe.notifyReadableHandlers();
#endif
pipe.readNode.notifyListeners({{{ cDefs.POLLRDNORM }}} | {{{ cDefs.POLLIN }}});
return dataLen;
},
close(stream) {
Expand Down
1 change: 1 addition & 0 deletions src/lib/libsigs.js
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,7 @@ sigs = {
emscripten_destroy_web_audio_node__sig: 'vi',
emscripten_destroy_worker__sig: 'vi',
emscripten_enter_soft_fullscreen__sig: 'ipp',
emscripten_epoll_set_callback__sig: 'iiipp',
emscripten_err__sig: 'vp',
emscripten_errn__sig: 'vpp',
emscripten_exit_fullscreen__sig: 'i',
Expand Down
17 changes: 16 additions & 1 deletion src/lib/libsockfs.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,18 @@ addToLibrary({
},
emit(event, param) {
SOCKFS.callbacks[event]?.(param);
// Bridge socket readiness into the inode wait-queue (poll/epoll). The
// 'error' event carries [fd, ...]; the rest carry the fd directly.
var fd = event === 'error' ? param[0] : param;
var flags = {
'message': {{{ cDefs.POLLRDNORM }}} | {{{ cDefs.POLLIN }}},
'open': {{{ cDefs.POLLOUT }}},
'connection': {{{ cDefs.POLLRDNORM }}} | {{{ cDefs.POLLIN }}},
'close': {{{ cDefs.POLLIN }}} | {{{ cDefs.POLLHUP }}},
'error': {{{ cDefs.POLLERR }}},
}[event];
// 'listen' has no readiness mapping; skip it.
if (flags) FS.getStream(fd)?.node.notifyListeners(flags);
},
mount(mount) {
#if expectToReceiveOnModule('websocket')
Expand Down Expand Up @@ -417,7 +429,8 @@ addToLibrary({
if (sock.connecting) {
mask |= {{{ cDefs.POLLOUT }}};
} else {
mask |= {{{ cDefs.POLLHUP }}};
// A closed peer is both a full hangup and a read-side hangup.
mask |= {{{ cDefs.POLLHUP }}} | {{{ cDefs.POLLRDHUP }}};
}
}

Expand Down Expand Up @@ -555,6 +568,8 @@ addToLibrary({
// push to queue for accept to pick up
sock.pending.push(newsock);
SOCKFS.emit('connection', newsock.stream.fd);
// A queued client makes the listening socket readable (POLLIN).
sock.stream.node.notifyListeners({{{ cDefs.POLLRDNORM }}} | {{{ cDefs.POLLIN }}});
} else {
// create a peer on the listen socket so calling sendto
// with the listen socket and an address will resolve
Expand Down
7 changes: 6 additions & 1 deletion src/lib/libsockfs_node.js
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,10 @@ var NodeSockFSLibrary = {
} else if (sock.connection && sock.state === {{{ SOCK_STATE_CONNECTED }}} && !sock.writeBlocked) {
mask |= {{{ cDefs.POLLOUT }}};
}
if (sock.readClosed) mask |= {{{ cDefs.POLLHUP }}};
// A peer FIN / read-side hangup (recv will see EOF) is POLLRDHUP; only a
// fully closed connection is a POLLHUP.
if (sock.readClosed) mask |= {{{ cDefs.POLLRDHUP }}};
if (sock.state === {{{ SOCK_STATE_CLOSED }}}) mask |= {{{ cDefs.POLLHUP }}};
return mask;
},
ioctl(sock, request, arg) {
Expand Down Expand Up @@ -507,6 +510,8 @@ var NodeSockFSLibrary = {
try { conn.resume(); } catch (e) {} // paused by pauseOnConnect
sock.pending.push(newsock);
SOCKFS.emit('connection', newsock.stream.fd);
// A queued client makes the listening socket readable (POLLIN).
sock.stream.node.notifyListeners({{{ cDefs.POLLRDNORM }}} | {{{ cDefs.POLLIN }}});
});
server.on('error', (e) => {
sock.error = nodeSockHelpers.nodeErrToErrno(e);
Expand Down
Loading