Transform a callback-based function into an Async Iterator version

Situation

I have a function with an asynchronous callback structure, like so:

let readFile:   (path: string, callback: (line: string, eof: boolean) => void) => void

However, I would much rather use a function that follows the AsyncIterable/AsyncGenerator pattern:

let readFileV2: (path: string) => AsyncIterable<string>

Issue

Without readFileV2, I'm stuck reading a file in this cumbersome manner:

let file = await new Promise((res, err) => {
    let file = ''
    readFile('./myfile.txt', (line, eof) => {
        if (eof) { return res(file) }
        file += line + '\n'
    })
})

.. whereas with readFileV2, I can elegantly achieve the same result like so:

let file = '';
for await (let line of readFileV2('./myfile.txt')) {
    file += line + '\n'
}

Inquiry

Is there a way for me to transform readFile into readFileV2?

Updated for clarity:

Is there a method that can be applied universally to convert a function with an async callback into an AsyncGenerator/AsyncIterable format?

If so, could this method be demonstrated using the readFile function mentioned above?

References

I have come across a couple of related questions:

  • How to convert Node.js async streaming callback into an async generator?
  • How to convert callback-based async function to async generator

However, none of these seem to offer a definitive solution.

Answer №1

Let me start with a disclaimer: I will be addressing the following question:

How can we modify a data-providing function fn that takes arguments of type A and a callback function

(data: T, done: boolean) => void
to transform it into a new function transform(fn) with a signature of
(...args: A) => AsyncIterable<T>
?

While this transformation might not be ideal in all scenarios, as the processing of AsyncIterable<T> data could stall or terminate prematurely, the implementation provided below showcases one way this can be achieved.


Here is the proposed implementation:

// Code implementation goes here

This implementation effectively manages a queue of data values, values, that are manipulated within the callback function of fn and consumed by a generator function. The process involves a series of promises to handle the flow of data from fn to the generator function.


To test this implementation, an initial fn needs to be provided. A sample function provideData is demonstrated below:

// Sample data-providing function goes here

The function provideData essentially delivers lines of data to a callback function at a one-second interval. This function is then passed through the transform function to create provideDataV2, an AsyncIterable data provider.

// Transformation of provideData to provideDataV2

The testing of this transformation is done by executing a function that iterates over the data provided by provideDataV2 and logs each line with a timestamp. The log output confirms the successful transformation.

While this implementation serves the purpose outlined in the initial question, it may not cover all edge cases or error handling scenarios. It's advisable to explore other recommended solutions and evaluate the functionality based on specific requirements.

Explore the code on TypeScript Playground

Answer №2

Ever since version 10, this NodeJS-native API has been available, eliminating the need to recreate it:

const {createReadStream} = require('fs');
const {createInterface} = require('readline');

function readLinesFromFile(fileName: string): AsyncIterable<string> {
    const input = createReadStream(fileName);
    return createInterface({input, crlfDelay: Infinity});
}

Let's give it a try:

const lines = readLinesFromFile('./test1.js');
for await(const line of lines) {
    console.log(line);
}

Answer №3

Affirmative.

I utilized this approach with Deno.serve, an HTTP server that accepts a callback and an options object in the form of

Deno.serve(req => respondWith(req), {port: 3000})
.

The gist of the code is as follows;

async function* emitterGen(opts){
  let _resolve,
      _req = new Promise((resolve,reject) => _resolve = resolve);
  Deno.serve( req => ( _resolve(req)
                     , _req = new Promise((resolve,reject) => _resolve = resolve)
                     )
            , opts
            );
  while (true){
    yield await _req;
  }
}

let reqEmitter = emitterGen({port: 3000});

for await (let req of reqEmitter){
  respondWith(req);
}

The code provided above is a simplified version, lacking exception handling, but it should suffice to address your query.

Presented here is a functional simulated server that generates a random number (0-99) as a request (req) every random (0-999) ms, triggering the invocation of cb (handler) with req. It halts after 5 iterations.

function server(cb,ms){
  let count  = 5,
      looper = function(c = count,t = ms){
                 let stoid = setTimeout( req => ( cb(req)
                                                , --c && looper(c, Math.random()*1000 >>> 0)
                                                , clearTimeout(stoid)
                                                )
                                       , t
                                       , Math.random()*100 >>> 0
                                       )
               }
  looper();
}

