Report this

What is the reason for this report?

How To Read and Write CSV Files in Node.js Using Node-CSV

Updated on November 14, 2025
How To Read and Write CSV Files in Node.js Using Node-CSV

The author selected Society of Women Engineers to receive a donation as part of the Write for DOnations program.

Introduction

Learn how to parse, transform, and store CSV data in Node.js using the csv (node-csv) package and SQLite.

CSV (Comma-Separated Values) files are plain text files that store tabular data in a structured format, and a delimiter (typically a comma) separates values within each row. CSV files are widely used for data exchange because they’re simple, human-readable, and supported by spreadsheet applications, databases, and programming languages.

Node.js provides several highly efficient modules for working with CSV files, including node-csv, fast-csv, and papaparse. The node-csv package is, in particular, a collection of stream-based modules that handle parsing, stringification, transformation, and generation of CSV data efficiently, making it suitable for processing large files without consuming excessive memory.

In this tutorial, you will learn how to:

  • Read CSV files using Node.js streams with csv-parse
  • Write data to CSV files using csv-stringify
  • Handle large datasets efficiently with streaming
  • Convert between JSON and CSV formats
  • Implement error handling and data validation
  • Troubleshoot common issues when working with CSV files

Deploy your Node applications from GitHub using DigitalOcean App Platform. Let DigitalOcean focus on scaling your app.

Key Takeaways

Remember these core patterns and best practices when designing CSV ingestion and export workflows in Node.js:

  • The csv package (also known as node-csv) provides four modular tools csv-parse, csv-stringify, csv-transform, and csv-generate for stream-based reading, writing, and manipulation of CSV data in Node.js.
  • Use streams instead of loading entire files into memory to handle large datasets efficiently and prevent performance bottlenecks.
  • csv-parse lets you convert raw CSV text into arrays or objects using options like columns, from_line, delimiter, and trim for precise parsing control.
  • csv-stringify serializes arrays or objects into CSV format, with full support for custom headers, delimiters, and streaming writes for scalable exports.
  • Use async iterators (for await ... of) or the pipe() method to process streaming data incrementally while maintaining non-blocking I/O.
  • Combine the CSV modules with SQLite or other databases to import, query, and export data pipelines using Node.js efficiently.
  • Apply validation logic and robust error handling (skip_records_with_error, relax_column_count, and field validation) to ensure data integrity across imports and exports.
  • Utilize transform streams to enrich, clean, or modify records in real time during parsing or writing, avoiding post-processing overhead.
  • For production workflows, wrap database inserts inside transactions to reduce disk I/O and improve performance for large imports.
  • Convert between JSON and CSV seamlessly using csv-parse and csv-stringify to integrate Node.js applications with APIs, analytics tools, or data exchange systems.
  • Troubleshoot common issues such as wrong delimiters, quoting errors, header misalignment, and memory overflow by fine-tuning parser options and leveraging streams.
  • Always design CSV workflows around scalability, predictability, and maintainability - ensuring code is testable, resilient, and optimized for both human readability and machine performance.

Prerequisites

Before you start, make sure your environment and background knowledge match these prerequisites:

Understanding CSV Files and Their Use in Node.js

Choose the right CSV handling strategy for your workload, then configure the csv package to match your delimiters, headers, and encoding.

  • Decide whether to load small files into memory or use streaming for large and unbounded datasets.
  • Pick a CSV library (csv-parse, fast-csv, papaparse) that fits your runtime (Node-only vs browser) and configuration needs.
  • Normalize encodings, BOM markers, and newline styles before parsing to avoid subtle bugs.

CSV files use a delimiter character to separate values within each row. While commas are most common, other delimiters include semicolons (;), tabs (\t), and pipes (|). The first row often contains column headers, though this isn’t required.

When working with CSV files in Node.js, you have two main approaches:

  1. Load entire file into memory: Suitable for small files, but can cause memory issues with large datasets
  2. Stream-based processing: Reads and processes data in chunks, making it memory-efficient for large files

The node-csv package uses streams by default, which makes it ideal for production applications that handle variable file sizes. The package consists of four modules:

csv-parse, csv-stringify, stream-transform, and csv-generate cover ingestion, export, inline rewriting, and synthetic dataset creation. The table below shows when to pick each tool and how it compares with other Node.js CSV approaches:

Tool / Approach Defaults (delimiter, headers, newlines) Ideal workloads Key differentiators
fs.readFileSync + manual parsing Raw text only; no CSV semantics; no header or quoting support; preserves native newlines Tiny one-off scripts and quick experiments Simple to write but brittle; does not handle quoting, BOM, or multiline fields safely.
csv-parse from csv (node-csv) Comma delimiter by default; configurable columns; normalizes \r\n / \n; optional bom: true Small to very large files; streaming ETL jobs Rich configuration, first-class streaming, and tight integration with stringify/transform.
fast-csv Comma delimiter; optional header row; stream-based High-throughput ingestion/export pipelines Performance-focused API with good TypeScript support and streaming-first design.
papaparse (with Node wrapper) Comma delimiter; browser-first; header detection via options Browser + Node hybrid workflows Works both in the browser and Node; good for shared CSV logic across front-end/back-end.

Some CSV exports include a UTF-8 byte order mark (BOM) at the start of the file. When using csv-parse, you can enable bom: true to automatically strip the BOM:

fs.createReadStream("./migration_data.csv")
  .pipe(
    parse({
      columns: true,
      bom: true,
      skip_empty_lines: true,
    })
  );

Line endings can also vary (\n on Unix-like systems, \r\n on Windows). Node streams pass these through transparently, and csv-parse normalizes them internally. If you manually split lines using string operations, normalize them first:

const raw = fs.readFileSync("./migration_data.csv", "utf8");
const normalized = raw.replace(/\r\n?/g, "\n");
const lines = normalized.split("\n");

Setting Up the Project Environment

Set up a dedicated project directory, install the csv and sqlite3 packages, and download a sample CSV file to work with.

Create a project directory and initialize it as an npm project:

  1. mkdir csv_demo
  2. cd csv_demo
  3. npm init -y

The -y option tells npm to use the default settings for your package.json file, so you don’t have to answer prompts during initialization. You can always update these values later to better match your project’s needs.

Install the required dependencies:

  1. npm install csv sqlite3

This command installs:

  • csv: The main package containing csv-parse, csv-stringify, csv-transform, and csv-generate
  • sqlite3: For database operations in later examples

Note: The csv package is the official name on npm, but it’s commonly referred to as node-csv in documentation and discussions. Both names refer to the same package.

Download a sample CSV dataset for testing. This example uses migration data from Stats NZ:

  1. wget https://www.stats.govt.nz/assets/Uploads/International-migration/International-migration-September-2021-Infoshare-tables/Download-data/international-migration-September-2021-estimated-migration-by-age-and-sex-csv.csv

If wget is not installed on your system, run curl -O https://www.stats.govt.nz/assets/Uploads/International-migration/International-migration-September-2021-Infoshare-tables/Download-data/international-migration-September-2021-estimated-migration-by-age-and-sex-csv.csv instead.

Rename the file for easier reference:

  1. mv international-migration-September-2021-estimated-migration-by-age-and-sex-csv.csv migration_data.csv

The new CSV filename, migration_data.csv, is shorter and easier to work with. Using editors like nano or vim, open the file to inspect its structure:

  1. nano migration_data.csv

The file structure looks like this:

year_month,month_of_release,passenger_type,direction,sex,age,estimate,standard_error,status
2001-01,2020-09,Long-term migrant,Arrivals,Female,0-4 years,344,0,Final
2001-01,2020-09,Long-term migrant,Arrivals,Male,0-4 years,341,0,Final
...

