jli  Linuxx86_641.10.3v1.10.30b4590a5507d3f3046e5bafc007cacbbfc9b310b߸TranscodingStreams(P;BŦ&Y07&F/opt/julia/packages/TranscodingStreams/o7roJ/src/TranscodingStreams.jl#NA:/opt/julia/packages/TranscodingStreams/o7roJ/src/memory.jl#NA:/opt/julia/packages/TranscodingStreams/o7roJ/src/buffer.jl#NA9/opt/julia/packages/TranscodingStreams/o7roJ/src/error.jl#NA9/opt/julia/packages/TranscodingStreams/o7roJ/src/codec.jl#NA9/opt/julia/packages/TranscodingStreams/o7roJ/src/state.jl#NA:/opt/julia/packages/TranscodingStreams/o7roJ/src/stream.jl#NA6/opt/julia/packages/TranscodingStreams/o7roJ/src/io.jl#NA8/opt/julia/packages/TranscodingStreams/o7roJ/src/noop.jl#NA=/opt/julia/packages/TranscodingStreams/o7roJ/src/transcode.jl#NA1 CoremуJ5Basemу]J5MainmуJ5ArgToolsBń x(mуF K5 Artifactsmr-V3|mу K5Base64UlD*_mу> K5CRC32c\y.jmуj K5 FileWatchingXzsy`{,zmуh& K5LibdluVW59˗,mу-" K5LoggingT{VhUXM=mуrU" K5MmapP~:xg,Omу|' K5NetworkOptionsC0YW,mуʠ, K5SHAQ<$!<%mу1 K5 Serialization [)*k1mу-G K5Sockets1V$ bdސݗmуYBY K5UnicodeP>I>Nrmуeszo K5 LinearAlgebraSm7̏mуuux K5 OpenBLAS_jll[(Śb6EcQ FmуDux K5libblastrampoline_jllLSۆ }lxӠmу^} K5MarkdownZPn7z`smу/Ed~ K5Printfg^cX׸QDmу;h K5Random_ɢ?\Ymу? K5TarOi>աmу!t, K5DatesEY8pj2 mуX K5FuturebS;3{I xVMmуsD K5InteractiveUtilsWL ~@'ZmуVg K5LibGit2Z[&RPTv3EКRmу8J K5 LibGit2_jll YXg}]$mуD K5 MbedTLS_jllAX 3ȡ_mу- K5 LibSSH2_jlloTZk)߆||<-------->||<-------->| # data ....xxxxxxxxxxxxXXXXXXXXXXXX............ # ^ ^ ^ ^ ^ # position 1 markpos bufferpos marginpos lastindex(data) # # `markpos` is positive iff there are marked data; otherwise it is set to zero. # `markpos` ≤ `marginpos` and `bufferpos` ≤ `marginpos` must hold. mutable struct Buffer # data and positions (see above) data::Vector{UInt8} markpos::Int bufferpos::Int marginpos::Int # the total number of transcoded bytes transcoded::Int64 function Buffer(data::Vector{UInt8}, marginpos::Integer=length(data)+1) @assert 1 <= marginpos <= length(data)+1 return new(data, 0, 1, marginpos, 0) end end function Buffer(size::Integer = 0) return Buffer(Vector{UInt8}(undef, size), 1) end function Buffer(data::Base.CodeUnits{UInt8}, args...) return Buffer(Vector{UInt8}(data), args...) end function Base.length(buf::Buffer) return length(buf.data) end function bufferptr(buf::Buffer) return pointer(buf.data, buf.bufferpos) end function buffersize(buf::Buffer) return buf.marginpos - buf.bufferpos end function buffermem(buf::Buffer) return Memory(bufferptr(buf), buffersize(buf)) end function marginptr(buf::Buffer) return pointer(buf.data, buf.marginpos) end function marginsize(buf::Buffer) return lastindex(buf.data) - buf.marginpos + 1 end function marginmem(buf::Buffer) return Memory(marginptr(buf), marginsize(buf)) end function ismarked(buf::Buffer) return buf.markpos != 0 end function mark!(buf::Buffer) return buf.markpos = buf.bufferpos end function unmark!(buf::Buffer) if buf.markpos == 0 return false else buf.markpos = 0 return true end end function reset!(buf::Buffer) @assert buf.markpos > 0 @assert buf.markpos ≤ buf.marginpos buf.bufferpos = buf.markpos buf.markpos = 0 return buf.bufferpos end # Notify that `n` bytes are consumed from `buf`. function consumed!(buf::Buffer, n::Integer; transcode::Bool = false) buf.bufferpos += n if transcode buf.transcoded += n end return buf end # Notify that `n` bytes are supplied to `buf`. function supplied!(buf::Buffer, n::Integer; transcode::Bool = false) buf.marginpos += n if transcode buf.transcoded += n end return buf end # Discard buffered data and initialize positions. function initbuffer!(buf::Buffer) buf.markpos = buf.transcoded = 0 buf.bufferpos = buf.marginpos = 1 return buf end # Remove all buffered data. function emptybuffer!(buf::Buffer) buf.markpos = 0 buf.bufferpos = buf.marginpos = 1 return buf end # Make margin with ≥`minsize` and return the size of it. # If eager is true, it tries to move data even when the buffer has enough margin. function makemargin!(buf::Buffer, minsize::Int; eager::Bool = false) @assert minsize ≥ 0 if buffersize(buf) == 0 && buf.markpos == 0 buf.bufferpos = buf.marginpos = 1 end if marginsize(buf) < minsize || eager # datapos refer to the leftmost position of data that must not be # discarded. We can left-shift to discard all data before this datapos = if iszero(buf.markpos) # If data is not marked we must not discard buffered (nonconsumed) data buf.bufferpos else # Else, we must not consume marked or buffered data min(buf.markpos, buf.bufferpos) end datasize = buf.marginpos - datapos # Shift data left in buffer to make space for new data copyto!(buf.data, 1, buf.data, datapos, datasize) shift = datapos - 1 if buf.markpos > 0 buf.markpos -= shift end buf.bufferpos -= shift buf.marginpos -= shift end # If there is still not enough margin, we expand buffer. # At least enough for minsize, but otherwise 1.5 times if marginsize(buf) < minsize datasize = length(buf.data) resize!(buf.data, max(Base.checked_add(buf.marginpos, minsize) - 1, datasize + div(datasize, 2))) end @assert marginsize(buf) ≥ minsize return marginsize(buf) end # Read a byte. function readbyte!(buf::Buffer) b = buf.data[buf.bufferpos] consumed!(buf, 1) return b end # Write a byte. function writebyte!(buf::Buffer, b::UInt8) buf.data[buf.marginpos] = b supplied!(buf, 1) return 1 end # Skip `n` bytes in the buffer. function skipbuffer!(buf::Buffer, n::Integer) @assert n ≤ buffersize(buf) consumed!(buf, n) return buf end # Copy data from `data` to `buf`. function copydata!(buf::Buffer, data::Ptr{UInt8}, nbytes::Integer) makemargin!(buf, Int(nbytes)) GC.@preserve buf unsafe_copyto!(marginptr(buf), data, nbytes) supplied!(buf, nbytes) return buf end # Copy data from `buf` to `data`. function copydata!(data::Ptr{UInt8}, buf::Buffer, nbytes::Integer) # NOTE: It's caller's responsibility to ensure that the buffer has at least # nbytes. @assert buffersize(buf) ≥ nbytes GC.@preserve buf unsafe_copyto!(data, bufferptr(buf), nbytes) consumed!(buf, nbytes) return data end # Insert data to the current buffer. # `data` must not alias `buf` function insertdata!(buf::Buffer, data::Union{AbstractArray{UInt8}, Memory}) nbytes = Int(length(data)) makemargin!(buf, nbytes) datapos = if iszero(buf.markpos) # If data is not marked we must not discard buffered (nonconsumed) data buf.bufferpos else # Else, we must not consume marked or buffered data min(buf.markpos, buf.bufferpos) end datasize = buf.marginpos - datapos copyto!(buf.data, datapos + nbytes, buf.data, datapos, datasize) for i in 0:nbytes-1 buf.data[buf.bufferpos + i] = data[firstindex(data) + i] end supplied!(buf, nbytes) if !iszero(buf.markpos) buf.markpos += nbytes end @assert buf.markpos ≤ buf.marginpos return buf end # Find the first occurrence of a specific byte. function findbyte(buf::Buffer, byte::UInt8) p = GC.@preserve buf ccall( :memchr, Ptr{UInt8}, (Ptr{UInt8}, Cint, Csize_t), pointer(buf.data, buf.bufferpos), byte, buffersize(buf)) if p == C_NULL return marginptr(buf) else return p end end 9/opt/julia/packages/TranscodingStreams/o7roJ/src/error.jlP# Transcoding Error # ================= """ Container of transcoding error. An object of this type is used to notify the caller of an exception that happened inside a transcoding method. The `error` field is undefined at first but will be filled when data processing failed. The error should be set by calling the `setindex!` method (e.g. `error[] = ErrorException("error!")`). """ mutable struct Error error::Exception function Error() return new() end end # Test if an exception is set. function haserror(error::Error) return isdefined(error, :error) end function Base.setindex!(error::Error, ex::Exception) @assert !haserror(error) "an error is already set" error.error = ex return error end function Base.getindex(error::Error) @assert haserror(error) "no error is set" return error.error end 9/opt/julia/packages/TranscodingStreams/o7roJ/src/codec.jl# Codec Interfaces # ================ """ An abstract codec type. Any codec supporting the transcoding protocol must be a subtype of this type. Transcoding protocol -------------------- Transcoding proceeds by calling some functions in a specific way. We call this "transcoding protocol" and any codec must implement it as described below. There are six functions for a codec to implement: - `expectedsize`: return the expected size of transcoded data - `minoutsize`: return the minimum output size of `process` - `initialize`: initialize the codec - `finalize`: finalize the codec - `startproc`: start processing with the codec - `process`: process data with the codec. These are defined in the `TranscodingStreams` and a new codec type must extend these methods if necessary. Implementing a `process` method is mandatory but others are optional. `expectedsize`, `minoutsize`, `initialize`, `finalize`, and `startproc` have a default implementation. Your codec type is denoted by `C` and its object by `codec`. Errors that occur in these methods are supposed to be unrecoverable and the stream will go to the panic mode. Only `Base.isopen` and `Base.close` are available in that mode. ### `expectedsize` The `expectedsize(codec::C, input::Memory)::Int` method takes `codec` and `input`, and returns the expected size of transcoded data. This method will be used as a hint to determine the size of a data buffer when `transcode` is called. A good hint will reduce the number of buffer resizing and hence result in better performance. ### `minoutsize` The `minoutsize(codec::C, input::Memory)::Int` method takes `codec` and `input`, and returns the minimum required size of the output memory when `process` is called. For example, an encoder of base64 will write at least four bytes to the output and hence it is reasonable to return 4 with this method. ### `initialize` The `initialize(codec::C)::Void` method takes `codec` and returns `nothing`. This is called once and only once before starting any data processing. Therefore, you may initialize `codec` (e.g. allocating memory needed to process data) with this method. If initialization fails for some reason, it may throw an exception and no other methods (including `finalize`) will be called. Therefore, you need to release the memory before throwing an exception. ### `finalize` The `finalize(codec::C)::Void` method takes `codec` and returns `nothing`. This is called once and only only once just before the transcoding stream goes to the close mode (i.e. when `Base.close` is called) or just after `startproc` or `process` throws an exception. Other errors that happen inside the stream (e.g. `EOFError`) will not call this method. Therefore, you may finalize `codec` (e.g. freeing memory) with this method. If finalization fails for some reason, it may throw an exception. You should release the allocated memory in codec before returning or throwing an exception in `finalize` because otherwise nobody cannot release the memory. Even when an exception is thrown while finalizing a stream, the stream will become the close mode for safety. ### `startproc` The `startproc(codec::C, mode::Symbol, error::Error)::Symbol` method takes `codec`, `mode` and `error`, and returns a status code. This is called just before the stream starts reading or writing data. `mode` is either `:read` or `:write` and then the stream starts reading or writing, respectively. The return code must be `:ok` if `codec` is ready to read or write data. Otherwise, it must be `:error` and the `error` argument must be set to an exception object. ### `process` The `process(codec::C, input::Memory, output::Memory, error::Error)::Tuple{Int,Int,Symbol}` method takes `codec`, `input`, `output` and `error`, and returns a consumed data size, a produced data size and a status code. This is called repeatedly while processing data. The input (`input`) and output (`output`) data are a `Memory` object, which is a pointer to a contiguous memory region with size. You must read input data from `input`, transcode the bytes, and then write the output data to `output`. Finally you need to return the size of read data, the size of written data, and `:ok` status code so that the caller can know how many bytes are consumed and produced in the method. When transcoding reaches the end of a data stream, it is notified to this method by empty input. In that case, the method need to write the buffered data (if any) to `output`. If there is no data to write, the status code must be set to `:end`. The `process` method will be called repeatedly until it returns `:end` status code. If an error happens while processing data, the `error` argument must be set to an exception object and the return code must be `:error`. """ abstract type Codec end # Methods # ------- """ expectedsize(codec::Codec, input::Memory)::Int Return the expected size of the transcoded `input` with `codec`. The default method returns `input.size`. """ function expectedsize(codec::Codec, input::Memory)::Int return input.size end """ minoutsize(codec::Codec, input::Memory)::Int Return the minimum output size to be ensured when calling `process`. The default method returns `max(1, div(input.size, 4))`. """ function minoutsize(codec::Codec, input::Memory)::Int return max(1, div(input.size, 4)) end """ initialize(codec::Codec)::Void Initialize `codec`. The default method does nothing. """ function initialize(codec::Codec) return nothing end """ finalize(codec::Codec)::Void Finalize `codec`. The default method does nothing. """ function finalize(codec::Codec)::Nothing return nothing end """ startproc(codec::Codec, mode::Symbol, error::Error)::Symbol Start data processing with `codec` of `mode`. The default method does nothing and returns `:ok`. """ function startproc(codec::Codec, mode::Symbol, error::Error)::Symbol return :ok end """ process(codec::Codec, input::Memory, output::Memory, error::Error)::Tuple{Int,Int,Symbol} Do data processing with `codec`. There is no default method. """ function process(codec::Codec, input::Memory, output::Memory, error::Error)::Tuple{Int,Int,Symbol} # no default method throw(MethodError(process, (codec, input, output, error))) end 9/opt/julia/packages/TranscodingStreams/o7roJ/src/state.jl# Transcoding State # ================= # See docs/src/devnotes.md. """ A mutable state type of transcoding streams. See Developer's notes for details. """ mutable struct State # current stream mode mode::Symbol # {:idle, :read, :write, :stop, :close, :panic} # return code of the last method call code::Symbol # {:ok, :end, :error} # flag to go :stop on :end while reading stop_on_end::Bool # exception thrown while data processing error::Error # data buffers buffer1::Buffer buffer2::Buffer function State(buffer1::Buffer, buffer2::Buffer) return new(:idle, :ok, false, Error(), buffer1, buffer2) end end function State(size::Integer) return State(Buffer(size), Buffer(size)) end :/opt/julia/packages/TranscodingStreams/o7roJ/src/stream.jl.`# Transcoding Stream # ================== # Data Flow # --------- # # When reading data (`state.mode == :read`): # user <--- |state.buffer1| <--- <--- |state.buffer2| <--- stream # # When writing data (`state.mode == :write`): # user ---> |state.buffer1| ---> ---> |state.buffer2| ---> stream struct TranscodingStream{C<:Codec,S<:IO} <: IO # codec object codec::C # source/sink stream stream::S # mutable state of the stream state::State # data buffers buffer1::Buffer buffer2::Buffer function TranscodingStream{C,S}( codec::C, stream::S, state::State, initialized::Bool) where {C<:Codec,S<:IO} if !isopen(stream) throw(ArgumentError("closed stream")) elseif state.mode != :idle throw(ArgumentError("invalid initial mode")) end if !initialized initialize(codec) end return new(codec, stream, state, state.buffer1, state.buffer2) end end function TranscodingStream(codec::C, stream::S, state::State; initialized::Bool=false) where {C<:Codec,S<:IO} return TranscodingStream{C,S}(codec, stream, state, initialized) end const DEFAULT_BUFFER_SIZE = 16 * 2^10 # 16KiB function checkbufsize(bufsize::Integer) if bufsize ≤ 0 throw(ArgumentError("non-positive buffer size")) end end function checksharedbuf(sharedbuf::Bool, stream::IO) if sharedbuf && !(stream isa TranscodingStream) throw(ArgumentError("invalid stream type for sharedbuf=true")) end end """ TranscodingStream(codec::Codec, stream::IO; bufsize::Integer=$(DEFAULT_BUFFER_SIZE), stop_on_end::Bool=false, sharedbuf::Bool=(stream isa TranscodingStream)) Create a transcoding stream with `codec` and `stream`. A `TranscodingStream` object wraps an input/output stream object `stream`, and transcodes the byte stream using `codec`. It is a subtype of `IO` and supports most of the I/O functions in the standard library. See the docs () for available codecs, examples, and more details of the type. Arguments --------- - `codec`: The data transcoder. The transcoding stream does the initialization and finalization of `codec`. Therefore, a codec object is not reusable once it is passed to a transcoding stream. - `stream`: The wrapped stream. It must be opened before passed to the constructor. - `bufsize`: The initial buffer size (the default size is 16KiB). The buffer may be extended whenever `codec` requests so. - `stop_on_end`: The flag to stop reading on `:end` return code from `codec`. The transcoded data are readable even after stopping transcoding process. With this flag on, `stream` is not closed when the wrapper stream is closed with `close`. Note that if reading some extra data may be read from `stream` into an internal buffer, and thus `stream` must be a `TranscodingStream` object and `sharedbuf` must be `true` to reuse `stream`. - `sharedbuf`: The flag to share buffers between adjacent transcoding streams. The value must be `false` if `stream` is not a `TranscodingStream` object. Examples -------- ```jldoctest julia> using TranscodingStreams julia> file = open(joinpath(dirname(dirname(pathof(TranscodingStreams))), "README.md")); julia> stream = TranscodingStream(Noop(), file); julia> readline(file) "TranscodingStreams.jl" julia> close(stream) ``` """ function TranscodingStream(codec::Codec, stream::IO; bufsize::Integer=DEFAULT_BUFFER_SIZE, stop_on_end::Bool=false, sharedbuf::Bool=(stream isa TranscodingStream)) checkbufsize(bufsize) checksharedbuf(sharedbuf, stream) if sharedbuf # Here, the compiler cannot infer at compile time that the # stream must be a TranscodingStream, so we need to help the # compiler along. See https://github.com/JuliaIO/TranscodingStreams.jl/pull/111 stream::TranscodingStream state = State(Buffer(bufsize), stream.buffer1) else state = State(bufsize) end state.stop_on_end = stop_on_end return TranscodingStream(codec, stream, state) end function Base.show(io::IO, stream::TranscodingStream) print(io, summary(stream), "()") end # Split keyword arguments. @nospecialize @static if isdefined(Base, :Pairs) splitkwargs(kwargs::Base.Pairs, ks::Tuple{Vararg{Symbol}}) = splitkwargs(NamedTuple(kwargs), ks) end function splitkwargs(kwargs::NamedTuple, ks::Tuple{Vararg{Symbol}}) non_ks = Base.diff_names(keys(kwargs), ks) ks = Base.diff_names(keys(kwargs), non_ks) return NamedTuple{ks}(kwargs), NamedTuple{non_ks}(kwargs) end function splitkwargs(kwargs, keys) hits = [] others = [] for kwarg in kwargs push!(kwarg[1] ∈ keys ? hits : others, kwarg) end return hits, others end @specialize # throw ArgumentError that mode is invalid. throw_invalid_mode(mode) = throw(ArgumentError(string("invalid mode :", mode))) # Return true if the stream shares buffers with underlying stream function has_sharedbuf(stream::TranscodingStream)::Bool stream.stream isa TranscodingStream && stream.buffer2 === stream.stream.buffer1 end # Base IO Functions # ----------------- function Base.open(f::Function, ::Type{T}, args...) where T<:TranscodingStream stream = T(open(args...)) try f(stream) finally close(stream) end end function Base.isopen(stream::TranscodingStream) return stream.state.mode != :close && stream.state.mode != :panic end function Base.isreadable(stream::TranscodingStream)::Bool mode = stream.state.mode (mode === :idle || mode === :read || mode === :stop) && isreadable(stream.stream) end function Base.iswritable(stream::TranscodingStream)::Bool mode = stream.state.mode (mode === :idle || mode === :write) && iswritable(stream.stream) end function Base.close(stream::TranscodingStream) mode = stream.state.mode try if mode != :panic changemode!(stream, :close) end finally if !stream.state.stop_on_end close(stream.stream) end end return nothing end function Base.eof(stream::TranscodingStream) eof = buffersize(stream.buffer1) == 0 state = stream.state mode = state.mode if !(mode == :read || mode == :stop) || eof eof = sloweof(stream) end return eof end @noinline function sloweof(stream::TranscodingStream) while true state = stream.state mode = state.mode if mode == :read return (buffersize(stream.buffer1) == 0 && fillbuffer(stream) == 0) elseif mode == :idle changemode!(stream, :read) continue elseif mode == :write return eof(stream.stream) elseif mode == :close return true elseif mode == :stop return buffersize(stream.buffer1) == 0 elseif mode == :panic throw_panic_error() end @assert false end end function Base.ismarked(stream::TranscodingStream)::Bool checkmode(stream) isopen(stream) && ismarked(stream.buffer1) end function Base.mark(stream::TranscodingStream)::Int64 ready_to_read!(stream) mark!(stream.buffer1) position(stream) end function Base.unmark(stream::TranscodingStream)::Bool checkmode(stream) isopen(stream) && unmark!(stream.buffer1) end function Base.reset(stream::T) where T<:TranscodingStream Base.ismarked(stream) || throw(ArgumentError("$T not marked")) reset!(stream.buffer1) position(stream) end """ position(stream::TranscodingStream) Return the number of bytes read from or written to `stream`. Note that the returned value will be different from that of the underlying stream wrapped by `stream`. This is because `stream` buffers some data and the codec may change the length of data. """ function Base.position(stream::TranscodingStream) mode = stream.state.mode if mode === :idle return Int64(0) elseif mode === :read || mode === :stop return stats(stream).out elseif mode === :write return stats(stream).in else throw_invalid_mode(mode) end @assert false "unreachable" end # Seek Operations # --------------- function Base.seekstart(stream::TranscodingStream) mode = stream.state.mode if mode === :read callstartproc(stream, mode) emptybuffer!(stream.buffer1) emptybuffer!(stream.buffer2) elseif mode === :idle else throw_invalid_mode(mode) end seekstart(stream.stream) return stream end function Base.seekend(stream::TranscodingStream) mode = stream.state.mode if mode == :read callstartproc(stream, mode) emptybuffer!(stream.buffer1) emptybuffer!(stream.buffer2) elseif mode === :idle else throw_invalid_mode(mode) end seekend(stream.stream) return stream end # Read Functions # -------------- function Base.read(stream::TranscodingStream, ::Type{UInt8}) # eof and ready_to_read! are inlined here because ready_to_read! is very slow and eof is broken eof = buffersize(stream.buffer1) == 0 state = stream.state mode = state.mode if !(mode == :read || mode == :stop) changemode!(stream, :read) end if eof && sloweof(stream) throw(EOFError()) end return readbyte!(stream.buffer1) end function Base.readuntil(stream::TranscodingStream, delim::UInt8; keep::Bool=false) ready_to_read!(stream) buffer1 = stream.buffer1 # delay initialization so as to reduce the number of buffer resizes local ret::Vector{UInt8} filled = 0 while !eof(stream) p = findbyte(buffer1, delim) found = false if p < marginptr(buffer1) found = true sz = Int(p + 1 - bufferptr(buffer1)) if !keep sz -= 1 end else sz = buffersize(buffer1) end if @isdefined(ret) resize!(ret, filled + sz) else @assert filled == 0 ret = Vector{UInt8}(undef, sz) end GC.@preserve ret copydata!(pointer(ret, filled+1), buffer1, sz) filled += sz if found if !keep # skip the delimiter skipbuffer!(buffer1, 1) end break end end if !@isdefined(ret) # special case: stream is empty ret = UInt8[] end return ret end """ skip(stream::TranscodingStream, offset) Read bytes from `stream` until `offset` bytes have been read or `eof(stream)` is reached. Return `stream`, discarding read bytes. This function will not throw an `EOFError` if `eof(stream)` is reached before `offset` bytes can be read. """ function Base.skip(stream::TranscodingStream, offset::Integer) if offset < 0 # TODO support negative offset if stream is marked throw(ArgumentError("negative offset")) end ready_to_read!(stream) buffer1 = stream.buffer1 skipped = 0 while skipped < offset && !eof(stream) n = min(buffersize(buffer1), offset - skipped) skipbuffer!(buffer1, n) skipped += n end return stream end function Base.unsafe_read(stream::TranscodingStream, output::Ptr{UInt8}, nbytes::UInt) ready_to_read!(stream) buffer = stream.buffer1 p = output p_end = output + nbytes while p < p_end && !eof(stream) m = min(buffersize(buffer), p_end - p) copydata!(p, buffer, m) p += m GC.safepoint() end if p < p_end throw(EOFError()) end return end function Base.readbytes!(stream::TranscodingStream, b::DenseArray{UInt8}, nb=length(b)) ready_to_read!(stream) filled = 0 resized = false while filled < nb && !eof(stream) if length(b) == filled resize!(b, min(max(length(b) * 2, 8), nb)) resized = true end filled += GC.@preserve b unsafe_read(stream, pointer(b, filled+1), min(length(b), nb)-filled) end if resized resize!(b, filled) end return filled end function Base.bytesavailable(stream::TranscodingStream) ready_to_read!(stream) return buffersize(stream.buffer1) end function Base.readavailable(stream::TranscodingStream) n = bytesavailable(stream) data = Vector{UInt8}(undef, n) GC.@preserve data unsafe_read(stream, pointer(data), n) return data end """ unread(stream::TranscodingStream, data::AbstractVector{UInt8}) Insert `data` to the current reading position of `stream`. The next `read(stream, sizeof(data))` call will read data that are just inserted. `data` must not alias any internal buffers in `stream` """ function unread(stream::TranscodingStream, data::AbstractVector{UInt8}) insertdata!(stream.buffer1, data) return nothing end """ unsafe_unread(stream::TranscodingStream, data::Ptr, nbytes::Integer) Insert `nbytes` pointed by `data` to the current reading position of `stream`. The data are copied into the internal buffer and hence `data` can be safely used after the operation without interfering the stream. `data` must not alias any internal buffers in `stream` """ function unsafe_unread(stream::TranscodingStream, data::Ptr, nbytes::Integer) if nbytes < 0 throw(ArgumentError("negative nbytes")) end ready_to_read!(stream) insertdata!(stream.buffer1, Memory(convert(Ptr{UInt8}, data), UInt(nbytes))) return nothing end # Ready to read data from the stream. function ready_to_read!(stream::TranscodingStream) mode = stream.state.mode if !(mode == :read || mode == :stop) changemode!(stream, :read) end return end # Write Functions # --------------- # Write nothing. function Base.write(stream::TranscodingStream) changemode!(stream, :write) return 0 end function Base.write(stream::TranscodingStream, b::UInt8) changemode!(stream, :write) if marginsize(stream.buffer1) == 0 && flushbuffer(stream) == 0 return 0 end return writebyte!(stream.buffer1, b) end function Base.unsafe_write(stream::TranscodingStream, input::Ptr{UInt8}, nbytes::UInt) changemode!(stream, :write) state = stream.state buffer1 = stream.buffer1 p = input p_end = p + nbytes while p < p_end && (marginsize(buffer1) > 0 || flushbuffer(stream) > 0) m = min(marginsize(buffer1), p_end - p) copydata!(buffer1, p, m) p += m GC.safepoint() end return Int(p - input) end # A singleton type of end token. struct EndToken end """ A special token indicating the end of data. `TOKEN_END` may be written to a transcoding stream like `write(stream, TOKEN_END)`, which will terminate the current transcoding block. !!! note Call `flush(stream)` after `write(stream, TOKEN_END)` to make sure that all data are written to the underlying stream. """ const TOKEN_END = EndToken() function Base.write(stream::TranscodingStream, ::EndToken) changemode!(stream, :write) flushbufferall(stream) flushuntilend(stream) return 0 end function Base.flush(stream::TranscodingStream) checkmode(stream) if stream.state.mode == :write flushbufferall(stream) writedata!(stream.stream, stream.buffer2) end flush(stream.stream) end # Stats # ----- """ I/O statistics. Its object has four fields: - `in`: the number of bytes supplied into the stream - `out`: the number of bytes consumed out of the stream - `transcoded_in`: the number of bytes transcoded from the input buffer - `transcoded_out`: the number of bytes transcoded to the output buffer Note that, since the transcoding stream does buffering, `in` is `transcoded_in + {size of buffered data}` and `out` is `transcoded_out - {size of buffered data}`. """ struct Stats in::Int64 out::Int64 transcoded_in::Int64 transcoded_out::Int64 end function Base.show(io::IO, stats::Stats) println(io, summary(stats), ':') println(io, " in: ", stats.in) println(io, " out: ", stats.out) println(io, " transcoded_in: ", stats.transcoded_in) print(io, " transcoded_out: ", stats.transcoded_out) end """ stats(stream::TranscodingStream) Create an I/O statistics object of `stream`. """ function stats(stream::TranscodingStream) state = stream.state mode = state.mode buffer1 = stream.buffer1 buffer2 = stream.buffer2 if mode === :idle transcoded_in = transcoded_out = in = out = 0 elseif mode === :read || mode === :stop transcoded_in = buffer2.transcoded transcoded_out = buffer1.transcoded in = transcoded_in + buffersize(buffer2) out = transcoded_out - buffersize(buffer1) elseif mode === :write transcoded_in = buffer1.transcoded transcoded_out = buffer2.transcoded in = transcoded_in + buffersize(buffer1) out = transcoded_out - buffersize(buffer2) else throw_invalid_mode(mode) end return Stats(in, out, transcoded_in, transcoded_out) end # Buffering # --------- function fillbuffer(stream::TranscodingStream; eager::Bool = false) changemode!(stream, :read) buffer1 = stream.buffer1 buffer2 = stream.buffer2 nfilled::Int = 0 while ((!eager && buffersize(buffer1) == 0) || (eager && makemargin!(buffer1, 0, eager = true) > 0)) && stream.state.mode != :stop if stream.state.code == :end if buffersize(buffer2) == 0 && eof(stream.stream) break end callstartproc(stream, :read) end makemargin!(buffer2, 1) readdata!(stream.stream, buffer2) _, Δout = callprocess(stream, buffer2, buffer1) nfilled += Δout end return nfilled end function flushbuffer(stream::TranscodingStream, all::Bool=false) changemode!(stream, :write) state = stream.state buffer1 = stream.buffer1 buffer2 = stream.buffer2 nflushed::Int = 0 while (all ? buffersize(buffer1) != 0 : makemargin!(buffer1, 0) == 0) if state.code == :end callstartproc(stream, :write) end writedata!(stream.stream, buffer2) Δin, _ = callprocess(stream, buffer1, buffer2) nflushed += Δin end return nflushed end function flushbufferall(stream::TranscodingStream) return flushbuffer(stream, true) end function flushuntilend(stream::TranscodingStream) changemode!(stream, :write) state = stream.state buffer1 = stream.buffer1 buffer2 = stream.buffer2 while state.code != :end writedata!(stream.stream, buffer2) callprocess(stream, buffer1, buffer2) end writedata!(stream.stream, buffer2) @assert buffersize(buffer1) == 0 return end # Interface to codec # ------------------ # Call `startproc` with epilogne. function callstartproc(stream::TranscodingStream, mode::Symbol) state = stream.state state.code = startproc(stream.codec, mode, state.error) if state.code == :error changemode!(stream, :panic) end return end # Call `process` with prologue and epilogue. function callprocess(stream::TranscodingStream, inbuf::Buffer, outbuf::Buffer) state = stream.state input = buffermem(inbuf) GC.@preserve inbuf makemargin!(outbuf, minoutsize(stream.codec, input)) Δin, Δout, state.code = GC.@preserve inbuf outbuf process(stream.codec, input, marginmem(outbuf), state.error) @debug( "called process()", code = state.code, input_size = buffersize(inbuf), output_size = marginsize(outbuf), input_delta = Δin, output_delta = Δout, ) consumed!(inbuf, Δin, transcode = true) supplied!(outbuf, Δout, transcode = true) if state.code == :error changemode!(stream, :panic) elseif state.code == :ok && Δin == Δout == 0 # When no progress, expand the output buffer. makemargin!(outbuf, max(16, marginsize(outbuf) * 2)) elseif state.code == :end && state.stop_on_end if stream.state.mode == :read if stream.stream isa TranscodingStream && !has_sharedbuf(stream) && !iszero(buffersize(inbuf)) # unread data to match behavior if inbuf was shared. unread(stream.stream, view(inbuf.data, inbuf.bufferpos:inbuf.marginpos-1)) end changemode!(stream, :stop) end end return Δin, Δout end # I/O operations # -------------- # Read as much data as possbile from `input` to the margin of `output`. # This function will not block if `input` has buffered data. function readdata!(input::IO, output::Buffer)::Int if input isa TranscodingStream && input.buffer1 === output # Delegate the operation to the underlying stream for shared buffers. mode::Symbol = input.state.mode if mode === :idle || mode === :read return fillbuffer(input) else return 0 end end nread::Int = 0 navail = bytesavailable(input) if navail == 0 && marginsize(output) > 0 && !eof(input) nread += writebyte!(output, read(input, UInt8)) navail = bytesavailable(input) end n = min(navail, marginsize(output)) GC.@preserve output Base.unsafe_read(input, marginptr(output), n) supplied!(output, n) nread += n return nread end # Write all data to `output` from the buffer of `input`. function writedata!(output::IO, input::Buffer) if output isa TranscodingStream && output.buffer1 === input # Delegate the operation to the underlying stream for shared buffers. return flushbufferall(output) end nwritten::Int = 0 while buffersize(input) > 0 n = GC.@preserve input Base.unsafe_write(output, bufferptr(input), buffersize(input)) consumed!(input, n) nwritten += n GC.safepoint() end GC.safepoint() return nwritten end # Mode Transition # --------------- # Change the current mode. function changemode!(stream::TranscodingStream, newmode::Symbol) state = stream.state mode = state.mode if mode == newmode # mode does not change return elseif newmode == :panic if !haserror(state.error) set_default_error!(state.error) end state.mode = newmode finalize_codec(stream.codec, state.error) throw(state.error[]) elseif mode == :idle if newmode == :read || newmode == :write state.code = startproc(stream.codec, newmode, state.error) if state.code == :error changemode!(stream, :panic) end state.mode = newmode return elseif newmode == :close state.mode = newmode finalize_codec(stream.codec, state.error) return end elseif mode == :read if newmode == :close || newmode == :stop state.mode = newmode finalize_codec(stream.codec, state.error) return end elseif mode == :write if newmode == :close flushbufferall(stream) flushuntilend(stream) state.mode = newmode finalize_codec(stream.codec, state.error) return end elseif mode == :stop if newmode == :close state.mode = newmode return end elseif mode == :panic throw_panic_error() end throw(ArgumentError("cannot change the mode from $(mode) to $(newmode)")) end # Check the current mode and throw an exception if needed. function checkmode(stream::TranscodingStream) if stream.state.mode == :panic throw_panic_error() end end # Throw an argument error (must be called only when the mode is panic). function throw_panic_error() throw(ArgumentError("stream is in unrecoverable error; only isopen and close are callable")) end # Set a defualt error. function set_default_error!(error::Error) error[] = ErrorException("unknown error happened while processing data") end # Call the finalize method of the codec. function finalize_codec(codec::Codec, error::Error) try finalize(codec) catch if haserror(error) throw(error[]) else rethrow() end end end 6/opt/julia/packages/TranscodingStreams/o7roJ/src/io.jlh# IO Functions # ------------ """ unsafe_read(input::IO, output::Ptr{UInt8}, nbytes::Int)::Int Copy at most `nbytes` from `input` into `output`. This function is similar to `Base.unsafe_read` but is different in some points: - It does not throw `EOFError` when it fails to read `nbytes` from `input`. - It returns the number of bytes written to `output`. - It does not block if there are buffered data in `input`. """ function unsafe_read(input::IO, output::Ptr{UInt8}, nbytes::Integer) nbytes = convert(UInt, nbytes) p = output navail = bytesavailable(input) if navail == 0 && nbytes > 0 && !eof(input) b = read(input, UInt8) unsafe_store!(p, b) p += 1 nbytes -= 1 navail = bytesavailable(input) end n = min(navail, nbytes) Base.unsafe_read(input, p, n) p += n return Int(p - output) end 8/opt/julia/packages/TranscodingStreams/o7roJ/src/noop.jl# Noop Codec # ========== """ Noop() Create a noop codec. Noop (no operation) is a codec that does nothing. The data read from or written to the stream are kept as-is without any modification. This is often useful as a buffered stream or an identity element of a composition of streams. The implementations are specialized for this codec. For example, a `Noop` stream uses only one buffer rather than a pair of buffers, which avoids copying data between two buffers and the throughput will be larger than a naive implementation. """ struct Noop <: Codec end const NoopStream{S} = TranscodingStream{Noop,S} where S<:IO """ NoopStream(stream::IO) Create a noop stream. """ function NoopStream(stream::IO; kwargs...) return TranscodingStream(Noop(), stream; kwargs...) end function TranscodingStream(codec::Noop, stream::IO; bufsize::Integer=DEFAULT_BUFFER_SIZE, stop_on_end::Bool=false, sharedbuf::Bool=(stream isa TranscodingStream)) checkbufsize(bufsize) checksharedbuf(sharedbuf, stream) if sharedbuf buffer = stream.buffer1 else buffer = Buffer(bufsize) end state = State(buffer, buffer) state.stop_on_end = stop_on_end return TranscodingStream(codec, stream, state) end """ position(stream::NoopStream) Get the current poition of `stream`. Note that this method may return a wrong position when - some data have been inserted by `TranscodingStreams.unread`, or - the position of the wrapped stream has been changed outside of this package. """ function Base.position(stream::NoopStream)::Int64 mode = stream.state.mode if !isopen(stream) throw_invalid_mode(mode) elseif mode === :idle return Int64(0) elseif has_sharedbuf(stream) return position(stream.stream) elseif mode === :write return position(stream.stream) + buffersize(stream.buffer1) else # read return position(stream.stream) - buffersize(stream.buffer1) end @assert false "unreachable" end function Base.seek(stream::NoopStream, pos::Integer) mode = stream.state.mode if mode === :write flushbuffer(stream) end seek(stream.stream, pos) initbuffer!(stream.buffer1) return stream end function Base.seekstart(stream::NoopStream) mode = stream.state.mode if mode === :write flushbuffer(stream) end seekstart(stream.stream) initbuffer!(stream.buffer1) return stream end function Base.seekend(stream::NoopStream) mode = stream.state.mode if mode === :write flushbuffer(stream) end seekend(stream.stream) initbuffer!(stream.buffer1) return stream end function Base.write(stream::NoopStream, b::UInt8)::Int changemode!(stream, :write) if has_sharedbuf(stream) # directly write data to the underlying stream n = Int(write(stream.stream, b)) return n end buffer1 = stream.buffer1 marginsize(buffer1) > 0 || flushbuffer(stream) return writebyte!(buffer1, b) end function Base.unsafe_write(stream::NoopStream, input::Ptr{UInt8}, nbytes::UInt)::Int changemode!(stream, :write) if has_sharedbuf(stream) # directly write data to the underlying stream n = Int(unsafe_write(stream.stream, input, nbytes)) return n end buffer = stream.buffer1 if marginsize(buffer) ≥ nbytes copydata!(buffer, input, Int(nbytes)) return Int(nbytes) else flushbuffer(stream) # directly write data to the underlying stream n = Int(unsafe_write(stream.stream, input, nbytes)) return n end end initial_output_size(codec::Noop, input::Memory) = length(input) function process(codec::Noop, input::Memory, output::Memory, error::Error) iszero(length(input)) && return (0, 0, :end) n::Int = min(length(input), length(output)) unsafe_copyto!(output.ptr, input.ptr, n) (n, n, :ok) end # Stats # ----- function stats(stream::NoopStream) state = stream.state mode = state.mode buffer = stream.buffer1 @assert buffer === stream.buffer2 if mode === :idle consumed = supplied = 0 elseif mode === :read supplied = buffer.transcoded consumed = supplied - buffersize(buffer) elseif mode === :write supplied = buffer.transcoded + buffersize(buffer) consumed = buffer.transcoded else throw_invalid_mode(mode) end return Stats(consumed, supplied, supplied, supplied) end # Buffering # --------- # # These methods are overloaded for the `Noop` codec because it has only one # buffer for efficiency. function fillbuffer(stream::NoopStream; eager::Bool = false)::Int changemode!(stream, :read) buffer = stream.buffer1 @assert buffer === stream.buffer2 if has_sharedbuf(stream) # Delegate the operation when buffers are shared. underlying_mode::Symbol = stream.stream.state.mode if underlying_mode === :idle || underlying_mode === :read return fillbuffer(stream.stream, eager = eager) else return 0 end end nfilled::Int = 0 while ((!eager && buffersize(buffer) == 0) || (eager && makemargin!(buffer, 0, eager = true) > 0)) && !eof(stream.stream) makemargin!(buffer, 1) nfilled += readdata!(stream.stream, buffer) end buffer.transcoded += nfilled return nfilled end function flushbuffer(stream::NoopStream, all::Bool=false) changemode!(stream, :write) buffer = stream.buffer1 @assert buffer === stream.buffer2 nflushed::Int = 0 if all while buffersize(buffer) > 0 nflushed += writedata!(stream.stream, buffer) end else nflushed += writedata!(stream.stream, buffer) makemargin!(buffer, 0) end buffer.transcoded += nflushed return nflushed end function flushuntilend(stream::NoopStream) stream.buffer1.transcoded += writedata!(stream.stream, stream.buffer1) return end =/opt/julia/packages/TranscodingStreams/o7roJ/src/transcode.jl# Transcode # ========= """ transcode( ::Type{C}, data::Union{Vector{UInt8},Base.CodeUnits{UInt8}}, )::Vector{UInt8} where {C<:Codec} Transcode `data` by applying a codec `C()`. Note that this method does allocation and deallocation of `C()` in every call, which is handy but less efficient when transcoding a number of objects. `transcode(codec, data)` is a recommended method in terms of performance. Examples -------- ```julia julia> using CodecZlib julia> data = b"abracadabra"; julia> compressed = transcode(ZlibCompressor, data); julia> decompressed = transcode(ZlibDecompressor, compressed); julia> String(decompressed) "abracadabra" ``` """ function Base.transcode(::Type{C}, args...) where {C<:Codec} codec = C() initialize(codec) try return transcode(codec, args...) finally finalize(codec) end end # Disambiguate `Base.transcode(::Type{C}, args...)` above from # `Base.transcode(T, ::String)` in Julia `Base` function Base.transcode(codec::Type{C}, src::String) where {C<:Codec} return invoke(transcode, Tuple{Any, String}, codec, src) end _default_output_buffer(codec, input) = Buffer( initial_output_size( codec, buffermem(input) ) ) """ transcode( codec::Codec, data::Union{Vector{UInt8},Base.CodeUnits{UInt8},Buffer}, [output::Union{Vector{UInt8},Base.CodeUnits{UInt8},Buffer}], )::Vector{UInt8} Transcode `data` by applying `codec`. If `output` is unspecified, then this method will allocate it. Note that this method does not initialize or finalize `codec`. This is efficient when you transcode a number of pieces of data, but you need to call [`TranscodingStreams.initialize`](@ref) and [`TranscodingStreams.finalize`](@ref) explicitly. Examples -------- ```julia julia> using CodecZlib julia> data = b"abracadabra"; julia> codec = ZlibCompressor(); julia> TranscodingStreams.initialize(codec) julia> compressed = Vector{UInt8}() julia> transcode(codec, data, compressed); julia> TranscodingStreams.finalize(codec) julia> codec = ZlibDecompressor(); julia> TranscodingStreams.initialize(codec) julia> decompressed = transcode(codec, compressed); julia> TranscodingStreams.finalize(codec) julia> String(decompressed) "abracadabra" ``` """ function Base.transcode( codec::Codec, input::Buffer, output::Union{Buffer,Nothing} = nothing, ) output = (output === nothing ? _default_output_buffer(codec, input) : initbuffer!(output)) transcode!(output, codec, input) end """ transcode!(output::Buffer, codec::Codec, input::Buffer) Transcode `input` by applying `codec` and storing the results in `output` with validation of input and output. Note that this method does not initialize or finalize `codec`. This is efficient when you transcode a number of pieces of data, but you need to call [`TranscodingStreams.initialize`](@ref) and [`TranscodingStreams.finalize`](@ref) explicitly. """ function transcode!( output::Buffer, codec::Codec, input::Buffer, ) Base.mightalias(input.data, output.data) && error( "input and outbut buffers must be independent" ) unsafe_transcode!(output, codec, input) end """ unsafe_transcode!(output::Buffer, codec::Codec, input::Buffer) Transcode `input` by applying `codec` and storing the results in `output` without validation of input or output. Note that this method does not initialize or finalize `codec`. This is efficient when you transcode a number of pieces of data, but you need to call [`TranscodingStreams.initialize`](@ref) and [`TranscodingStreams.finalize`](@ref) explicitly. """ function unsafe_transcode!( output::Buffer, codec::Codec, input::Buffer, ) error = Error() code = startproc(codec, :write, error) if code === :error @goto error end n = minoutsize(codec, buffermem(input)) @label process makemargin!(output, n) Δin, Δout, code = process(codec, buffermem(input), marginmem(output), error) @debug( "called process()", code = code, input_size = buffersize(input), output_size = marginsize(output), input_delta = Δin, output_delta = Δout, ) consumed!(input, Δin) supplied!(output, Δout) if code === :error @goto error elseif code === :end if buffersize(input) > 0 if startproc(codec, :write, error) === :error @goto error end n = minoutsize(codec, buffermem(input)) @goto process end resize!(output.data, output.marginpos - 1) return output.data else n = max(Δout, minoutsize(codec, buffermem(input))) @goto process end @label error if !haserror(error) set_default_error!(error) end throw(error[]) end Base.transcode(codec::Codec, data::Buffer, output::ByteData) = transcode(codec, data, Buffer(output)) Base.transcode(codec::Codec, data::ByteData, args...) = transcode(codec, Buffer(data), args...) unsafe_transcode!(codec::Codec, data::Buffer, output::ByteData) = unsafe_transcode!(Buffer(output), codec, data) unsafe_transcode!(codec::Codec, data::ByteData, args...) = unsafe_transcode!(codec, Buffer(data), args...) # Return the initial output buffer size. function initial_output_size(codec::Codec, input::Memory) return max( minoutsize(codec, input), expectedsize(codec, input), 8, # just in case where both minoutsize and expectedsize are foolish ) end p퇟J