I am attempting to utilize the "csv-parse" library in Typescript to read a csv file by creating an observable. The code provided uses fs.createReadStream to read the file. I am looking to return the observable and subscribe to it, but it seems that the process is not retrieving anything, potentially due to not properly awaiting the asynchronous fs.createReadStream. How can I resolve this issue?
import { parse } from "csv-parse";
import { Observable } from "rxjs";
import * as fs from "fs";
import path from "path";
export interface StdJsonDoc<T = string> {
[key: string]: T;
}
export function createCsvObservable(
filePath: string,
fileType: string | undefined = undefined,
fieldDelimiter: string = ",",
columnHeader: boolean = true
) {
if (fileType !== "csv") {
throw Error(`Cannot create CSV observable from non CSV file`);
}
return new Observable<StdJsonDoc>((subscriber) => {
const parser = fs.createReadStream(filePath).pipe(
parse({
delimiter: fieldDelimiter,
columns: columnHeader,
trim: true,
skip_empty_lines: true,
relax_column_count: true,
})
);
parser.on("readable", () => {
let record: StdJsonDoc;
while ((record = parser.read())) {
subscriber.next(record);
}
});
parser.on("end", () => {
subscriber.complete();
});
parser.on("error", () => {
subscriber.error();
});
});
}
async function start() {
const myObservableCsv = createCsvObservable(
path.join(__dirname, "data", "myCsvFile.csv"),
"csv"
);
myObservableCsv.subscribe({
next: (record) => {
console.log(`RECORD: ${record}`);
},
error: () => {
console.log("ERROR");
},
complete: () => {
console.log("COMPLETE");
},
});
}
start().then(() => {
console.log(`*** END PROGRAM ***`);
process.exit(0);
});