The first line contains the column names, and all subsequent lines have the data corresponding to each column. A comma separates each piece of data. This character is known as a delimiter because it delineates the fields. You are not limited to using commas. Other popular delimiters include colons (:), semicolons (;), and tabs (\t). You need to know which delimiter is used in the file since most modules require it to parse the files.

After reviewing the file and identifying the delimiter, exit your migration_data.csv file using CTRL+X.

Installing and Importing Node-CSV

Import only the csv modules you need - typically csv-parse for reading, csv-stringify for writing, and stream-transform for inline edits - so the dependency footprint stays small.

const { parse } = require("csv-parse");
const { stringify } = require("csv-stringify");
const { transform } = require("stream-transform");

For ES modules (Node.js 14+ with "type": "module" in package.json):

import { parse } from "csv-parse";
import { stringify } from "csv-stringify";
import { transform } from "stream-transform";

Note: The examples in this guide use CommonJS (require). If you’re working with ES modules, be sure to update the import statements using the import keyword as shown above.

Reading CSV Files

Use csv-parse with Node.js streams to read CSV files as arrays or objects without loading the entire file into memory. Use Node.js streams to process files efficiently, especially large ones.

Basic CSV Reading

Create a file named readCSV.js:

csv_demo/readCSV.js
const fs = require("fs");
const { parse } = require("csv-parse");
const { pipeline } = require("stream/promises");

// Use stream.pipeline for proper backpressure handling and error propagation
async function readCSV() {
  const parser = fs
    .createReadStream("./migration_data.csv")
    .pipe(parse({ delimiter: ",", from_line: 2 }));

  try {
    // Use async iterator pattern to process rows incrementally
    // This ensures proper backpressure handling and prevents memory build-up
    for await (const row of parser) {
      console.log(row);
    }
    console.log("Finished reading CSV file");
  } catch (error) {
    console.error("Error:", error.message);
    throw error;
  }
}

readCSV().catch((error) => {
  console.error("Failed to read CSV:", error.message);
  process.exit(1);
});

This code works as follows:

  • fs.createReadStream("./migration_data.csv") creates a readable stream from the CSV file. A readable stream breaks the file into smaller chunks so Node.js doesn’t have to load the entire file into memory at once, and it only lets you read from the stream, not write to it.
  • .pipe(parse({ delimiter: ",", from_line: 2 })) forwards each chunk from the readable stream into csv-parse. The parse() function implements a transform stream (both readable and writable): it accepts raw CSV text such as 2001-01,2020-09,Long-term migrant,Arrivals,Female,0-4 years,344 and emits the parsed row as an array of fields.
  • The options object { delimiter: ",", from_line: 2 } configures the parser. delimiter defines which character separates fields in a row, and from_line: 2 tells the parser to skip the header row so you only process data records.
  • The for await...of loop uses the async iterator pattern to process rows incrementally. This ensures proper backpressure handling, preventing memory build-up on large files. The parser automatically pauses when the consumer is busy, and resumes when ready, maintaining optimal memory usage.

Next, Run the script using:

  1. node readCSV.js

The output shows each row as an array:

Output
[ '2001-01', '2020-09', 'Long-term migrant', 'Arrivals', 'Female', '0-4 years', '344', '0', 'Final' ] ... Finished reading CSV file

All the rows in the CSV file are parsed into arrays by the csv-parse stream. Because the data handler runs each time a chunk is received from the stream, you see the output scroll past gradually instead of being printed all at once.

In this step, you read data from a CSV file and transformed each row into an array using a basic stream pipeline. Next, you’ll see how to read the same CSV data as JavaScript objects using the column names.

Reading CSV as Objects

To parse CSV rows as objects with column names as keys, use the columns option:

csv_demo/readCSVObjects.js
const fs = require("fs");
const { parse } = require("csv-parse");

// Use async iterator pattern for proper backpressure handling
async function readCSVAsObjects() {
  const parser = fs
    .createReadStream("./migration_data.csv")
    .pipe(
      parse({
        columns: true,
        skip_empty_lines: true,
        trim: true,
        // Avoid skip_records_with_error - validate and log errors instead
        relax_column_count: true,
      })
    );

  try {
    for await (const row of parser) {
      // Process each row as it arrives, preventing memory accumulation
    console.log(row);
    }
    console.log("Finished reading CSV file");
  } catch (error) {
    console.error("Error:", error.message);
    throw error;
  }
}

readCSVAsObjects().catch((error) => {
  console.error("Failed to read CSV:", error.message);
  process.exit(1);
});

With columns: true, the parser uses the first row as column names and returns objects like:

{
  year_month: '2001-01',
  month_of_release: '2020-09',
  passenger_type: 'Long-term migrant',
  direction: 'Arrivals',
  sex: 'Female',
  age: '0-4 years',
  estimate: '344',
  standard_error: '0',
  status: 'Final'
}

The skip_empty_lines: true option ignores blank lines, and trim: true removes whitespace from field values.

Reading CSV with Async/Await

For better control flow, you can use async/await with promises:

csv_demo/readCSVAsync.js
const fs = require("fs");
const { parse } = require("csv-parse");

// Validate and log errors with contextual information
function validateRecord(record, rowNumber) {
  const errors = [];
  
  if (!record.year_month || !/^\d{4}-\d{2}$/.test(record.year_month)) {
    errors.push(`Invalid year_month format at row ${rowNumber}`);
  }
  
  const estimate = parseInt(record.estimate, 10);
  if (isNaN(estimate) || estimate < 0) {
    errors.push(`Invalid estimate value at row ${rowNumber}`);
  }
  
  return { isValid: errors.length === 0, errors };
}

async function readCSVFile(filePath) {
  let rowNumber = 0;
  let validCount = 0;
  let invalidCount = 0;
  
  const parser = fs
    .createReadStream(filePath)
    .pipe(
      parse({
        columns: true,
        skip_empty_lines: true,
        trim: true,
        relax_column_count: true,
      })
    );

  // Process records individually to avoid memory accumulation on large files
  for await (const record of parser) {
    rowNumber++;
    const validation = validateRecord(record, rowNumber);
    
    if (validation.isValid) {
      validCount++;
      // Process each valid record as it arrives
      console.log(`Processed record ${rowNumber}: ${record.year_month}`);
    } else {
      invalidCount++;
      // Log validation errors with contextual information for debugging
      console.warn(`Row ${rowNumber} validation failed:`, validation.errors);
    }
  }

  console.log(`Total records: ${rowNumber}, Valid: ${validCount}, Invalid: ${invalidCount}`);
  return { total: rowNumber, valid: validCount, invalid: invalidCount };
}

readCSVFile("./migration_data.csv")
  .then((stats) => {
    console.log(`Processing complete: ${stats.valid} valid records`);
  })
  .catch((error) => {
    console.error("Error reading CSV:", error.message);
    process.exit(1);
  });

This approach processes records individually to avoid memory accumulation. Validation errors are logged with row numbers for easier debugging.

Writing CSV Files

Use csv-stringify with writable streams to serialize arrays or objects into CSV files, optionally adding headers and custom column ordering. When dealing with file output, csv-stringify can be combined with writable streams, such as those created with Node’s fs.createWriteStream(), to handle large datasets efficiently by writing data incrementally, row by row, instead of loading everything in memory. This approach is ideal for scalable applications and supports handling headers as well.

Basic CSV Writing

Create a file named writeCSV.js:

csv_demo/writeCSV.js
const fs = require("fs");
const { stringify } = require("csv-stringify");
const { pipeline } = require("stream/promises");

