programing

유형 오류: _thread.lock 개체를 피클할 수 없습니다.

goodsources 2023. 5. 1. 21:02
반응형

유형 오류: _thread.lock 개체를 피클할 수 없습니다.

공유 대기열을 사용하여 두 개의 서로 다른 기능을 동시에 실행하려고 하면 오류가 발생합니다.공유 대기열로 두 기능을 동시에 실행하려면 어떻게 해야 합니까?Windows 7(윈도우 7)의 Python 버전 3.6입니다.

from multiprocessing import Process
from queue import Queue
import logging

def main():
    x = DataGenerator()
    try:
        x.run()
    except Exception as e:
        logging.exception("message")


class DataGenerator:

    def __init__(self):
        logging.basicConfig(filename='testing.log', level=logging.INFO)

    def run(self):
        logging.info("Running Generator")
        queue = Queue()
        Process(target=self.package, args=(queue,)).start()
        logging.info("Process started to generate data")
        Process(target=self.send, args=(queue,)).start()
        logging.info("Process started to send data.")

    def package(self, queue): 
        while True:
            for i in range(16):
                datagram = bytearray()
                datagram.append(i)
                queue.put(datagram)

    def send(self, queue):
        byte_array = bytearray()
        while True:
            size_of__queue = queue.qsize()
            logging.info(" queue size %s", size_of_queue)
            if size_of_queue > 7:
                for i in range(1, 8):
                    packet = queue.get()
                    byte_array.append(packet)
                logging.info("Sending datagram ")
                print(str(datagram))
                byte_array(0)

if __name__ == "__main__":
    main()

로그에 오류가 표시됩니다. 콘솔을 관리자 권한으로 실행하려고 했는데 동일한 메시지가 표시됩니다.

INFO:root:Running Generator
ERROR:root:message
Traceback (most recent call last):
  File "test.py", line 8, in main
    x.run()
  File "test.py", line 20, in run
    Process(target=self.package, args=(queue,)).start()
  File "C:\ProgramData\Miniconda3\lib\multiprocessing\process.py", line 105, in start
    self._popen = self._Popen(self)
  File "C:\ProgramData\Miniconda3\lib\multiprocessing\context.py", line 223, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "C:\ProgramData\Miniconda3\lib\multiprocessing\context.py", line 322, in _Popen
    return Popen(process_obj)
  File "C:\ProgramData\Miniconda3\lib\multiprocessing\popen_spawn_win32.py", line 65, in __init__
    reduction.dump(process_obj, to_child)
  File "C:\ProgramData\Miniconda3\lib\multiprocessing\reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
TypeError: can't pickle _thread.lock objects

저도 같은 문제를 겪었습니다.Pool()Python 3.6.3에서.

수신된 오류:TypeError: can't pickle _thread.RLock objects

우리가 몇 개의 숫자를 추가하고 싶다고 가정해 보겠습니다.num_to_add어떤 목록의 각 요소에.num_list병렬로코드는 다음과 같이 개략적으로 다음과 같습니다.

class DataGenerator:
    def __init__(self, num_list, num_to_add)
        self.num_list = num_list # e.g. [4,2,5,7]
        self.num_to_add = num_to_add # e.g. 1 

        self.run()

    def run(self):
        new_num_list = Manager().list()

        pool = Pool(processes=50)
        results = [pool.apply_async(run_parallel, (num, new_num_list)) 
                      for num in num_list]
        roots = [r.get() for r in results]
        pool.close()
        pool.terminate()
        pool.join()

    def run_parallel(self, num, shared_new_num_list):
        new_num = num + self.num_to_add # uses class parameter
        shared_new_num_list.append(new_num)

여기서 문제는self기능하고 있는run_parallel()클래스 인스턴스이므로 절일 수 없습니다.이 병렬화된 기능을 이동run_parallel()교실 밖에서 도움을 받았습니다.그러나 이 함수는 아마도 다음과 같은 클래스 매개 변수를 사용해야 하기 때문에 최상의 솔루션은 아닙니다.self.num_to_add그리고 나서 당신은 그것을 논쟁으로 넘겨야 합니다.

솔루션:

