Fa+@s,dddgZddlZddlZddlZddlZddlZddlZddlZddlm Z m Z ddl Z ddl m Z ddl mZdd lmZmZmZmZmZdd lmZGd ddeZeZGd ddeZGd ddeZdS)Queue SimpleQueue JoinableQueueN)EmptyFull) connection)context)debuginfoFinalizeregister_after_fork is_exiting)ForkingPicklerc@s eZdZdddZddZddZdd Zd d d d Zd d ddZddZ ddZ ddZ ddZ ddZ ddZddZddZd d!Zed"d#Zed$d%Zed&d'Zd S)(rrcCs|dkrddlm}||_tjdd\|_|_|j|_t j |_ t j dkr|d|_n|j|_|j||_d|_|jt j dkrt|tjdS)Nrr) SEM_VALUE_MAXduplexFwin32)Z synchronizer_maxsizerPipe_reader_writerLock_rlockosgetpid_opidsysplatform_wlockZBoundedSemaphore_sem _ignore_epipe _after_forkr r)selfmaxsizectxr%,/usr/lib/python3.5/multiprocessing/queues.py__init__$s     zQueue.__init__cCsAtj||j|j|j|j|j|j|j|j fS)N) r assert_spawningr rrrrrrr)r"r%r%r& __getstate__9s zQueue.__getstate__c CsD|\|_|_|_|_|_|_|_|_|jdS)N) r rrrrrrrr!)r"stater%r%r& __setstate__>s6zQueue.__setstate__cCstdtjtj|_tj|_d|_d|_ d|_ d|_ d|_ |j j|_|jj|_|jj|_dS)NzQueue._after_fork()F)r threading Conditionr _notempty collectionsdeque_buffer_thread _jointhread_joincancelled_closed_closer send_bytes _send_bytesr recv_bytes _recv_bytespoll_poll)r"r%r%r&r!Cs      zQueue._after_forkTNc Csv|j st|jj||s+t|j;|jdkrN|j|jj ||jj WdQRXdS)N) r5AssertionErrorracquirerr.r2 _start_threadr1appendnotify)r"objblocktimeoutr%r%r&putPs  z Queue.putc Cs|r?|dkr?|j|j}WdQRX|jjn|rUtj|}|jj||sptzj|r|tj}|dks|j| rtn|jst|j}|jjWd|jjXtj |S)Nr) rr:rreleasetimer>rr<rloads)r"rCrDresZdeadliner%r%r&get[s&    z Queue.getcCs|j|jjjS)N)rr_semlockZ _get_value)r"r%r%r&qsizessz Queue.qsizecCs |j S)N)r<)r"r%r%r&emptywsz Queue.emptycCs|jjjS)N)rrK_is_zero)r"r%r%r&fullzsz Queue.fullcCs |jdS)NF)rJ)r"r%r%r& get_nowait}szQueue.get_nowaitcCs|j|dS)NF)rE)r"rBr%r%r& put_nowaitszQueue.put_nowaitc CsAd|_z|jjWd|j}|r<d|_|XdS)NT)r5rcloser6)r"rRr%r%r&rRs   z Queue.closecCs0td|jst|jr,|jdS)NzQueue.join_thread())r r5r=r3)r"r%r%r& join_threads  zQueue.join_threadc Cs=tdd|_y|jjWntk r8YnXdS)NzQueue.cancel_join_thread()T)r r4r3ZcancelAttributeError)r"r%r%r&cancel_join_threads    zQueue.cancel_join_threadc Cstd|jjtjdtjd|j|j|j|j |j j |j fdd|_ d|j _td|j jtd|jtjk}|j r| rt|j tjtj|j gd d |_t|tj|j|jgd d |_dS) NzQueue._start_thread()targetargsnameZQueueFeederThreadTzdoing self._thread.start()z... done self._thread.start()Z exitpriority )r r1clearr,ZThreadr_feedr.r8rrrRr r2Zdaemonstartrrrr4r _finalize_joinweakrefrefr3_finalize_closer6)r"Zcreated_by_this_processr%r%r&r?s*            zQueue._start_threadcCsDtd|}|dk r6|jtdn tddS)Nzjoining queue threadz... queue thread joinedz... queue thread already dead)r join)ZtwrZthreadr%r%r&r_s      zQueue._finalize_joinc Cs3td||jt|jWdQRXdS)Nztelling queue thread to quit)r r@ _sentinelrA)buffernotemptyr%r%r&rbs  zQueue._finalize_closecCstd|j}|j}|j}|j} t} tjdkrX|j} |j} nd} yx|z|s{|Wd|Xy}xv| } | | krtd|dStj | } | dkr|| q| z|| Wd| XqWWqdt k rYqdXqdWWnt k r}zr|rXt |ddt jkrXdSy3trttd|nddl}|jWnt k rYnXWYdd}~XnXdS)Nz$starting thread to feed data to piperz%feeder thread got sentinel -- exitingerrnorzerror in queue thread: %s)r r>rFwaitpopleftrdrrrdumps IndexError ExceptiongetattrrgZEPIPErr traceback print_exc)rerfr7Z writelockrRZ ignore_epipeZnacquireZnreleaseZnwaitZbpopleftsentinelZwacquireZwreleaserBernr%r%r&r]sV               !   z Queue._feed)__name__ __module__ __qualname__r'r)r+r!rErJrLrMrOrPrQrRrSrUr? staticmethodr_rbr]r%r%r%r&r"s$             & c@saeZdZdddZddZddZdd d d Zd d ZddZd S)rrcCs;tj||d||jd|_|j|_dS)Nr$r)rr'Z Semaphore_unfinished_tasksr-_cond)r"r#r$r%r%r&r'szJoinableQueue.__init__cCstj||j|jfS)N)rr)rwrv)r"r%r%r&r)szJoinableQueue.__getstate__cCs:tj||dd|dd\|_|_dS)Nry)rr+rwrv)r"r*r%r%r&r+ szJoinableQueue.__setstate__TNcCs|j st|jj||s+t|jY|jH|jdkrX|j|j j ||j j |jj WdQRXWdQRXdS)N)r5r=rr>rr.rwr2r?r1r@rvrFrA)r"rBrCrDr%r%r&rE$s  zJoinableQueue.putc CsR|jB|jjds(td|jjjrG|jjWdQRXdS)NFz!task_done() called too many times)rwrvr> ValueErrorrKrNZ notify_all)r"r%r%r& task_done0s   zJoinableQueue.task_donec Cs4|j$|jjjs)|jjWdQRXdS)N)rwrvrKrNrh)r"r%r%r&rc7s zJoinableQueue.join) rrrsrtr'r)r+rEr{rcr%r%r%r&rs    c@sXeZdZddZddZddZddZd d Zd d Zd S)rcCsjtjdd\|_|_|j|_|jj|_tj dkrWd|_ n|j|_ dS)NrFr) rrrrrrr;r<rrr)r"r$r%r%r&r'Bs  zSimpleQueue.__init__cCs |j S)N)r<)r"r%r%r&rMKszSimpleQueue.emptycCs)tj||j|j|j|jfS)N)r r(rrrr)r"r%r%r&r)Ns zSimpleQueue.__getstate__cCs"|\|_|_|_|_dS)N)rrrr)r"r*r%r%r&r+RszSimpleQueue.__setstate__c Cs-|j|jj}WdQRXtj|S)N)rrr9rrH)r"rIr%r%r&rJUs zSimpleQueue.getc CsVtj|}|jdkr1|jj|n!|j|jj|WdQRXdS)N)rrjrrr7)r"rBr%r%r&rE[s  zSimpleQueue.putN) rrrsrtr'rMr)r+rJrEr%r%r%r&r@s     )__all__rrr,r/rGr`rgZqueuerrZ_multiprocessingrr utilr r r r rZ reductionrobjectrrdrrr%r%r%r& s"        ( *