const columns = [
  "year_month",
  "month_of_release",
  "passenger_type",
  "direction",
  "sex",
  "age",
  "estimate",
];

const data = [
  {
    year_month: "2001-01",
    month_of_release: "2020-09",
    passenger_type: "Long-term migrant",
    direction: "Arrivals",
    sex: "Female",
    age: "0-4 years",
    estimate: "344",
  },
  {
    year_month: "2001-01",
    month_of_release: "2020-09",
    passenger_type: "Long-term migrant",
    direction: "Arrivals",
    sex: "Male",
    age: "0-4 years",
    estimate: "341",
  },
];

async function writeCSV() {
  const stringifier = stringify({ header: true, columns: columns });
  const writableStream = fs.createWriteStream("output.csv");

  // Register event listeners BEFORE initiating writes to prevent missing events
  writableStream.on("finish", () => {
    console.log("Finished writing CSV file");
  });

  writableStream.on("error", (error) => {
    console.error("Write stream error:", error.message);
  });

  // Use pipeline for proper backpressure handling and error propagation
  await pipeline(
    async function* () {
      // Generate data as a readable stream
      for (const row of data) {
        yield row;
      }
    },
    stringifier,
    writableStream
  );
}

writeCSV().catch((error) => {
  console.error("Failed to write CSV:", error.message);
  process.exit(1);
});

This code:

  • Defines a columns array that controls the order and names of the fields in the CSV header.
  • Creates a stringify stream with header: true and columns, so the header row is written automatically before any data rows.
  • Pipes the stringifier into a writable file stream that targets output.csv.
  • Iterates over the data array and writes each object to the stringifier, which converts it into a CSV row.
  • Calls stringifier.end() to signal that no more rows are coming, allowing the writable stream to flush remaining data and emit the finish event.

Writing CSV from Database

You may want to export data directly from a database to a CSV file for reporting, backup, or sharing with other systems. This can be accomplished efficiently by streaming the data from the database and writing it to a CSV file as each row is read.

The following example demonstrates how to connect to a SQLite database, query a table, and write the results to a CSV file using the csv-stringify module. By streaming both the database reads and CSV writes, you avoid loading all data into memory at once, making this approach suitable for large datasets:

csv_demo/writeCSVFromDB.js
const fs = require("fs");
const { stringify } = require("csv-stringify");
const sqlite3 = require("sqlite3").verbose();
const { promisify } = require("util");

const db = new sqlite3.Database("./population.db");
const filename = "saved_from_db.csv";
const writableStream = fs.createWriteStream(filename);

const columns = [
  "year_month",
  "month_of_release",
  "passenger_type",
  "direction",
  "sex",
  "age",
  "estimate",
  "standard_error",
  "status",
];

const stringifier = stringify({ header: true, columns: columns });
stringifier.pipe(writableStream);

// Register event listeners BEFORE initiating writes to prevent missing events
writableStream.on("finish", () => {
  console.log("Finished writing data to CSV file");
  db.close();
});

writableStream.on("error", (error) => {
  console.error("Write stream error:", error.message);
  db.close();
});

stringifier.on("error", (error) => {
  console.error("Stringifier error:", error.message);
  db.close();
});

// Use async/await pattern for better error handling and control flow
async function exportToCSV() {
  return new Promise((resolve, reject) => {
    let rowCount = 0;

    db.each(
      `SELECT * FROM migration`,
      (error, row) => {
        if (error) {
          console.error("Database query error:", error.message);
          return;
        }
        rowCount++;
        // Write each row as it's retrieved, maintaining streaming behavior
        if (!stringifier.write(row)) {
          // Handle backpressure: pause database reads if stringifier is full
          db.pause();
          stringifier.once("drain", () => {
            db.resume();
          });
        }
      },
      (error, count) => {
        if (error) {
          console.error("Database error:", error.message);
          reject(error);
          return;
        }
        console.log(`Processed ${count} rows`);
        // End the stringifier after all rows are written
        stringifier.end();
        resolve(count);
      }
    );
  });
}

exportToCSV().catch((error) => {
  console.error("Export failed:", error.message);
  db.close();
  process.exit(1);
});

The db.each() method retrieves rows one at a time, writing each to the CSV stream. This approach is memory-efficient for large result sets.

Handling Large Datasets and Streams

Process large CSV files with streaming pipelines so you can filter, transform, and export millions of rows without exhausting memory.

  • Use fs.createReadStream() and csv-parse to stream input in bounded chunks.
  • Connect parse, optional transform, and stringify stages with stream.pipeline() to wire backpressure and error handling.
  • Tune stream options (like highWaterMark) and consider worker threads for CPU-heavy transformations.

Using stream.pipeline for Robust Large-File Processing

The stream.pipeline() API (or pipeline from stream/promises) connects multiple streams, propagates errors correctly, and respects backpressure end-to-end. This is useful when you want a single async function that processes a large file from start to finish:

csv_demo/pipelineCSV.js
const fs = require("fs");
const { parse } = require("csv-parse");
const { stringify } = require("csv-stringify");
const { transform } = require("stream-transform");
const { pipeline } = require("stream/promises");

// Validate and log errors with contextual information
function validateAndTransform(row, rowNumber) {
  const estimate = parseInt(row.estimate, 10);
  
  // Validate estimate field
  if (isNaN(estimate) || estimate < 0) {
    console.warn(`Row ${rowNumber}: Invalid estimate value "${row.estimate}"`);
    return null; // Filter out invalid rows
  }

  // Filter: drop low-importance rows
  if (estimate < 100) {
    return null;
  }

  return {
    year_month: row.year_month,
    direction: row.direction,
    sex: row.sex,
    estimate,
  };
}

async function runPipeline() {
  let rowNumber = 0;
  
  // Use stream.pipeline for proper backpressure handling and error propagation
  // This ensures all streams are properly cleaned up on error
  await pipeline(
    fs.createReadStream("./migration_data.csv", {
      highWaterMark: 64 * 1024, // 64 KB chunks - tune based on your memory constraints
    }),
    parse({
      columns: true,
      skip_empty_lines: true,
      trim: true,
      relax_column_count: true, // Allow inconsistent columns, validate manually
    }),
    transform((row) => {
      rowNumber++;
      // Validate and transform each row, logging errors for debugging
      return validateAndTransform(row, rowNumber);
    }),
    stringify({ header: true }),
    fs.createWriteStream("./pipeline_output.csv")
  );

  console.log(`Pipeline complete. Processed ${rowNumber} rows.`);
}

runPipeline().catch((error) => {
  console.error("Pipeline failed:", error.message);
  process.exit(1);
});

In this pattern:

  • highWaterMark controls how much data each stream buffers at a time. Smaller values reduce peak memory usage, while slightly larger values can improve throughput.
  • Transform logic runs row by row, so memory stays bounded even for very large files.
  • Errors thrown in any stage propagate to the catch block, rather than leaving half-open streams.

For CPU-heavy operations (such as complex parsing, encryption, or compression), consider offloading per-row work to a pool of worker threads using the worker_threads module or a worker pool library. Each worker can process a subset of the data while the main thread manages streaming and I/O.

For files with millions of rows, you can also filter or process data on the fly within your pipeline. For example, to only include rows where estimate > 100:

csv_demo/processLargeCSV.js
const fs = require("fs");
const { parse } = require("csv-parse");
const { stringify } = require("csv-stringify");
const { transform } = require("stream-transform");
const { pipeline } = require("stream/promises");

const inputFile = "./migration_data.csv";
const outputFile = "./filtered_output.csv";

