1 /** File I/O of Compressed Files.
2  *
3  * See_Also: https://forum.dlang.org/post/jykarqycnrecajveqpos@forum.dlang.org
4  */
5 module nxt.zio;
6 
7 import nxt.path : Path, FilePath;
8 
9 version = benchmark_zio;
10 
11 @safe:
12 
13 struct GzipFileInputRange
14 {
15 	import std.stdio : File;
16 	import std.traits : ReturnType;
17 
18 	enum chunkSize = 0x4000;	/+ TODO: find optimal value via benchmark +/
19 
20 	enum defaultExtension = `.gz`;
21 
22 	this(in FilePath path) @trusted
23 	{
24 		_f = File(path.str, `r`);
25 		_chunkRange = _f.byChunk(chunkSize);
26 		_uncompress = new UnCompress;
27 		loadNextChunk();
28 	}
29 
30 	void loadNextChunk() @trusted
31 	{
32 		if (!_chunkRange.empty)
33 		{
34 			_uncompressedBuf = cast(ubyte[])_uncompress.uncompress(_chunkRange.front);
35 			_chunkRange.popFront();
36 		}
37 		else
38 		{
39 			if (!_exhausted)
40 			{
41 				_uncompressedBuf = cast(ubyte[])_uncompress.flush();
42 				_exhausted = true;
43 			}
44 			else
45 			{
46 				_uncompressedBuf.length = 0;
47 			}
48 		}
49 		_bufIx = 0;
50 	}
51 
52 	void popFront()
53 	{
54 		_bufIx += 1;
55 		if (_bufIx >= _uncompressedBuf.length)
56 		{
57 			loadNextChunk();
58 		}
59 	}
60 
61 pragma(inline, true):
62 pure nothrow @safe @nogc:
63 
64 	@property ubyte front() const
65 	{
66 		return _uncompressedBuf[_bufIx];
67 	}
68 
69 	bool empty() const @property
70 	{
71 		return _uncompressedBuf.length == 0;
72 	}
73 
74 private:
75 	import std.zlib : UnCompress;
76 	UnCompress _uncompress;
77 	File _f;
78 	ReturnType!(_f.byChunk) _chunkRange;
79 	bool _exhausted;			///< True if exhausted.
80 	ubyte[] _uncompressedBuf;   ///< Uncompressed buffer.
81 	size_t _bufIx;			  ///< Current byte index into `_uncompressedBuf`.
82 }
83 
84 /** Is `true` iff `R` is a block input range.
85 	TODO: Move to std.range
86  */
87 private template isBlockInputRange(R)
88 {
89 	import std.range.primitives : isInputRange;
90 	enum isBlockInputRange = (isInputRange!R &&
91 							  __traits(hasMember, R, `bufferFrontChunk`) && /+ TODO: ask dlang for better naming +/
92 							  __traits(hasMember, R, `loadNextChunk`));	 /+ TODO: ask dlang for better naming +/
93 }
94 
95 /** Decompress `BlockInputRange` linewise.
96  */
97 class DecompressByLine(BlockInputRange)
98 {
99 	private alias E = char;
100 
101 	/** If `range` is of type `isBlockInputRange` decoding compressed files will
102 	 * be much faster.
103 	 */
104 	this(in FilePath path,
105 		 E separator = '\n',
106 		 in size_t initialCapacity = 80)
107 	{
108 		this._range = typeof(_range)(path);
109 		this._separator = separator;
110 		static if (__traits(hasMember, typeof(_lbuf), `withCapacity`))
111 			this._lbuf = typeof(_lbuf).withCapacity(initialCapacity);
112 		popFront();
113 	}
114 
115 	void popFront() @trusted
116 	{
117 		_lbuf.shrinkTo(0);
118 
119 		static if (isBlockInputRange!(typeof(_range)))
120 		{
121 			/+ TODO: functionize +/
122 			while (!_range.empty)
123 			{
124 				ubyte[] currentFronts = _range.bufferFrontChunk;
125 				// `_range` is mutable so sentinel-based search can kick
126 
127 				enum useCountUntil = false;
128 				static if (useCountUntil)
129 				{
130 					import std.algorithm.searching : countUntil;
131 					// TODO
132 				}
133 				else
134 				{
135 					import std.algorithm.searching : find;
136 					const hit = currentFronts.find(_separator); // or use `indexOf`
137 				}
138 
139 				if (hit.length)
140 				{
141 					const lineLength = hit.ptr - currentFronts.ptr;
142 					_lbuf.put(currentFronts[0 .. lineLength]); // add everything up to separator
143 					_range._bufIx += lineLength + _separator.sizeof; // advancement + separator
144 					if (_range.empty)
145 						_range.loadNextChunk();
146 					break;	  // done
147 				}
148 				else			// no separator yet
149 				{
150 					_lbuf.put(currentFronts); // so just add everything
151 					_range.loadNextChunk();
152 				}
153 			}
154 		}
155 		else
156 		{
157 			/+ TODO: sentinel-based search for `_separator` in `_range` +/
158 			while (!_range.empty &&
159 				   _range.front != _separator)
160 			{
161 				_lbuf.put(_range.front);
162 				_range.popFront();
163 			}
164 
165 			if (!_range.empty &&
166 				_range.front == _separator)
167 			{
168 				_range.popFront();  // pop separator
169 			}
170 		}
171 	}
172 
173 	pragma(inline):
174 	pure nothrow @safe @nogc:
175 
176 	bool empty() const @property
177 	{
178 		return _lbuf.data.length == 0;
179 	}
180 
181 	const(E)[] front() const return scope
182 	{
183 		return _lbuf.data;
184 	}
185 
186 private:
187 	BlockInputRange _range;
188 
189 	import std.array : Appender;
190 	Appender!(E[]) _lbuf;	   // line buffer
191 
192 	// NOTE this is slower for ldc:
193 	// import nxt.container.dynamic_array : Array;
194 	// Array!E _lbuf;
195 
196 	E _separator;
197 }
198 
199 class GzipOut
200 {
201 	import std.zlib: Compress, HeaderFormat;
202 	import std.stdio: File;
203 
204 	this(File file) @trusted
205 	{
206 		_f = file;
207 		_compress = new Compress(HeaderFormat.gzip);
208 	}
209 
210 	void compress(const string s) @trusted
211 	{
212 		auto compressed = _compress.compress(s);
213 		_f.rawWrite(compressed);
214 	}
215 
216 	void finish() @trusted
217 	{
218 		auto compressed = _compress.flush;
219 		_f.rawWrite(compressed);
220 		_f.close;
221 	}
222 
223 private:
224 	Compress _compress;
225 	File _f;
226 }
227 
228 struct ZlibFileInputRange
229 {
230 	import std.file : FileException;
231 
232 	/* Zlib docs:
233 	   CHUNK is simply the buffer size for feeding data to and pulling data from
234 	   the zlib routines. Larger buffer sizes would be more efficient,
235 	   especially for inflate(). If the memory is available, buffers sizes on
236 	   the order of 128K or 256K bytes should be used.
237 	*/
238 	enum chunkSize = 128 * 1024; // 128K
239 
240 	enum defaultExtension = `.gz`;
241 
242 	@safe:
243 
244 	this(in FilePath path) @trusted
245 	{
246 		import std.string : toStringz; /+ TODO: avoid GC allocation by looking at how gmp-d z.d solves it +/
247 		_f = gzopen(path.str.toStringz, `rb`);
248 		if (!_f)
249 			throw new FileException(`Couldn't open file ` ~ path.str.idup);
250 		_buf = new ubyte[chunkSize];
251 		loadNextChunk();
252 	}
253 
254 	~this() nothrow @trusted @nogc
255 	{
256 		const int ret = gzclose(_f);
257 		if (ret < 0)
258 			assert(0, `Couldn't close file`); /+ TODO: replace with non-GC-allocated exception +/
259 	}
260 
261 	this(this) @disable;
262 
263 	void loadNextChunk() @trusted
264 	{
265 		int count = gzread(_f, _buf.ptr, chunkSize);
266 		if (count == -1)
267 			throw new Exception(`Error decoding file`);
268 		_bufIx = 0;
269 		_bufReadLength = count;
270 	}
271 
272 	void popFront() in(!empty)
273 	{
274 		_bufIx += 1;
275 		if (_bufIx >= _bufReadLength)
276 		{
277 			loadNextChunk();
278 			_bufIx = 0;		 // restart counter
279 		}
280 	}
281 
282 pragma(inline, true):
283 pure nothrow @nogc:
284 
285 	@property ubyte front() const @trusted in(!empty) => _buf.ptr[_bufIx];
286 	bool empty() const @property => _bufIx == _bufReadLength;
287 
288 	/** Get current bufferFrontChunk.
289 		TODO: need better name for this
290 	 */
291 	inout(ubyte)[] bufferFrontChunk() inout @trusted in(!empty) => _buf.ptr[_bufIx .. _bufReadLength];
292 
293 private:
294 	import etc.c.zlib : gzFile, gzopen, gzclose, gzread;
295 
296 	gzFile _f;
297 
298 	ubyte[] _buf;			   // block read buffer
299 
300 	// number of bytes in `_buf` recently read by `gzread`, normally equal to `_buf.length` except after last read where is it's normally less than `_buf.length`
301 	size_t _bufReadLength;
302 
303 	size_t _bufIx;			  // current stream read index in `_buf`
304 
305 	/+ TODO: make this work: +/
306 	// extern (C) nothrow @nogc:
307 	// pragma(mangle, `gzopen`) gzFile gzopen(const(char)* path, const(char)* mode);
308 	// pragma(mangle, `gzclose`) int gzclose(gzFile file);
309 	// pragma(mangle, `gzread`) int gzread(gzFile file, void* buf, uint len);
310 }
311 
312 struct Bz2libFileInputRange
313 {
314 	import std.file : FileException;
315 
316 	enum chunkSize = 128 * 1024; // 128K. TODO: find optimal value via benchmark
317 	enum defaultExtension = `.bz2`;
318 	enum useGC = false;		 /+ TODO: generalize to allocator parameter +/
319 
320 @safe:
321 
322 	this(in FilePath path) @trusted
323 	{
324 		import std.string : toStringz; /+ TODO: avoid GC allocation by looking at how gmp-d z.d solves it +/
325 		_f = BZ2_bzopen(path.str.toStringz, `rb`);
326 		if (!_f)
327 			throw new FileException(`Couldn't open file ` ~ path.str.idup);
328 
329 		static if (useGC)
330 			_buf = new ubyte[chunkSize];
331 		else
332 		{
333 			import core.memory : pureMalloc;
334 			_buf = (cast(ubyte*)pureMalloc(chunkSize))[0 .. chunkSize];
335 		}
336 
337 		loadNextChunk();
338 	}
339 
340 	~this() nothrow @trusted @nogc
341 	{
342 		BZ2_bzclose(_f);	   /+ TODO: error handling? +/
343 
344 		static if (!useGC)
345 		{
346 			import core.memory : pureFree;
347 			pureFree(_buf.ptr);
348 		}
349 	}
350 
351 	this(this) @disable;
352 
353 	void loadNextChunk() @trusted
354 	{
355 		int count = BZ2_bzread(_f, _buf.ptr, chunkSize);
356 		if (count == -1)
357 			throw new Exception(`Error decoding file`);
358 		_bufIx = 0;
359 		_bufReadLength = count;
360 	}
361 
362 	void popFront() in(!empty)
363 	{
364 		_bufIx += 1;
365 		if (_bufIx >= _bufReadLength)
366 		{
367 			loadNextChunk();
368 			_bufIx = 0;		 // restart counter
369 		}
370 	}
371 
372 	pragma(inline, true):
373 	pure nothrow @nogc:
374 
375 	@property ubyte front() const @trusted in(!empty)
376 		=> _buf.ptr[_bufIx];
377 	bool empty() const @property
378 		=> _bufIx == _bufReadLength;
379 
380 	/** Get current bufferFrontChunk.
381 		TODO: need better name for this
382 	 */
383 	inout(ubyte)[] bufferFrontChunk() inout @trusted in(!empty)
384 		=> _buf.ptr[_bufIx .. _bufReadLength];
385 
386 private:
387 	/* import bzlib : BZFILE, BZ2_bzopen, BZ2_bzread, BZ2_bzwrite, BZ2_bzclose; */
388 	pragma(lib, `bz2`);			 // Ubuntu: sudo apt-get install libbz2-dev
389 
390 	BZFILE* _f;
391 
392 	ubyte[] _buf;			   // block read buffer
393 
394 	// number of bytes in `_buf` recently read by `gzread`, normally equal to `_buf.length` except after last read where is it's normally less than `_buf.length`
395 	size_t _bufReadLength;
396 
397 	size_t _bufIx;			  // current stream read index in `_buf`
398 }
399 
400 private void testInputRange(FileInputRange)() @safe
401 if (isInputRange!FileInputRange)
402 {
403 	import std.stdio : File;
404 
405 	const path = FilePath(`test` ~ FileInputRange.defaultExtension);
406 
407 	const data = "abc\ndef\nghi"; // contents of source
408 
409 	foreach (const n; data.length .. data.length) /+ TODO: from 0 +/
410 	{
411 		const source = data[0 .. n]; // slice from the beginning
412 
413 		scope of = new GzipOut(File(path.str, `w`));
414 		of.compress(source);
415 		of.finish();
416 
417 		size_t ix = 0;
418 		foreach (e; FileInputRange(path))
419 		{
420 			assert(cast(char)e == source[ix]);
421 			++ix;
422 		}
423 
424 		import std.algorithm.searching : count;
425 		import std.algorithm.iteration : splitter;
426 		alias R = DecompressByLine!ZlibFileInputRange;
427 
428 		assert(new R(path).count == source.splitter('\n').count);
429 	}
430 }
431 
432 ///
433 @safe unittest {
434 	testInputRange!(GzipFileInputRange);
435 	testInputRange!(ZlibFileInputRange);
436 	testInputRange!(Bz2libFileInputRange);
437 }
438 
439 /** Read Age of Aqcuisitions.
440  */
441 static private void testReadAgeofAqcuisitions(in Path rootDirPath = Path(`~/Work/knet/knowledge/en/age-of-aqcuisition`)) @safe
442 {
443 	import std.path: expandTilde;
444 	import nxt.zio : DecompressByLine, GzipFileInputRange;
445 	import std.path : buildNormalizedPath;
446 
447 	{
448 		const path = FilePath(buildNormalizedPath(rootDirPath.str.expandTilde, `AoA_51715_words.csv.gz`));
449 		size_t count = 0;
450 		foreach (line; new DecompressByLine!GzipFileInputRange(path))
451 			count += 1;
452 		assert(count == 51716);
453 	}
454 
455 	{
456 		const path = FilePath(buildNormalizedPath(rootDirPath.str.expandTilde, `AoA_51715_words.csv.gz`));
457 		size_t count = 0;
458 		foreach (line; new DecompressByLine!ZlibFileInputRange(path))
459 			count += 1;
460 		assert(count == 51716);
461 	}
462 
463 	{
464 		const path = FilePath(buildNormalizedPath(rootDirPath.str.expandTilde, `AoA_51715_words_copy.csv.bz2`));
465 		size_t count = 0;
466 		foreach (line; new DecompressByLine!Bz2libFileInputRange(path))
467 			count += 1;
468 		assert(count == 51716);
469 	}
470 }
471 
472 /** Read Concept 5 assertions.
473  */
474 static private void testReadConcept5Assertions(in FilePath path = FilePath(`/home/per/Knowledge/ConceptNet5/latest/conceptnet-assertions-5.6.0.csv.gz`)) @safe
475 {
476 	alias R = ZlibFileInputRange;
477 
478 	import std.stdio: writeln;
479 	import std.range: take;
480 	import std.algorithm.searching: count;
481 
482 	const lineBlockCount = 100_000;
483 	size_t lineNr = 0;
484 	foreach (const line; new DecompressByLine!R(path))
485 	{
486 		if (lineNr % lineBlockCount == 0)
487 			writeln(`Line `, lineNr, ` read containing:`, line);
488 		lineNr += 1;
489 	}
490 
491 	const lineCount = 5;
492 	foreach (const line; new DecompressByLine!R(path).take(lineCount))
493 		writeln(line);
494 }
495 
496 /// benchmark DBpedia parsing
497 version (benchmark_zio)
498 static private void benchmarkDbpediaParsing(in Path rootPath = Path(`/home/per/Knowledge/DBpedia/latest`)) @system
499 {
500 	alias R = Bz2libFileInputRange;
501 
502 	import nxt.algorithm.searching : startsWith, endsWith;
503 	import std.algorithm : filter;
504 	import std.file : dirEntries, SpanMode;
505 	import std.path : baseName;
506 	import std.stdio : write, writeln, stdout;
507 	import std.datetime : MonoTime;
508 
509 	foreach (const pathStr; dirEntries(rootPath.str, SpanMode.depth).filter!(file => (file.name.baseName.startsWith(`instance_types`) &&
510 																			   file.name.endsWith(`.ttl.bz2`))))
511 	{
512 		write(`Checking `, pathStr, ` ... `); stdout.flush();
513 
514 		immutable before = MonoTime.currTime();
515 
516 		size_t lineCounter = 0;
517 		foreach (const line; new DecompressByLine!R(FilePath(pathStr)))
518 			lineCounter += 1;
519 
520 		immutable after = MonoTime.currTime();
521 
522 		showStat(pathStr, before, after, lineCounter);
523 	}
524 }
525 
526 /// Show statistics.
527 static private void showStat(T)(in const(char[]) tag,
528 								in T before,
529 								in T after,
530 								in size_t lineCount)
531 {
532 	import std.stdio : writefln;
533 	writefln(`%s: %3.1f msecs (%3.1f usecs/line)`,
534 			 tag,
535 			 cast(double)(after - before).total!`msecs`,
536 			 cast(double)(after - before).total!`usecs` / lineCount);
537 }
538 
539 version (unittest)
540 {
541 	import std.range.primitives : isInputRange;
542 }
543 
544 pragma(lib, "bz2");			 // Ubuntu: sudo apt-get install libbz2-dev
545 
546 extern(C) nothrow @nogc:
547 
548 enum BZ_RUN			   = 0;
549 enum BZ_FLUSH			 = 1;
550 enum BZ_FINISH			= 2;
551 
552 enum BZ_OK				= 0;
553 enum BZ_RUN_OK			= 1;
554 enum BZ_FLUSH_OK		  = 2;
555 enum BZ_FINISH_OK		 = 3;
556 enum BZ_STREAM_END		= 4;
557 enum BZ_SEQUENCE_ERROR	= -1;
558 enum BZ_PARAM_ERROR	   = -2;
559 enum BZ_MEM_ERROR		 = -3;
560 enum BZ_DATA_ERROR		= -4;
561 enum BZ_DATA_ERROR_MAGIC  = -5;
562 enum BZ_IO_ERROR		  = -6;
563 enum BZ_UNEXPECTED_EOF	= -7;
564 enum BZ_OUTBUFF_FULL	  = -8;
565 enum BZ_CONFIG_ERROR	  = -9;
566 
567 struct bz_stream
568 {
569 	ubyte* next_in;
570 	uint   avail_in;
571 	uint   total_in_lo32;
572 	uint   total_in_hi32;
573 
574 	ubyte* next_out;
575 	uint   avail_out;
576 	uint   total_out_lo32;
577 	uint   total_out_hi32;
578 
579 	void*  state;
580 
581 	void* function(void*, int, int) nothrow bzalloc;
582 	void  function(void*, void*) nothrow	bzfree;
583 	void* opaque;
584 }
585 
586 /*-- Core (low-level) library functions --*/
587 
588 int BZ2_bzCompressInit(bz_stream* strm,
589 					   int		blockSize100k,
590 					   int		verbosity,
591 					   int		workFactor);
592 
593 int BZ2_bzCompress(bz_stream* strm,
594 				   int action);
595 
596 int BZ2_bzCompressEnd(bz_stream* strm);
597 
598 int BZ2_bzDecompressInit(bz_stream* strm,
599 						 int		verbosity,
600 						 int		small);
601 
602 int BZ2_bzDecompress(bz_stream* strm);
603 
604 int BZ2_bzDecompressEnd(bz_stream *strm);
605 
606 /*-- High(er) level library functions --*/
607 
608 version (BZ_NO_STDIO) {}
609 else
610 {
611 	import core.stdc.stdio;
612 
613 	enum BZ_MAX_UNUSED = 5000;
614 
615 	struct BZFILE;
616 
617 	BZFILE* BZ2_bzReadOpen(int*  bzerror,
618 						   FILE* f,
619 						   int   verbosity,
620 						   int   small,
621 						   void* unused,
622 						   int   nUnused);
623 
624 	void BZ2_bzReadClose(int*	bzerror,
625 						 BZFILE* b);
626 
627 	void BZ2_bzReadGetUnused(int*	bzerror,
628 							 BZFILE* b,
629 							 void**  unused,
630 							 int*	nUnused);
631 
632 	int BZ2_bzRead(int*	bzerror,
633 				   BZFILE* b,
634 				   void*   buf,
635 				   int	 len);
636 
637 	BZFILE* BZ2_bzWriteOpen(int*  bzerror,
638 							FILE* f,
639 							int   blockSize100k,
640 							int   verbosity,
641 							int   workFactor
642 		);
643 
644 	void BZ2_bzWrite(int*	bzerror,
645 					 BZFILE* b,
646 					 void*   buf,
647 					 int	 len);
648 
649 	void BZ2_bzWriteClose(int*		  bzerror,
650 						  BZFILE*	   b,
651 						  int		   abandon,
652 						  uint*		 nbytes_in,
653 						  uint*		 nbytes_out);
654 
655 	void BZ2_bzWriteClose64(int*		  bzerror,
656 							BZFILE*	   b,
657 							int		   abandon,
658 							uint*		 nbytes_in_lo32,
659 							uint*		 nbytes_in_hi32,
660 							uint*		 nbytes_out_lo32,
661 							uint*		 nbytes_out_hi32);
662 }
663 
664 /*-- Utility functions --*/
665 
666 int BZ2_bzBuffToBuffCompress(ubyte*		dest,
667 							 uint*		 destLen,
668 							 ubyte*		source,
669 							 uint		  sourceLen,
670 							 int		   blockSize100k,
671 							 int		   verbosity,
672 							 int		   workFactor);
673 
674 int BZ2_bzBuffToBuffDecompress(ubyte*		dest,
675 							   uint*		 destLen,
676 							   ubyte*		source,
677 							   uint		  sourceLen,
678 							   int		   small,
679 							   int		   verbosity);
680 
681 
682 /*--
683   Code contributed by Yoshioka Tsuneo (tsuneo@rr.iij4u.or.jp)
684   to support better zlib compatibility.
685   This code is not _officially_ part of libbzip2 (yet);
686   I haven't tested it, documented it, or considered the
687   threading-safeness of it.
688   If this code breaks, please contact both Yoshioka and me.
689   --*/
690 
691 const(char)* BZ2_bzlibVersion();
692 
693 BZFILE* BZ2_bzopen(const scope const(char)* path,
694 				   const scope const(char)* mode);
695 
696 BZFILE * BZ2_bzdopen(int		  fd,
697 					 const scope const(char)* mode);
698 
699 int BZ2_bzread(scope BZFILE* b,
700 			   scope void*   buf,
701 			   int	 len);
702 
703 int BZ2_bzwrite(scope BZFILE* b,
704 				scope void*   buf,
705 				int	 len);
706 
707 int BZ2_bzflush(scope BZFILE* b);
708 
709 void BZ2_bzclose(scope BZFILE* b);
710 
711 const(char)* BZ2_bzerror(scope BZFILE *b,
712 						 int	*errnum);