Große Logdateien in Node.js analysieren – Zeile für Zeile einlesen

Ich muss einige große (5-10 GB) Logfiles in Javascript / Node.js analysieren (ich benutze Cube).

Die Logline sieht ungefähr so ​​aus:

10:00:43.343423 I'm a friendly log message. There are 5 cats, and 7 dogs. We are in state "SUCCESS". 

Wir müssen jede Zeile lesen, etwas analysieren (z. B. 5 , 7 und SUCCESS ) und dann diese Daten mit ihrem JS-Client in Cube ( https://github.com/square/cube ) pumpen.

Erstens, was ist der kanonische Weg in Node, Zeile für Zeile in eine Datei einzulesen?

Es scheint eine ziemlich häufige Frage online zu sein:

  • http://www.quora.com/What-is-the-best-way-to-read-a-file-line-by-line-in-node-js
  • Eine Datei in node.js Zeile für Zeile lesen?

Viele der Antworten scheinen auf eine Reihe von Modulen von Drittanbietern zu verweisen:

  • https://github.com/nickewing/line-reader
  • https://github.com/jahewson/node-byline
  • https://github.com/pkrumins/node-lazy
  • https://github.com/Gagle/Node-BufferedReader

Dies scheint jedoch eine ziemlich einfache Aufgabe zu sein – sicherlich gibt es einen einfachen Weg innerhalb der stdlib, um eine Textdatei Zeile für Zeile zu lesen.

Zweitens muss ich dann jede Zeile bearbeiten (z. B. den Zeitstempel in ein Date-Objekt konvertieren und nützliche Felder extrahieren).

Was ist der beste Weg, dies zu tun, um den Durchsatz zu maximieren? Gibt es einen Weg, der das Lesen in jeder Zeile oder das Senden an Cube nicht blockiert?

Drittens – ich rate mit String-Splits, und das JS-Äquivalent von enthält (IndexOf! = -1?) Wird viel schneller sein als Regexes? Hat jemand viel Erfahrung in der Analyse großer Mengen von Textdaten in Node.js?

Prost, Victor

   

Ich suchte nach einer Lösung, um sehr große Dateien (gbs) Zeile für Zeile mit einem Stream zu analysieren. Alle Bibliotheken und Beispiele von Drittanbietern entsprachen nicht meinen Bedürfnissen, da sie die Dateien nicht Zeile für Zeile (wie 1, 2, 3, 4 ..) verarbeiteten oder die gesamte Datei im Speicher ablegten

Die folgende Lösung kann sehr große Dateien Zeile für Zeile mit Stream & Pipe analysieren. Zum Testen habe ich eine 2,1-GB-Datei mit 17.000.000 Datensätzen verwendet. Die Ram-Nutzung hat 60 MB nicht überschritten.

 var fs = require('fs') , es = require('event-stream'); var lineNr = 0; var s = fs.createReadStream('very-large-file.csv') .pipe(es.split()) .pipe(es.mapSync(function(line){ // pause the readstream s.pause(); lineNr += 1; // process line here and call s.resume() when rdy // function below was for logging memory usage logMemoryUsage(lineNr); // resume the readstream, possibly from a callback s.resume(); }) .on('error', function(err){ console.log('Error while reading file.', err); }) .on('end', function(){ console.log('Read entire file.') }) ); 

Bildbeschreibung hier eingeben

Bitte lassen Sie mich wissen, wie es geht!

Sie können das eingebaute readline Paket verwenden, siehe Dokumentation hier . Ich verwende Stream , um einen neuen Ausgabestream zu erstellen.

 var fs = require('fs'), readline = require('readline'), stream = require('stream'); var instream = fs.createReadStream('/path/to/file'); var outstream = new stream; outstream.readable = true; outstream.writable = true; var rl = readline.createInterface({ input: instream, output: outstream, terminal: false }); rl.on('line', function(line) { console.log(line); //Do your stuff ... //Then write to outstream rl.write(cubestuff); }); 

Große Dateien benötigen eine gewisse Verarbeitungszeit. Sag, ob es funktioniert.

Ich mochte @gerard Antwort wirklich, die eigentlich verdient, die richtige Antwort hier zu sein. Ich habe einige Verbesserungen vorgenommen:

  • Code ist in einer class (modular)
  • Parsing ist enthalten
  • Die Möglichkeit zum Fortsetzen wird nach außen gegeben, wenn ein asynchroner Job zum Lesen der CSV wie zum Einfügen in DB oder eine HTTP-Anfrage verkettet ist
  • Einlesen von Stücken / Chargengrößen, die der Benutzer deklarieren kann. Ich habe auch im Stream für die Kodierung gesorgt, falls Sie Dateien in verschiedenen Kodierungen haben.

Hier ist der Code:

 'use strict' const fs = require('fs'), util = require('util'), stream = require('stream'), es = require('event-stream'), parse = require("csv-parse"), iconv = require('iconv-lite'); class CSVReader { constructor(filename, batchSize, columns) { this.reader = fs.createReadStream(filename).pipe(iconv.decodeStream('utf8')) this.batchSize = batchSize || 1000 this.lineNumber = 0 this.data = [] this.parseOptions = {delimiter: '\t', columns: true, escape: '/', relax: true} } read(callback) { this.reader .pipe(es.split()) .pipe(es.mapSync(line => { ++this.lineNumber parse(line, this.parseOptions, (err, d) => { this.data.push(d[0]) }) if (this.lineNumber % this.batchSize === 0) { callback(this.data) } }) .on('error', function(){ console.log('Error while reading file.') }) .on('end', function(){ console.log('Read entirefile.') })) } continue () { this.data = [] this.reader.resume() } } module.exports = CSVReader 

Also im Grunde, hier ist, wie Sie es verwenden werden:

 let reader = CSVReader('path_to_file.csv') reader.read(() => reader.continue()) 

Ich habe das mit einer 35GB CSV-Datei getestet und es hat für mich funktioniert und deshalb habe ich beschlossen, es auf @gerards Antwort aufzubauen, Feedbacks sind willkommen.

Ich habe https://www.npmjs.com/package/line-by-line benutzt, um mehr als 1 000 000 Zeilen aus einer Textdatei zu lesen. In diesem Fall betrug eine belegte RAM-Kapazität etwa 50-60 Megabyte.

  const LineByLineReader = require('line-by-line'), lr = new LineByLineReader('big_file.txt'); lr.on('error', function (err) { // 'err' contains error object }); lr.on('line', function (line) { // pause emitting of lines... lr.pause(); // ...do your asynchronous line processing.. setTimeout(function () { // ...and continue emitting lines. lr.resume(); }, 100); }); lr.on('end', function () { // All lines are read, file is closed now. }); 

Abgesehen davon, dass Sie die große Datei Zeile für Zeile lesen, können Sie sie auch Stück für Stück lesen. Weitere Informationen finden Sie in diesem Artikel

 var offset = 0; var chunkSize = 2048; var chunkBuffer = new Buffer(chunkSize); var fp = fs.openSync('filepath', 'r'); var bytesRead = 0; while(bytesRead = fs.readSync(fp, chunkBuffer, 0, chunkSize, offset)) { offset += bytesRead; var str = chunkBuffer.slice(0, bytesRead).toString(); var arr = str.split('\n'); if(bytesRead = chunkSize) { // the last item of the arr may be not a full line, leave it to the next chunk offset -= arr.pop().length; } lines.push(arr); } console.log(lines); 

Ich hatte das gleiche Problem schon. Nach dem Vergleich mehrerer Module, die diese function zu haben scheinen, habe ich beschlossen, es selbst zu machen, es ist einfacher als ich dachte.

gist: https://gist.github.com/deemstone/8279565

 var fetchBlock = lineByline(filepath, onEnd); fetchBlock(function(lines, start){ ... }); //lines{array} start{int} lines[0] No. 

Es deckt die Datei ab, die in einer Schließung geöffnet wird, die fetchBlock() zurückgegeben wird, wird einen Block aus der Datei holen, Ende auf Array aufgeteilt (wird das Segment vom letzten Abruf behandeln).

Ich habe die Blockgröße für jede Leseoperation auf 1024 gesetzt. Dies kann Fehler enthalten, aber Code-Logik ist offensichtlich, probieren Sie es selbst aus.

Node-Byline verwendet Streams, also würde ich das für Ihre riesigen Dateien bevorzugen.

Für Ihre Datum-Conversions würde ich moment.js verwenden .

Um Ihren Durchsatz zu maximieren, könnten Sie über die Verwendung eines Software-Clusters nachdenken. Es gibt einige nette Module, die das node-native Cluster-Modul sehr gut umhüllen. Ich mag cluster-master von isaacs. ZB könnten Sie einen Cluster von x Arbeitern erstellen, die alle eine Datei berechnen.

Für Benchmarking-Splits und Regexes verwenden Sie benchmark.js . Ich habe es bis jetzt nicht getestet. benchmark.js ist als Knotenmodul verfügbar

Ich habe ein Knotenmodul gemacht, um asynchronen Text in großer Datei oder JSON zu lesen. Getestet auf großen Dateien.

 var fs = require('fs') , util = require('util') , stream = require('stream') , es = require('event-stream'); module.exports = FileReader; function FileReader(){ } FileReader.prototype.read = function(pathToFile, callback){ var returnTxt = ''; var s = fs.createReadStream(pathToFile) .pipe(es.split()) .pipe(es.mapSync(function(line){ // pause the readstream s.pause(); //console.log('reading line: '+line); returnTxt += line; // resume the readstream, possibly from a callback s.resume(); }) .on('error', function(){ console.log('Error while reading file.'); }) .on('end', function(){ console.log('Read entire file.'); callback(returnTxt); }) ); }; FileReader.prototype.readJSON = function(pathToFile, callback){ try{ this.read(pathToFile, function(txt){callback(JSON.parse(txt));}); } catch(err){ throw new Error('json file is not valid! '+err.stack); } }; 

Speichern Sie die Datei einfach als Datei-Reader.js und verwenden Sie sie wie folgt:

 var FileReader = require('./file-reader'); var fileReader = new FileReader(); fileReader.readJSON(__dirname + '/largeFile.json', function(jsonObj){/*callback logic here*/}); 

Basierend auf dieser Frage habe ich eine class implementiert, mit der man eine Datei zeilenweise synchron mit fs.readSync() lesen kann. Sie können diese “Pause” und “Fortsetzen” mit einem Q Versprechen machen ( jQuery scheint ein DOM zu benötigen, so dass es nicht mit nodejs laufen kann):

 var fs = require('fs'); var Q = require('q'); var lr = new LineReader(filenameToLoad); lr.open(); var promise; workOnLine = function () { var line = lr.readNextLine(); promise = complexLineTransformation(line).then( function() {console.log('ok');workOnLine();}, function() {console.log('error');} ); } workOnLine(); complexLineTransformation = function (line) { var deferred = Q.defer(); // ... async call goes here, in callback: deferred.resolve('done ok'); or deferred.reject(new Error(error)); return deferred.promise; } function LineReader (filename) { this.moreLinesAvailable = true; this.fd = undefined; this.bufferSize = 1024*1024; this.buffer = new Buffer(this.bufferSize); this.leftOver = ''; this.read = undefined; this.idxStart = undefined; this.idx = undefined; this.lineNumber = 0; this._bundleOfLines = []; this.open = function() { this.fd = fs.openSync(filename, 'r'); }; this.readNextLine = function () { if (this._bundleOfLines.length === 0) { this._readNextBundleOfLines(); } this.lineNumber++; var lineToReturn = this._bundleOfLines[0]; this._bundleOfLines.splice(0, 1); // remove first element (pos, howmany) return lineToReturn; }; this.getLineNumber = function() { return this.lineNumber; }; this._readNextBundleOfLines = function() { var line = ""; while ((this.read = fs.readSync(this.fd, this.buffer, 0, this.bufferSize, null)) !== 0) { // read next bytes until end of file this.leftOver += this.buffer.toString('utf8', 0, this.read); // append to leftOver this.idxStart = 0 while ((this.idx = this.leftOver.indexOf("\n", this.idxStart)) !== -1) { // as long as there is a newline-char in leftOver line = this.leftOver.substring(this.idxStart, this.idx); this._bundleOfLines.push(line); this.idxStart = this.idx + 1; } this.leftOver = this.leftOver.substring(this.idxStart); if (line !== "") { break; } } }; }