// Validate and filter rows with proper error logging
function validateAndFilter(row, rowNumber) {
  const estimate = parseInt(row.estimate, 10);
  
  // Validate estimate field and log errors
  if (isNaN(estimate)) {
    console.warn(`Row ${rowNumber}: Invalid estimate "${row.estimate}" - skipping`);
    return null;
  }
  
  // Filter: only process rows with estimate > 100
  if (estimate > 100) {
    return {
      year_month: row.year_month,
      direction: row.direction,
      sex: row.sex,
      estimate: row.estimate,
    };
  }
  
  return null; // Filter out rows that don't meet criteria
}

async function processLargeCSV() {
  let rowNumber = 0;
  
  // Use stream.pipeline for proper backpressure handling and error propagation
  await pipeline(
    fs.createReadStream(inputFile),
    parse({
      columns: true,
      skip_empty_lines: true,
      trim: true,
      relax_column_count: true,
    }),
    transform((row) => {
      rowNumber++;
      return validateAndFilter(row, rowNumber);
    }),
    stringify({
      header: true,
      columns: ["year_month", "direction", "sex", "estimate"],
    }),
    fs.createWriteStream(outputFile)
  );
  
  console.log(`Finished processing large CSV file. Processed ${rowNumber} rows.`);
}

processLargeCSV().catch((error) => {
  console.error("Error processing CSV:", error.message);
  process.exit(1);
});

This example reads a large file, filters rows based on a condition, and writes filtered results to a new file, all without loading the entire dataset into memory.

Using Transform Streams

With the stream-transform module from node-csv, you can manipulate CSV data on the fly during reading or writing. As each row flows through the stream pipeline, you can change, filter, or enrich rows before they reach their next destination. This enables memory-efficient processing and powerful, scalable transformations for large CSV datasets without fully loading files.

csv_demo/transformCSV.js
const fs = require("fs");
const { parse } = require("csv-parse");
const { transform } = require("stream-transform");
const { stringify } = require("csv-stringify");
const { pipeline } = require("stream/promises");

// Validate and transform with error logging
const transformer = transform((row, callback) => {
  const estimate = parseInt(row.estimate, 10);
  
  // Validate estimate field
  if (isNaN(estimate) || estimate < 0) {
    console.warn(`Invalid estimate value "${row.estimate}" - using 0`);
  }
  
  // Transform: convert estimate to number and add a calculated field
  const transformedRow = {
    ...row,
    estimate: estimate || 0,
    estimate_doubled: (estimate || 0) * 2,
  };
  callback(null, transformedRow);
});

// Use stream.pipeline for proper backpressure handling and error propagation
async function transformCSV() {
  await pipeline(
    fs.createReadStream("./migration_data.csv"),
    parse({ 
      columns: true, 
      skip_empty_lines: true,
      relax_column_count: true,
    }),
    transformer,
    stringify({ header: true }),
    fs.createWriteStream("transformed_output.csv")
  );
  
  console.log("Transformation complete");
}

transformCSV().catch((error) => {
  console.error("Transformation failed:", error.message);
  process.exit(1);
});

The transform function receives each row and can modify it before passing it to the next stage in the pipeline.

Converting Between JSON and CSV

Convert between JSON and CSV formats so your Node.js applications can import spreadsheet-style data and export structured results for analytics or sharing.

Whether you need to export API results or import spreadsheet data, converting between JSON and CSV is a common workflow in Node.js. Let’s see how to do it both ways.

Streaming JSON → CSV Conversions

For large datasets, avoid loading entire JSON arrays or CSV files into memory. Instead, stream records through the conversion pipeline.

The following example uses a JSON array stored in input.json and converts it to CSV using a streaming parser:

csv_demo/jsonToCSVStream.js
const fs = require("fs");
const JSONStream = require("JSONStream");
const { stringify } = require("csv-stringify");

const stringifier = stringify({
  header: true,
  columns: ["name", "age", "city"],
});

fs.createReadStream("input.json")
  .pipe(JSONStream.parse("*")) // Stream each element from a top-level array
  .pipe(stringifier)
  .pipe(fs.createWriteStream("output.csv"))
  .on("finish", () => {
    console.log("Streaming JSON to CSV conversion complete");
  });

You can also stream CSV to JSON using csv-parse and a JSON stringifier:

csv_demo/csvToJSONStream.js
const fs = require("fs");
const { parse } = require("csv-parse");
const JSONStream = require("JSONStream");

fs.createReadStream("./migration_data.csv")
  .pipe(
    parse({
      columns: true,
      skip_empty_lines: true,
      trim: true,
    })
  )
  .pipe(JSONStream.stringify("[\n", ",\n", "\n]"))
  .pipe(fs.createWriteStream("output_stream.json"))
  .on("finish", () => {
    console.log("Streaming CSV to JSON conversion complete");
  });

When designing streaming conversions:

  • Treat each row or object as an independent unit that can be processed and written immediately.
  • Avoid aggregating large arrays in memory; instead, use streaming JSON utilities like JSONStream or Node’s Readable.from() to write records incrementally.
  • Consider compressing large JSON outputs with zlib.createGzip() at the end of the pipeline when disk or network I/O is a bottleneck.

Converting JSON to CSV

Convert an array of JSON objects to CSV:

csv_demo/jsonToCSV.js
const fs = require("fs");
const { stringify } = require("csv-stringify");
const { pipeline } = require("stream/promises");
const { Readable } = require("stream");

const jsonData = [
  { name: "Alice", age: 30, city: "New York" },
  { name: "Bob", age: 25, city: "San Francisco" },
  { name: "Charlie", age: 35, city: "Chicago" },
];

async function jsonToCSV() {
  const stringifier = stringify({
    header: true,
    columns: ["name", "age", "city"],
  });
  
  const writableStream = fs.createWriteStream("output.csv");

  // Register event listeners BEFORE initiating writes to prevent missing events
  writableStream.on("finish", () => {
    console.log("JSON to CSV conversion complete");
  });

  writableStream.on("error", (error) => {
    console.error("Write stream error:", error.message);
  });

  // Use pipeline with a readable stream for proper backpressure handling
  await pipeline(
    Readable.from(jsonData), // Convert array to readable stream
    stringifier,
    writableStream
  );
}

jsonToCSV().catch((error) => {
  console.error("JSON to CSV conversion failed:", error.message);
  process.exit(1);
});

Converting CSV to JSON

Convert CSV data to JSON format using streaming to avoid memory accumulation:

csv_demo/csvToJSON.js
const fs = require("fs");
const { parse } = require("csv-parse");
const { pipeline } = require("stream/promises");
const { Transform } = require("stream");

// Transform stream that converts CSV records to JSON lines (NDJSON format)
// This avoids accumulating all records in memory
class CSVToJSONTransform extends Transform {
  constructor(options = {}) {
    super({ objectMode: true, ...options });
    this.isFirst = true;
  }

  _transform(chunk, encoding, callback) {
    try {
      // Output as JSON lines (one JSON object per line)
      // For array format, you'd need to buffer and write brackets, but that defeats streaming
      const jsonLine = JSON.stringify(chunk) + "\n";
      callback(null, jsonLine);
    } catch (error) {
      callback(error);
    }
  }
}

// Alternative: Stream as JSON array (requires buffering first/last items)
class CSVToJSONArrayTransform extends Transform {
  constructor(options = {}) {
    super({ objectMode: true, ...options });
    this.isFirst = true;
  }

  _transform(chunk, encoding, callback) {
    try {
      if (this.isFirst) {
        this.push("[\n");
        this.isFirst = false;
  } else {
        this.push(",\n");
      }
      const json = JSON.stringify(chunk, null, 2);
      this.push(json.replace(/^/gm, "  "));
      callback(null);
    } catch (error) {
      callback(error);
    }
  }

