Indexing a large number of documents into Elasticsearch is made straightforward using the bulk API: transform your data records into JSON documents, interleaved with instructions for which index they should be added to, and send a single HTTP request to your Elasticsearch cluster with this large newline-delimited JSON blob as the body. Or, using the Node.js client’s bulk function.
Let’s read a CSV file, transform its rows into JSON objects, and index it:
import { Client } from '@elastic/elasticsearch'
import { parse } from "csv-parse/sync"
import { readFileSync } from 'node:fs'
const csv = parse(readFileSync('data.csv', 'utf8'), { columns: true })
const operations = csv.flatMap(row => [
{ index: { _index: "my_index" } },
row
])
const client = new Client({ node: 'http://localhost:9200' })
await client.bulk({ operations })
But what if you need to send more data than Elasticsearch can receive in a single request? Or your CSV file is so large that it can’t all fit into memory all at once? Bulk helper to the rescue!
While the bulk API is simple enough to use on its own, for more complicated scenarios, the helper adds support for streaming input, breaking up large datasets into multiple requests, and more.
For example, if your Elasticsearch server can only receive HTTP requests smaller than 10MB, you can instruct the bulk helper to break up your data by setting a flushBytes value. This will send a bulk request every time the request is about to exceed a set value:
const csv = parse(readFileSync('data.csv', 'utf8'), { columns: true })
await client.helpers.bulk({
datasource: csv,
onDocument(doc) {
return { index: { _index: "my_index" } }
},
// send a bulk request for every 9.5MB
flushBytes: 9500000
})
Or if your CSV file is too large to fit in memory, the helper can take a stream as its datasource instead of an array:
import { createReadStream } from 'node:fs'
import { parse } from 'csv-parse'
const parser = parse({ columns: true })
await client.helpers.bulk({
datasource: createReadStream('data.csv').pipe(parser),
onDocument(doc) {
return { index: { _index: "my_index" } }
}
})
This will buffer rows into memory from the CSV file, parse them into JSON objects, and let the helper flush the results out into one or more HTTP requests for you. Not only is this solution easy on memory, but it’s just as easy to read than the solution that loads the entire file into memory!