async function* emitterGen(ms){
  let _resolve,
      _req = new Promise((resolve,reject) => _resolve = resolve);
  server( req => ( _resolve(req)
                 , _req = new Promise((resolve,reject) => _resolve = resolve)
                 )
        , ms
        );
  while (true){
    yield await _req;
  }
}

let reqEmitter = emitterGen(1000);

// due to the absence of top-level await in SO snippets
(async function(){
  for await (let req of reqEmitter){
    console.log(`Received request is: ${req}`);
  }
})();

Answer №4

I have developed a unique class that has the ability to create an asynchronous generator from any given source:

/** Implementation of an endless async generator. Processes incoming messages until terminated. */
class Automaton<T> {

  #active = true;
  #buffer: T[] = [];
  #resolve: (() => void) | undefined;

  async * generate(): AsyncGenerator<T> {
    this.#active = true;

    while (this.#active) {
      if (this.#buffer.length) {
        yield this.#buffer.shift()!;
        continue;
      }

      await new Promise<void>((_resolve) => {
        this.#resolve = _resolve;
      });
    }
  }

  add(data: T): void {
    this.#buffer.push(data);
    this.#resolve?.();
  }

  end(): void {
    this.#active = false;
    this.#resolve?.();
  }

}

export { Automaton };

You can utilize it in the following manner:

// Instantiate the Automaton
const automaton = new Automaton<string>();

// Asynchronous generator loop
async function processMessages() {
  for await (const message of automaton.generate()) {
    console.log(message);
  }
}

// Initialize the generator
processMessages();

// Add messages to the generator
automaton.add('hello!');
automaton.add('how are you?');
automaton.add('salutations');

// Terminate the generator
automaton.end();

For your specific scenario, consider integrating it like this:

/** Read each line of a file as an AsyncGenerator. */
function readFileSync(path: string): AsyncGenerator<string> {
  const automaton = new Automaton<string>();

  readFile(path, (line: string, eof: boolean) => {
    if (eof) {
      automaton.end();
    } else {
      automaton.add(line);
    }
  });

  return automaton.generate();
}

// Implementation
for await (const line of readFileSync('file.txt')) {
  console.log(line);
}

Functionality Overview

  1. Invoking automaton.generate() triggers an infinite loop, although it pauses immediately (on the first iteration) while awaiting a promise resolution.
  2. Utilizing automaton.add() appends an item to the buffer and then resumes by resolving the promise. Upon resuming, it transfers the buffer contents to the stream before pausing again via a new promise.
  3. The consumer of automaton.generate() receives the added items, and this process can be repeated.

Additional Remarks:

  • Even if the Automaton instance is no longer in use (potentially subject to garbage collection), its promise continues indefinitely. Therefore, remember to call automaton.end() manually once streaming is complete. Simply exiting the loop is insufficient!
  • Initially, I experimented with an alternative design sans a buffer. However, it became apparent that a buffer is necessary to accommodate the addition of multiple items within a single tick; otherwise, subsequent items beyond the first would be lost. Nonetheless, if only one item is added per tick and the stream is actively consumed, the buffer will maintain a single item at most—a minor memory consideration.

Similar questions

If you have not found the answer to your question or you are interested in this topic, then look at other similar questions below or use the search

What is the process for implementing a third-party component in my web application?

After some experimentation, I've discovered that it's necessary to include the link to the css files in the header and then mention the link to the js files before the closing tag. However, I encountered difficulties when trying to use a compone ...

The function 'downloadFunc' is not recognized as a valid function within the context of ReactJS custom hooks

Here is a unique custom hook that triggers when the user presses a button, calling an endpoint to download files as a .zip: import { useQuery } from 'react-query'; import { basePath } from '../../config/basePath'; async function downlo ...

Angular version 5 and above introduces a new feature called "openFromComponent" within the Snackbar component, facilitating seamless communication

Angular (v5.2.10) Snackbar --| Introduction |-- I am facing a scenario where an Angular component named "Parent" is initializing an Angular Material Snackbar known as snackBar. The snackbar is being passed in the component called SnackbarMessage, which ...

Looking for assistance with transferring a data attribute to a form redirection