  _flush(callback) {
    if (!this.isFirst) {
      this.push("\n]");
    } else {
      this.push("[]");
    }
    callback();
  }
}

async function csvToJSON() {
  let recordCount = 0;
  
  // Use stream.pipeline for proper backpressure handling
  await pipeline(
    fs.createReadStream("./migration_data.csv"),
    parse({
      columns: true,
      skip_empty_lines: true,
      trim: true,
      relax_column_count: true,
    }),
    new Transform({
      objectMode: true,
      transform(chunk, encoding, callback) {
        recordCount++;
        // Validate record if needed
        if (!chunk.year_month) {
          console.warn(`Record ${recordCount}: Missing year_month field`);
        }
        callback(null, chunk);
      },
    }),
    new CSVToJSONArrayTransform(), // Use array format
    fs.createWriteStream("output.json")
  );
  
  console.log(`Converted ${recordCount} records to JSON`);
}

csvToJSON().catch((error) => {
  console.error("CSV to JSON conversion failed:", error.message);
  process.exit(1);
});

Note: For very large CSV files, prefer JSON Lines (NDJSON) format where each record is a separate JSON object on its own line. This allows true streaming without buffering. The array format shown above requires minimal buffering for brackets but is still more memory-efficient than loading all records.

Error Handling and Data Validation

Make your CSV pipelines resilient by validating records, tagging failures, and handling malformed rows without crashing the entire job.

CSV data can be messy. Missing fields, bad values, or inconsistent rows can break your scripts. This section shows how to detect, skip, or fix errors on the fly so your application stays stable and your data clean.

Tagging, Quarantining, and Retrying Failed Rows

In production, it’s usually better to tag and quarantine bad records than to drop them silently. You can route invalid rows to a separate CSV file for later inspection or reprocessing:

csv_demo/quarantineInvalidRows.js
const fs = require("fs");
const { parse } = require("csv-parse");
const { stringify } = require("csv-stringify");
const { pipeline } = require("stream/promises");
const { Transform } = require("stream");

// Validate records and tag errors with contextual information
function validateRecord(record, rowNumber) {
  const errors = [];
  const estimate = parseInt(record.estimate, 10);

  if (!record.year_month || !/^[0-9]{4}-[0-9]{2}$/.test(record.year_month)) {
    errors.push(`Invalid year_month format at row ${rowNumber}`);
  }

  if (Number.isNaN(estimate) || estimate < 0) {
    errors.push(`Invalid estimate value at row ${rowNumber}`);
  }

  const validDirections = ["Arrivals", "Departures"];
  if (!validDirections.includes(record.direction)) {
    errors.push(`Invalid direction at row ${rowNumber}`);
  }

  return { errors, estimate, isValid: errors.length === 0 };
}

// Transform stream that routes valid and invalid records to separate outputs
class QuarantineTransform extends Transform {
  constructor(validStringifier, quarantineStringifier, options = {}) {
    super({ objectMode: true, ...options });
    this.validStringifier = validStringifier;
    this.quarantineStringifier = quarantineStringifier;
    this.rowNumber = 0;
    this.validCount = 0;
    this.invalidCount = 0;
  }

  _transform(record, encoding, callback) {
    this.rowNumber++;
    const validation = validateRecord(record, this.rowNumber);

    if (validation.isValid) {
      this.validCount++;
      // Write valid records with converted estimate
      this.validStringifier.write({ ...record, estimate: validation.estimate });
    } else {
      this.invalidCount++;
      // Log validation errors with contextual information for debugging
      console.warn(`Row ${this.rowNumber} quarantined:`, validation.errors.join("; "));
      // Quarantine invalid records with error reasons
      this.quarantineStringifier.write({
        ...record,
        error_reasons: validation.errors.join("; "),
      });
    }
    callback(null); // Continue processing
  }

  _flush(callback) {
    this.validStringifier.end();
    this.quarantineStringifier.end();
    console.log(`Processing complete: ${this.validCount} valid, ${this.invalidCount} quarantined`);
    callback();
  }
}

async function quarantineInvalidRows() {
  const validStringifier = stringify({
    header: true,
    columns: [
      "year_month",
      "month_of_release",
      "passenger_type",
      "direction",
      "sex",
      "age",
      "estimate",
    ],
  });

  const quarantineStringifier = stringify({
    header: true,
    columns: [
      "year_month",
      "month_of_release",
      "passenger_type",
      "direction",
      "sex",
      "age",
      "estimate",
      "error_reasons",
    ],
  });

  const validStream = fs.createWriteStream("valid_rows.csv");
  const quarantineStream = fs.createWriteStream("quarantine_rows.csv");

  // Register event listeners BEFORE initiating writes
  validStream.on("finish", () => {
    console.log("Valid rows file written");
  });

  quarantineStream.on("finish", () => {
    console.log("Quarantine file written");
  });

  validStringifier.pipe(validStream);
  quarantineStringifier.pipe(quarantineStream);

  // Use stream.pipeline for proper backpressure handling and error propagation
  await pipeline(
    fs.createReadStream("./migration_data.csv"),
    parse({
      columns: true,
      skip_empty_lines: true,
      trim: true,
      relax_column_count: true, // Allow inconsistent columns, validate manually
      // Avoid skip_records_with_error - tag and quarantine instead
    }),
    new QuarantineTransform(validStringifier, quarantineStringifier)
  );
}

quarantineInvalidRows().catch((error) => {
  console.error("Quarantine processing failed:", error.message);
  process.exit(1);
});

Later, you can build a small retry job that reads quarantine_rows.csv, fixes known issues, and attempts to reinsert the cleaned records into your database.

The csv-parse module also supports cast and cast_date options to convert values as they are parsed:

// Use async iterator pattern for proper backpressure handling
async function parseWithCast() {
  const parser = fs
    .createReadStream("./migration_data.csv")
    .pipe(
      parse({
        columns: true,
        skip_empty_lines: true,
        relax_column_count: true,
        cast: (value, context) => {
          // Validate and cast estimate field
          if (context.column === "estimate") {
            const num = parseInt(value, 10);
            if (isNaN(num)) {
              console.warn(`Invalid estimate value "${value}" at column ${context.column}`);
              return null;
            }
            return num;
          }
          return value;
        },
        cast_date: true, // Parse ISO-like date strings into Date objects where applicable
      })
    );

  for await (const record of parser) {
    // record.estimate is a number or null (if invalid), and date-like fields are Date objects
    // Process each record as it arrives
  }
}

parseWithCast().catch((error) => {
  console.error("Parsing with cast failed:", error.message);
  process.exit(1);
});

In real-world services, integrate structured logging so you can trace problems across runs. For example, using a logging framework like pino:

const pino = require("pino");
const logger = pino();

fs.createReadStream("./migration_data.csv")
  .pipe(parse({ columns: true, skip_empty_lines: true }))
  .on("data", (record) => {
    const { errors } = validateRecord(record);
    if (errors.length > 0) {
      logger.warn(
        { record, errors },
        "Invalid CSV record encountered"
      );
    }
  })
  .on("error", (error) => {
    logger.error({ err: error }, "CSV parsing failed");
  });

Logging contextual information (file name, row number, error reasons) makes it much easier to debug and remediate data quality issues in production.

Basic Error Handling

Handle parsing errors gracefully:

csv_demo/errorHandling.js
const fs = require("fs");
const { parse } = require("csv-parse");
const { pipeline } = require("stream/promises");

