1 /** File I/O of Compressed Files.
2  *
3  * See_Also: https://forum.dlang.org/post/jykarqycnrecajveqpos@forum.dlang.org
4  *
5  * TODO: Add to dub a package having the dub.sdl line: libs "z" "bz2" "zstd"
6  */
7 module nxt.zio;
8 
9 version = benchmark_zio;
10 
11 @safe:
12 
13 struct GzipFileInputRange
14 {
15     import std.stdio : File;
16     import std.traits : ReturnType;
17 
18     enum chunkSize = 0x4000;    // TODO: find optimal value via benchmark
19 
20     enum defaultExtension = `.gz`;
21 
22     this(in char[] path) @trusted
23     {
24         _f = File(path, `r`);
25         _chunkRange = _f.byChunk(chunkSize);
26         _uncompress = new UnCompress;
27         loadNextChunk();
28     }
29 
30     void loadNextChunk() @trusted
31     {
32         if (!_chunkRange.empty)
33         {
34             _uncompressedBuf = cast(ubyte[])_uncompress.uncompress(_chunkRange.front);
35             _chunkRange.popFront();
36         }
37         else
38         {
39             if (!_exhausted)
40             {
41                 _uncompressedBuf = cast(ubyte[])_uncompress.flush();
42                 _exhausted = true;
43             }
44             else
45             {
46                 _uncompressedBuf.length = 0;
47             }
48         }
49         _bufIx = 0;
50     }
51 
52     void popFront()
53     {
54         _bufIx += 1;
55         if (_bufIx >= _uncompressedBuf.length)
56         {
57             loadNextChunk();
58         }
59     }
60 
61 pragma(inline, true):
62 @safe pure nothrow @nogc:
63 
64     @property ubyte front() const
65     {
66         return _uncompressedBuf[_bufIx];
67     }
68 
69     bool empty() const @property
70     {
71         return _uncompressedBuf.length == 0;
72     }
73 
74 private:
75     import std.zlib : UnCompress;
76     UnCompress _uncompress;
77     File _f;
78     ReturnType!(_f.byChunk) _chunkRange;
79     bool _exhausted;            ///< True if exhausted.
80     ubyte[] _uncompressedBuf;   ///< Uncompressed buffer.
81     size_t _bufIx;              ///< Current byte index into `_uncompressedBuf`.
82 }
83 
84 /** Is `true` iff `R` is a block input range.
85     TODO: Move to std.range
86  */
87 private template isBlockInputRange(R)
88 {
89     import std.range.primitives : isInputRange;
90     enum isBlockInputRange = (isInputRange!R &&
91                               __traits(hasMember, R, `bufferFrontChunk`) && // TODO: ask dlang for better naming
92                               __traits(hasMember, R, `loadNextChunk`));     // TODO: ask dlang for better naming
93 }
94 
95 /** Decompress `BlockInputRange` linewise.
96  */
97 class DecompressByLine(BlockInputRange)
98 {
99     private alias E = char;
100 
101     /** If `range` is of type `isBlockInputRange` decoding compressed files will
102      * be much faster.
103      */
104     this(in const(char)[] range,
105          E separator = '\n',
106          in size_t initialCapacity = 80)
107     {
108         this._range = typeof(_range)(range);
109         this._separator = separator;
110         static if (__traits(hasMember, typeof(_lbuf), `withCapacity`))
111             this._lbuf = typeof(_lbuf).withCapacity(initialCapacity);
112         popFront();
113     }
114 
115     void popFront() @trusted
116     {
117         _lbuf.shrinkTo(0);
118 
119         static if (isBlockInputRange!(typeof(_range)))
120         {
121             // TODO: functionize
122             while (!_range.empty)
123             {
124                 ubyte[] currentFronts = _range.bufferFrontChunk;
125                 // `_range` is mutable so sentinel-based search can kick
126 
127                 enum useCountUntil = false;
128                 static if (useCountUntil)
129                 {
130                     import std.algorithm.searching : countUntil;
131                     // TODO
132                 }
133                 else
134                 {
135                     import std.algorithm.searching : find;
136                     const hit = currentFronts.find(_separator); // or use `indexOf`
137                 }
138 
139                 if (hit.length)
140                 {
141                     const lineLength = hit.ptr - currentFronts.ptr;
142                     _lbuf.put(currentFronts[0 .. lineLength]); // add everything up to separator
143                     _range._bufIx += lineLength + _separator.sizeof; // advancement + separator
144                     if (_range.empty)
145                         _range.loadNextChunk();
146                     break;      // done
147                 }
148                 else            // no separator yet
149                 {
150                     _lbuf.put(currentFronts); // so just add everything
151                     _range.loadNextChunk();
152                 }
153             }
154         }
155         else
156         {
157             // TODO: sentinel-based search for `_separator` in `_range`
158             while (!_range.empty &&
159                    _range.front != _separator)
160             {
161                 _lbuf.put(_range.front);
162                 _range.popFront();
163             }
164 
165             if (!_range.empty &&
166                 _range.front == _separator)
167             {
168                 _range.popFront();  // pop separator
169             }
170         }
171     }
172 
173     pragma(inline):
174     @safe pure nothrow @nogc:
175 
176     bool empty() const @property
177     {
178         return _lbuf.data.length == 0;
179     }
180 
181     const(E)[] front() const return scope
182     {
183         return _lbuf.data;
184     }
185 
186 private:
187     BlockInputRange _range;
188 
189     import std.array : Appender;
190     Appender!(E[]) _lbuf;       // line buffer
191 
192     // NOTE this is slower for ldc:
193     // import nxt.container.dynamic_array : Array;
194     // Array!E _lbuf;
195 
196     E _separator;
197 }
198 
199 class GzipOut
200 {
201     import std.zlib: Compress, HeaderFormat;
202     import std.stdio: File;
203 
204     this(File file) @trusted
205     {
206         _f = file;
207         _compress = new Compress(HeaderFormat.gzip);
208     }
209 
210     void compress(const string s) @trusted
211     {
212         auto compressed = _compress.compress(s);
213         _f.rawWrite(compressed);
214     }
215 
216     void finish() @trusted
217     {
218         auto compressed = _compress.flush;
219         _f.rawWrite(compressed);
220         _f.close;
221     }
222 
223 private:
224     Compress _compress;
225     File _f;
226 }
227 
228 struct ZlibFileInputRange
229 {
230     import std.file : FileException;
231 
232     /* Zlib docs:
233        CHUNK is simply the buffer size for feeding data to and pulling data from
234        the zlib routines. Larger buffer sizes would be more efficient,
235        especially for inflate(). If the memory is available, buffers sizes on
236        the order of 128K or 256K bytes should be used.
237     */
238     enum chunkSize = 128 * 1024; // 128K
239 
240     enum defaultExtension = `.gz`;
241 
242     @safe:
243 
244     this(in char[] path) @trusted
245     {
246         import std.string : toStringz; // TODO: avoid GC allocation by looking at how gmp-d z.d solves it
247         _f = gzopen(path.toStringz, `rb`);
248         if (!_f)
249             throw new FileException(`Couldn't open file ` ~ path.idup);
250         _buf = new ubyte[chunkSize];
251         loadNextChunk();
252     }
253 
254     ~this() nothrow @trusted @nogc
255     {
256         const int ret = gzclose(_f);
257         if (ret < 0)
258             assert(`Couldn't close file`); // TODO: replace with non-GC-allocated exception
259     }
260 
261     @disable this(this);
262 
263     void loadNextChunk() @trusted
264     {
265         int count = gzread(_f, _buf.ptr, chunkSize);
266         if (count == -1)
267             throw new Exception(`Error decoding file`);
268         _bufIx = 0;
269         _bufReadLength = count;
270     }
271 
272     void popFront() in(!empty)
273     {
274         _bufIx += 1;
275         if (_bufIx >= _bufReadLength)
276         {
277             loadNextChunk();
278             _bufIx = 0;         // restart counter
279         }
280     }
281 
282 pragma(inline, true):
283 pure nothrow @nogc:
284 
285     @property ubyte front() const @trusted in(!empty) => _buf.ptr[_bufIx];
286     bool empty() const @property => _bufIx == _bufReadLength;
287 
288     /** Get current bufferFrontChunk.
289         TODO: need better name for this
290      */
291     inout(ubyte)[] bufferFrontChunk() inout @trusted in(!empty) => _buf.ptr[_bufIx .. _bufReadLength];
292 
293 private:
294     import etc.c.zlib : gzFile, gzopen, gzclose, gzread;
295 
296     gzFile _f;
297 
298     ubyte[] _buf;               // block read buffer
299 
300     // number of bytes in `_buf` recently read by `gzread`, normally equal to `_buf.length` except after last read where is it's normally less than `_buf.length`
301     size_t _bufReadLength;
302 
303     size_t _bufIx;              // current stream read index in `_buf`
304 
305     // TODO: make this work:
306     // extern (C) nothrow @nogc:
307     // pragma(mangle, `gzopen`) gzFile gzopen(const(char)* path, const(char)* mode);
308     // pragma(mangle, `gzclose`) int gzclose(gzFile file);
309     // pragma(mangle, `gzread`) int gzread(gzFile file, void* buf, uint len);
310 }
311 
312 struct Bz2libFileInputRange
313 {
314     import std.file : FileException;
315 
316     enum chunkSize = 128 * 1024; // 128K. TODO: find optimal value via benchmark
317     enum defaultExtension = `.bz2`;
318     enum useGC = false;         // TODO: generalize to allocator parameter
319 
320 @safe:
321 
322     this(in char[] path) @trusted
323     {
324         import std.string : toStringz; // TODO: avoid GC allocation by looking at how gmp-d z.d solves it
325         _f = BZ2_bzopen(path.toStringz, `rb`);
326         if (!_f)
327             throw new FileException(`Couldn't open file ` ~ path.idup);
328 
329         static if (useGC)
330             _buf = new ubyte[chunkSize];
331         else
332         {
333             import core.memory : pureMalloc;
334             _buf = (cast(ubyte*)pureMalloc(chunkSize))[0 .. chunkSize];
335         }
336 
337         loadNextChunk();
338     }
339 
340     ~this() nothrow @trusted @nogc
341     {
342         BZ2_bzclose(_f);       // TODO: error handling?
343 
344         static if (!useGC)
345         {
346             import core.memory : pureFree;
347             pureFree(_buf.ptr);
348         }
349     }
350 
351     @disable this(this);
352 
353     void loadNextChunk() @trusted
354     {
355         int count = BZ2_bzread(_f, _buf.ptr, chunkSize);
356         if (count == -1)
357             throw new Exception(`Error decoding file`);
358         _bufIx = 0;
359         _bufReadLength = count;
360     }
361 
362     void popFront() in(!empty)
363     {
364         _bufIx += 1;
365         if (_bufIx >= _bufReadLength)
366         {
367             loadNextChunk();
368             _bufIx = 0;         // restart counter
369         }
370     }
371 
372     pragma(inline, true):
373     pure nothrow @nogc:
374 
375     @property ubyte front() const @trusted in(!empty)
376 		=> _buf.ptr[_bufIx];
377     bool empty() const @property
378 		=> _bufIx == _bufReadLength;
379 
380     /** Get current bufferFrontChunk.
381         TODO: need better name for this
382      */
383     inout(ubyte)[] bufferFrontChunk() inout @trusted in(!empty)
384 		=> _buf.ptr[_bufIx .. _bufReadLength];
385 
386 private:
387     /* import bzlib : BZFILE, BZ2_bzopen, BZ2_bzread, BZ2_bzwrite, BZ2_bzclose; */
388     pragma(lib, `bz2`);             // Ubuntu: sudo apt-get install libbz2-dev
389 
390     BZFILE* _f;
391 
392     ubyte[] _buf;               // block read buffer
393 
394     // number of bytes in `_buf` recently read by `gzread`, normally equal to `_buf.length` except after last read where is it's normally less than `_buf.length`
395     size_t _bufReadLength;
396 
397     size_t _bufIx;              // current stream read index in `_buf`
398 }
399 
400 private void testInputRange(FileInputRange)() @safe
401 if (isInputRange!FileInputRange)
402 {
403     import std.stdio : File;
404 
405     enum path = `test` ~ FileInputRange.defaultExtension;
406 
407     const data = "abc\ndef\nghi"; // contents of source
408 
409     foreach (const n; data.length .. data.length) // TODO: from 0
410     {
411         const source = data[0 .. n]; // slice from the beginning
412 
413         scope of = new GzipOut(File(path, `w`));
414         of.compress(source);
415         of.finish();
416 
417         size_t ix = 0;
418         foreach (e; FileInputRange(path))
419         {
420             assert(cast(char)e == source[ix]);
421             ++ix;
422         }
423 
424         import std.algorithm.searching : count;
425         import std.algorithm.iteration : splitter;
426         alias R = DecompressByLine!ZlibFileInputRange;
427 
428         assert(new R(path).count == source.splitter('\n').count);
429     }
430 }
431 
432 @safe unittest
433 {
434     testInputRange!(GzipFileInputRange);
435     testInputRange!(ZlibFileInputRange);
436     testInputRange!(Bz2libFileInputRange);
437 }
438 
439 /** Read Age of Aqcuisitions.
440  */
441 static private void testReadAgeofAqcuisitions(const string rootDirPath = `~/Work/knet/knowledge/en/age-of-aqcuisition`) @safe
442 {
443     import std.path: expandTilde;
444     import nxt.zio : DecompressByLine, GzipFileInputRange;
445     import std.path : buildNormalizedPath;
446 
447     {
448         const path = buildNormalizedPath(rootDirPath.expandTilde,
449                                          `AoA_51715_words.csv.gz`);
450         size_t count = 0;
451         foreach (line; new DecompressByLine!GzipFileInputRange(path))
452             count += 1;
453         assert(count == 51716);
454     }
455 
456     {
457         const path = buildNormalizedPath(rootDirPath.expandTilde,
458                                          `AoA_51715_words.csv.gz`);
459         size_t count = 0;
460         foreach (line; new DecompressByLine!ZlibFileInputRange(path))
461             count += 1;
462         assert(count == 51716);
463     }
464 
465     {
466         const path = buildNormalizedPath(rootDirPath.expandTilde,
467                                          `AoA_51715_words_copy.csv.bz2`);
468         size_t count = 0;
469         foreach (line; new DecompressByLine!Bz2libFileInputRange(path))
470             count += 1;
471         assert(count == 51716);
472     }
473 }
474 
475 /** Read Concept 5 assertions.
476  */
477 static private void testReadConcept5Assertions(const string path = `/home/per/Knowledge/ConceptNet5/latest/conceptnet-assertions-5.6.0.csv.gz`) @safe
478 {
479     alias R = ZlibFileInputRange;
480 
481     import std.stdio: writeln;
482     import std.range: take;
483     import std.algorithm.searching: count;
484 
485     const lineBlockCount = 100_000;
486     size_t lineNr = 0;
487     foreach (const line; new DecompressByLine!R(path))
488     {
489         if (lineNr % lineBlockCount == 0)
490             writeln(`Line `, lineNr, ` read containing:`, line);
491         lineNr += 1;
492     }
493 
494     const lineCount = 5;
495     foreach (const line; new DecompressByLine!R(path).take(lineCount))
496         writeln(line);
497 }
498 
499 /// benchmark DBpedia parsing
500 version(benchmark_zio)
501 static private void benchmarkDbpediaParsing(const string rootPath = `/home/per/Knowledge/DBpedia/latest`) @system
502 {
503     alias R = Bz2libFileInputRange;
504 
505     import nxt.array_algorithm : startsWith, endsWith;
506     import std.algorithm : filter;
507     import std.file : dirEntries, SpanMode;
508     import std.path : baseName;
509     import std.stdio : write, writeln, stdout;
510     import std.datetime : MonoTime;
511 
512     foreach (const path; dirEntries(rootPath, SpanMode.depth).filter!(file => (file.name.baseName.startsWith(`instance_types`) &&
513                                                                                file.name.endsWith(`.ttl.bz2`))))
514     {
515         write(`Checking `, path, ` ... `); stdout.flush();
516 
517         immutable before = MonoTime.currTime();
518 
519         size_t lineCounter = 0;
520         foreach (const line; new DecompressByLine!R(path))
521             lineCounter += 1;
522 
523         immutable after = MonoTime.currTime();
524 
525         showStat(path, before, after, lineCounter);
526     }
527 }
528 
529 /// Show statistics.
530 static private void showStat(T)(in const(char[]) tag,
531                                 in T before,
532                                 in T after,
533                                 in size_t lineCount)
534 {
535     import std.stdio : writefln;
536     writefln(`%s: %3.1f msecs (%3.1f usecs/line)`,
537              tag,
538              cast(double)(after - before).total!`msecs`,
539              cast(double)(after - before).total!`usecs` / lineCount);
540 }
541 
542 version(unittest)
543 {
544 	import std.range.primitives : isInputRange;
545 }
546 
547 pragma(lib, "bz2");             // Ubuntu: sudo apt-get install libbz2-dev
548 
549 extern(C) nothrow @nogc:
550 
551 enum BZ_RUN               = 0;
552 enum BZ_FLUSH             = 1;
553 enum BZ_FINISH            = 2;
554 
555 enum BZ_OK                = 0;
556 enum BZ_RUN_OK            = 1;
557 enum BZ_FLUSH_OK          = 2;
558 enum BZ_FINISH_OK         = 3;
559 enum BZ_STREAM_END        = 4;
560 enum BZ_SEQUENCE_ERROR    = -1;
561 enum BZ_PARAM_ERROR       = -2;
562 enum BZ_MEM_ERROR         = -3;
563 enum BZ_DATA_ERROR        = -4;
564 enum BZ_DATA_ERROR_MAGIC  = -5;
565 enum BZ_IO_ERROR          = -6;
566 enum BZ_UNEXPECTED_EOF    = -7;
567 enum BZ_OUTBUFF_FULL      = -8;
568 enum BZ_CONFIG_ERROR      = -9;
569 
570 struct bz_stream
571 {
572     ubyte* next_in;
573     uint   avail_in;
574     uint   total_in_lo32;
575     uint   total_in_hi32;
576 
577     ubyte* next_out;
578     uint   avail_out;
579     uint   total_out_lo32;
580     uint   total_out_hi32;
581 
582     void*  state;
583 
584     void* function(void*, int, int) nothrow bzalloc;
585     void  function(void*, void*) nothrow    bzfree;
586     void* opaque;
587 }
588 
589 /*-- Core (low-level) library functions --*/
590 
591 int BZ2_bzCompressInit(bz_stream* strm,
592                        int        blockSize100k,
593                        int        verbosity,
594                        int        workFactor);
595 
596 int BZ2_bzCompress(bz_stream* strm,
597                    int action);
598 
599 int BZ2_bzCompressEnd(bz_stream* strm);
600 
601 int BZ2_bzDecompressInit(bz_stream* strm,
602                          int        verbosity,
603                          int        small);
604 
605 int BZ2_bzDecompress(bz_stream* strm);
606 
607 int BZ2_bzDecompressEnd(bz_stream *strm);
608 
609 /*-- High(er) level library functions --*/
610 
611 version(BZ_NO_STDIO) {}
612 else
613 {
614     import core.stdc.stdio;
615 
616     enum BZ_MAX_UNUSED = 5000;
617 
618     struct BZFILE;
619 
620     BZFILE* BZ2_bzReadOpen(int*  bzerror,
621                            FILE* f,
622                            int   verbosity,
623                            int   small,
624                            void* unused,
625                            int   nUnused);
626 
627     void BZ2_bzReadClose(int*    bzerror,
628                          BZFILE* b);
629 
630     void BZ2_bzReadGetUnused(int*    bzerror,
631                              BZFILE* b,
632                              void**  unused,
633                              int*    nUnused);
634 
635     int BZ2_bzRead(int*    bzerror,
636                    BZFILE* b,
637                    void*   buf,
638                    int     len);
639 
640     BZFILE* BZ2_bzWriteOpen(int*  bzerror,
641                             FILE* f,
642                             int   blockSize100k,
643                             int   verbosity,
644                             int   workFactor
645         );
646 
647     void BZ2_bzWrite(int*    bzerror,
648                      BZFILE* b,
649                      void*   buf,
650                      int     len);
651 
652     void BZ2_bzWriteClose(int*          bzerror,
653                           BZFILE*       b,
654                           int           abandon,
655                           uint*         nbytes_in,
656                           uint*         nbytes_out);
657 
658     void BZ2_bzWriteClose64(int*          bzerror,
659                             BZFILE*       b,
660                             int           abandon,
661                             uint*         nbytes_in_lo32,
662                             uint*         nbytes_in_hi32,
663                             uint*         nbytes_out_lo32,
664                             uint*         nbytes_out_hi32);
665 }
666 
667 /*-- Utility functions --*/
668 
669 int BZ2_bzBuffToBuffCompress(ubyte*        dest,
670                              uint*         destLen,
671                              ubyte*        source,
672                              uint          sourceLen,
673                              int           blockSize100k,
674                              int           verbosity,
675                              int           workFactor);
676 
677 int BZ2_bzBuffToBuffDecompress(ubyte*        dest,
678                                uint*         destLen,
679                                ubyte*        source,
680                                uint          sourceLen,
681                                int           small,
682                                int           verbosity);
683 
684 
685 /*--
686   Code contributed by Yoshioka Tsuneo (tsuneo@rr.iij4u.or.jp)
687   to support better zlib compatibility.
688   This code is not _officially_ part of libbzip2 (yet);
689   I haven't tested it, documented it, or considered the
690   threading-safeness of it.
691   If this code breaks, please contact both Yoshioka and me.
692   --*/
693 
694 const(char)* BZ2_bzlibVersion();
695 
696 BZFILE* BZ2_bzopen(const scope const(char)* path,
697                    const scope const(char)* mode);
698 
699 BZFILE * BZ2_bzdopen(int          fd,
700                      const scope const(char)* mode);
701 
702 int BZ2_bzread(scope BZFILE* b,
703                scope void*   buf,
704                int     len);
705 
706 int BZ2_bzwrite(scope BZFILE* b,
707                 scope void*   buf,
708                 int     len);
709 
710 int BZ2_bzflush(scope BZFILE* b);
711 
712 void BZ2_bzclose(scope BZFILE* b);
713 
714 const(char)* BZ2_bzerror(scope BZFILE *b,
715                          int    *errnum);