|
16 | 16 | BufferedInputStream
|
17 | 17 | InputStream)))
|
18 | 18 |
|
| 19 | +(def ^:private ^:const STREAM_MARK_LIMIT 512) |
| 20 | +(def ^:private ^:const DEFAULT_BUFFER_SIZE 8192) |
| 21 | + |
| 22 | +(def ^:private ^ThreadLocal buffer-pool |
| 23 | + (proxy [ThreadLocal] [] |
| 24 | + (initialValue [] |
| 25 | + (byte-array DEFAULT_BUFFER_SIZE)))) |
| 26 | + |
| 27 | +(defn- get-buffer |
| 28 | + "Gets a reusable buffer from the thread-local pool" |
| 29 | + [] |
| 30 | + (.get buffer-pool)) |
| 31 | + |
19 | 32 | (defn str->bytes
|
20 | 33 | "Returns the encoding's bytes corresponding to the given string. If no
|
21 | 34 | encoding is specified, UTF-8 is used."
|
22 | 35 | [^String s & [^String encoding]]
|
23 |
| - (.getBytes s (or encoding "UTF-8"))) |
| 36 | + (.getBytes s ^String (or encoding "UTF-8"))) |
24 | 37 |
|
25 | 38 | (defn bytes->str
|
26 | 39 | "Returns the String corresponding to the given encoding's decoding of the
|
27 | 40 | given bytes. If no encoding is specified, UTF-8 is used."
|
28 | 41 | [^bytes b & [^String encoding]]
|
29 |
| - (String. b (or encoding "UTF-8"))) |
| 42 | + (String. b ^String (or encoding "UTF-8"))) |
30 | 43 |
|
31 | 44 | (defn gunzip
|
32 | 45 | "Returns a gunzip'd version of the given byte array."
|
|
43 | 56 | [b]
|
44 | 57 | (when b
|
45 | 58 | (let [baos (ByteArrayOutputStream.)
|
46 |
| - gos (GZIPOutputStream. baos)] |
47 |
| - (IOUtils/copy (ByteArrayInputStream. b) gos) |
| 59 | + gos (GZIPOutputStream. baos ^int DEFAULT_BUFFER_SIZE) |
| 60 | + buffer (get-buffer) |
| 61 | + bis (ByteArrayInputStream. b)] |
| 62 | + (loop [] |
| 63 | + (let [n (.read bis buffer 0 DEFAULT_BUFFER_SIZE)] |
| 64 | + (when (pos? n) |
| 65 | + (.write gos buffer 0 n) |
| 66 | + (recur)))) |
48 | 67 | (.close gos)
|
49 | 68 | (.toByteArray baos))))
|
50 | 69 |
|
|
66 | 85 | (let [stream (BufferedInputStream. (if (instance? InputStream b)
|
67 | 86 | b
|
68 | 87 | (ByteArrayInputStream. b)))
|
69 |
| - _ (.mark stream 512) |
| 88 | + _ (.mark stream STREAM_MARK_LIMIT) |
70 | 89 | iis (InflaterInputStream. stream)
|
71 | 90 | readable? (try (.read iis) true
|
72 | 91 | (catch ZipException _ false))]
|
|
138 | 157 | (defn sha-512
|
139 | 158 | ^bytes [^bytes b]
|
140 | 159 | (wrap-digest "SHA-512" b))
|
| 160 | + |
| 161 | +;; Streaming API for large data |
| 162 | + |
| 163 | +(defn deflate-stream |
| 164 | + "Returns a DeflaterInputStream for streaming deflation. |
| 165 | + Useful for large files that shouldn't be loaded entirely into memory." |
| 166 | + ([^InputStream input-stream] |
| 167 | + (DeflaterInputStream. input-stream)) |
| 168 | + ([^InputStream input-stream level] |
| 169 | + (DeflaterInputStream. input-stream (Deflater. level)))) |
| 170 | + |
| 171 | +(defn inflate-stream |
| 172 | + "Returns an InflaterInputStream for streaming inflation. |
| 173 | + Useful for large files that shouldn't be loaded entirely into memory." |
| 174 | + [^InputStream input-stream] |
| 175 | + (let [stream (BufferedInputStream. input-stream) |
| 176 | + _ (.mark stream STREAM_MARK_LIMIT) |
| 177 | + iis (InflaterInputStream. stream) |
| 178 | + readable? (try (.read iis) true |
| 179 | + (catch ZipException _ false))] |
| 180 | + (.reset stream) |
| 181 | + (if readable? |
| 182 | + (InflaterInputStream. stream) |
| 183 | + (InflaterInputStream. stream (Inflater. true))))) |
| 184 | + |
| 185 | +(defn gzip-stream |
| 186 | + "Returns a GZIPOutputStream for streaming gzip compression. |
| 187 | + Useful for large files that shouldn't be loaded entirely into memory." |
| 188 | + ^GZIPOutputStream |
| 189 | + ([^java.io.OutputStream output-stream] |
| 190 | + (GZIPOutputStream. output-stream ^int DEFAULT_BUFFER_SIZE)) |
| 191 | + ([^java.io.OutputStream output-stream ^Integer buffer-size] |
| 192 | + (GZIPOutputStream. output-stream ^int buffer-size))) |
| 193 | + |
| 194 | +(defn gunzip-stream |
| 195 | + "Returns a GZIPInputStream for streaming gzip decompression. |
| 196 | + Useful for large files that shouldn't be loaded entirely into memory." |
| 197 | + ([^InputStream input-stream] |
| 198 | + (GZIPInputStream. input-stream ^int DEFAULT_BUFFER_SIZE)) |
| 199 | + ([^InputStream input-stream buffer-size] |
| 200 | + (GZIPInputStream. input-stream ^int buffer-size))) |
| 201 | + |
| 202 | +(defn copy-compress |
| 203 | + "Copies data from input-stream to output-stream with compression. |
| 204 | + Returns the number of bytes written." |
| 205 | + ^long [^InputStream input-stream ^java.io.OutputStream output-stream compress-fn] |
| 206 | + (let [^java.io.OutputStream compressed-stream (compress-fn output-stream) |
| 207 | + ^bytes buffer (get-buffer)] |
| 208 | + (try |
| 209 | + (loop [total (long 0)] |
| 210 | + (let [n (.read input-stream buffer 0 DEFAULT_BUFFER_SIZE)] |
| 211 | + (if (pos? n) |
| 212 | + (do |
| 213 | + (.write compressed-stream buffer 0 n) |
| 214 | + (recur (+ total n))) |
| 215 | + total))) |
| 216 | + (finally |
| 217 | + (.close compressed-stream))))) |
| 218 | + |
| 219 | +(defn copy-decompress |
| 220 | + "Copies data from input-stream to output-stream with decompression. |
| 221 | + Returns the number of bytes written." |
| 222 | + ^long [^InputStream input-stream ^java.io.OutputStream output-stream decompress-fn] |
| 223 | + (let [^InputStream decompressed-stream (decompress-fn input-stream) |
| 224 | + ^bytes buffer (get-buffer)] |
| 225 | + (try |
| 226 | + (loop [total (long 0)] |
| 227 | + (let [n (.read decompressed-stream buffer 0 DEFAULT_BUFFER_SIZE)] |
| 228 | + (if (pos? n) |
| 229 | + (do |
| 230 | + (.write output-stream buffer 0 n) |
| 231 | + (recur (+ total n))) |
| 232 | + total))) |
| 233 | + (finally |
| 234 | + (.close decompressed-stream))))) |
0 commit comments