// Validate records and log errors with contextual information
function validateRecord(record, rowNumber) {
  const errors = [];
  
  if (!record.year_month) {
    errors.push(`Missing year_month at row ${rowNumber}`);
  } else if (!/^\d{4}-\d{2}$/.test(record.year_month)) {
    errors.push(`Invalid year_month format at row ${rowNumber}`);
  }
  
  if (!record.estimate) {
    errors.push(`Missing estimate at row ${rowNumber}`);
  } else {
    const estimate = parseInt(record.estimate, 10);
    if (isNaN(estimate) || estimate < 0) {
      errors.push(`Invalid estimate value at row ${rowNumber}`);
    }
  }
  
  return { isValid: errors.length === 0, errors };
}

async function handleErrors() {
  let rowNumber = 0;
  let validCount = 0;
  let invalidCount = 0;
  
  // Use async iterator pattern for proper backpressure handling
  const parser = fs
    .createReadStream("./migration_data.csv")
    .pipe(
      parse({
        columns: true,
        skip_empty_lines: true,
        relax_column_count: true, // Allow inconsistent column counts
        // Avoid skip_records_with_error - validate and log errors instead
      })
    );

  try {
    for await (const record of parser) {
      rowNumber++;
      const validation = validateRecord(record, rowNumber);
      
      if (validation.isValid) {
        validCount++;
        // Process valid record
        console.log(`Valid record ${rowNumber}:`, record.year_month);
      } else {
        invalidCount++;
        // Log validation errors with contextual information for debugging
        console.warn(`Row ${rowNumber} validation failed:`, validation.errors.join("; "));
      }
    }
    
    console.log(`Processed ${validCount} valid records`);
    console.log(`Encountered ${invalidCount} invalid records`);
  } catch (error) {
    console.error("Parse error:", error.message);
    throw error;
  }
}

handleErrors().catch((error) => {
  console.error("Error handling failed:", error.message);
  process.exit(1);
});

Data Type Validation

Validate and convert data types during parsing:

csv_demo/validateData.js
const fs = require("fs");
const { parse } = require("csv-parse");
const { pipeline } = require("stream/promises");

// Validate and convert data types with detailed error logging
function validateRecord(record, rowNumber) {
  const errors = [];

  // Validate year_month format (YYYY-MM)
  if (!record.year_month || !/^\d{4}-\d{2}$/.test(record.year_month)) {
    errors.push(`Invalid year_month format at row ${rowNumber}`);
  }

  // Validate estimate is a number
  const estimate = parseInt(record.estimate, 10);
  if (isNaN(estimate) || estimate < 0) {
    errors.push(`Invalid estimate value at row ${rowNumber}`);
  }

  // Validate direction is one of expected values
  const validDirections = ["Arrivals", "Departures"];
  if (!validDirections.includes(record.direction)) {
    errors.push(`Invalid direction value at row ${rowNumber}`);
  }

  return {
    isValid: errors.length === 0,
    errors: errors,
    record: {
      ...record,
      estimate: estimate, // Convert to number
    },
  };
}

async function validateData() {
  let rowNumber = 0;
  let validCount = 0;
  let invalidCount = 0;
  
  // Use async iterator pattern for proper backpressure handling
  const parser = fs
    .createReadStream("./migration_data.csv")
    .pipe(
      parse({
        columns: true,
        skip_empty_lines: true,
        relax_column_count: true,
      })
    );

  try {
    for await (const record of parser) {
      rowNumber++;
      const validation = validateRecord(record, rowNumber);
      
      if (validation.isValid) {
        validCount++;
        // Process valid record with converted types
        console.log(`Valid record ${rowNumber}:`, validation.record);
      } else {
        invalidCount++;
        // Log validation errors with contextual information for debugging
        console.warn(`Row ${rowNumber} validation failed:`, validation.errors.join("; "));
      }
    }
    
    console.log(`Validation complete: ${validCount} valid, ${invalidCount} invalid`);
  } catch (error) {
    console.error("Validation error:", error.message);
    throw error;
  }
}

validateData().catch((error) => {
  console.error("Data validation failed:", error.message);
  process.exit(1);
});

Inserting Data into the Database

Stream CSV rows into SQLite so you can query, aggregate, and join the imported data using SQL.

Setting Up the Database

Create a database connection file db.js:

csv_demo/db.js
const fs = require("fs");
const sqlite3 = require("sqlite3").verbose();
const filepath = "./population.db";

function connectToDatabase() {
  if (fs.existsSync(filepath)) {
    return new sqlite3.Database(filepath);
  } else {
    const db = new sqlite3.Database(filepath, (error) => {
      if (error) {
        return console.error(error.message);
      }
      createTable(db);
      console.log("Connected to the database successfully");
    });
    return db;
  }
}

function createTable(db) {
  db.exec(`
    CREATE TABLE migration
    (
      year_month       VARCHAR(10),
      month_of_release VARCHAR(10),
      passenger_type   VARCHAR(50),
      direction        VARCHAR(20),
      sex              VARCHAR(10),
      age              VARCHAR(50),
      estimate         INT,
      standard_error   INT,
      status           VARCHAR(20)
    )
  `);
}

module.exports = connectToDatabase();

Inserting CSV Data

Create insertData.js to read CSV and insert into the database using transactions and prepared statements:

csv_demo/insertData.js
const fs = require("fs");
const { parse } = require("csv-parse");
const db = require("./db");
const { promisify } = require("util");

// Wrap database methods in promises for async/await
const dbRun = promisify(db.run.bind(db));
const dbPrepare = promisify(db.prepare.bind(db));

// Validate and prepare row data
function prepareRowData(row) {
  const [
    yearMonth,
    monthOfRelease,
    passengerType,
    direction,
    sex,
    age,
    estimateRaw,
    standardErrorRaw,
    status,
  ] = row;

  // Validate and convert numeric fields
  const estimate = parseInt(estimateRaw, 10);
  if (isNaN(estimate) || estimate < 0) {
    throw new Error(`Invalid estimate value: ${estimateRaw}`);
  }

  const standardError =
    standardErrorRaw === "" ? null : parseInt(standardErrorRaw, 10);

  return [
    yearMonth,
    monthOfRelease,
    passengerType,
    direction,
    sex,
    age,
    estimate,
    standardError,
    status,
  ];
}

async function insertCSVData() {
  // Wrap database operations in a transaction for atomicity
  return new Promise((resolve, reject) => {
    db.serialize(async () => {
      try {
        // Begin transaction explicitly
        await dbRun("BEGIN TRANSACTION");

        // Prepare statement once for better performance
        // The database can reuse the query plan instead of re-parsing SQL for every row
        const stmt = db.prepare(
          `INSERT INTO migration VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`
        );

        const stmtRun = promisify(stmt.run.bind(stmt));
        const stmtFinalize = promisify(stmt.finalize.bind(stmt));

        let insertCount = 0;
        let rowNumber = 0;

        // Use async iterator pattern to coordinate database inserts
        // This ensures proper sequencing and prevents race conditions
        const parser = fs
          .createReadStream("./migration_data.csv")
          .pipe(parse({ delimiter: ",", from_line: 2, trim: true }));

        try {
          for await (const row of parser) {
            rowNumber++;
            try {
              const rowData = prepareRowData(row);
              
              // Await each insert to ensure proper sequencing
              // This prevents race conditions and ensures inserts complete before moving on
              await stmtRun(rowData);
              
              insertCount++;
              if (insertCount % 1000 === 0) {
                console.log(`Inserted ${insertCount} rows`);
              }
            } catch (error) {
              // Log validation/insert errors with contextual information
              console.error(`Row ${rowNumber} insert failed:`, error.message);
              // Continue processing other rows instead of crashing
            }
          }

          // Finalize prepared statement
          await stmtFinalize();

          // Commit transaction - all inserts succeed atomically
          await dbRun("COMMIT");
          
          console.log(`Finished inserting ${insertCount} rows`);
          resolve(insertCount);
        } catch (error) {
          // Rollback transaction on error to maintain data integrity
          await dbRun("ROLLBACK");
          reject(error);
        }
      } catch (error) {
        reject(error);
      } finally {
        db.close();
      }
    });
  });
}

