o w[e.@sgdZddlZddlZddlZddlZddlZddlZddlZddlZddl m Z m Z ddl Z ddl mZddl mZejjZddlmZmZmZmZmZGdd d eZeZGd d d eZGd d d eZdS))Queue SimpleQueue JoinableQueueN)EmptyFull) connection)context)debuginfoFinalizeregister_after_fork is_exitingc@seZdZd-ddZddZddZdd Zd.d d Zd/ddZd/ddZ ddZ ddZ ddZ ddZ ddZddZdd Zd!d"Zd#d$Zed%d&Zed'd(Zed)d*Zed+d,ZdS)0rrcCs|dkr ddlm}||_tjdd\|_|_||_t |_ t j dkr*d|_n||_|||_d|_|t j dkrIt|tjdSdS)Nrr) SEM_VALUE_MAXFduplexwin32) synchronizer_maxsizerPipe_reader_writerLock_rlockosgetpid_opidsysplatform_wlockBoundedSemaphore_sem _ignore_epipe_resetr r _after_forkselfmaxsizectxr)-/usr/lib/python3.10/multiprocessing/queues.py__init__%s       zQueue.__init__cCs.t||j|j|j|j|j|j|j|j fSN) r assert_spawningr"rrrrrr!rr&r)r)r* __getstate__9s zQueue.__getstate__c Cs0|\|_|_|_|_|_|_|_|_|dSr,) r"rrrrrr!rr#r&stater)r)r* __setstate__>s zQueue.__setstate__cCstd|jdddS)NzQueue._after_fork()T) after_fork)r r#r.r)r)r*r$CszQueue._after_forkFcCsj|r|jntt|_t|_d|_d|_ d|_ d|_ d|_ |j j|_|jj|_|jj|_dSNF) _notempty_at_fork_reinit threading Conditionr collectionsdeque_buffer_thread _jointhread_joincancelled_closed_closer send_bytes _send_bytesr recv_bytes _recv_bytespoll_poll)r&r3r)r)r*r#Gs    z Queue._resetTNcCs||jr td|d|j||st|j|jdur!||j ||j WddS1s7wYdSNzQueue z is closed) r? ValueErrorr!acquirerr5r< _start_threadr;appendnotifyr&objblocktimeoutr)r)r*putVs   "z Queue.putcCs|jr td|d|r.|dur.|j |}Wdn1s#wY|jn>|r6t|}|j||s?t z&|rP|t}| |sOt n| sVt |}|jW|jn|jwt |SrG) r?rHrrDr!releasetime monotonicrIrrF_ForkingPicklerloads)r&rOrPresdeadliner)r)r*getbs.        z Queue.getcCs|j|jjSr,)rr!_semlock _get_valuer.r)r)r*qsize|sz Queue.qsizecC | Sr,rFr.r)r)r*empty z Queue.emptycCs |jjSr,)r!rZ_is_zeror.r)r)r*full z Queue.fullcCs |dSr4)rYr.r)r)r* get_nowaitr`zQueue.get_nowaitcCs ||dSr4)rQr&rNr)r)r* put_nowaitrczQueue.put_nowaitcCs$d|_|j}|rd|_|dSdS)NT)r?r@)r&closer)r)r*rgs  z Queue.closecCs2td|jsJd||jr|dSdS)NzQueue.join_thread()zQueue {0!r} not closed)r r?formatr=r.r)r)r* join_threads  zQueue.join_threadcCs4tdd|_z|jWdStyYdSw)NzQueue.cancel_join_thread()T)r r>r=cancelAttributeErrorr.r)r)r*cancel_join_threads zQueue.cancel_join_threadc Cstd|jtjtj|j|j|j|j |j j |j j |j |j|jf dd|_d|j_td|jtd|jsKt|jtjt|jgdd|_t|tj|j|jgd d|_dS) NzQueue._start_thread()QueueFeederThread)targetargsnameTzdoing self._thread.start()z... done self._thread.start()) exitpriority )r r;clearr7Threadr_feedr5rBrrrgrr"_on_queue_feeder_errorr!r<daemonstartr>r _finalize_joinweakrefrefr=_finalize_closer@r.r)r)r*rJs4      zQueue._start_threadcCs6td|}|dur|tddStddS)Nzjoining queue threadz... queue thread joinedz... queue thread already dead)r join)twrthreadr)r)r*rzs   zQueue._finalize_joincCsDtd||t|WddS1swYdS)Nztelling queue thread to quit)r rK _sentinelrL)buffernotemptyr)r)r*r}s   "zQueue._finalize_closec  Csftd|j} |j} |j} |j} t} tjdkr|j}|j}nd} zT| z |s+| W| n| wz6 | }|| urKtd||WWdSt |}|durY||n|z ||W|n|wq6t ysYnwWn<t y}z0|rt |ddt jkrWYd}~dStrtd|WYd}~dS||||WYd}~nd}~wwq!)Nz$starting thread to feed data to piperrz%feeder thread got sentinel -- exitingerrnorzerror in queue thread: %s)r rIrRwaitpopleftrrrrUdumps IndexError ExceptiongetattrrEPIPErr )rrrA writelock reader_close writer_close ignore_epipeonerror queue_semnacquirenreleasenwaitbpopleftsentinelwacquirewreleaserNer)r)r*rvsb      z Queue._feedcCsddl}|dS)z Private API hook called when feeding data in the background thread raises an exception. For overriding by concurrent.futures. rN) traceback print_exc)rrNrr)r)r*rws zQueue._on_queue_feeder_errorr)FTN)__name__ __module__ __qualname__r+r/r2r$r#rQrYr\r_rbrdrfrgrirlrJ staticmethodrzr}rvrwr)r)r)r*r#s2    !   >rc@s@eZdZdddZddZddZdd d Zd d ZddZd S)rrcCs*tj|||d|d|_||_dS)N)r(r)rr+ Semaphore_unfinished_tasksr8_condr%r)r)r*r+(s zJoinableQueue.__init__cCst||j|jfSr,)rr/rrr.r)r)r*r/-szJoinableQueue.__getstate__cCs,t||dd|dd\|_|_dS)N)rr2rrr0r)r)r*r20szJoinableQueue.__setstate__TNc Cs|jr td|d|j||st|j=|j!|jdur%||j ||j |j Wdn1s?wYWddSWddS1sWwYdSrG)r?rHr!rIrr5rr<rJr;rKrrRrLrMr)r)r*rQ4s    PzJoinableQueue.putcCsf|j&|jdstd|jjr!|jWddSWddS1s,wYdS)NFz!task_done() called too many times)rrrIrHrZra notify_allr.r)r)r* task_doneAs   "zJoinableQueue.task_donecCsR|j|jjs|jWddSWddS1s"wYdSr,)rrrZrarr.r)r)r*r~Hs   "zJoinableQueue.joinrr) rrrr+r/r2rQrr~r)r)r)r*r&s   rc@sNeZdZddZddZddZddZd d Zd d Zd dZ e e j Z dS)rcCsJtjdd\|_|_||_|jj|_tj dkrd|_ dS||_ dS)NFrr) rrrrrrrErFrrr)r&r(r)r)r*r+Ss     zSimpleQueue.__init__cCs|j|jdSr,)rrgrr.r)r)r*rg\s zSimpleQueue.closecCr]r,r^r.r)r)r*r_`r`zSimpleQueue.emptycCst||j|j|j|jfSr,)r r-rrrrr.r)r)r*r/cs zSimpleQueue.__getstate__cCs"|\|_|_|_|_|jj|_dSr,)rrrrrErFr0r)r)r*r2gszSimpleQueue.__setstate__cCs:|j |j}Wdn1swYt|Sr,)rrrCrUrV)r&rWr)r)r*rYks  zSimpleQueue.getcCs\t|}|jdur|j|dS|j|j|WddS1s'wYdSr,)rUrrrrArer)r)r*rQqs  "zSimpleQueue.putN)rrrr+rgr_r/r2rYrQ classmethodtypes GenericAlias__class_getitem__r)r)r)r*rQs  r)__all__rrr7r9rSrr{rqueuerr_multiprocessingrr reductionForkingPicklerrUutilr r r r robjectrrrrr)r)r)r*s(   z +