def run_parallel(num, shared_new_num_list, to_add): # to_add is passed as an argument
    new_num = num + to_add
    shared_new_num_list.append(new_num)

class DataGenerator:
    def __init__(self, num_list, num_to_add)
        self.num_list = num_list # e.g. [4,2,5,7]
        self.num_to_add = num_to_add # e.g. 1

        self.run()

    def run(self):
        new_num_list = Manager().list()

        pool = Pool(processes=50)
        results = [pool.apply_async(run_parallel, (num, new_num_list, self.num_to_add)) # num_to_add is passed as an argument
                      for num in num_list]
        roots = [r.get() for r in results]
        pool.close()
        pool.terminate()
        pool.join()

위의 다른 제안들은 저에게 도움이 되지 않았습니다.

변경해야 합니다.from queue import Queue로.from multiprocessing import Queue.

근본적인 이유는 전자 큐는 스레드화 모듈 큐를 위해 설계된 반면 후자는 멀티프로세싱을 위해 설계되었기 때문입니다.프로세스 모듈.

멀티프로세싱풀 - 피클링 오류:<type' 스레드를 피클할 수 없습니다.lock'>: 특성 조회 thread.lock 실패

함수에 대한 인수 대신 대기열을 자체로 이동package그리고.send

마리나가 수업 전체에 접근할 수 있는 것을 여기에 답합니다.또한 오늘 필요한 만큼 Pool.map을 속입니다.

fakeSelf = None

def run_parallel(num, shared_new_num_list, to_add): # to_add is passed as an argument
    new_num = num + fakeSelf.num_to_add
    shared_new_num_list.append(new_num)

class DataGenerator:
    def __init__(self, num_list, num_to_add)
        globals()['fakeSelf'] = self
        self.num_list = num_list # e.g. [4,2,5,7]
        self.num_to_add = num_to_add # e.g. 1

        self.run()

    def run(self):
        new_num_list = Manager().list()

이것이 이 문제를 검색할 때 나타나는 첫 번째 답변이므로, 저도 여기에 제 솔루션을 추가하겠습니다.

이 문제는 여러 가지로 인해 발생할 수 있습니다.다음은 두 가지 시나리오입니다.

  1. 패키지 비호환성
    • 문제:코드의 다른 부분이 최종적으로 호출될 때도 새 프로세스를 만들어야 하거나 잠금 사용 등으로 인해 새 프로세스로 복사되는 것과 호환되지 않을 때 멀티프로세싱을 사용하려고 합니다.
    • 솔루션:문제는 모든 프로세스에 대해 MongoDB 인스턴스에 대한 단일 연결을 사용하는 것이었습니다.각 프로세스에 대해 새 연결을 생성하여 문제를 해결했습니다.
  2. 클래스 인스턴스
    • 문제:전화하려고 합니다.pool.starmap클래스 내부에서 클래스의 다른 기능으로.정적 메서드로 만들거나 외부 호출 기능을 사용해도 작동하지 않고 동일한 오류가 발생했습니다.클래스 인스턴스는 피클링할 수 없으므로 다중 처리를 시작한 후 인스턴스를 만들어야 합니다.
    • 솔루션:제가 하게 된 일은 제 수업을 두 개의 반으로 나누는 것이었습니다.기본적으로 멀티프로세싱을 호출하는 함수는 해당 함수가 속한 클래스의 새 개체를 인스턴스화한 후 바로 호출해야 합니다.이와 같은 것:
from multiprocessing import Pool

class B:
    ...
    def process_feature(idx, feature):
        # do stuff in the new process
        pass
    ...

def multiprocess_feature(process_args):
    b_instance = B()
    return b_instance.process_feature(*process_args)

class A:
    ...
    def process_stuff():
        ...
        with Pool(processes=num_processes, maxtasksperchild=10) as pool:
            results = pool.starmap(
                multiprocess_feature,
                [
                    (idx, feature)
                    for idx, feature in enumerate(features)
                ],
                chunksize=100,
            )
        ...
    ...

...

언급URL : https://stackoverflow.com/questions/44144584/typeerror-cant-pickle-thread-lock-objects

반응형