insertCSVData()
  .then((count) => {
    console.log(`Successfully inserted ${count} rows`);
    process.exit(0);
  })
  .catch((error) => {
    console.error("Insert failed:", error.message);
    process.exit(1);
  });

Run the script:

  1. node insertData.js

The output shows progress as rows are inserted:

Output
Connected to the database successfully Inserted 1000 rows Inserted 2000 rows ... Inserted 44300 rows Finished inserting 44310 rows

Tip: For large imports, wrap inserts inside a database transaction (db.serialize() with explicit BEGIN / COMMIT) or use prepared statements to reduce disk I/O and speed up bulk loading.

Using Transactions and Prepared Statements

For imports that run nightly or handle millions of rows, parameterize and batch your inserts so the database can reuse the query plan and flush to disk less frequently. Use async/await to coordinate inserts and prevent race conditions:

csv_demo/insertWithTransactions.js
const fs = require("fs");
const { parse } = require("csv-parse");
const db = require("./db");
const { promisify } = require("util");

// Wrap database methods in promises for async/await
const dbRun = promisify(db.run.bind(db));

// Validate and prepare row data with error logging
function prepareRowData(row, rowNumber) {
  const [
    yearMonth,
    monthOfRelease,
    passengerType,
    direction,
    sex,
    age,
    estimateRaw,
    standardErrorRaw,
    status,
  ] = row;

  // Validate and convert numeric fields
  const estimate = parseInt(estimateRaw, 10);
  if (isNaN(estimate) || estimate < 0) {
    throw new Error(`Row ${rowNumber}: Invalid estimate value "${estimateRaw}"`);
  }

  const standardError =
    standardErrorRaw === "" ? null : parseInt(standardErrorRaw, 10);

  return [
    yearMonth,
    monthOfRelease,
    passengerType,
    direction,
    sex,
    age,
    estimate,
    standardError,
    status,
  ];
}

async function insertWithTransactions() {
  return new Promise((resolve, reject) => {
    db.serialize(async () => {
      try {
        // Begin transaction explicitly for atomicity
        await dbRun("BEGIN TRANSACTION");

        // Prepare statement once - database caches the execution plan
        // This is much faster than re-parsing SQL for every row
        const stmt = db.prepare(
          `INSERT INTO migration VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`
        );

        const stmtRun = promisify(stmt.run.bind(stmt));
        const stmtFinalize = promisify(stmt.finalize.bind(stmt));

        let insertCount = 0;
        let rowNumber = 0;

        // Use async iterator pattern to coordinate database inserts
        // This ensures proper sequencing and prevents race conditions
        const parser = fs
          .createReadStream("./migration_data.csv")
          .pipe(parse({ delimiter: ",", from_line: 2, trim: true }));

        try {
          for await (const row of parser) {
            rowNumber++;
            try {
              const rowData = prepareRowData(row, rowNumber);
              
              // Await each insert to ensure proper sequencing
              // This prevents race conditions and ensures inserts complete before moving on
              await stmtRun(rowData);
              
              insertCount++;
              if (insertCount % 1000 === 0) {
                console.log(`Inserted ${insertCount} rows`);
              }
            } catch (error) {
              // Log validation/insert errors with contextual information
              console.error(`Row ${rowNumber} failed:`, error.message);
              // Continue processing other rows instead of crashing
            }
          }

          // Finalize prepared statement
          await stmtFinalize();

          // Commit transaction - all inserts succeed atomically
          await dbRun("COMMIT");
          
          console.log(`Transaction committed: ${insertCount} rows inserted`);
          resolve(insertCount);
        } catch (error) {
          // Rollback transaction on error to maintain data integrity
          console.error("Stream processing error:", error.message);
          await dbRun("ROLLBACK");
          reject(error);
        }
      } catch (error) {
        console.error("Transaction setup error:", error.message);
        reject(error);
      } finally {
        db.close();
      }
    });
  });
}

insertWithTransactions()
  .then((count) => {
    console.log(`Successfully inserted ${count} rows`);
    process.exit(0);
  })
  .catch((error) => {
    console.error("Insert with transactions failed:", error.message);
    process.exit(1);
  });

Transactions ensure each batch succeeds atomically, while prepare/run lets SQLite cache the execution plan instead of re-parsing SQL for every row. Using async/await with the iterator pattern ensures inserts are properly sequenced and prevents race conditions that can occur when inserting rows directly in .on('data') handlers without awaiting completion.

Testing and Automating CSV Pipelines

Keep recurring CSV workflows healthy by codifying tests, static checks, automation, and observability.

  • Unit and integration tests: Store miniature CSV fixtures (valid and invalid) inside tests/fixtures/ and write Jest or Mocha tests that invoke your parsing functions. Snapshot the serialized CSV output so column orders and quoting stay consistent.
  • Schema linting: Run csvlint or a lightweight Node script that verifies header names, delimiter consistency, and row counts before promoting new datasets.
  • Continuous integration: Run scripts like node readCSV.js and node insertData.js in CI against temporary SQLite databases so malformed CSVs fail fast before reaching production.
  • Scheduling and automation: Trigger imports with cron, systemd timers, or workflow engines (Airflow, Temporal). Wrap jobs in shell scripts that emit clear exit codes for your monitoring system.
  • Instrumentation: Capture metrics (rows processed, rows quarantined, average latency per row) and structured logs with pino or winston. Alert when error spikes or throughput drops indicate a downstream schema change.

Common Pitfalls and Troubleshooting

Before and during parsing, use quick sanity checks and targeted configuration to avoid common CSV ingestion failures.

  • Inspect the file with tools like head, tail, and wc to understand its size, structure, and encoding.
  • Detect BOMs and newline styles before parsing, and normalize them or enable bom: true in csv-parse.
  • Use validation tools such as csvlint where available to catch structural issues early.

Issue: Incorrect Delimiter

Problem: Parser returns single-column arrays or incorrect field separation.

Prevention: Run head -5 migration_data.csv and check how fields are separated, or use a validator such as csvlint before wiring up the parser.

Solution: Identify the delimiter by inspecting the file:

const fs = require("fs");
const firstLine = fs.readFileSync("./migration_data.csv", "utf8").split("\n")[0];
console.log("First line:", firstLine);
// Check for common delimiters
if (firstLine.includes(";")) {
  console.log("Delimiter appears to be semicolon");
} else if (firstLine.includes("\t")) {
  console.log("Delimiter appears to be tab");
} else {
  console.log("Delimiter appears to be comma");
}

Then configure the parser accordingly by:

parse({ delimiter: ";" }) // or "\t" for tabs

Issue: Special Characters in Fields

Problem: Fields containing commas, quotes, or newlines break parsing.

Prevention: Ask data producers to always quote text fields that may contain commas, quotes, or newlines, and validate a sample file with head and wc -l to confirm that each row has a consistent number of fields.

Solution: The csv-parse module handles quoted fields automatically. Ensure your CSV uses proper quoting:

name,description,price
"Widget, Premium","A widget with ""special"" features",29.99

The parser handles this correctly with default settings. If issues persist, check the quote and escape options:

parse({
  quote: '"',
  escape: '"',
  relax_quotes: true,
})

Issue: Memory Issues with Large Files

Problem: Application runs out of memory when processing large CSV files.

Prevention: Use wc -l and file size checks to avoid loading unexpectedly large CSV files into memory with readFileSync; default to streaming for anything beyond small test datasets.

