NestJS server push log stream output

Original link: https://innei.ren/posts/programming/shell-output-via-sse

The rendering is generated by marked, there may be some incomprehensible sentences or typesetting problems. For the best experience, please go to: https://innei.ren/posts/programming/shell-output-via-sse

Two requirements have been made recently, one is to hot update the background panel, and the other is to install and upgrade dependencies. Both are full tasks. In order to visualize the task progress on the front end, real-time log output is required. Similar to log output for builds like GitHub Action, Vercel, etc. Generally speaking, WebSocket is used for real-time output. Although Socket.IO is also used in the project, building a Socket.IO is expensive by design. Considering HTTP2 SSE might be worth a shot.

SSE interface design

SSE stands for Server Push. A long connection can be established with the server, and the server can send data multiple times like the browser, which is very suitable for log output.

The @Sse decorator is provided in @Sse , which is used in the same way as traditional HTTP requests. SSE requests are essentially HTTP GET requests, but the Content-Type has become text/event-stream .

0710142313.png

In the NestJS SSE interface, in order to realize a stream transmission, the ability of rxjs needs to be used, and the observable objects in rxjs can be used to continuously transmit data. A simple flow is:

 @Sse('/hello') async helloSse() { const ob$ = new Observable((subscriber) => { subscriber.next('hello') subscriber.next('bye!') subscriber.complete() }) return ob$ }

The browser uses an EventSource to connect.

 const ev = new EventSource('http://localhost:2333/debug/hello') ev.onmessage = (e) => { log.innerText += e.data + '\n' } ev.onerror = (e) => { if (!e) { ev.close() } }

0710143736.png

But after the streaming is successful, the browser resends the request. It keeps repeating as long as the connection is not closed manually. Here temporarily use the following code HACK.

 const ev = new EventSource('http://localhost:2333/debug/hello') let opened = false ev.onopen = (e) => { opened = true console.log('opened') } ev.onmessage = (e) => { log.innerText += e.data + '\n' } ev.onerror = (e) => { if (!e) { ev.close() } if (e.target.readyState === EventSource.CONNECTING && opened) { return ev.close() } if (e.target.readyState === EventSource.CLOSED) { log.innerText += 'closed' ev.close() } }

Because after completing the transmission, there will be an error event.

Using zx and node-pty to realize the transmission of shell output stream

zx is a library open sourced by Google for easy scripting. Not only for writing scripts, I also use it a lot in the project, but now zx drop the support for node-cjs, here is Amway zx-cjs library, and update the node-cjs compatible zx synchronously in real time by modifying the official construction method Version. Don’t forget to modify the application path of the package after installation. Change it to:

 "zx": "npm:zx-cjs@latest"

This is the same as the original reference.

node-pty is a shell implementation open sourced by Microsoft, which can easily implement a terminal emulator.

When using zx to execute shell commands, add .pipe method to pipe the output stream to another Writable, which implements write to rxjs Observable. The suggested code is as follows.

 const ob$ = new Observable((subscriber) => { ;(async () => { const writable = new Stream.Writable({ autoDestroy: false, write(chunk) { subscriber.next(chunk.toString()) }, }) await $`ls -lh`.pipe(writable) subscriber.complete() writable.destroy() })() })

Note that currently Observable does not support passing in async functions, otherwise an error will be reported when .complete() executed.

This achieves a simple log output.

Of course, this is a simple output zx can be competent, encountered zip this may cause problems. At this time, node-pty can be used to achieve output. Below is a package.

 runShellCommandPipeOutput( command: string, args: any[], subscriber: Subscriber<string>, ) { return new Promise((resolve) => { subscriber.next(`$ ${command} ${args.join(' ')}\n`) const pty = spawn(command, args, {}) pty.onData((data) => { subscriber.next(data.toString()) }) pty.onExit(() => { resolve(null) }) }) }

node-pty wraps spawn, which is not as flexible as zx, which can directly type long strings of commands. Commands need to be split into procedures and parameters. The following is a call.

 await this.runShellCommandPipeOutput( 'unzip', ['-o', 'admin-release.zip', '-d', folder], subscriber, )

In this way, a simple day output is achieved through Writable and rxjs Observable. The effect is as follows.

0710151822.gif

insufficient

Since SSE is only a long connection of a GET request, there is no ability to disconnect and reconnect in design. If the disconnected EventSource is not processed in time, the request will be sent again and again, resulting in a waste of resources. Moreover, the control of the connection is not friendly, and the SSE is still rarely used, and the more stable WebSocket will be selected.

see

update.controller.ts#L24

finish watching? say something

This article is reproduced from: https://innei.ren/posts/programming/shell-output-via-sse
This site is for inclusion only, and the copyright belongs to the original author.

Leave a Comment