Node.js에서 대용량 로그 파일 구문 분석 - 한 줄씩 읽기
Javascript/Node.js(큐브를 사용하고 있습니다)에서 큰(5-10Gb) 로그 파일을 파싱해야 합니다.
로그선은 다음과 같습니다.
10:00:43.343423 I'm a friendly log message. There are 5 cats, and 7 dogs. We are in state "SUCCESS".
행을 (예: 예을분구석문읽고합다제니 (행: 거각야) 행수해을▁(▁we예을다합니▁oute,▁parsing▁need▁some▁read수▁to▁line)5
,7
그리고.SUCCESS
)를 클릭한 다음 JS 클라이언트를 사용하여 이 데이터를 큐브(https://github.com/square/cube) 에 펌핑합니다.
첫째, 노드에서 파일을 한 줄씩 읽는 표준적인 방법은 무엇입니까?
온라인에서 상당히 흔한 질문인 것 같습니다.
- http://www.quora.com/What-is-the-best-way-to-read-a-file-line-by-line-in-node-js
- node.js에서 파일을 한 번에 한 줄씩 읽으시겠습니까?
대부분의 답변은 여러 타사 모듈을 가리키는 것 같습니다.
- https://github.com/nickewing/line-reader
- https://github.com/jahewson/node-byline
- https://github.com/pkrumins/node-lazy
- https://github.com/Gagle/Node-BufferedReader
하지만, 이것은 꽤 기본적인 작업인 것 같습니다 - 확실히, stdlib 내에 텍스트 파일을 한 줄 한 줄 읽을 수 있는 간단한 방법이 있습니까?
두 번째로 각 줄을 처리해야 합니다(예: 타임스탬프를 Date 개체로 변환하고 유용한 필드를 추출).
처리량을 극대화하는 가장 좋은 방법은 무엇입니까?각 행의 판독치 또는 큐브로 전송할 때 차단되지 않는 방법이 있습니까?
셋째 - 문자열 분할을 사용하면 JS에 해당하는 contains(IndexOf!= -1?)가 정규식보다 훨씬 빠를 것으로 예상됩니다.Node.js에서 방대한 양의 텍스트 데이터를 구문 분석한 경험이 있는 사람이 있습니까?
스트림을 사용하여 매우 큰 파일(gbs)을 한 줄씩 구문 분석할 수 있는 솔루션을 찾았습니다.모든 타사 라이브러리 및 예제가 파일을 한 줄로 처리하지 않거나(예: 1, 2, 3, 4...) 전체 파일을 메모리에 읽기 때문에 필요에 맞지 않았습니다.
다음 솔루션은 스트림 및 파이프를 사용하여 매우 큰 파일을 한 줄씩 구문 분석할 수 있습니다.테스트를 위해 17.000.000개의 레코드가 있는 2.1gb 파일을 사용했습니다.RAM 사용량이 60MB를 초과하지 않았습니다.
먼저 이벤트 스트림 패키지를 설치합니다.
npm install event-stream
그러면:
var fs = require('fs')
, es = require('event-stream');
var lineNr = 0;
var s = fs.createReadStream('very-large-file.csv')
.pipe(es.split())
.pipe(es.mapSync(function(line){
// pause the readstream
s.pause();
lineNr += 1;
// process line here and call s.resume() when rdy
// function below was for logging memory usage
logMemoryUsage(lineNr);
// resume the readstream, possibly from a callback
s.resume();
})
.on('error', function(err){
console.log('Error while reading file.', err);
})
.on('end', function(){
console.log('Read entire file.')
})
);
어떻게 진행되는지 알려주세요!
내장된 제품을 사용할 수 있습니다.readline
패키지, 여기 문서를 참조하십시오.스트림을 사용하여 새 출력 스트림을 만듭니다.
var fs = require('fs'),
readline = require('readline'),
stream = require('stream');
var instream = fs.createReadStream('/path/to/file');
var outstream = new stream;
outstream.readable = true;
outstream.writable = true;
var rl = readline.createInterface({
input: instream,
output: outstream,
terminal: false
});
rl.on('line', function(line) {
console.log(line);
//Do your stuff ...
//Then write to output stream
rl.write(line);
});
대용량 파일을 처리하는 데 시간이 걸립니다.효과가 있는지 알려주세요.
저는 실제로 여기서 정답이 될 만한 @gerard 답변이 정말 좋았습니다.몇 가지 개선 사항이 있습니다.
- 코드가 클래스(모듈형)에 있습니다.
- 구문 분석이 포함되어 있습니다.
- DB에 삽입하거나 HTTP 요청과 같이 CSV 읽기에 연결된 비동기 작업이 있는 경우 외부에 재개할 수 있는 기능이 제공됩니다.
- 사용자가 선언할 수 있는 청크/배치 크기로 읽는 중입니다.다른 인코딩의 파일이 있을 경우를 대비하여 스트림의 인코딩도 처리했습니다.
코드는 다음과 같습니다.
'use strict'
const fs = require('fs'),
util = require('util'),
stream = require('stream'),
es = require('event-stream'),
parse = require("csv-parse"),
iconv = require('iconv-lite');
class CSVReader {
constructor(filename, batchSize, columns) {
this.reader = fs.createReadStream(filename).pipe(iconv.decodeStream('utf8'))
this.batchSize = batchSize || 1000
this.lineNumber = 0
this.data = []
this.parseOptions = {delimiter: '\t', columns: true, escape: '/', relax: true}
}
read(callback) {
this.reader
.pipe(es.split())
.pipe(es.mapSync(line => {
++this.lineNumber
parse(line, this.parseOptions, (err, d) => {
this.data.push(d[0])
})
if (this.lineNumber % this.batchSize === 0) {
callback(this.data)
}
})
.on('error', function(){
console.log('Error while reading file.')
})
.on('end', function(){
console.log('Read entirefile.')
}))
}
continue () {
this.data = []
this.reader.resume()
}
}
module.exports = CSVReader
기본적으로 사용 방법은 다음과 같습니다.
let reader = CSVReader('path_to_file.csv')
reader.read(() => reader.continue())
35GB CSV 파일로 테스트해 본 결과, 효과가 있었고 @gerard의 답변을 기반으로 구축하기로 결정했습니다. 피드백을 환영합니다.
저는 텍스트 파일에서 1,000,000줄 이상을 읽기 위해 https://www.npmjs.com/package/line-by-line 을 사용했습니다.이 경우 RAM의 점유 용량은 약 50-60메가바이트였습니다.
const LineByLineReader = require('line-by-line'),
lr = new LineByLineReader('big_file.txt');
lr.on('error', function (err) {
// 'err' contains error object
});
lr.on('line', function (line) {
// pause emitting of lines...
lr.pause();
// ...do your asynchronous line processing..
setTimeout(function () {
// ...and continue emitting lines.
lr.resume();
}, 100);
});
lr.on('end', function () {
// All lines are read, file is closed now.
});
Node.js 설명서는 읽기 줄 모듈을 사용하는 매우 우아한 예를 제공합니다.
const { once } = require('node:events');
const fs = require('fs');
const readline = require('readline');
const rl = readline.createInterface({
input: fs.createReadStream('sample.txt'),
crlfDelay: Infinity
});
rl.on('line', (line) => {
console.log(`Line from file: ${line}`);
});
await once(rl, 'close');
참고: crlfDelay 옵션을 사용하여 CRLF('\r\n')의 모든 인스턴스를 단일 줄 바꿈으로 인식합니다.
큰 파일을 한 줄씩 읽는 것 외에도, 한 줄씩 읽을 수도 있습니다.자세한 내용은 이 문서를 참조하십시오.
var offset = 0;
var chunkSize = 2048;
var chunkBuffer = new Buffer(chunkSize);
var fp = fs.openSync('filepath', 'r');
var bytesRead = 0;
while(bytesRead = fs.readSync(fp, chunkBuffer, 0, chunkSize, offset)) {
offset += bytesRead;
var str = chunkBuffer.slice(0, bytesRead).toString();
var arr = str.split('\n');
if(bytesRead = chunkSize) {
// the last item of the arr may be not a full line, leave it to the next chunk
offset -= arr.pop().length;
}
lines.push(arr);
}
console.log(lines);
저는 아직도 같은 문제를 안고 있었습니다.이 기능이 있는 것으로 보이는 여러 모듈을 비교해 본 결과, 제가 직접 해보기로 했습니다, 생각보다 간단합니다.
요지: https://gist.github.com/deemstone/8279565
var fetchBlock = lineByline(filepath, onEnd);
fetchBlock(function(lines, start){ ... }); //lines{array} start{int} lines[0] No.
그것은 닫힌 상태에서 열린 파일을 덮습니다.fetchBlock()
반환됨은 파일에서 블록을 가져오고, 엔드 스플릿을 어레이로 가져옵니다(마지막 가져오기에서 세그먼트를 처리함).
각 읽기 작업에 대해 블록 크기를 1024로 설정했습니다.버그가 있을 수 있지만 코드 로직은 분명합니다. 직접 시도해 보십시오.
네이티브 nodejs 모듈(fs, readline)이 있는 스트림을 사용하여 파일 읽기/쓰기:
const fs = require('fs');
const readline = require('readline');
const rl = readline.createInterface({
input: fs.createReadStream('input.json'),
output: fs.createWriteStream('output.json')
});
rl.on('line', function(line) {
console.log(line);
// Do any 'line' processing if you want and then write to the output file
this.output.write(`${line}\n`);
});
rl.on('close', function() {
console.log(`Created "${this.output.path}"`);
});
이 질문 답변을 기반으로 파일을 한 줄씩 동시에 읽는 데 사용할 수 있는 클래스를 구현했습니다.fs.readSync()
여러분은 "일시정지"와 "재개"를 사용하여 이 ""와 수 .Q
jQuery
한 것 DOM으로 할 수 .nodejs
):
var fs = require('fs');
var Q = require('q');
var lr = new LineReader(filenameToLoad);
lr.open();
var promise;
workOnLine = function () {
var line = lr.readNextLine();
promise = complexLineTransformation(line).then(
function() {console.log('ok');workOnLine();},
function() {console.log('error');}
);
}
workOnLine();
complexLineTransformation = function (line) {
var deferred = Q.defer();
// ... async call goes here, in callback: deferred.resolve('done ok'); or deferred.reject(new Error(error));
return deferred.promise;
}
function LineReader (filename) {
this.moreLinesAvailable = true;
this.fd = undefined;
this.bufferSize = 1024*1024;
this.buffer = new Buffer(this.bufferSize);
this.leftOver = '';
this.read = undefined;
this.idxStart = undefined;
this.idx = undefined;
this.lineNumber = 0;
this._bundleOfLines = [];
this.open = function() {
this.fd = fs.openSync(filename, 'r');
};
this.readNextLine = function () {
if (this._bundleOfLines.length === 0) {
this._readNextBundleOfLines();
}
this.lineNumber++;
var lineToReturn = this._bundleOfLines[0];
this._bundleOfLines.splice(0, 1); // remove first element (pos, howmany)
return lineToReturn;
};
this.getLineNumber = function() {
return this.lineNumber;
};
this._readNextBundleOfLines = function() {
var line = "";
while ((this.read = fs.readSync(this.fd, this.buffer, 0, this.bufferSize, null)) !== 0) { // read next bytes until end of file
this.leftOver += this.buffer.toString('utf8', 0, this.read); // append to leftOver
this.idxStart = 0
while ((this.idx = this.leftOver.indexOf("\n", this.idxStart)) !== -1) { // as long as there is a newline-char in leftOver
line = this.leftOver.substring(this.idxStart, this.idx);
this._bundleOfLines.push(line);
this.idxStart = this.idx + 1;
}
this.leftOver = this.leftOver.substring(this.idxStart);
if (line !== "") {
break;
}
}
};
}
노드 바이 라인은 스트림을 사용하기 때문에 대용량 파일에 사용할 스트림을 선호합니다.
당신의 데이트 표현을 위해 모멘트.js를 사용할 것입니다.
소프트웨어 클러스터 사용을 고려할 수 있습니다.노드 네이티브 클러스터를 꽤 잘 감싸는 멋진 클러스터가 있습니다. 저는 아이작의 클러스터 마스터를 좋아합니다. 예를 들어, 파일을 모두 계산하는 x개의 작업자 클러스터를 만들 수 있습니다.
benchmark.js.js.는 지금까지 테스트한 적이 없습니다. benchmark.js는 노드 패치로 사용할 수 있습니다.
import * as csv from 'fast-csv';
import * as fs from 'fs';
interface Row {
[s: string]: string;
}
type RowCallBack = (data: Row, index: number) => object;
export class CSVReader {
protected file: string;
protected csvOptions = {
delimiter: ',',
headers: true,
ignoreEmpty: true,
trim: true
};
constructor(file: string, csvOptions = {}) {
if (!fs.existsSync(file)) {
throw new Error(`File ${file} not found.`);
}
this.file = file;
this.csvOptions = Object.assign({}, this.csvOptions, csvOptions);
}
public read(callback: RowCallBack): Promise < Array < object >> {
return new Promise < Array < object >> (resolve => {
const readStream = fs.createReadStream(this.file);
const results: Array < any > = [];
let index = 0;
const csvStream = csv.parse(this.csvOptions).on('data', async (data: Row) => {
index++;
results.push(await callback(data, index));
}).on('error', (err: Error) => {
console.error(err.message);
throw err;
}).on('end', () => {
resolve(results);
});
readStream.pipe(csvStream);
});
}
}
import { CSVReader } from '../src/helpers/CSVReader';
(async () => {
const reader = new CSVReader('./database/migrations/csv/users.csv');
const users = await reader.read(async data => {
return {
username: data.username,
name: data.name,
email: data.email,
cellPhone: data.cell_phone,
homePhone: data.home_phone,
roleId: data.role_id,
description: data.description,
state: data.state,
};
});
console.log(users);
})();
@gerard의 대답에 영감을 받아, 저는 한 덩어리씩 읽는 통제된 방법을 제공하고 싶습니다.
나는 사용자의 요청에 따라 여러 개의 큰 로그 파일을 한 덩어리씩 읽는 전자 앱을 가지고 있으며, 다음 청크는 사용자가 요청할 때만 요청될 것입니다.
LogReader 클래스입니다.
// A singleton class, used to read log chunk by chunk
import * as fs from 'fs';
import { logDirPath } from './mainConfig';
import * as path from 'path';
type ICallback = (data: string) => Promise<void> | void;
export default class LogReader {
filenames: string[];
readstreams: fs.ReadStream[];
chunkSize: number;
lineNumber: number;
data: string;
static instance: LogReader;
private constructor(chunkSize = 10240) {
this.chunkSize = chunkSize || 10240; // default to 10kB per chunk
this.filenames = [];
// collect all log files and sort from latest to oldest
fs.readdirSync(logDirPath).forEach((file) => {
if (file.endsWith('.log')) {
this.filenames.push(path.join(logDirPath, file));
}
});
this.filenames = this.filenames.sort().reverse();
this.lineNumber = 0;
}
static getInstance() {
if (!this.instance) {
this.instance = new LogReader();
}
return this.instance;
}
// read a chunk from a log file
read(fileIndex: number, chunkIndex: number, cb: ICallback) {
// file index out of range, return "end of all files"
if (fileIndex >= this.filenames.length) {
cb('EOAF');
return;
}
const chunkSize = this.chunkSize;
fs.createReadStream(this.filenames[fileIndex], {
highWaterMark: chunkSize, // 1kb per read
start: chunkIndex * chunkSize, // start byte of this chunk
end: (chunkIndex + 1) * chunkSize - 1, // end byte of this chunk (end index was included, so minus 1)
})
.on('data', (data) => {
cb(data.toString());
})
.on('error', (e) => {
console.error('Error while reading file.');
console.error(e);
cb('EOF');
})
.on('end', () => {
console.log('Read entire chunk.');
cb('EOF');
});
}
}
그런 다음 각 청크를 읽으려면 기본 프로세스에서 다음을 호출하기만 하면 됩니다.
const readLogChunk = (fileIndex: number, chunkIndex: number): Promise<string> => {
console.log(`=== load log chunk ${fileIndex}: ${chunkIndex}====`);
return new Promise((resolve) => {
LogReader.getInstance().read(fileIndex, chunkIndex, (data) => resolve(data));
});
};
청크 단위로 읽기 위해 증분 청크 인덱스 유지
언제EOF
반환됩니다. 즉, 하나의 파일이 완료된 경우 fileIndex만 증가합니다.
언제EOAF
반환됩니다. 즉, 모든 파일을 읽었습니다. 중지하십시오.
큰 파일을 비동기식으로 읽을 수 있는 노드 모듈이나 JSON을 만들었습니다.대용량 파일에서 테스트되었습니다.
var fs = require('fs')
, util = require('util')
, stream = require('stream')
, es = require('event-stream');
module.exports = FileReader;
function FileReader(){
}
FileReader.prototype.read = function(pathToFile, callback){
var returnTxt = '';
var s = fs.createReadStream(pathToFile)
.pipe(es.split())
.pipe(es.mapSync(function(line){
// pause the readstream
s.pause();
//console.log('reading line: '+line);
returnTxt += line;
// resume the readstream, possibly from a callback
s.resume();
})
.on('error', function(){
console.log('Error while reading file.');
})
.on('end', function(){
console.log('Read entire file.');
callback(returnTxt);
})
);
};
FileReader.prototype.readJSON = function(pathToFile, callback){
try{
this.read(pathToFile, function(txt){callback(JSON.parse(txt));});
}
catch(err){
throw new Error('json file is not valid! '+err.stack);
}
};
파일을 file-reader.js로 저장하고 다음과 같이 사용합니다.
var FileReader = require('./file-reader');
var fileReader = new FileReader();
fileReader.readJSON(__dirname + '/largeFile.json', function(jsonObj){/*callback logic here*/});
언급URL : https://stackoverflow.com/questions/16010915/parsing-huge-logfiles-in-node-js-read-in-line-by-line
'programing' 카테고리의 다른 글
Node.js에서 가져오기: "ES 모듈을 로드하려면 가져오기를 사용해야 함" 오류 (0) | 2023.07.25 |
---|---|
PHP - 배열에서 마지막 요소 앞에 있는 요소를 가져오는 방법은 무엇입니까? (0) | 2023.07.25 |
Spring Boot @RepositoryRestResource를 특정 URL에 매핑하려면 어떻게 해야 합니까? (0) | 2023.07.25 |
이 입력은 파이썬 'any' 기능과 어떻게 작동합니까? (0) | 2023.07.25 |
디스크에 쓰지 않고 .zip 파일 다운로드 및 압축 풀기 (0) | 2023.07.25 |