1 /** File I/O of Compressed Files.
2  *
3  * See_Also: https://forum.dlang.org/post/jykarqycnrecajveqpos@forum.dlang.org
4  */
5 module nxt.zio;
6 
7 import std.range.primitives : isInputRange;
8 
9 @safe:
10 
11 struct GzipFileInputRange
12 {
13     import std.stdio : File;
14     import std.traits : ReturnType;
15 
16     enum chunkSize = 0x4000;    // TODO find optimal value via benchmark
17 
18     enum defaultExtension = `.gz`;
19 
20     this(in char[] path) @trusted
21     {
22         _f = File(path, `r`);
23         _chunkRange = _f.byChunk(chunkSize);
24         _uncompress = new UnCompress;
25         loadNextChunk();
26     }
27 
28     void loadNextChunk() @trusted
29     {
30         if (!_chunkRange.empty)
31         {
32             _uncompressedBuf = cast(ubyte[])_uncompress.uncompress(_chunkRange.front);
33             _chunkRange.popFront();
34         }
35         else
36         {
37             if (!_exhausted)
38             {
39                 _uncompressedBuf = cast(ubyte[])_uncompress.flush();
40                 _exhausted = true;
41             }
42             else
43             {
44                 _uncompressedBuf.length = 0;
45             }
46         }
47         _bufIx = 0;
48     }
49 
50     void popFront()
51     {
52         _bufIx += 1;
53         if (_bufIx >= _uncompressedBuf.length)
54         {
55             loadNextChunk();
56         }
57     }
58 
59 pragma(inline, true):
60 @safe pure nothrow @nogc:
61 
62     @property ubyte front() const
63     {
64         return _uncompressedBuf[_bufIx];
65     }
66 
67     @property bool empty() const
68     {
69         return _uncompressedBuf.length == 0;
70     }
71 
72 private:
73     import std.zlib : UnCompress;
74     UnCompress _uncompress;
75     File _f;
76     ReturnType!(_f.byChunk) _chunkRange;
77     bool _exhausted;            ///< True if exhausted.
78     ubyte[] _uncompressedBuf;   ///< Uncompressed buffer.
79     size_t _bufIx;              ///< Current byte index into `_uncompressedBuf`.
80 }
81 
82 /** Is `true` iff `R` is a block input range.
83     TODO Move to std.range
84  */
85 private template isBlockInputRange(R)
86 {
87     import std.range.primitives : isInputRange;
88     enum isBlockInputRange = (isInputRange!R &&
89                               __traits(hasMember, R, `bufferFrontChunk`) && // TODO ask dlang for better naming
90                               __traits(hasMember, R, `loadNextChunk`));     // TODO ask dlang for better naming
91 }
92 
93 /** Decompress `BlockInputRange` linewise.
94  */
95 class DecompressByLine(BlockInputRange)
96 {
97     private alias E = char;
98 
99     /** If `range` is of type `isBlockInputRange` decoding compressed files will
100      * be much faster.
101      */
102     this(in const(char)[] range,
103          E separator = '\n',
104          in size_t initialCapacity = 80)
105     {
106         this._range = typeof(_range)(range);
107         this._separator = separator;
108         static if (__traits(hasMember, typeof(_lbuf), `withCapacity`))
109         {
110             this._lbuf = typeof(_lbuf).withCapacity(initialCapacity);
111         }
112         popFront();
113     }
114 
115     void popFront() @trusted
116     {
117         _lbuf.shrinkTo(0);
118 
119         static if (isBlockInputRange!(typeof(_range)))
120         {
121             // TODO functionize
122             while (!_range.empty)
123             {
124                 ubyte[] currentFronts = _range.bufferFrontChunk;
125                 // `_range` is mutable so sentinel-based search can kick
126 
127                 enum useCountUntil = false;
128                 static if (useCountUntil)
129                 {
130                     import std.algorithm.searching : countUntil;
131                     // TODO
132                 }
133                 else
134                 {
135                     import std.algorithm.searching : find;
136                     const hit = currentFronts.find(_separator); // or use `indexOf`
137                 }
138 
139                 if (hit.length)
140                 {
141                     const lineLength = hit.ptr - currentFronts.ptr;
142                     _lbuf.put(currentFronts[0 .. lineLength]); // add everything up to separator
143                     _range._bufIx += lineLength + _separator.sizeof; // advancement + separator
144                     if (_range.empty)
145                     {
146                         _range.loadNextChunk();
147                     }
148                     break;      // done
149                 }
150                 else            // no separator yet
151                 {
152                     _lbuf.put(currentFronts); // so just add everything
153                     _range.loadNextChunk();
154                 }
155             }
156         }
157         else
158         {
159             // TODO sentinel-based search for `_separator` in `_range`
160             while (!_range.empty &&
161                    _range.front != _separator)
162             {
163                 _lbuf.put(_range.front);
164                 _range.popFront();
165             }
166 
167             if (!_range.empty &&
168                 _range.front == _separator)
169             {
170                 _range.popFront();  // pop separator
171             }
172         }
173     }
174 
175     pragma(inline):
176     @safe pure nothrow @nogc:
177 
178     @property bool empty() const
179     {
180         return _lbuf.data.length == 0;
181     }
182 
183     const(E)[] front() const return scope
184     {
185         return _lbuf.data;
186     }
187 
188 private:
189     BlockInputRange _range;
190 
191     import std.array : Appender;
192     Appender!(E[]) _lbuf;       // line buffer
193 
194     // NOTE this is slower for ldc:
195     // import nxt.dynamic_array : Array;
196     // Array!E _lbuf;
197 
198     E _separator;
199 }
200 
201 class GzipOut
202 {
203     import std.zlib: Compress, HeaderFormat;
204     import std.stdio: File;
205 
206     this(File file) @trusted
207     {
208         _f = file;
209         _compress = new Compress(HeaderFormat.gzip);
210     }
211 
212     void compress(const string s) @trusted
213     {
214         auto compressed = _compress.compress(s);
215         _f.rawWrite(compressed);
216     }
217 
218     void finish() @trusted
219     {
220         auto compressed = _compress.flush;
221         _f.rawWrite(compressed);
222         _f.close;
223     }
224 
225 private:
226     Compress _compress;
227     File _f;
228 }
229 
230 struct ZlibFileInputRange
231 {
232     import std.file : FileException;
233 
234     /* Zlib docs:
235        CHUNK is simply the buffer size for feeding data to and pulling data from
236        the zlib routines. Larger buffer sizes would be more efficient,
237        especially for inflate(). If the memory is available, buffers sizes on
238        the order of 128K or 256K bytes should be used.
239     */
240     enum chunkSize = 128 * 1024; // 128K
241 
242     enum defaultExtension = `.gz`;
243 
244     @safe:
245 
246     this(in char[] path) @trusted
247     {
248         import std..string : toStringz; // TODO avoid GC allocation by looking at how gmp-d z.d solves it
249         _f = gzopen(path.toStringz, `rb`);
250         if (!_f)
251         {
252             throw new FileException(`Couldn't open file ` ~ path.idup);
253         }
254         _buf = new ubyte[chunkSize];
255         loadNextChunk();
256     }
257 
258     ~this() @trusted @nogc
259     {
260         const int ret = gzclose(_f);
261         if (ret < 0)
262         {
263             assert(`Couldn't close file`); // TODO replace with non-GC-allocated exception
264         }
265     }
266 
267     @disable this(this);
268 
269     void loadNextChunk() @trusted
270     {
271         int count = gzread(_f, _buf.ptr, chunkSize);
272         if (count == -1)
273         {
274             throw new Exception(`Error decoding file`);
275         }
276         _bufIx = 0;
277         _bufReadLength = count;
278     }
279 
280     void popFront()
281     {
282         assert(!empty);
283         _bufIx += 1;
284         if (_bufIx >= _bufReadLength)
285         {
286             loadNextChunk();
287             _bufIx = 0;         // restart counter
288         }
289     }
290 
291 pragma(inline, true):
292 pure nothrow @nogc:
293 
294     @property ubyte front() const @trusted
295     {
296         assert(!empty);
297         return _buf.ptr[_bufIx];
298     }
299 
300     @property bool empty() const
301     {
302         return _bufIx == _bufReadLength;
303     }
304 
305     /** Get current bufferFrontChunk.
306         TODO need better name for this
307      */
308     inout(ubyte)[] bufferFrontChunk() inout @trusted
309     {
310         assert(!empty);
311         return _buf.ptr[_bufIx .. _bufReadLength];
312     }
313 
314 private:
315     import etc.c.zlib : gzFile, gzopen, gzclose, gzread;
316 
317     gzFile _f;
318 
319     ubyte[] _buf;               // block read buffer
320 
321     // number of bytes in `_buf` recently read by `gzread`, normally equal to `_buf.length` except after last read where is it's normally less than `_buf.length`
322     size_t _bufReadLength;
323 
324     size_t _bufIx;              // current stream read index in `_buf`
325 
326     // TODO make this work:
327     // extern (C) nothrow @nogc:
328     // pragma(mangle, `gzopen`) gzFile gzopen(const(char)* path, const(char)* mode);
329     // pragma(mangle, `gzclose`) int gzclose(gzFile file);
330     // pragma(mangle, `gzread`) int gzread(gzFile file, void* buf, uint len);
331 }
332 
333 struct Bz2libFileInputRange
334 {
335     import std.file : FileException;
336 
337     enum chunkSize = 128 * 1024; // 128K. TODO find optimal value via benchmark
338     enum defaultExtension = `.bz2`;
339     enum useGC = false;         // TODO generalize to allocator parameter
340 
341 @safe:
342 
343     this(in char[] path) @trusted
344     {
345         import std..string : toStringz; // TODO avoid GC allocation by looking at how gmp-d z.d solves it
346         _f = BZ2_bzopen(path.toStringz, `rb`);
347         if (!_f)
348         {
349             throw new FileException(`Couldn't open file ` ~ path.idup);
350         }
351 
352         static if (useGC)
353         {
354             _buf = new ubyte[chunkSize];
355         }
356         else
357         {
358             import core.memory : pureMalloc;
359             _buf = (cast(ubyte*)pureMalloc(chunkSize))[0 .. chunkSize];
360         }
361 
362         loadNextChunk();
363     }
364 
365     ~this() @trusted @nogc
366     {
367         BZ2_bzclose(_f);       // TODO error handling?
368 
369         static if (!useGC)
370         {
371             import core.memory : pureFree;
372             pureFree(_buf.ptr);
373         }
374     }
375 
376     @disable this(this);
377 
378     void loadNextChunk() @trusted
379     {
380         int count = BZ2_bzread(_f, _buf.ptr, chunkSize);
381         if (count == -1)
382         {
383             throw new Exception(`Error decoding file`);
384         }
385         _bufIx = 0;
386         _bufReadLength = count;
387     }
388 
389     void popFront()
390     {
391         assert(!empty);
392         _bufIx += 1;
393         if (_bufIx >= _bufReadLength)
394         {
395             loadNextChunk();
396             _bufIx = 0;         // restart counter
397         }
398     }
399 
400     pragma(inline, true):
401     pure nothrow @nogc:
402 
403     @property ubyte front() const @trusted
404     {
405         assert(!empty);
406         return _buf.ptr[_bufIx];
407     }
408 
409     @property bool empty() const
410     {
411         return _bufIx == _bufReadLength;
412     }
413 
414     /** Get current bufferFrontChunk.
415         TODO need better name for this
416      */
417     inout(ubyte)[] bufferFrontChunk() inout @trusted
418     {
419         assert(!empty);
420         return _buf.ptr[_bufIx .. _bufReadLength];
421     }
422 
423 private:
424     import nxt.bzlib : BZFILE, BZ2_bzopen, BZ2_bzread, BZ2_bzwrite, BZ2_bzclose;
425     pragma(lib, `bz2`);             // Ubuntu: sudo apt-get install libbz2-dev
426 
427     BZFILE* _f;
428 
429     ubyte[] _buf;               // block read buffer
430 
431     // number of bytes in `_buf` recently read by `gzread`, normally equal to `_buf.length` except after last read where is it's normally less than `_buf.length`
432     size_t _bufReadLength;
433 
434     size_t _bufIx;              // current stream read index in `_buf`
435 }
436 
437 void testInputRange(FileInputRange)() @safe
438 if (isInputRange!FileInputRange)
439 {
440     import std.stdio : File;
441 
442     enum path = `test` ~ FileInputRange.defaultExtension;
443 
444     const wholeSource = "abc\ndef\nghi"; // contents of source
445 
446     foreach (const n; wholeSource.length .. wholeSource.length) // TODO from 0
447     {
448         const source = wholeSource[0 .. n]; // slice from the beginning
449 
450         File file = File(path, `w`); // TODO `scope`
451         auto of = new GzipOut(file); // TODO `scope`
452         of.compress(source);
453         of.finish();
454 
455         size_t ix = 0;
456         foreach (e; FileInputRange(path))
457         {
458             assert(cast(char)e == source[ix]);
459             ++ix;
460         }
461 
462         import std.algorithm.searching : count;
463         import std.algorithm.iteration : splitter;
464         alias R = DecompressByLine!ZlibFileInputRange;
465 
466         assert(new R(path).count == source.splitter('\n').count);
467     }
468 }
469 
470 @safe unittest
471 {
472     testInputRange!(GzipFileInputRange);
473     testInputRange!(ZlibFileInputRange);
474     testInputRange!(Bz2libFileInputRange);
475 }
476 
477 /** Read Age of Aqcuisitions.
478  */
479 static private void testReadAgeofAqcuisitions(const string rootDirPath = `~/Work/knet/knowledge/en/age-of-aqcuisition`) @safe
480 {
481     import std.path: expandTilde;
482     import nxt.zio : DecompressByLine, GzipFileInputRange;
483     import std.path : buildNormalizedPath;
484 
485     {
486         const path = buildNormalizedPath(rootDirPath.expandTilde,
487                                          `AoA_51715_words.csv.gz`);
488         size_t count = 0;
489         foreach (line; new DecompressByLine!GzipFileInputRange(path))
490         {
491             count += 1;
492         }
493         assert(count == 51716);
494     }
495 
496     {
497         const path = buildNormalizedPath(rootDirPath.expandTilde,
498                                          `AoA_51715_words.csv.gz`);
499         size_t count = 0;
500         foreach (line; new DecompressByLine!ZlibFileInputRange(path))
501         {
502             count += 1;
503         }
504         assert(count == 51716);
505     }
506 
507     {
508         const path = buildNormalizedPath(rootDirPath.expandTilde,
509                                          `AoA_51715_words_copy.csv.bz2`);
510         size_t count = 0;
511         foreach (line; new DecompressByLine!Bz2libFileInputRange(path))
512         {
513             count += 1;
514         }
515         assert(count == 51716);
516     }
517 }
518 
519 /** Read Concept 5 assertions.
520  */
521 static private void testReadConcept5Assertions(const string path = `/home/per/Knowledge/ConceptNet5/latest/conceptnet-assertions-5.6.0.csv.gz`) @safe
522 {
523     alias R = ZlibFileInputRange;
524 
525     import std.stdio: writeln;
526     import std.range: take;
527     import std.algorithm.searching: count;
528 
529     const lineBlockCount = 100_000;
530     size_t lineNr = 0;
531     foreach (const line; new DecompressByLine!R(path))
532     {
533         if (lineNr % lineBlockCount == 0)
534         {
535             writeln(`Line `, lineNr, ` read containing:`, line);
536         }
537         lineNr += 1;
538     }
539 
540     const lineCount = 5;
541     foreach (const line; new DecompressByLine!R(path).take(lineCount))
542     {
543         writeln(line);
544     }
545 }
546 
547 /// benchmark DBpedia parsing
548 static private void benchmarkDbpediaParsing(const string rootPath = `/home/per/Knowledge/DBpedia/latest`) @system
549 {
550     alias R = Bz2libFileInputRange;
551 
552     import nxt.array_algorithm : startsWith, endsWith;
553     import std.algorithm : filter;
554     import std.file : dirEntries, SpanMode;
555     import std.path : baseName;
556     import std.stdio : write, writeln, stdout;
557     import std.datetime : MonoTime;
558 
559     foreach (const path; dirEntries(rootPath, SpanMode.depth).filter!(file => (file.name.baseName.startsWith(`instance_types`) &&
560                                                                                file.name.endsWith(`.ttl.bz2`))))
561     {
562         write(`Checking `, path, ` ... `); stdout.flush();
563 
564         immutable before = MonoTime.currTime();
565 
566         size_t lineCounter = 0;
567         foreach (const line; new DecompressByLine!R(path))
568         {
569             lineCounter += 1;
570         }
571 
572         immutable after = MonoTime.currTime();
573 
574         showStat(path, before, after, lineCounter);
575     }
576 }
577 
578 /// Show statistics.
579 static private void showStat(T)(in const(char[]) tag,
580                                 in T before,
581                                 in T after,
582                                 in size_t lineCount)
583 {
584     import std.stdio : writefln;
585     writefln(`%s: %3.1f msecs (%3.1f usecs/line)`,
586              tag,
587              cast(double)(after - before).total!`msecs`,
588              cast(double)(after - before).total!`usecs` / lineCount);
589 }