programing

노드 하위 프로세스에 큰 어레이 전달

goodsources 2023. 7. 10. 22:18
반응형

노드 하위 프로세스에 큰 어레이 전달

대규모 어레이에서 수행해야 하는 복잡한 CPU 집약적 작업이 있습니다.이상적으로, 저는 이것을 아동 과정에 전달하고 싶습니다.

var spawn = require('child_process').spawn;

// dataAsNumbers is a large 2D array
var child = spawn(process.execPath, ['/child_process_scripts/getStatistics', dataAsNumbers]);

child.stdout.on('data', function(data){
  console.log('from child: ', data.toString());
});

하지만 그럴 때 노드는 다음과 같은 오류를 표시합니다.

산란 E2BIG

는 이 기사를 우연히 발견했습니다.

따라서 데이터를 하위 프로세스에 연결하는 것이 방법인 것 같습니다.내 코드는 다음과 같습니다.

var spawn = require('child_process').spawn;

console.log('creating child........................');

var options = { stdio: [null, null, null, 'pipe'] };
var args = [ '/getStatistics' ];
var child = spawn(process.execPath, args, options);

var pipe = child.stdio[3];

pipe.write(Buffer('awesome'));

child.stdout.on('data', function(data){
  console.log('from child: ', data.toString());
});

그런 다음 getStatistics.js:

console.log('im inside child');

process.stdin.on('data', function(data) {
  console.log('data is ', data);
  process.exit(0);
});

은 하만다전걸려다니옵가화지시▁however.process.stdin.on연결되지 않았습니다.하위 스크립트에서 스트림을 받으려면 어떻게 해야 합니까?

편집

저는 버퍼 접근 방식을 포기해야 했습니다.이제 배열을 메시지로 보냅니다.

var cp = require('child_process');
var child = cp.fork('/getStatistics.js');

child.send({ 
  dataAsNumbers: dataAsNumbers
});

그러나 이것은 dataAsNumbers의 길이가 약 20,000 미만일 때만 작동하며, 그렇지 않을 경우 시간 초과됩니다.

이렇게 방대한 양의 데이터를 사용할 경우 하위 프로세스(파이프를 사용하거나 메시지를 전달할 때 발생)에 데이터를 복사하는 대신 공유 메모리를 사용하는 방법을 알아봅니다.이렇게 하면 메모리가 절약되고 상위 프로세스에 대한 CPU 시간이 적게 소요되며 일부 제한에 부딪힐 가능성이 낮습니다.

shm-typed-array 는 응용 프로그램에 적합한 매우 간단한 모듈입니다.예:

parent.js

"use strict";

const shm = require('shm-typed-array');
const fork = require('child_process').fork;

// Create shared memory
const SIZE = 20000000;
const data = shm.create(SIZE, 'Float64Array');

// Fill with dummy data
Array.prototype.fill.call(data, 1);

// Spawn child, set up communication, and give shared memory
const child = fork("child.js");
child.on('message', sum => {
    console.log(`Got answer: ${sum}`);

    // Demo only; ideally you'd re-use the same child
    child.kill();
});
child.send(data.key);

child.js

"use strict";

const shm = require('shm-typed-array');

process.on('message', key => {
    // Get access to shared memory
    const data = shm.get(key, 'Float64Array');

    // Perform processing
    const sum = Array.prototype.reduce.call(data, (a, b) => a + b, 0);

    // Return processed data
    process.send(sum);
});

전체 데이터가 아닌 IPC를 통해 상위 프로세스에서 하위 프로세스로 작은 "키"만 전송합니다.따라서, 우리는 많은 메모리와 시간을 절약할 수 있습니다.

물론, 당신은 바꿀 수 있습니다.'Float64Array' a(숫자: a)double응용 프로그램에 필요한 모든 유형의 배열로 이동합니다.특히 이 라이브러리는 1차원 유형 배열만 처리하지만 이는 사소한 장애물일 뿐입니다.

저도 당신이 겪고 있는 지연을 재현할 수 있었지만, 아마도 당신만큼 나쁘지는 않을 것입니다.다음을 사용했습니다.

// main.js
const fork = require('child_process').fork

const child = fork('./getStats.js')

const dataAsNumbers = Array(100000).fill(0).map(() =>
  Array(100).fill(0).map(() => Math.round(Math.random() * 100)))

child.send({
  dataAsNumbers: dataAsNumbers,
})

그리고.

// getStats.js
process.on('message', function (data) {
  console.log('data is ', data)
  process.exit(0)
})

노드 main.js 2.72s 사용자 0.45s 시스템 103% CPU 3.045 합계

된 100k 요소를 , 이 100k 요소를 당신이 사용하고 있는지 확인하세요.message에 관한 사건.process그러나 자녀가 더 복잡하고 실패의 원인일 수 있으며, 또한 쿼리에 설정한 시간 제한에 따라 다를 수 있습니다.


더 나은 결과를 얻으려면 데이터를 여러 조각으로 청크하여 하위 프로세스로 보내고 초기 배열을 구성하도록 재구성합니다.


또한 타사 라이브러리나 프로토콜을 사용하는 것도 한 가지 방법입니다. 조금 더 많은 작업이 필요하더라도 말입니다.messenger.js 또는 AMQP 큐와 같은 프로세스 간에 풀과 메시지의 보증으로 통신할 수 있는 것을 하위 프로세스에서 확인할 수 있습니다.amqp.node와 같은 몇 가지 노드 구현이 있지만 여전히 약간의 설정 및 구성 작업이 필요합니다.

