
The author selected Society of Women Engineers to receive a donation as part of the Write for DOnations program.
Learn how to parse, transform, and store CSV data in Node.js using the csv (node-csv) package and SQLite.
CSV (Comma-Separated Values) files are plain text files that store tabular data in a structured format, and a delimiter (typically a comma) separates values within each row. CSV files are widely used for data exchange because they’re simple, human-readable, and supported by spreadsheet applications, databases, and programming languages.
Node.js provides several highly efficient modules for working with CSV files, including node-csv, fast-csv, and papaparse. The node-csv package is, in particular, a collection of stream-based modules that handle parsing, stringification, transformation, and generation of CSV data efficiently, making it suitable for processing large files without consuming excessive memory.
In this tutorial, you will learn how to:
csv-parsecsv-stringifyDeploy your Node applications from GitHub using DigitalOcean App Platform. Let DigitalOcean focus on scaling your app.
Remember these core patterns and best practices when designing CSV ingestion and export workflows in Node.js:
csv package (also known as node-csv) provides four modular tools csv-parse, csv-stringify, csv-transform, and csv-generate for stream-based
reading, writing, and manipulation of CSV data in Node.js.csv-parse lets you convert raw CSV text into arrays or objects using options like columns, from_line, delimiter, and trim for precise parsing control.csv-stringify serializes arrays or objects into CSV format, with full support for custom headers, delimiters, and streaming writes for scalable exports.for await ... of) or the pipe() method to process streaming data incrementally while maintaining non-blocking I/O.skip_records_with_error, relax_column_count, and field validation) to ensure data integrity across imports and
exports.csv-parse and csv-stringify to integrate Node.js applications with APIs, analytics tools, or data exchange
systems.Before you start, make sure your environment and background knowledge match these prerequisites:
Node.js installed on your local or server environment (v20 LTS or later recommended). Follow our How to Install Node.js and Create a Local Development Environment to install Node.js.
SQLite installed on your local or server environment, which you can install by following step 1 in How To Install and Use SQLite on Ubuntu 20.04. Knowledge on how to use SQLite is helpful and can be learned in steps 2-7 of the installation guide.
Familiarity with writing a Node.js program. See How To Write and Run Your First Program in Node.js.
Familiarity with Node.js streams. See How To Work with Files Using Streams in Node.js.
Understanding of async/await syntax. See How To Write Asynchronous Code in Node.js.
Choose the right CSV handling strategy for your workload, then configure the csv package to match your delimiters, headers, and encoding.
csv-parse, fast-csv, papaparse) that fits your runtime (Node-only vs browser) and configuration needs.CSV files use a delimiter character to separate values within each row. While commas are most common, other delimiters include semicolons (;), tabs (\t), and pipes (|). The first row often contains column headers, though this isn’t required.
When working with CSV files in Node.js, you have two main approaches:
The node-csv package uses streams by default, which makes it ideal for production applications that handle variable file sizes. The package consists of four modules:
csv-parse, csv-stringify, stream-transform, and csv-generate cover ingestion, export, inline rewriting, and synthetic dataset creation. The table below shows when to pick each tool and how it compares with other Node.js CSV approaches:
| Tool / Approach | Defaults (delimiter, headers, newlines) | Ideal workloads | Key differentiators |
|---|---|---|---|
fs.readFileSync + manual parsing |
Raw text only; no CSV semantics; no header or quoting support; preserves native newlines | Tiny one-off scripts and quick experiments | Simple to write but brittle; does not handle quoting, BOM, or multiline fields safely. |
csv-parse from csv (node-csv) |
Comma delimiter by default; configurable columns; normalizes \r\n / \n; optional bom: true |
Small to very large files; streaming ETL jobs | Rich configuration, first-class streaming, and tight integration with stringify/transform. |
fast-csv |
Comma delimiter; optional header row; stream-based | High-throughput ingestion/export pipelines | Performance-focused API with good TypeScript support and streaming-first design. |
papaparse (with Node wrapper) |
Comma delimiter; browser-first; header detection via options | Browser + Node hybrid workflows | Works both in the browser and Node; good for shared CSV logic across front-end/back-end. |
Some CSV exports include a UTF-8 byte order mark (BOM) at the start of the file. When using csv-parse, you can enable bom: true to automatically strip the BOM:
fs.createReadStream("./migration_data.csv")
.pipe(
parse({
columns: true,
bom: true,
skip_empty_lines: true,
})
);
Line endings can also vary (\n on Unix-like systems, \r\n on Windows). Node streams pass these through transparently, and csv-parse normalizes them internally. If you manually split lines using string operations, normalize them first:
const raw = fs.readFileSync("./migration_data.csv", "utf8");
const normalized = raw.replace(/\r\n?/g, "\n");
const lines = normalized.split("\n");
Set up a dedicated project directory, install the csv and sqlite3 packages, and download a sample CSV file to work with.
Create a project directory and initialize it as an npm project:
- mkdir csv_demo
- cd csv_demo
- npm init -y
The -y option tells npm to use the default settings for your package.json file, so you don’t have to answer prompts during initialization. You can always update these values later to better match your project’s needs.
Install the required dependencies:
- npm install csv sqlite3
This command installs:
csv: The main package containing csv-parse, csv-stringify, csv-transform, and csv-generatesqlite3: For database operations in later examplesNote: The csv package is the official name on npm, but it’s commonly referred to as node-csv in documentation and discussions. Both names refer to the same package.
Download a sample CSV dataset for testing. This example uses migration data from Stats NZ:
- wget https://www.stats.govt.nz/assets/Uploads/International-migration/International-migration-September-2021-Infoshare-tables/Download-data/international-migration-September-2021-estimated-migration-by-age-and-sex-csv.csv
If wget is not installed on your system, run curl -O https://www.stats.govt.nz/assets/Uploads/International-migration/International-migration-September-2021-Infoshare-tables/Download-data/international-migration-September-2021-estimated-migration-by-age-and-sex-csv.csv instead.
Rename the file for easier reference:
- mv international-migration-September-2021-estimated-migration-by-age-and-sex-csv.csv migration_data.csv
The new CSV filename, migration_data.csv, is shorter and easier to work with. Using editors like nano or vim, open the file to inspect its structure:
- nano migration_data.csv
The file structure looks like this:
year_month,month_of_release,passenger_type,direction,sex,age,estimate,standard_error,status
2001-01,2020-09,Long-term migrant,Arrivals,Female,0-4 years,344,0,Final
2001-01,2020-09,Long-term migrant,Arrivals,Male,0-4 years,341,0,Final
...
The first line contains the column names, and all subsequent lines have the data corresponding to each column. A comma separates each piece of data. This character is known as a delimiter because it delineates the fields. You are not limited to using commas. Other popular delimiters include colons (:), semicolons (;), and tabs (\t). You need to know which delimiter is used in the file since most modules require it to parse the files.
After reviewing the file and identifying the delimiter, exit your migration_data.csv file using CTRL+X.
Import only the csv modules you need - typically csv-parse for reading, csv-stringify for writing, and stream-transform for inline edits - so the dependency footprint stays small.
const { parse } = require("csv-parse");
const { stringify } = require("csv-stringify");
const { transform } = require("stream-transform");
For ES modules (Node.js 14+ with "type": "module" in package.json):
import { parse } from "csv-parse";
import { stringify } from "csv-stringify";
import { transform } from "stream-transform";
Note: The examples in this guide use CommonJS (require). If you’re working with ES modules, be sure to update the import statements using the import keyword as shown above.
Use csv-parse with Node.js streams to read CSV files as arrays or objects without loading the entire file into memory. Use Node.js streams to process files efficiently, especially large ones.
Create a file named readCSV.js:
const fs = require("fs");
const { parse } = require("csv-parse");
const { pipeline } = require("stream/promises");
// Use stream.pipeline for proper backpressure handling and error propagation
async function readCSV() {
const parser = fs
.createReadStream("./migration_data.csv")
.pipe(parse({ delimiter: ",", from_line: 2 }));
try {
// Use async iterator pattern to process rows incrementally
// This ensures proper backpressure handling and prevents memory build-up
for await (const row of parser) {
console.log(row);
}
console.log("Finished reading CSV file");
} catch (error) {
console.error("Error:", error.message);
throw error;
}
}
readCSV().catch((error) => {
console.error("Failed to read CSV:", error.message);
process.exit(1);
});
This code works as follows:
fs.createReadStream("./migration_data.csv") creates a readable stream from the CSV file. A readable stream breaks the file into smaller chunks so Node.js doesn’t have to load the entire file into memory at once, and it only lets you read from the stream, not write to it..pipe(parse({ delimiter: ",", from_line: 2 })) forwards each chunk from the readable stream into csv-parse. The parse() function implements a transform stream (both readable and writable): it accepts raw CSV text such as 2001-01,2020-09,Long-term migrant,Arrivals,Female,0-4 years,344 and emits the parsed row as an array of fields.{ delimiter: ",", from_line: 2 } configures the parser. delimiter defines which character separates fields in a row, and from_line: 2 tells the parser to skip the header row so you only process data records.for await...of loop uses the async iterator pattern to process rows incrementally. This ensures proper backpressure handling, preventing memory build-up on large files. The parser automatically pauses when the consumer is busy, and resumes when ready, maintaining optimal memory usage.Next, Run the script using:
- node readCSV.js
The output shows each row as an array:
Output[
'2001-01',
'2020-09',
'Long-term migrant',
'Arrivals',
'Female',
'0-4 years',
'344',
'0',
'Final'
]
...
Finished reading CSV file
All the rows in the CSV file are parsed into arrays by the csv-parse stream. Because the data handler runs each time a chunk is received from the stream, you see the output scroll past gradually instead of being printed all at once.
In this step, you read data from a CSV file and transformed each row into an array using a basic stream pipeline. Next, you’ll see how to read the same CSV data as JavaScript objects using the column names.
To parse CSV rows as objects with column names as keys, use the columns option:
const fs = require("fs");
const { parse } = require("csv-parse");
// Use async iterator pattern for proper backpressure handling
async function readCSVAsObjects() {
const parser = fs
.createReadStream("./migration_data.csv")
.pipe(
parse({
columns: true,
skip_empty_lines: true,
trim: true,
// Avoid skip_records_with_error - validate and log errors instead
relax_column_count: true,
})
);
try {
for await (const row of parser) {
// Process each row as it arrives, preventing memory accumulation
console.log(row);
}
console.log("Finished reading CSV file");
} catch (error) {
console.error("Error:", error.message);
throw error;
}
}
readCSVAsObjects().catch((error) => {
console.error("Failed to read CSV:", error.message);
process.exit(1);
});
With columns: true, the parser uses the first row as column names and returns objects like:
{
year_month: '2001-01',
month_of_release: '2020-09',
passenger_type: 'Long-term migrant',
direction: 'Arrivals',
sex: 'Female',
age: '0-4 years',
estimate: '344',
standard_error: '0',
status: 'Final'
}
The skip_empty_lines: true option ignores blank lines, and trim: true removes whitespace from field values.
For better control flow, you can use async/await with promises:
const fs = require("fs");
const { parse } = require("csv-parse");
// Validate and log errors with contextual information
function validateRecord(record, rowNumber) {
const errors = [];
if (!record.year_month || !/^\d{4}-\d{2}$/.test(record.year_month)) {
errors.push(`Invalid year_month format at row ${rowNumber}`);
}
const estimate = parseInt(record.estimate, 10);
if (isNaN(estimate) || estimate < 0) {
errors.push(`Invalid estimate value at row ${rowNumber}`);
}
return { isValid: errors.length === 0, errors };
}
async function readCSVFile(filePath) {
let rowNumber = 0;
let validCount = 0;
let invalidCount = 0;
const parser = fs
.createReadStream(filePath)
.pipe(
parse({
columns: true,
skip_empty_lines: true,
trim: true,
relax_column_count: true,
})
);
// Process records individually to avoid memory accumulation on large files
for await (const record of parser) {
rowNumber++;
const validation = validateRecord(record, rowNumber);
if (validation.isValid) {
validCount++;
// Process each valid record as it arrives
console.log(`Processed record ${rowNumber}: ${record.year_month}`);
} else {
invalidCount++;
// Log validation errors with contextual information for debugging
console.warn(`Row ${rowNumber} validation failed:`, validation.errors);
}
}
console.log(`Total records: ${rowNumber}, Valid: ${validCount}, Invalid: ${invalidCount}`);
return { total: rowNumber, valid: validCount, invalid: invalidCount };
}
readCSVFile("./migration_data.csv")
.then((stats) => {
console.log(`Processing complete: ${stats.valid} valid records`);
})
.catch((error) => {
console.error("Error reading CSV:", error.message);
process.exit(1);
});
This approach processes records individually to avoid memory accumulation. Validation errors are logged with row numbers for easier debugging.
Use csv-stringify with writable streams to serialize arrays or objects into CSV files, optionally adding headers and custom column ordering. When dealing with file output, csv-stringify can be combined with writable streams, such as those created with Node’s fs.createWriteStream(), to handle large datasets efficiently by writing data incrementally, row by row, instead of loading everything in memory. This approach is ideal for scalable applications and supports handling headers as well.
Create a file named writeCSV.js:
const fs = require("fs");
const { stringify } = require("csv-stringify");
const { pipeline } = require("stream/promises");
const columns = [
"year_month",
"month_of_release",
"passenger_type",
"direction",
"sex",
"age",
"estimate",
];
const data = [
{
year_month: "2001-01",
month_of_release: "2020-09",
passenger_type: "Long-term migrant",
direction: "Arrivals",
sex: "Female",
age: "0-4 years",
estimate: "344",
},
{
year_month: "2001-01",
month_of_release: "2020-09",
passenger_type: "Long-term migrant",
direction: "Arrivals",
sex: "Male",
age: "0-4 years",
estimate: "341",
},
];
async function writeCSV() {
const stringifier = stringify({ header: true, columns: columns });
const writableStream = fs.createWriteStream("output.csv");
// Register event listeners BEFORE initiating writes to prevent missing events
writableStream.on("finish", () => {
console.log("Finished writing CSV file");
});
writableStream.on("error", (error) => {
console.error("Write stream error:", error.message);
});
// Use pipeline for proper backpressure handling and error propagation
await pipeline(
async function* () {
// Generate data as a readable stream
for (const row of data) {
yield row;
}
},
stringifier,
writableStream
);
}
writeCSV().catch((error) => {
console.error("Failed to write CSV:", error.message);
process.exit(1);
});
This code:
columns array that controls the order and names of the fields in the CSV header.stringify stream with header: true and columns, so the header row is written automatically before any data rows.output.csv.data array and writes each object to the stringifier, which converts it into a CSV row.stringifier.end() to signal that no more rows are coming, allowing the writable stream to flush remaining data and emit the finish event.You may want to export data directly from a database to a CSV file for reporting, backup, or sharing with other systems. This can be accomplished efficiently by streaming the data from the database and writing it to a CSV file as each row is read.
The following example demonstrates how to connect to a SQLite database, query a table, and write the results to a CSV file using the csv-stringify module. By streaming both the database reads and CSV writes, you avoid loading all data into memory at once, making this approach suitable for large datasets:
const fs = require("fs");
const { stringify } = require("csv-stringify");
const sqlite3 = require("sqlite3").verbose();
const { promisify } = require("util");
const db = new sqlite3.Database("./population.db");
const filename = "saved_from_db.csv";
const writableStream = fs.createWriteStream(filename);
const columns = [
"year_month",
"month_of_release",
"passenger_type",
"direction",
"sex",
"age",
"estimate",
"standard_error",
"status",
];
const stringifier = stringify({ header: true, columns: columns });
stringifier.pipe(writableStream);
// Register event listeners BEFORE initiating writes to prevent missing events
writableStream.on("finish", () => {
console.log("Finished writing data to CSV file");
db.close();
});
writableStream.on("error", (error) => {
console.error("Write stream error:", error.message);
db.close();
});
stringifier.on("error", (error) => {
console.error("Stringifier error:", error.message);
db.close();
});
// Use async/await pattern for better error handling and control flow
async function exportToCSV() {
return new Promise((resolve, reject) => {
let rowCount = 0;
db.each(
`SELECT * FROM migration`,
(error, row) => {
if (error) {
console.error("Database query error:", error.message);
return;
}
rowCount++;
// Write each row as it's retrieved, maintaining streaming behavior
if (!stringifier.write(row)) {
// Handle backpressure: pause database reads if stringifier is full
db.pause();
stringifier.once("drain", () => {
db.resume();
});
}
},
(error, count) => {
if (error) {
console.error("Database error:", error.message);
reject(error);
return;
}
console.log(`Processed ${count} rows`);
// End the stringifier after all rows are written
stringifier.end();
resolve(count);
}
);
});
}
exportToCSV().catch((error) => {
console.error("Export failed:", error.message);
db.close();
process.exit(1);
});
The db.each() method retrieves rows one at a time, writing each to the CSV stream. This approach is memory-efficient for large result sets.
Process large CSV files with streaming pipelines so you can filter, transform, and export millions of rows without exhausting memory.
fs.createReadStream() and csv-parse to stream input in bounded chunks.parse, optional transform, and stringify stages with stream.pipeline() to wire backpressure and error handling.highWaterMark) and consider worker threads for CPU-heavy transformations.The stream.pipeline() API (or pipeline from stream/promises) connects multiple streams, propagates errors correctly, and respects backpressure end-to-end. This is useful when you want a single async function that processes a large file from start to finish:
const fs = require("fs");
const { parse } = require("csv-parse");
const { stringify } = require("csv-stringify");
const { transform } = require("stream-transform");
const { pipeline } = require("stream/promises");
// Validate and log errors with contextual information
function validateAndTransform(row, rowNumber) {
const estimate = parseInt(row.estimate, 10);
// Validate estimate field
if (isNaN(estimate) || estimate < 0) {
console.warn(`Row ${rowNumber}: Invalid estimate value "${row.estimate}"`);
return null; // Filter out invalid rows
}
// Filter: drop low-importance rows
if (estimate < 100) {
return null;
}
return {
year_month: row.year_month,
direction: row.direction,
sex: row.sex,
estimate,
};
}
async function runPipeline() {
let rowNumber = 0;
// Use stream.pipeline for proper backpressure handling and error propagation
// This ensures all streams are properly cleaned up on error
await pipeline(
fs.createReadStream("./migration_data.csv", {
highWaterMark: 64 * 1024, // 64 KB chunks - tune based on your memory constraints
}),
parse({
columns: true,
skip_empty_lines: true,
trim: true,
relax_column_count: true, // Allow inconsistent columns, validate manually
}),
transform((row) => {
rowNumber++;
// Validate and transform each row, logging errors for debugging
return validateAndTransform(row, rowNumber);
}),
stringify({ header: true }),
fs.createWriteStream("./pipeline_output.csv")
);
console.log(`Pipeline complete. Processed ${rowNumber} rows.`);
}
runPipeline().catch((error) => {
console.error("Pipeline failed:", error.message);
process.exit(1);
});
In this pattern:
highWaterMark controls how much data each stream buffers at a time. Smaller values reduce peak memory usage, while slightly larger values can improve throughput.catch block, rather than leaving half-open streams.For CPU-heavy operations (such as complex parsing, encryption, or compression), consider offloading per-row work to a pool of worker threads using the worker_threads module or a worker pool library. Each worker can process a subset of the data while the main thread manages streaming and I/O.
For files with millions of rows, you can also filter or process data on the fly within your pipeline. For example, to only include rows where estimate > 100:
const fs = require("fs");
const { parse } = require("csv-parse");
const { stringify } = require("csv-stringify");
const { transform } = require("stream-transform");
const { pipeline } = require("stream/promises");
const inputFile = "./migration_data.csv";
const outputFile = "./filtered_output.csv";
// Validate and filter rows with proper error logging
function validateAndFilter(row, rowNumber) {
const estimate = parseInt(row.estimate, 10);
// Validate estimate field and log errors
if (isNaN(estimate)) {
console.warn(`Row ${rowNumber}: Invalid estimate "${row.estimate}" - skipping`);
return null;
}
// Filter: only process rows with estimate > 100
if (estimate > 100) {
return {
year_month: row.year_month,
direction: row.direction,
sex: row.sex,
estimate: row.estimate,
};
}
return null; // Filter out rows that don't meet criteria
}
async function processLargeCSV() {
let rowNumber = 0;
// Use stream.pipeline for proper backpressure handling and error propagation
await pipeline(
fs.createReadStream(inputFile),
parse({
columns: true,
skip_empty_lines: true,
trim: true,
relax_column_count: true,
}),
transform((row) => {
rowNumber++;
return validateAndFilter(row, rowNumber);
}),
stringify({
header: true,
columns: ["year_month", "direction", "sex", "estimate"],
}),
fs.createWriteStream(outputFile)
);
console.log(`Finished processing large CSV file. Processed ${rowNumber} rows.`);
}
processLargeCSV().catch((error) => {
console.error("Error processing CSV:", error.message);
process.exit(1);
});
This example reads a large file, filters rows based on a condition, and writes filtered results to a new file, all without loading the entire dataset into memory.
With the stream-transform module from node-csv, you can manipulate CSV data on the fly during reading or writing. As each row flows through the stream pipeline, you can change, filter, or enrich rows before they reach their next destination. This enables memory-efficient processing and powerful, scalable transformations for large CSV datasets without fully loading files.
const fs = require("fs");
const { parse } = require("csv-parse");
const { transform } = require("stream-transform");
const { stringify } = require("csv-stringify");
const { pipeline } = require("stream/promises");
// Validate and transform with error logging
const transformer = transform((row, callback) => {
const estimate = parseInt(row.estimate, 10);
// Validate estimate field
if (isNaN(estimate) || estimate < 0) {
console.warn(`Invalid estimate value "${row.estimate}" - using 0`);
}
// Transform: convert estimate to number and add a calculated field
const transformedRow = {
...row,
estimate: estimate || 0,
estimate_doubled: (estimate || 0) * 2,
};
callback(null, transformedRow);
});
// Use stream.pipeline for proper backpressure handling and error propagation
async function transformCSV() {
await pipeline(
fs.createReadStream("./migration_data.csv"),
parse({
columns: true,
skip_empty_lines: true,
relax_column_count: true,
}),
transformer,
stringify({ header: true }),
fs.createWriteStream("transformed_output.csv")
);
console.log("Transformation complete");
}
transformCSV().catch((error) => {
console.error("Transformation failed:", error.message);
process.exit(1);
});
The transform function receives each row and can modify it before passing it to the next stage in the pipeline.
Convert between JSON and CSV formats so your Node.js applications can import spreadsheet-style data and export structured results for analytics or sharing.
Whether you need to export API results or import spreadsheet data, converting between JSON and CSV is a common workflow in Node.js. Let’s see how to do it both ways.
For large datasets, avoid loading entire JSON arrays or CSV files into memory. Instead, stream records through the conversion pipeline.
The following example uses a JSON array stored in input.json and converts it to CSV using a streaming parser:
const fs = require("fs");
const JSONStream = require("JSONStream");
const { stringify } = require("csv-stringify");
const stringifier = stringify({
header: true,
columns: ["name", "age", "city"],
});
fs.createReadStream("input.json")
.pipe(JSONStream.parse("*")) // Stream each element from a top-level array
.pipe(stringifier)
.pipe(fs.createWriteStream("output.csv"))
.on("finish", () => {
console.log("Streaming JSON to CSV conversion complete");
});
You can also stream CSV to JSON using csv-parse and a JSON stringifier:
const fs = require("fs");
const { parse } = require("csv-parse");
const JSONStream = require("JSONStream");
fs.createReadStream("./migration_data.csv")
.pipe(
parse({
columns: true,
skip_empty_lines: true,
trim: true,
})
)
.pipe(JSONStream.stringify("[\n", ",\n", "\n]"))
.pipe(fs.createWriteStream("output_stream.json"))
.on("finish", () => {
console.log("Streaming CSV to JSON conversion complete");
});
When designing streaming conversions:
JSONStream or Node’s Readable.from() to write records incrementally.zlib.createGzip() at the end of the pipeline when disk or network I/O is a bottleneck.Convert an array of JSON objects to CSV:
const fs = require("fs");
const { stringify } = require("csv-stringify");
const { pipeline } = require("stream/promises");
const { Readable } = require("stream");
const jsonData = [
{ name: "Alice", age: 30, city: "New York" },
{ name: "Bob", age: 25, city: "San Francisco" },
{ name: "Charlie", age: 35, city: "Chicago" },
];
async function jsonToCSV() {
const stringifier = stringify({
header: true,
columns: ["name", "age", "city"],
});
const writableStream = fs.createWriteStream("output.csv");
// Register event listeners BEFORE initiating writes to prevent missing events
writableStream.on("finish", () => {
console.log("JSON to CSV conversion complete");
});
writableStream.on("error", (error) => {
console.error("Write stream error:", error.message);
});
// Use pipeline with a readable stream for proper backpressure handling
await pipeline(
Readable.from(jsonData), // Convert array to readable stream
stringifier,
writableStream
);
}
jsonToCSV().catch((error) => {
console.error("JSON to CSV conversion failed:", error.message);
process.exit(1);
});
Convert CSV data to JSON format using streaming to avoid memory accumulation:
const fs = require("fs");
const { parse } = require("csv-parse");
const { pipeline } = require("stream/promises");
const { Transform } = require("stream");
// Transform stream that converts CSV records to JSON lines (NDJSON format)
// This avoids accumulating all records in memory
class CSVToJSONTransform extends Transform {
constructor(options = {}) {
super({ objectMode: true, ...options });
this.isFirst = true;
}
_transform(chunk, encoding, callback) {
try {
// Output as JSON lines (one JSON object per line)
// For array format, you'd need to buffer and write brackets, but that defeats streaming
const jsonLine = JSON.stringify(chunk) + "\n";
callback(null, jsonLine);
} catch (error) {
callback(error);
}
}
}
// Alternative: Stream as JSON array (requires buffering first/last items)
class CSVToJSONArrayTransform extends Transform {
constructor(options = {}) {
super({ objectMode: true, ...options });
this.isFirst = true;
}
_transform(chunk, encoding, callback) {
try {
if (this.isFirst) {
this.push("[\n");
this.isFirst = false;
} else {
this.push(",\n");
}
const json = JSON.stringify(chunk, null, 2);
this.push(json.replace(/^/gm, " "));
callback(null);
} catch (error) {
callback(error);
}
}
_flush(callback) {
if (!this.isFirst) {
this.push("\n]");
} else {
this.push("[]");
}
callback();
}
}
async function csvToJSON() {
let recordCount = 0;
// Use stream.pipeline for proper backpressure handling
await pipeline(
fs.createReadStream("./migration_data.csv"),
parse({
columns: true,
skip_empty_lines: true,
trim: true,
relax_column_count: true,
}),
new Transform({
objectMode: true,
transform(chunk, encoding, callback) {
recordCount++;
// Validate record if needed
if (!chunk.year_month) {
console.warn(`Record ${recordCount}: Missing year_month field`);
}
callback(null, chunk);
},
}),
new CSVToJSONArrayTransform(), // Use array format
fs.createWriteStream("output.json")
);
console.log(`Converted ${recordCount} records to JSON`);
}
csvToJSON().catch((error) => {
console.error("CSV to JSON conversion failed:", error.message);
process.exit(1);
});
Note: For very large CSV files, prefer JSON Lines (NDJSON) format where each record is a separate JSON object on its own line. This allows true streaming without buffering. The array format shown above requires minimal buffering for brackets but is still more memory-efficient than loading all records.
Make your CSV pipelines resilient by validating records, tagging failures, and handling malformed rows without crashing the entire job.
CSV data can be messy. Missing fields, bad values, or inconsistent rows can break your scripts. This section shows how to detect, skip, or fix errors on the fly so your application stays stable and your data clean.
In production, it’s usually better to tag and quarantine bad records than to drop them silently. You can route invalid rows to a separate CSV file for later inspection or reprocessing:
const fs = require("fs");
const { parse } = require("csv-parse");
const { stringify } = require("csv-stringify");
const { pipeline } = require("stream/promises");
const { Transform } = require("stream");
// Validate records and tag errors with contextual information
function validateRecord(record, rowNumber) {
const errors = [];
const estimate = parseInt(record.estimate, 10);
if (!record.year_month || !/^[0-9]{4}-[0-9]{2}$/.test(record.year_month)) {
errors.push(`Invalid year_month format at row ${rowNumber}`);
}
if (Number.isNaN(estimate) || estimate < 0) {
errors.push(`Invalid estimate value at row ${rowNumber}`);
}
const validDirections = ["Arrivals", "Departures"];
if (!validDirections.includes(record.direction)) {
errors.push(`Invalid direction at row ${rowNumber}`);
}
return { errors, estimate, isValid: errors.length === 0 };
}
// Transform stream that routes valid and invalid records to separate outputs
class QuarantineTransform extends Transform {
constructor(validStringifier, quarantineStringifier, options = {}) {
super({ objectMode: true, ...options });
this.validStringifier = validStringifier;
this.quarantineStringifier = quarantineStringifier;
this.rowNumber = 0;
this.validCount = 0;
this.invalidCount = 0;
}
_transform(record, encoding, callback) {
this.rowNumber++;
const validation = validateRecord(record, this.rowNumber);
if (validation.isValid) {
this.validCount++;
// Write valid records with converted estimate
this.validStringifier.write({ ...record, estimate: validation.estimate });
} else {
this.invalidCount++;
// Log validation errors with contextual information for debugging
console.warn(`Row ${this.rowNumber} quarantined:`, validation.errors.join("; "));
// Quarantine invalid records with error reasons
this.quarantineStringifier.write({
...record,
error_reasons: validation.errors.join("; "),
});
}
callback(null); // Continue processing
}
_flush(callback) {
this.validStringifier.end();
this.quarantineStringifier.end();
console.log(`Processing complete: ${this.validCount} valid, ${this.invalidCount} quarantined`);
callback();
}
}
async function quarantineInvalidRows() {
const validStringifier = stringify({
header: true,
columns: [
"year_month",
"month_of_release",
"passenger_type",
"direction",
"sex",
"age",
"estimate",
],
});
const quarantineStringifier = stringify({
header: true,
columns: [
"year_month",
"month_of_release",
"passenger_type",
"direction",
"sex",
"age",
"estimate",
"error_reasons",
],
});
const validStream = fs.createWriteStream("valid_rows.csv");
const quarantineStream = fs.createWriteStream("quarantine_rows.csv");
// Register event listeners BEFORE initiating writes
validStream.on("finish", () => {
console.log("Valid rows file written");
});
quarantineStream.on("finish", () => {
console.log("Quarantine file written");
});
validStringifier.pipe(validStream);
quarantineStringifier.pipe(quarantineStream);
// Use stream.pipeline for proper backpressure handling and error propagation
await pipeline(
fs.createReadStream("./migration_data.csv"),
parse({
columns: true,
skip_empty_lines: true,
trim: true,
relax_column_count: true, // Allow inconsistent columns, validate manually
// Avoid skip_records_with_error - tag and quarantine instead
}),
new QuarantineTransform(validStringifier, quarantineStringifier)
);
}
quarantineInvalidRows().catch((error) => {
console.error("Quarantine processing failed:", error.message);
process.exit(1);
});
Later, you can build a small retry job that reads quarantine_rows.csv, fixes known issues, and attempts to reinsert the cleaned records into your database.
The csv-parse module also supports cast and cast_date options to convert values as they are parsed:
// Use async iterator pattern for proper backpressure handling
async function parseWithCast() {
const parser = fs
.createReadStream("./migration_data.csv")
.pipe(
parse({
columns: true,
skip_empty_lines: true,
relax_column_count: true,
cast: (value, context) => {
// Validate and cast estimate field
if (context.column === "estimate") {
const num = parseInt(value, 10);
if (isNaN(num)) {
console.warn(`Invalid estimate value "${value}" at column ${context.column}`);
return null;
}
return num;
}
return value;
},
cast_date: true, // Parse ISO-like date strings into Date objects where applicable
})
);
for await (const record of parser) {
// record.estimate is a number or null (if invalid), and date-like fields are Date objects
// Process each record as it arrives
}
}
parseWithCast().catch((error) => {
console.error("Parsing with cast failed:", error.message);
process.exit(1);
});
In real-world services, integrate structured logging so you can trace problems across runs. For example, using a logging framework like pino:
const pino = require("pino");
const logger = pino();
fs.createReadStream("./migration_data.csv")
.pipe(parse({ columns: true, skip_empty_lines: true }))
.on("data", (record) => {
const { errors } = validateRecord(record);
if (errors.length > 0) {
logger.warn(
{ record, errors },
"Invalid CSV record encountered"
);
}
})
.on("error", (error) => {
logger.error({ err: error }, "CSV parsing failed");
});
Logging contextual information (file name, row number, error reasons) makes it much easier to debug and remediate data quality issues in production.
Handle parsing errors gracefully:
const fs = require("fs");
const { parse } = require("csv-parse");
const { pipeline } = require("stream/promises");
// Validate records and log errors with contextual information
function validateRecord(record, rowNumber) {
const errors = [];
if (!record.year_month) {
errors.push(`Missing year_month at row ${rowNumber}`);
} else if (!/^\d{4}-\d{2}$/.test(record.year_month)) {
errors.push(`Invalid year_month format at row ${rowNumber}`);
}
if (!record.estimate) {
errors.push(`Missing estimate at row ${rowNumber}`);
} else {
const estimate = parseInt(record.estimate, 10);
if (isNaN(estimate) || estimate < 0) {
errors.push(`Invalid estimate value at row ${rowNumber}`);
}
}
return { isValid: errors.length === 0, errors };
}
async function handleErrors() {
let rowNumber = 0;
let validCount = 0;
let invalidCount = 0;
// Use async iterator pattern for proper backpressure handling
const parser = fs
.createReadStream("./migration_data.csv")
.pipe(
parse({
columns: true,
skip_empty_lines: true,
relax_column_count: true, // Allow inconsistent column counts
// Avoid skip_records_with_error - validate and log errors instead
})
);
try {
for await (const record of parser) {
rowNumber++;
const validation = validateRecord(record, rowNumber);
if (validation.isValid) {
validCount++;
// Process valid record
console.log(`Valid record ${rowNumber}:`, record.year_month);
} else {
invalidCount++;
// Log validation errors with contextual information for debugging
console.warn(`Row ${rowNumber} validation failed:`, validation.errors.join("; "));
}
}
console.log(`Processed ${validCount} valid records`);
console.log(`Encountered ${invalidCount} invalid records`);
} catch (error) {
console.error("Parse error:", error.message);
throw error;
}
}
handleErrors().catch((error) => {
console.error("Error handling failed:", error.message);
process.exit(1);
});
Validate and convert data types during parsing:
const fs = require("fs");
const { parse } = require("csv-parse");
const { pipeline } = require("stream/promises");
// Validate and convert data types with detailed error logging
function validateRecord(record, rowNumber) {
const errors = [];
// Validate year_month format (YYYY-MM)
if (!record.year_month || !/^\d{4}-\d{2}$/.test(record.year_month)) {
errors.push(`Invalid year_month format at row ${rowNumber}`);
}
// Validate estimate is a number
const estimate = parseInt(record.estimate, 10);
if (isNaN(estimate) || estimate < 0) {
errors.push(`Invalid estimate value at row ${rowNumber}`);
}
// Validate direction is one of expected values
const validDirections = ["Arrivals", "Departures"];
if (!validDirections.includes(record.direction)) {
errors.push(`Invalid direction value at row ${rowNumber}`);
}
return {
isValid: errors.length === 0,
errors: errors,
record: {
...record,
estimate: estimate, // Convert to number
},
};
}
async function validateData() {
let rowNumber = 0;
let validCount = 0;
let invalidCount = 0;
// Use async iterator pattern for proper backpressure handling
const parser = fs
.createReadStream("./migration_data.csv")
.pipe(
parse({
columns: true,
skip_empty_lines: true,
relax_column_count: true,
})
);
try {
for await (const record of parser) {
rowNumber++;
const validation = validateRecord(record, rowNumber);
if (validation.isValid) {
validCount++;
// Process valid record with converted types
console.log(`Valid record ${rowNumber}:`, validation.record);
} else {
invalidCount++;
// Log validation errors with contextual information for debugging
console.warn(`Row ${rowNumber} validation failed:`, validation.errors.join("; "));
}
}
console.log(`Validation complete: ${validCount} valid, ${invalidCount} invalid`);
} catch (error) {
console.error("Validation error:", error.message);
throw error;
}
}
validateData().catch((error) => {
console.error("Data validation failed:", error.message);
process.exit(1);
});
Stream CSV rows into SQLite so you can query, aggregate, and join the imported data using SQL.
Create a database connection file db.js:
const fs = require("fs");
const sqlite3 = require("sqlite3").verbose();
const filepath = "./population.db";
function connectToDatabase() {
if (fs.existsSync(filepath)) {
return new sqlite3.Database(filepath);
} else {
const db = new sqlite3.Database(filepath, (error) => {
if (error) {
return console.error(error.message);
}
createTable(db);
console.log("Connected to the database successfully");
});
return db;
}
}
function createTable(db) {
db.exec(`
CREATE TABLE migration
(
year_month VARCHAR(10),
month_of_release VARCHAR(10),
passenger_type VARCHAR(50),
direction VARCHAR(20),
sex VARCHAR(10),
age VARCHAR(50),
estimate INT,
standard_error INT,
status VARCHAR(20)
)
`);
}
module.exports = connectToDatabase();
Create insertData.js to read CSV and insert into the database using transactions and prepared statements:
const fs = require("fs");
const { parse } = require("csv-parse");
const db = require("./db");
const { promisify } = require("util");
// Wrap database methods in promises for async/await
const dbRun = promisify(db.run.bind(db));
const dbPrepare = promisify(db.prepare.bind(db));
// Validate and prepare row data
function prepareRowData(row) {
const [
yearMonth,
monthOfRelease,
passengerType,
direction,
sex,
age,
estimateRaw,
standardErrorRaw,
status,
] = row;
// Validate and convert numeric fields
const estimate = parseInt(estimateRaw, 10);
if (isNaN(estimate) || estimate < 0) {
throw new Error(`Invalid estimate value: ${estimateRaw}`);
}
const standardError =
standardErrorRaw === "" ? null : parseInt(standardErrorRaw, 10);
return [
yearMonth,
monthOfRelease,
passengerType,
direction,
sex,
age,
estimate,
standardError,
status,
];
}
async function insertCSVData() {
// Wrap database operations in a transaction for atomicity
return new Promise((resolve, reject) => {
db.serialize(async () => {
try {
// Begin transaction explicitly
await dbRun("BEGIN TRANSACTION");
// Prepare statement once for better performance
// The database can reuse the query plan instead of re-parsing SQL for every row
const stmt = db.prepare(
`INSERT INTO migration VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`
);
const stmtRun = promisify(stmt.run.bind(stmt));
const stmtFinalize = promisify(stmt.finalize.bind(stmt));
let insertCount = 0;
let rowNumber = 0;
// Use async iterator pattern to coordinate database inserts
// This ensures proper sequencing and prevents race conditions
const parser = fs
.createReadStream("./migration_data.csv")
.pipe(parse({ delimiter: ",", from_line: 2, trim: true }));
try {
for await (const row of parser) {
rowNumber++;
try {
const rowData = prepareRowData(row);
// Await each insert to ensure proper sequencing
// This prevents race conditions and ensures inserts complete before moving on
await stmtRun(rowData);
insertCount++;
if (insertCount % 1000 === 0) {
console.log(`Inserted ${insertCount} rows`);
}
} catch (error) {
// Log validation/insert errors with contextual information
console.error(`Row ${rowNumber} insert failed:`, error.message);
// Continue processing other rows instead of crashing
}
}
// Finalize prepared statement
await stmtFinalize();
// Commit transaction - all inserts succeed atomically
await dbRun("COMMIT");
console.log(`Finished inserting ${insertCount} rows`);
resolve(insertCount);
} catch (error) {
// Rollback transaction on error to maintain data integrity
await dbRun("ROLLBACK");
reject(error);
}
} catch (error) {
reject(error);
} finally {
db.close();
}
});
});
}
insertCSVData()
.then((count) => {
console.log(`Successfully inserted ${count} rows`);
process.exit(0);
})
.catch((error) => {
console.error("Insert failed:", error.message);
process.exit(1);
});
Run the script:
- node insertData.js
The output shows progress as rows are inserted:
OutputConnected to the database successfully
Inserted 1000 rows
Inserted 2000 rows
...
Inserted 44300 rows
Finished inserting 44310 rows
Tip: For large imports, wrap inserts inside a database transaction (db.serialize() with explicit BEGIN / COMMIT) or use prepared statements to reduce disk I/O and speed up bulk loading.
For imports that run nightly or handle millions of rows, parameterize and batch your inserts so the database can reuse the query plan and flush to disk less frequently. Use async/await to coordinate inserts and prevent race conditions:
const fs = require("fs");
const { parse } = require("csv-parse");
const db = require("./db");
const { promisify } = require("util");
// Wrap database methods in promises for async/await
const dbRun = promisify(db.run.bind(db));
// Validate and prepare row data with error logging
function prepareRowData(row, rowNumber) {
const [
yearMonth,
monthOfRelease,
passengerType,
direction,
sex,
age,
estimateRaw,
standardErrorRaw,
status,
] = row;
// Validate and convert numeric fields
const estimate = parseInt(estimateRaw, 10);
if (isNaN(estimate) || estimate < 0) {
throw new Error(`Row ${rowNumber}: Invalid estimate value "${estimateRaw}"`);
}
const standardError =
standardErrorRaw === "" ? null : parseInt(standardErrorRaw, 10);
return [
yearMonth,
monthOfRelease,
passengerType,
direction,
sex,
age,
estimate,
standardError,
status,
];
}
async function insertWithTransactions() {
return new Promise((resolve, reject) => {
db.serialize(async () => {
try {
// Begin transaction explicitly for atomicity
await dbRun("BEGIN TRANSACTION");
// Prepare statement once - database caches the execution plan
// This is much faster than re-parsing SQL for every row
const stmt = db.prepare(
`INSERT INTO migration VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`
);
const stmtRun = promisify(stmt.run.bind(stmt));
const stmtFinalize = promisify(stmt.finalize.bind(stmt));
let insertCount = 0;
let rowNumber = 0;
// Use async iterator pattern to coordinate database inserts
// This ensures proper sequencing and prevents race conditions
const parser = fs
.createReadStream("./migration_data.csv")
.pipe(parse({ delimiter: ",", from_line: 2, trim: true }));
try {
for await (const row of parser) {
rowNumber++;
try {
const rowData = prepareRowData(row, rowNumber);
// Await each insert to ensure proper sequencing
// This prevents race conditions and ensures inserts complete before moving on
await stmtRun(rowData);
insertCount++;
if (insertCount % 1000 === 0) {
console.log(`Inserted ${insertCount} rows`);
}
} catch (error) {
// Log validation/insert errors with contextual information
console.error(`Row ${rowNumber} failed:`, error.message);
// Continue processing other rows instead of crashing
}
}
// Finalize prepared statement
await stmtFinalize();
// Commit transaction - all inserts succeed atomically
await dbRun("COMMIT");
console.log(`Transaction committed: ${insertCount} rows inserted`);
resolve(insertCount);
} catch (error) {
// Rollback transaction on error to maintain data integrity
console.error("Stream processing error:", error.message);
await dbRun("ROLLBACK");
reject(error);
}
} catch (error) {
console.error("Transaction setup error:", error.message);
reject(error);
} finally {
db.close();
}
});
});
}
insertWithTransactions()
.then((count) => {
console.log(`Successfully inserted ${count} rows`);
process.exit(0);
})
.catch((error) => {
console.error("Insert with transactions failed:", error.message);
process.exit(1);
});
Transactions ensure each batch succeeds atomically, while prepare/run lets SQLite cache the execution plan instead of re-parsing SQL for every row. Using async/await with the iterator pattern ensures inserts are properly sequenced and prevents race conditions that can occur when inserting rows directly in .on('data') handlers without awaiting completion.
Keep recurring CSV workflows healthy by codifying tests, static checks, automation, and observability.
tests/fixtures/ and write Jest or Mocha tests that invoke your parsing functions. Snapshot the serialized CSV output so column orders and quoting stay consistent.csvlint or a lightweight Node script that verifies header names, delimiter consistency, and row counts before promoting new datasets.node readCSV.js and node insertData.js in CI against temporary SQLite databases so malformed CSVs fail fast before reaching production.pino or winston. Alert when error spikes or throughput drops indicate a downstream schema change.Before and during parsing, use quick sanity checks and targeted configuration to avoid common CSV ingestion failures.
head, tail, and wc to understand its size, structure, and encoding.bom: true in csv-parse.csvlint where available to catch structural issues early.Problem: Parser returns single-column arrays or incorrect field separation.
Prevention: Run head -5 migration_data.csv and check how fields are separated, or use a validator such as csvlint before wiring up the parser.
Solution: Identify the delimiter by inspecting the file:
const fs = require("fs");
const firstLine = fs.readFileSync("./migration_data.csv", "utf8").split("\n")[0];
console.log("First line:", firstLine);
// Check for common delimiters
if (firstLine.includes(";")) {
console.log("Delimiter appears to be semicolon");
} else if (firstLine.includes("\t")) {
console.log("Delimiter appears to be tab");
} else {
console.log("Delimiter appears to be comma");
}
Then configure the parser accordingly by:
parse({ delimiter: ";" }) // or "\t" for tabs
Problem: Fields containing commas, quotes, or newlines break parsing.
Prevention: Ask data producers to always quote text fields that may contain commas, quotes, or newlines, and validate a sample file with head and wc -l to confirm that each row has a consistent number of fields.
Solution: The csv-parse module handles quoted fields automatically. Ensure your CSV uses proper quoting:
name,description,price
"Widget, Premium","A widget with ""special"" features",29.99
The parser handles this correctly with default settings. If issues persist, check the quote and escape options:
parse({
quote: '"',
escape: '"',
relax_quotes: true,
})
Problem: Application runs out of memory when processing large CSV files.
Prevention: Use wc -l and file size checks to avoid loading unexpectedly large CSV files into memory with readFileSync; default to streaming for anything beyond small test datasets.
Solution: Always use streams instead of loading the entire file:
// Don't do this for large files
const data = fs.readFileSync("large_file.csv", "utf8");
const records = parse(data);
// Do this instead
fs.createReadStream("large_file.csv")
.pipe(parse({ columns: true }))
.on("data", (record) => {
// Process each record immediately
});
Problem: Headers are not interpreted correctly (for example, the header row is parsed as data or the first data row is treated as headers).
Prevention: Inspect the first few lines with head -3 to confirm whether the file includes a header row and how the column names are formatted before enabling columns: true.
Solution: Explicitly specify column names or skip the header row:
// Option 1: Skip first line and define columns manually
parse({ from_line: 2, columns: ["col1", "col2", "col3"] })
// Option 2: Use first line as headers
parse({ columns: true })
// Option 3: Provide column mapping
parse({
columns: (header) => header.map((col) => col.toLowerCase().trim())
})
Problem: Database inserts or API calls in data event handlers cause race conditions.
Prevention: Design your pipeline around async iterators or stream.pipeline() from the start, and avoid doing unbounded parallel database inserts inside simple data event handlers.
Solution: Use proper async handling with backpressure control:
const fs = require("fs");
const { parse } = require("csv-parse");
async function processWithAsyncOps() {
const parser = fs
.createReadStream("./migration_data.csv")
.pipe(parse({ columns: true }));
for await (const record of parser) {
// Await async operations properly (for example, a database insert or API call)
await someAsyncOperation(record);
}
console.log("Finished processing all records");
}
processWithAsyncOps().catch((error) => {
console.error("Error while processing CSV:", error.message);
});
If you’re using a callback-based database client such as sqlite3, wrap its methods in Promises before using await inside the loop so that each insert or API call is properly sequenced.
The node-csv package (installed as csv) is widely used and well-maintained. It provides stream-based processing, making it suitable for large files. Alternatives include fast-csv (faster for some use cases) and papaparse (good browser compatibility). Choose based on your specific needs: stream processing, performance, or browser support.
Use Node.js streams with csv-parse:
fs.createReadStream("large_file.csv")
.pipe(parse({ columns: true }))
.on("data", (row) => {
// Process each row as it arrives
});
This processes data in chunks without loading the entire file into memory. For even better control, use async iterators:
for await (const record of parser) {
// Process record
}
csv-parse is part of the node-csv package and uses streams by default. csv-parser is a separate, lighter package with similar functionality. Both are valid choices; csv-parse offers more configuration options and is part of a larger ecosystem (csv-stringify, csv-transform).
Use csv-stringify to convert JSON objects to CSV:
const { stringify } = require("csv-stringify");
const stringifier = stringify({ header: true, columns: ["col1", "col2"] });
stringifier.pipe(fs.createWriteStream("output.csv"));
jsonArray.forEach((obj) => {
stringifier.write(obj);
});
stringifier.end();
The csv-parse and csv-stringify modules automatically handle quoted fields containing special characters. Ensure your CSV uses proper quoting:
name,description
"Smith, John","Product with ""quotes"" and, commas"
The parser handles this with default settings. If you’re generating CSV, the stringifier automatically quotes fields when needed.
Yes. Use async iterators with the parser stream:
const parser = fs.createReadStream("file.csv").pipe(parse({ columns: true }));
for await (const record of parser) {
await processRecord(record); // Await async operations
}
This provides better control flow than event-based handlers when you need to await async operations.
Use the stream-transform module with stream.pipeline for proper backpressure handling:
const { transform } = require("stream-transform");
const { pipeline } = require("stream/promises");
const transformer = transform((row, callback) => {
const estimate = parseInt(row.estimate, 10);
if (isNaN(estimate)) {
console.warn(`Invalid estimate value: ${row.estimate}`);
}
const transformed = {
...row,
estimate: (estimate || 0) * 2,
};
callback(null, transformed);
});
async function transformCSV() {
await pipeline(
fs.createReadStream("input.csv"),
parse({ columns: true, relax_column_count: true }),
transformer,
stringify({ header: true }),
fs.createWriteStream("output.csv")
);
}
transformCSV().catch((error) => {
console.error("Transform failed:", error.message);
process.exit(1);
});
Specify the delimiter in the parser options:
// Semicolon-delimited
parse({ delimiter: ";" })
// Tab-delimited
parse({ delimiter: "\t" })
// Pipe-delimited
parse({ delimiter: "|" })
You can also use the delimiter option in stringify when writing CSV files.
Yes, but it’s not recommended for production. Simple parsing can be done with string methods, but this doesn’t handle edge cases like quoted fields, escaped characters, or multiline values. The node-csv library handles these cases correctly and is well-tested.
Use the csv package to build streaming, resilient CSV pipelines in Node.js that can handle everything from small exports to multi-million-row data feeds. In this tutorial, you read CSV files as arrays and objects with csv-parse, wrote CSV output with csv-stringify, converted between JSON and CSV, processed large files with stream pipelines, validated and quarantined bad rows, and imported data into SQLite for querying and reporting.
When building CSV workflows, keep a few essentials in mind:
For more information, see the CSV Project documentation. To continue learning about Node.js file operations, see How To Work with Files Using the fs Module in Node.js. For more on asynchronous programming, see How To Write Asynchronous Code in Node.js.
Thanks for learning with the DigitalOcean Community. Check out our offerings for compute, storage, networking, and managed databases.
Software Engineer with a passion for sharing my knowledge.
Building future-ready infrastructure with Linux, Cloud, and DevOps. Full Stack Developer & System Administrator. Technical Writer @ DigitalOcean | GitHub Contributor | Passionate about Docker, PostgreSQL, and Open Source | Exploring NLP & AI-TensorFlow | Nailed over 50+ deployments across production environments.
This textbox defaults to using Markdown to format your answer.
You can type !ref in this text area to quickly search our full set of tutorials, documentation & marketplace offerings and insert the link!
Get paid to write technical tutorials and select a tech-focused charity to receive a matching donation.
Full documentation for every DigitalOcean product.
The Wave has everything you need to know about building a business, from raising funding to marketing your product.
Stay up to date by signing up for DigitalOcean’s Infrastructure as a Newsletter.
New accounts only. By submitting your email you agree to our Privacy Policy
Scale up as you grow — whether you're running one virtual machine or ten thousand.
Sign up and get $200 in credit for your first 60 days with DigitalOcean.*
*This promotional offer applies to new accounts only.