Nf:dZdZddlZddlmZddlZddlZddlZddl m Z ddl Z ddl Z ddl mZddlZddlZddlmZe jZdaGd d Zd Ze jed Zd ZGddeZGddZdZGddeZ GddeZ!GddeZ"Gdde Z#dZ$dZ% d'dZ&d(dZ'Gdd e j(Z)da*da+d!Z,d"Z-Gd#d$ej.Z/Gd%d&ej0Z1dS))a- Implements ProcessPoolExecutor. The following diagram and text describe the data-flow through the system: |======================= In-process =====================|== Out-of-process ==| +----------+ +----------+ +--------+ +-----------+ +---------+ | | => | Work Ids | | | | Call Q | | Process | | | +----------+ | | +-----------+ | Pool | | | | ... | | | | ... | +---------+ | | | 6 | => | | => | 5, call() | => | | | | | 7 | | | | ... | | | | Process | | ... | | Local | +-----------+ | Process | | Pool | +----------+ | Worker | | #1..n | | Executor | | Thread | | | | | +----------- + | | +-----------+ | | | | <=> | Work Items | <=> | | <= | Result Q | <= | | | | +------------+ | | +-----------+ | | | | | 6: call() | | | | ... | | | | | | future | | | | 4, result | | | | | | ... | | | | 3, except | | | +----------+ +------------+ +--------+ +-----------+ +---------+ Executor.submit() called: - creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict - adds the id of the _WorkItem to the "Work Ids" queue Local worker thread: - reads work ids from the "Work Ids" queue and looks up the corresponding WorkItem from the "Work Items" dict: if the work item has been cancelled then it is simply removed from the dict, otherwise it is repackaged as a _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q" until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because calls placed in the "Call Q" can no longer be cancelled with Future.cancel(). - reads _ResultItems from "Result Q", updates the future stored in the "Work Items" dict and deletes the dict entry Process #1..n: - reads _CallItems from "Call Q", executes the calls, and puts the resulting _ResultItems in "Result Q" z"Brian Quinlan (brian@sweetapp.com)N)_base)Queue)partial)format_exceptionFc&eZdZdZdZdZdZdS) _ThreadWakeupcXd|_tjd\|_|_dS)NF)duplex)_closedmpPipe_reader_writerselfs 1/usr/lib/python3.11/concurrent/futures/process.py__init__z_ThreadWakeup.__init__Cs( %'WE%:%:%:" dlllc|js;d|_|j|jdSdSNT)r rcloserrs rrz_ThreadWakeup.closeGsK| !DL L   L    ! !rcL|js|jddSdS)Nr)r r send_bytesrs rwakeupz_ThreadWakeup.wakeupMs2| ) L # #C ( ( ( ( ( ) )rc|jsM|jr6|j|j2dSdSdSN)r rpoll recv_bytesrs rclearz_ThreadWakeup.clearQsl| *,##%% * ''))),##%% * * * * * * *rN)__name__ __module__ __qualname__rrrrrrrrBsP;;;!!! )))*****rrcdatt}|D]\}}||D]\}}|dSr)_global_shutdownlist_threads_wakeupsitemsrjoin)r(_ thread_wakeupts r _python_exitr-Wsw !'')) * *E!=1 r=ceZdZdZdZdS)_RemoteTracebackc||_dSrtb)rr4s rrz_RemoteTraceback.__init__ws rc|jSrr3rs r__str__z_RemoteTraceback.__str__ys wrN)r r!r"rr6r#rrr1r1vs2rr1ceZdZdZdZdS)_ExceptionWithTracebackcdtt|||}||_d|j_d|z|_dS)Nz """ %s""")r)rtypeexc __traceback__r4)rr<r4s rrz _ExceptionWithTraceback.__init__}sI WW%d3iib99 : :"& 2%rc,t|j|jffSr) _rebuild_excr<r4rs r __reduce__z"_ExceptionWithTraceback.__reduce__sdh000rN)r r!r"rr@r#rrr8r8|s2&&&11111rr8c.t||_|Sr)r1 __cause__)r<r4s rr?r?s$R((CM JrceZdZdZdS) _WorkItemc>||_||_||_||_dSr)futurefnargskwargs)rrFrGrHrIs rrz_WorkItem.__init__s"   rNr r!r"rr#rrrDrD#rrDceZdZddZdS) _ResultItemNc>||_||_||_||_dSr)work_id exceptionresultexit_pid)rrOrPrQrRs rrz_ResultItem.__init__s" "   rNNNrJr#rrrMrMs(!!!!!!rrMceZdZdZdS) _CallItemc>||_||_||_||_dSr)rOrGrHrI)rrOrGrHrIs rrz_CallItem.__init__s"   rNrJr#rrrUrUrKrrUc.eZdZdZdfd ZfdZxZS) _SafeQueuez=Safe Queue set exception to the future object linked to a jobrcx||_||_||_t||dS)N)ctx)pending_work_items shutdown_lockr+superr)rmax_sizerZr[r\r+ __class__s rrz_SafeQueue.__init__s>"4** s+++++rc t|trtt|||j}t dd||_|j |j d}|j 5|j dddn #1swxYwY||j|dSdSt#||dS)Nz """ {}"""r:) isinstancerUrr;r=r1formatr)rBr[poprOr\r+rrF set_exceptionr]_on_queue_feeder_error)reobjr4 work_itemr_s rrez!_SafeQueue._on_queue_feeder_errors1 c9 % % 3!$q''1ao>>B*>+@+@+M+MNNAK/33CKFFI# , ,"))+++ , , , , , , , , , , , , , , , $ ..q11111%$ GG * *1c 2 2 2 2 2sCCC)r)r r!r"__doc__rre __classcell__r_s@rrXrXs\GG,,,,,, 3 3 3 3 3 3 3 3 3rrXc'pKt|} ttj||}|sdS|V+)z, Iterates over zip()ed iterables in chunks. TN)ziptuple itertoolsislice) chunksize iterablesitchunks r _get_chunksrusI iBi&r95566  F rc fd|DS)z Processes a chunk of an iterable passed to map. Runs the function passed to map() on a chunk of the iterable passed to map. This function is run in a separate process. cg|]}|Sr#r#).0rHrGs r z"_process_chunk..s ( ( ($BBI ( ( (rr#)rGrts` r_process_chunkrzs ) ( ( (% ( ( ((rc |t||||dS#t$rE}t||j}|t|||Yd}~dSd}~wwxYw)z.Safely send back the given result or exception)rQrPrRrPrRN)putrM BaseExceptionr8r=) result_queuerOrQrPrRrfr<s r_sendback_resultrs9WV/88MMM N N N N N 999%a99W.6888 9 9 9 9 9 9 9 9 99s&* A9:A44A9c:|9 ||n2#t$r%tjddYdSwxYwd}d} |d}|(|t jdS||dz }||krt j} |j|j i|j }t||j ||~nD#t$r7} t| | j} t||j | | Yd} ~ nd} ~ wwxYw~|dS) aEvaluates calls from call_queue and places the results in result_queue. This worker is run in a separate process. Args: call_queue: A ctx.Queue of _CallItems that will be read and evaluated by the worker. result_queue: A ctx.Queue of _ResultItems that will written to by the worker. initializer: A callable initializer, or None initargs: A tuple of args for the initializer NzException in initializer:T)exc_inforblockr.)rQrRr|)r~rLOGGERcriticalgetr}osgetpidrGrHrIrrOr8r=) call_queuer initializerinitargs max_tasks num_tasksrR call_itemrrfr<s r_process_workerrs  K " " "    L ! !"= ! M M M FF   IHNNN..     RY[[ ) ) ) F  NII%%9;;  inA 0@AAA \9+.weakref_cb,s GMM1 2 2 2 ' '$$&&& ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' 'sAA A)_executor_manager_thread_wakeupr+_shutdown_lockr\weakrefrefexecutor_reference _processes processes _call_queuer _result_queuer _work_idswork_ids_queue_max_tasks_per_childmax_tasks_per_child_pending_work_itemsr[r]r)rexecutorrr_s rrz_ExecutorManagerThread.__init__s &E%4&*%7%)%7 ' ' ' '#*+h "C"C",#.%2'0$,#@ #+"> rc ||\}}}|r||dS||||jdu}|r3|j|j}|~|x}rP|r4|j 5| dddn #1swxYwYn|j ~| r1||js|dSHr)add_call_item_to_queuewait_result_broken_or_wakeupterminate_brokenprocess_result_itemrRrrcr)rr\_adjust_process_count_idle_worker_semaphorereleaseis_shutting_downflag_executor_shutting_downr[join_executor_internals)r result_item is_brokencauseprocess_exitedprs rrunz_ExecutorManagerThread.runMs#   ' ' ) ) ),0,M,M,O,O )KE %%e,,,&((555!,!5T!A!**;+?@@AFFHHH #668888!%B!/==$::<<<===============!7??AAA $$&& 00222.00222FG# s<CC!$C!cv |jrdS |jd}|j|}|jr<|jt||j |j |j dn|j|=#tj $rYdSwxYw)NTFr)rfullrrr[rFset_running_or_notify_cancelr}rUrGrHrIqueueEmpty)rrOrhs rrz-_ExecutorManagerThread.add_call_item_to_queueus ##%%  -111>>!3G< #@@BBO'' '2;,2;.2;2B)D)D/3 (4444 /8;      sB$$B76B7c8|jj}|jjrJ|jj}||g}dt |jD}tj ||z}d}d}d}||vrR | }d}n@#t$r-} tt| | | j}Yd} ~ nd} ~ wwxYw||vrd}|j5|jdddn #1swxYwY|||fS)Ncg|] }|j Sr#)sentinelrxrs rryzG_ExecutorManagerThread.wait_result_broken_or_wakeup..sNNN1AJNNNrTF)rrr+r r&rvaluesr connectionwaitrecvr~rr;r=r\r) r result_reader wakeup_readerreadersworker_sentinelsreadyrrrrfs rrz3_ExecutorManagerThread.wait_result_broken_or_wakeups )1 %----*2  -0NNT^5J5J5L5L0M0MNNN ""7-=#=>>  E ! ! F+0022 !   F F F(a!Q_EE Fe # #I   ' '   $ $ & & & ' ' ' ' ' ' ' ' ' ' ' ' ' ' 'Iu,,s*B C)#CC&D  DDct|trc|sJ|j|}||js|dSdS|j|jd}|I|j r!|j |j dS|j |j dSdSr)raintrrrcr)rr[rOrPrFrd set_resultrQ)rrrrhs rrz*_ExecutorManagerThread.process_result_items k3 ' ' D((** * * *"";//A FFHHH> ,,...   /33K4GNNI$(D$22;3HIIIII$// 0BCCCCC %$rcN|}tp |dup|jSr)rr%_shutdown_thread)rrs rrz'_ExecutorManagerThread.is_shutting_downs4**,, !-H$4-, .rc|}|d|_d|_d}td}|+t dd|d|_|jD] \}}|j |~!|j |j D]}||dS)NzKA child process terminated abruptly, the process pool is not usable anymoreTz^A process in the process pool was terminated abruptly while the future was running or pending.z ''' r:z''')r_brokenrBrokenProcessPoolr1r)rBr[r(rFrdrrr terminater)rrrbperOrhrs rrz'_ExecutorManagerThread.terminate_brokens **,,  !1H )-H %H !677  ,-"''%..---//CM#'"9"?"?"A"A   GY   * *3 / / /  %%'''&&((  A KKMMMM $$&&&&&rcT|}|d|_|jri}|jD]#\}}|js|||<$||_ |jn#tj $rYnwxYw1d|_dSdSdS)NTF) rr_cancel_pending_futuresr[r(rFcancelr get_nowaitrr)rrnew_pending_work_itemsrOrhs rrz2_ExecutorManagerThread.flag_executor_shutting_downs**,,  (,H %/ 9*,&*.*A*G*G*I*IDD&GY$+2244D:C.w7*@'+668888 ;49000)  9 9s-BBBcL|}d}||kr|dkrmt||z D]8} |jd|dz }##tj$rYnwxYw||kr|dkidSdSdSdS)Nrr.)get_n_children_aliveranger put_nowaitrFull)rn_children_to_stopn_sentinels_sentis rshutdown_workersz'_ExecutorManagerThread.shutdown_workers s!6688 "444--//!33-0@@AA  O..t444$)$$zEE  "444--//!3333543354s A**A=<A=cb||j|j|j5|jdddn #1swxYwY|jD]}|dSr) rrr join_threadr\r+rrr)rrs rrz._ExecutorManagerThread.join_executor_internalss   ##%%%   ' '   $ $ & & & ' ' ' ' ' ' ' ' ' ' ' ' ' ' '&&((  A FFHHHH  sA44A8;A8cbtd|jDS)Nc3>K|]}|VdSr)is_alivers r z>_ExecutorManagerThread.get_n_children_alive..(s*AAA1::<<AAAAAAr)sumrrrs rrz+_ExecutorManagerThread.get_n_children_alive&s-AA)>)>)@)@AAAAAAr)r r!r"rirrrrrrrrrrrrjrks@rrrs  -----^&&&P.--->DDD....$'$'$'L9994      BBBBBBBrrcLtrtrttda ddl}n$#t$rdattwxYw t jd}n#ttf$rYdSwxYw|dkrdS|dkrdSd|zatt)NTrzxThis Python build lacks multiprocessing.synchronize, usually due to named semaphores being unavailable on this platform.SC_SEM_NSEMS_MAXz@system provides too few semaphores (%d available, 256 necessary)) _system_limits_checked_system_limitedNotImplementedErrormultiprocessing.synchronize ImportErrorrsysconfAttributeError ValueError)multiprocessing nsems_maxs r_check_system_limitsr/s7  7%o66 6!3***** 333 F "/222 3 J122 J 'B C 46?@O o . ..s+!A A%%A:9A:c#pK|D]0}||r|V|1dS)z Specialized implementation of itertools.chain.from_iterable. Each item in *iterable* should be a list. This function is careful not to keep references to yielded objects. N)reverserc)iterableelements r_chain_from_iterable_of_listsrOsX    ++--      rceZdZdZdS)rzy Raised when a process in a ProcessPoolExecutor terminated abruptly while a future was in the running state. N)r r!r"rir#rrrr[srrceZdZ ddddZdZdZdZdZd Ze j jj e_ dd d fd Z ddddZ e j j j e _ xZS)ProcessPoolExecutorNr#)rct|Jtjpd|_tjdkrt t|j|_nN|dkrtdtjdkr"|tkrtdt||_|*|tj d}ntj }||_ |j d d k|_ |t|std ||_||_|gt%|t&std |dkrtd |j d d krtd||_d|_i|_d|_t1j|_t1jd|_d|_d|_i|_d|_ tC|_"|jtFz}tI||j |j|j|j"|_%d|j%_&|'|_(tSj*|_+dS)aHInitializes a new ProcessPoolExecutor instance. Args: max_workers: The maximum number of processes that can be used to execute the given calls. If None or not given then as many worker processes will be created as the machine has processors. mp_context: A multiprocessing context to launch the workers. This object should provide SimpleQueue, Queue and Process. Useful to allow specific multiprocessing start methods. initializer: A callable used to initialize worker processes. initargs: A tuple of arguments to pass to the initializer. max_tasks_per_child: The maximum number of tasks a worker process can complete before it will exit and be replaced with a fresh worker process. The default of None means worker process will live as long as the executor. Requires a non-'fork' mp_context start method. When given, we default to using 'spawn' if no mp_context is supplied. Nr.win32rz"max_workers must be greater than 0zmax_workers must be <= spawnF) allow_noneforkzinitializer must be a callablez&max_tasks_per_child must be an integerz max_tasks_per_child must be >= 1zpmax_tasks_per_child is incompatible with the 'fork' multiprocessing start method; supply a different mp_context.)r^rZr[r\r+T),rr cpu_count _max_workerssysplatformmin_MAX_WINDOWS_WORKERSrr get_context _mp_contextget_start_method#_safe_to_dynamically_spawn_childrencallable TypeError _initializer _initargsrarr_executor_manager_threadrr threadingLockr Semaphorerr _queue_countrrrrEXTRA_QUEUED_CALLSrXr _ignore_epipe SimpleQueuerrrr)r max_workers mp_contextrrr queue_sizes rrzProcessPoolExecutor.__init__cs(    "  3!D |w&&$'(<(,(9%;%;!a !EFFF,'))222 D.BDDFFF!,D   ".^G44 ^-- % 11U1CCvM 0  "8K+@+@ "<== ='!  *1377 E HIII$)) !CDDD00E0BBfLL "CDDD%8!)-%!&'n..&/&9!&<&<# #% ',$0=, &);; %T%5#7-> @@@*.&'3355rc|j^|js|t||_|j|jt |j<dSdSr)rr_launch_processesrstartrr'rs r_start_executor_manager_threadz2ProcessPoolExecutor._start_executor_manager_threadsn  ( 0; )&&(((,B4,H,HD )  ) / / 1 1 14 T: ; ; ; 1 0rc|jdrdSt|j}||jkr|dSdS)NF)blocking)racquirelenrr_spawn_process)r process_counts rrz)ProcessPoolExecutor._adjust_process_countsb  & . . . > >  FDO,, 4, , ,    ! ! ! ! ! - ,rc|jr Jdtt|j|jD]}|dS)NzhProcesses cannot be fork()ed after the thread has started, deadlock in the child processes could result.)rrr'rrr()rr*s rr!z%ProcessPoolExecutor._launch_processesss0 A A@ A A As4?++T->?? " "A    ! ! ! ! " "rc|jt|j|j|j|j|jf}|||j |j <dS)N)targetrH) rProcessrrrrrrr"rpidrs rr(z"ProcessPoolExecutor._spawn_processsg   $ $""$#.+ - % . .  !"rc,|j5|jrt|j|jrt dt rt dt j}t||||}||j |j <|j |j |xj dz c_ |j |jr|||cdddS#1swxYwYdS)Nz*cannot schedule new futures after shutdownz6cannot schedule new futures after interpreter shutdownr.)rrrr RuntimeErrorr%rFuturerDrrrr}rrrrr#)rrGrHrIfws rsubmitzProcessPoolExecutor.submitsg    | 6' 555$ Q"#OPPP ;"$:;;; A!Rv..A:;D $T%6 7 N  t0 1 1 1    "    0 7 7 9 9 97 -**,,,  / / 1 1 1+                  sC4D  D D r.)timeoutrqc|dkrtdttt|t |d|i|}t |S)ajReturns an iterator equivalent to map(fn, iter). Args: fn: A callable that will take as many arguments as there are passed iterables. timeout: The maximum number of seconds to wait. If None, then there is no limit on the wait time. chunksize: If greater than one, the iterables will be chopped into chunks of size chunksize and submitted to the process pool. If set to one, the items in the list will be sent one at a time. Returns: An iterator equivalent to: map(func, *iterables) but the calls may be evaluated out-of-order. Raises: TimeoutError: If the entire result iterator could not be generated before the given timeout. Exception: If fn(*args) raises for any values. r.zchunksize must be >= 1.rq)r5)rr]maprrzrur)rrGr5rqrrresultsr_s rr7zProcessPoolExecutor.mapsh* q==677 7''++gnb99)9J JJ&-//-W555rTF)cancel_futurescn|j5||_d|_|j|jdddn #1swxYwY|j|r|jd|_d|_|j|r|j d|_d|_ d|_dSr) rrrrrrr)rrrr)rrr9s rshutdownzProcessPoolExecutor.shutdown0s   > >+9D ($(D !3?4;;===  > > > > > > > > > > > > > > >  ( 4 4  ) . . 0 0 0)-%   )d )   $ $ & & &!/3,,,s/AA A)NNNr#)T)r r!r"rr#rr!r(r4rExecutorrir7r;rjrks@rrrbs48,.i'GKi'i'i'i'i'V555 " " """" # # #.^*2FN*.!6666666:4E44444(~.6HrrrSr)2ri __author__rconcurrent.futuresrrrr multiprocessing.connectionmultiprocessing.queuesrrr functoolsrror  tracebackrWeakKeyDictionaryr'r%rr-_register_atexitrr  Exceptionr1r8r?objectrDrMrUrXrurzrrThreadrrrrrBrokenExecutorrr<rr#rrrIsY((T2 $$$$$$ !!!!(((((( &&&&&&-7,..********* <((( y 1 1 1 1 1 1 1 1!!!!!&!!!333333332 ) ) )DH" 9 9 9 93333lVBVBVBVBVBY-VBVBVBr///@    ,b7b7b7b7b7%.b7b7b7b7b7r