https://github.com/ptarjan/node-cache, 같은 in 메모리 캐시를 사용하고 상위 프로세스가 어레이 내용을 일부 키로 저장하도록 하면 하위 프로세스가 해당 키를 통해 내용을 검색합니다.

OS 파이프를 사용하는 것을 고려할 수 있습니다. 노드 하위 응용 프로그램에 대한 입력으로 여기에서 요지를 찾을있습니다.

이것이 정확히 당신이 요구하는 것이 아니라는 것을 알지만, 당신은 클러스터 모듈(노드에 포함)을 사용할 수 있습니다.이렇게 하면 기계가 처리 속도를 높이는 데 필요한 코어 수만큼 인스턴스를 얻을 수 있습니다.또한 처리를 시작하기 전에 모든 데이터를 사용할 필요가 없는 경우 스트림을 사용하는 것이 좋습니다.처리할 데이터가 너무 크면 파일에 저장하여 프로세스 중에 오류가 발생하면 다시 사용할 수 있습니다.다음은 클러스터링의 예입니다.

var cluster = require('cluster');
var numCPUs = 4;

if (cluster.isMaster) {
    for (var i = 0; i < numCPUs; i++) {
        var worker = cluster.fork();
        console.log('id', worker.id)
    }
} else {
    doSomeWork()
}

function doSomeWork(){
    for (var i=1; i<10; i++){
        console.log(i)
    }
}

작업자 간 메시지 전송에 대한 자세한 내용은 질문 8534462를 참조하십시오.

하위 프로세스를 만드는 이유는 무엇입니까?하위 프로세스 간에 데이터를 전송하면 동일한 프로세스 내에서 처리를 수행하는 데 드는 비용보다 CPU 및 실시간 측면에서 더 많은 비용이 소요될 수 있습니다.

대신, 매우 효율적인 코딩을 위해 nodejs 메인 프로세스와 동일한 메모리 내에서 실행되는 작업자 스레드에서 통계 계산을 수행할 것을 제안합니다.

NAN을 사용하여 작업자 스레드에 게시할 수 있는 C++ 코드를 작성한 다음 작업자 스레드가 완료되면 결과와 이벤트를 nodejs 이벤트 루프에 게시하도록 할 수 있습니다.

다른 프로세스로 데이터를 전송하는 데 추가 시간이 필요하지 않다는 장점이 있지만, 나사형 작업을 위해 C++ 코드를 약간 작성하지만 NAN 확장이 대부분의 어려운 작업을 처리합니다.

대용량 데이터를 하위 프로세스에 전달하는 동안 성능 문제를 해결하려면 데이터를 .json 또는 .txt 파일에 저장하고 파일 이름만 하위 프로세스에 전달합니다.이 접근 방식을 통해 성능이 70% 향상되었습니다.

긴 프로세스 작업의 경우 기어맨과 같은 작업을 수행할 수 있습니다. 이 방법으로 작업자에게 필요한 작업자 수를 설정할 수 있습니다. 예를 들어, 파일 처리를 이 방법으로 수행하고, 확장이 필요한 경우 작업자 인스턴스를 더 많이 생성하고, 다른 작업, zip 파일 처리,썸네일 등을 생성합니다. 이것의 장점은 작업자가 모든 언어 노드에 작성될 수 있다는 것입니다.js, Java, python은 프로젝트에 쉽게 통합될 수 있습니다.

// worker-unzip.js
const debug = require('debug')('worker:unzip');
const {series, apply} = require('async');
const gearman = require('gearmanode');
const {mkdirpSync} = require('fs-extra');
const extract = require('extract-zip');

module.exports.unzip = unzip;
module.exports.worker = worker;

function unzip(inputPath, outputDirPath, done) {
  debug('unzipping', inputPath, 'to', outputDirPath);
  mkdirpSync(outputDirPath);
  extract(inputPath, {dir: outputDirPath}, done);
}


/**
 *
 * @param {Job} job
 */
function workerUnzip(job) {
  const {inputPath, outputDirPath} = JSON.parse(job.payload);
  series([
    apply(unzip, inputPath, outputDirPath),
    (done) => job.workComplete(outputDirPath)
  ], (err) => {
    if (err) {
      console.error(err);
      job.reportError();
    }
  });
}

function worker(config) {
  const worker = gearman.worker(config);
  if (config.id) {
    worker.setWorkerId(config.id);
  }

  worker.addFunction('unzip', workerUnzip, {timeout: 10, toStringEncoding: 'ascii'});
  worker.on('error', (err) => console.error(err));

  return worker;
}

단순 색인.js

const unzip = require('./worker-unzip').worker;

unzip(config); // pass host and port of the Gearman server

저는 주로 PM2로 작업자를 운영합니다.

당신의 코드와 통합하는 것은 매우 쉽습니다. 비슷한 것입니다.

//initialize
const gearman = require('gearmanode');

gearman.Client.logger.transports.console.level = 'error';
const client = gearman.client(configGearman); // same host and port

함수 이름을 전달하는 대기열에 작업 추가

const taskpayload = {inputPath: '/tmp/sample-file.zip', outputDirPath: '/tmp/unzip/sample-file/'}
const job client.submitJob('unzip', JSON.stringify(taskpayload));
job.on('complete', jobCompleteCallback);
job.on('error', jobErrorCallback);

언급URL : https://stackoverflow.com/questions/44052913/pass-large-array-to-node-child-process

반응형