Solution: Always use streams instead of loading the entire file:

// Don't do this for large files
const data = fs.readFileSync("large_file.csv", "utf8");
const records = parse(data);

// Do this instead
fs.createReadStream("large_file.csv")
  .pipe(parse({ columns: true }))
  .on("data", (record) => {
    // Process each record immediately
  });

Issue: Headers Not Recognized

Problem: Headers are not interpreted correctly (for example, the header row is parsed as data or the first data row is treated as headers).

Prevention: Inspect the first few lines with head -3 to confirm whether the file includes a header row and how the column names are formatted before enabling columns: true.

Solution: Explicitly specify column names or skip the header row:

// Option 1: Skip first line and define columns manually
parse({ from_line: 2, columns: ["col1", "col2", "col3"] })

// Option 2: Use first line as headers
parse({ columns: true })

// Option 3: Provide column mapping
parse({
  columns: (header) => header.map((col) => col.toLowerCase().trim())
})

Issue: Async Operations in Stream Handlers

Problem: Database inserts or API calls in data event handlers cause race conditions.

Prevention: Design your pipeline around async iterators or stream.pipeline() from the start, and avoid doing unbounded parallel database inserts inside simple data event handlers.

Solution: Use proper async handling with backpressure control:

const fs = require("fs");
const { parse } = require("csv-parse");

async function processWithAsyncOps() {
  const parser = fs
    .createReadStream("./migration_data.csv")
    .pipe(parse({ columns: true }));

  for await (const record of parser) {
    // Await async operations properly (for example, a database insert or API call)
    await someAsyncOperation(record);
  }

  console.log("Finished processing all records");
}

processWithAsyncOps().catch((error) => {
  console.error("Error while processing CSV:", error.message);
});

If you’re using a callback-based database client such as sqlite3, wrap its methods in Promises before using await inside the loop so that each insert or API call is properly sequenced.

FAQ

What is the best CSV library for Node.js?

The node-csv package (installed as csv) is widely used and well-maintained. It provides stream-based processing, making it suitable for large files. Alternatives include fast-csv (faster for some use cases) and papaparse (good browser compatibility). Choose based on your specific needs: stream processing, performance, or browser support.

How do I read a large CSV file in Node.js without blocking?

Use Node.js streams with csv-parse:

fs.createReadStream("large_file.csv")
  .pipe(parse({ columns: true }))
  .on("data", (row) => {
    // Process each row as it arrives
  });

This processes data in chunks without loading the entire file into memory. For even better control, use async iterators:

for await (const record of parser) {
  // Process record
}

What is the difference between csv-parse and csv-parser?

csv-parse is part of the node-csv package and uses streams by default. csv-parser is a separate, lighter package with similar functionality. Both are valid choices; csv-parse offers more configuration options and is part of a larger ecosystem (csv-stringify, csv-transform).

How do I write JSON data to a CSV file in Node.js?

Use csv-stringify to convert JSON objects to CSV:

const { stringify } = require("csv-stringify");
const stringifier = stringify({ header: true, columns: ["col1", "col2"] });
stringifier.pipe(fs.createWriteStream("output.csv"));

jsonArray.forEach((obj) => {
  stringifier.write(obj);
});
stringifier.end();

How do I handle special characters or commas in CSV fields?

The csv-parse and csv-stringify modules automatically handle quoted fields containing special characters. Ensure your CSV uses proper quoting:

name,description
"Smith, John","Product with ""quotes"" and, commas"

The parser handles this with default settings. If you’re generating CSV, the stringifier automatically quotes fields when needed.

Can I use async/await with Node-CSV streams?

Yes. Use async iterators with the parser stream:

const parser = fs.createReadStream("file.csv").pipe(parse({ columns: true }));

for await (const record of parser) {
  await processRecord(record); // Await async operations
}

This provides better control flow than event-based handlers when you need to await async operations.

How do I transform CSV data while reading it?

Use the stream-transform module with stream.pipeline for proper backpressure handling:

const { transform } = require("stream-transform");
const { pipeline } = require("stream/promises");

const transformer = transform((row, callback) => {
  const estimate = parseInt(row.estimate, 10);
  if (isNaN(estimate)) {
    console.warn(`Invalid estimate value: ${row.estimate}`);
  }
  
  const transformed = {
    ...row,
    estimate: (estimate || 0) * 2,
  };
  callback(null, transformed);
});

async function transformCSV() {
  await pipeline(
    fs.createReadStream("input.csv"),
    parse({ columns: true, relax_column_count: true }),
    transformer,
    stringify({ header: true }),
    fs.createWriteStream("output.csv")
  );
}

transformCSV().catch((error) => {
  console.error("Transform failed:", error.message);
  process.exit(1);
});

How do I handle CSV files with different delimiters?

Specify the delimiter in the parser options:

// Semicolon-delimited
parse({ delimiter: ";" })

// Tab-delimited
parse({ delimiter: "\t" })

// Pipe-delimited
parse({ delimiter: "|" })

You can also use the delimiter option in stringify when writing CSV files.

Can I parse CSV files without a library?

Yes, but it’s not recommended for production. Simple parsing can be done with string methods, but this doesn’t handle edge cases like quoted fields, escaped characters, or multiline values. The node-csv library handles these cases correctly and is well-tested.

Conclusion

Use the csv package to build streaming, resilient CSV pipelines in Node.js that can handle everything from small exports to multi-million-row data feeds. In this tutorial, you read CSV files as arrays and objects with csv-parse, wrote CSV output with csv-stringify, converted between JSON and CSV, processed large files with stream pipelines, validated and quarantined bad rows, and imported data into SQLite for querying and reporting.

When building CSV workflows, keep a few essentials in mind:

  • Prefer streams over in-memory reads for any file that might grow large.
  • Configure delimiters, headers, encodings, and column mappings explicitly.
  • Apply validation, logging, and error handling to maintain data integrity.
  • Use async/await or async iterators when combining streams with databases or APIs.
  • Leverage transform streams and pipeline helpers to keep your code modular and testable.

For more information, see the CSV Project documentation. To continue learning about Node.js file operations, see How To Work with Files Using the fs Module in Node.js. For more on asynchronous programming, see How To Write Asynchronous Code in Node.js.

Thanks for learning with the DigitalOcean Community. Check out our offerings for compute, storage, networking, and managed databases.

Learn more about our products

About the author(s)

Stanley Ulili
Stanley Ulili
Author
See author profile

Software Engineer with a passion for sharing my knowledge.

Caitlin Postal
Caitlin Postal
Editor
Technical Editor
See author profile
Vinayak Baranwal
Vinayak Baranwal
Editor
Technical Writer II
See author profile

Building future-ready infrastructure with Linux, Cloud, and DevOps. Full Stack Developer & System Administrator. Technical Writer @ DigitalOcean | GitHub Contributor | Passionate about Docker, PostgreSQL, and Open Source | Exploring NLP & AI-TensorFlow | Nailed over 50+ deployments across production environments.

Still looking for an answer?

Was this helpful?


This textbox defaults to using Markdown to format your answer.

You can type !ref in this text area to quickly search our full set of tutorials, documentation & marketplace offerings and insert the link!

Creative CommonsThis work is licensed under a Creative Commons Attribution-NonCommercial- ShareAlike 4.0 International License.
Join the Tech Talk
Success! Thank you! Please check your email for further details.

Please complete your information!

The developer cloud

Scale up as you grow — whether you're running one virtual machine or ten thousand.

Get started for free

Sign up and get $200 in credit for your first 60 days with DigitalOcean.*

*This promotional offer applies to new accounts only.