I'm seeking assistance with a coding dilemma I've encountered. To provide some background, I have a list of items on my website, each featuring a 'Book Now' button that redirects users to different pages. Recently, I incorporated a mod ...

In order to verify the dynamic components within the table

I need assistance with validating dynamically created textareas. In the code below, I am able to validate only the first row but struggling to do so for the second row. How can I get all the row values validated? Thank you in advance. To generate dynamic ...

What causes an undefined outcome when a promise is fulfilled?

Could you help me understand something? const promise1 = new Promise((resolve, reject) => { setTimeout(() => { if(true) { resolve('success') } else { reject('failure') } }, 4000) }) promise1.then(resul ...

The act of employing `Function.prototype.run` within an Angular TypeScript class is deemed as illegitimate

Is there a way to globally define a new function called run within my Angular component as shown below? Function.prototype.run = function (delay: number) { // some content; }; However, the compiler shows an error that the Property 'run' does n ...

I'd really appreciate your assistance in obtaining a value for a specific webelement

I am facing an issue where I am trying to retrieve the value from a WebElement 'input'. The general method getText() doesn't seem to work for me in this case. However, when I tried using JavaScript in the console, it worked perfectly: " ...

Installing material-ui using npm does not always result in getting the most up-to-date version

I'm currently facing a dilemma with an abandoned project that serves as the admin tool for my current project. The Material-UI version used in this project is 0.19.4. However, when I remove the dependency from my package.json file and execute npm inst ...

Error message: Upon refreshing the page, the React Router is unable to read properties of

While developing a recipe application using the Edamam recipe API, everything was functioning smoothly until an issue arose when refreshing the Recipe Detail page. The error occurs specifically when trying to refresh the page with a URL like http://localho ...

What is the best way to divide multiple event handlers in jQuery?

Is there a way to organize the code below more effectively by splitting the multiple events into separate lines? $document.find('body').on( 'click dblclick mousedown mousemove mouseout mouseover mouseup mousewheel keydown keypress keyup ...

Adding functions to the window scroll event

Rather than constantly invoking the handler, John Resig proposes using setInterval() to optimize the number of times it is called - check out his thoughts at http://ejohn.org/blog/learning-from-twitter/ In his blog post, John presents the following soluti ...

Checking if the group of radio buttons has been selected using JQUERY

Need help with validating a group of radio buttons. If none are checked, display an error message in the span element to prompt users to select an option. The strange issue I am facing is that the validation code works only when placed below the radio butt ...

Is there a more efficient algorithm available to solve this problem in a quicker manner?

I'm currently working on iterating through the body tag and all of its nested children. I want to be able to access each child, even if they contain additional children within them. Can anyone offer a more efficient algorithm than the one I came up wi ...

Issue with arrow functions in Reactjs Material-UI - Unexpected token error

I am currently trying to integrate components from material-ui into a project based on react-rocket-boilerplate. An error message is appearing: [23:55:11] gulp-notify: [Compile Error] C:/react-rocket-boilerplate/app/js/components/Sidebar.js: Unexpected ...

Performing mathematical operations in JavaScript, rounding to the nearest .05 increment with precision up to two

Apologies in advance. After reviewing multiple posts, it seems like the solution involves using the toFixed() method, but I'm struggling to implement it. $('.addsurcharge').click(function() { $('span.depositamount&ap ...

The URL may change, but the component remains constant when navigating back

My application consists of two primary components: the Project component and MainContainer. The MainContainer component regularly fetches data using a fetchData method. When moving forward, both the URL and component can change dynamically. However, when m ...

Creating a responsive HTML, CSS, and JavaScript popup: A step-by-step guide

I'm facing a challenge in making my HTML, CSS, and JavaScript popup responsive. Despite looking into various resources, I haven't been able to find a solution that fits my code. However, I am confident that it is achievable to make it responsive! ...

Issue with Component: Data is not being injected into controller from ui-router resolve, resulting in undefined data

Encountering an issue with resolve when using a component and ui-router: the data returned after resolving the promise is displaying as "undefined" in the controller. Service: class userService { constructor ($http, ConfigService, authService) { th ...

Transmit both image and JSON data in a single response using Node.js and Express

I'm in the process of building a web API using Node.js and Express and my goal is to send back both an image and JSON data in a single response. I came across a solution that involves specifying headers at , but I'm looking for a way to achieve t ...