1 /** File I/O of Compressed Files.
2  *
3  * See_Also: https://forum.dlang.org/post/jykarqycnrecajveqpos@forum.dlang.org
4  */
5 module nxt.zio;
6 
7 import nxt.path : Path, FilePath;
8 
9 version = benchmark_zio;
10 
11 @safe:
12 
13 struct GzipFileInputRange
14 {
15 	import std.stdio : File;
16 	import std.traits : ReturnType;
17 
18 	static immutable chunkSize = 0x4000;	/+ TODO: find optimal value via benchmark +/
19 	static immutable defaultExtension = `.gz`;
20 
21 	this(in FilePath path) @trusted
22 	{
23 		_f = File(path.str, `r`);
24 		_chunkRange = _f.byChunk(chunkSize);
25 		_uncompress = new UnCompress;
26 		loadNextChunk();
27 	}
28 
29 	void loadNextChunk() @trusted
30 	{
31 		if (!_chunkRange.empty)
32 		{
33 			_uncompressedBuf = cast(ubyte[])_uncompress.uncompress(_chunkRange.front);
34 			_chunkRange.popFront();
35 		}
36 		else
37 		{
38 			if (!_exhausted)
39 			{
40 				_uncompressedBuf = cast(ubyte[])_uncompress.flush();
41 				_exhausted = true;
42 			}
43 			else
44 			{
45 				_uncompressedBuf.length = 0;
46 			}
47 		}
48 		_bufIx = 0;
49 	}
50 
51 	void popFront()
52 	{
53 		_bufIx += 1;
54 		if (_bufIx >= _uncompressedBuf.length)
55 		{
56 			loadNextChunk();
57 		}
58 	}
59 
60 pragma(inline, true):
61 pure nothrow @safe @nogc:
62 
63 	@property ubyte front() const
64 	{
65 		return _uncompressedBuf[_bufIx];
66 	}
67 
68 	bool empty() const @property
69 	{
70 		return _uncompressedBuf.length == 0;
71 	}
72 
73 private:
74 	import std.zlib : UnCompress;
75 	UnCompress _uncompress;
76 	File _f;
77 	ReturnType!(_f.byChunk) _chunkRange;
78 	bool _exhausted;			///< True if exhausted.
79 	ubyte[] _uncompressedBuf;   ///< Uncompressed buffer.
80 	size_t _bufIx;			  ///< Current byte index into `_uncompressedBuf`.
81 }
82 
83 /** Is `true` iff `R` is a block input range.
84 	TODO: Move to std.range
85  */
86 private template isBlockInputRange(R)
87 {
88 	import std.range.primitives : isInputRange;
89 	enum isBlockInputRange = (isInputRange!R &&
90 							  __traits(hasMember, R, `bufferFrontChunk`) && /+ TODO: ask dlang for better naming +/
91 							  __traits(hasMember, R, `loadNextChunk`));	 /+ TODO: ask dlang for better naming +/
92 }
93 
94 /** Decompress `BlockInputRange` linewise.
95  */
96 class DecompressByLine(BlockInputRange)
97 {
98 	private alias E = char;
99 
100 	/** If `range` is of type `isBlockInputRange` decoding compressed files will
101 	 * be much faster.
102 	 */
103 	this(in FilePath path,
104 		 E separator = '\n',
105 		 in size_t initialCapacity = 80)
106 	{
107 		this._range = typeof(_range)(path);
108 		this._separator = separator;
109 		static if (__traits(hasMember, typeof(_lbuf), `withCapacity`))
110 			this._lbuf = typeof(_lbuf).withCapacity(initialCapacity);
111 		popFront();
112 	}
113 
114 	void popFront() @trusted
115 	{
116 		_lbuf.shrinkTo(0);
117 
118 		static if (isBlockInputRange!(typeof(_range)))
119 		{
120 			/+ TODO: functionize +/
121 			while (!_range.empty)
122 			{
123 				ubyte[] currentFronts = _range.bufferFrontChunk;
124 				// `_range` is mutable so sentinel-based search can kick
125 
126 				static immutable useCountUntil = false;
127 				static if (useCountUntil)
128 				{
129 					import std.algorithm.searching : countUntil;
130 					// TODO
131 				}
132 				else
133 				{
134 					import std.algorithm.searching : find;
135 					const hit = currentFronts.find(_separator); // or use `indexOf`
136 				}
137 
138 				if (hit.length)
139 				{
140 					const lineLength = hit.ptr - currentFronts.ptr;
141 					_lbuf.put(currentFronts[0 .. lineLength]); // add everything up to separator
142 					_range._bufIx += lineLength + _separator.sizeof; // advancement + separator
143 					if (_range.empty)
144 						_range.loadNextChunk();
145 					break;	  // done
146 				}
147 				else			// no separator yet
148 				{
149 					_lbuf.put(currentFronts); // so just add everything
150 					_range.loadNextChunk();
151 				}
152 			}
153 		}
154 		else
155 		{
156 			/+ TODO: sentinel-based search for `_separator` in `_range` +/
157 			while (!_range.empty &&
158 				   _range.front != _separator)
159 			{
160 				_lbuf.put(_range.front);
161 				_range.popFront();
162 			}
163 
164 			if (!_range.empty &&
165 				_range.front == _separator)
166 			{
167 				_range.popFront();  // pop separator
168 			}
169 		}
170 	}
171 
172 	pragma(inline):
173 	pure nothrow @safe @nogc:
174 
175 	bool empty() const @property
176 	{
177 		return _lbuf.data.length == 0;
178 	}
179 
180 	const(E)[] front() const return scope
181 	{
182 		return _lbuf.data;
183 	}
184 
185 private:
186 	BlockInputRange _range;
187 
188 	import std.array : Appender;
189 	Appender!(E[]) _lbuf;	   // line buffer
190 
191 	// NOTE this is slower for ldc:
192 	// import nxt.container.dynamic_array : Array;
193 	// Array!E _lbuf;
194 
195 	E _separator;
196 }
197 
198 class GzipOut
199 {
200 	import std.zlib: Compress, HeaderFormat;
201 	import std.stdio: File;
202 
203 	this(File file) @trusted
204 	{
205 		_f = file;
206 		_compress = new Compress(HeaderFormat.gzip);
207 	}
208 
209 	void compress(const string s) @trusted
210 	{
211 		auto compressed = _compress.compress(s);
212 		_f.rawWrite(compressed);
213 	}
214 
215 	void finish() @trusted
216 	{
217 		auto compressed = _compress.flush;
218 		_f.rawWrite(compressed);
219 		_f.close;
220 	}
221 
222 private:
223 	Compress _compress;
224 	File _f;
225 }
226 
227 struct ZlibFileInputRange
228 {
229 	import std.file : FileException;
230 
231 	/* Zlib docs:
232 	   CHUNK is simply the buffer size for feeding data to and pulling data from
233 	   the zlib routines. Larger buffer sizes would be more efficient,
234 	   especially for inflate(). If the memory is available, buffers sizes on
235 	   the order of 128K or 256K bytes should be used.
236 	*/
237 	static immutable chunkSize = 128 * 1024; // 128K
238 
239 	static immutable defaultExtension = `.gz`;
240 
241 	@safe:
242 
243 	this(in FilePath path) @trusted
244 	{
245 		import std.string : toStringz; /+ TODO: avoid GC allocation by looking at how gmp-d z.d solves it +/
246 		_f = gzopen(path.str.toStringz, `rb`);
247 		if (!_f)
248 			throw new FileException(`Couldn't open file ` ~ path.str.idup);
249 		_buf = new ubyte[chunkSize];
250 		loadNextChunk();
251 	}
252 
253 	~this() nothrow @trusted @nogc
254 	{
255 		const int ret = gzclose(_f);
256 		if (ret < 0)
257 			assert(0, `Couldn't close file`); /+ TODO: replace with non-GC-allocated exception +/
258 	}
259 
260 	this(this) @disable;
261 
262 	void loadNextChunk() @trusted
263 	{
264 		int count = gzread(_f, _buf.ptr, chunkSize);
265 		if (count == -1)
266 			throw new Exception(`Error decoding file`);
267 		_bufIx = 0;
268 		_bufReadLength = count;
269 	}
270 
271 	void popFront() in(!empty)
272 	{
273 		_bufIx += 1;
274 		if (_bufIx >= _bufReadLength)
275 		{
276 			loadNextChunk();
277 			_bufIx = 0;		 // restart counter
278 		}
279 	}
280 
281 pragma(inline, true):
282 pure nothrow @nogc:
283 
284 	@property ubyte front() const @trusted in(!empty) => _buf.ptr[_bufIx];
285 	bool empty() const @property => _bufIx == _bufReadLength;
286 
287 	/** Get current bufferFrontChunk.
288 		TODO: need better name for this
289 	 */
290 	inout(ubyte)[] bufferFrontChunk() inout @trusted in(!empty) => _buf.ptr[_bufIx .. _bufReadLength];
291 
292 private:
293 	import etc.c.zlib : gzFile, gzopen, gzclose, gzread;
294 
295 	gzFile _f;
296 
297 	ubyte[] _buf;			   // block read buffer
298 
299 	// number of bytes in `_buf` recently read by `gzread`, normally equal to `_buf.length` except after last read where is it's normally less than `_buf.length`
300 	size_t _bufReadLength;
301 
302 	size_t _bufIx;			  // current stream read index in `_buf`
303 
304 	/+ TODO: make this work: +/
305 	// extern (C) nothrow @nogc:
306 	// pragma(mangle, `gzopen`) gzFile gzopen(const(char)* path, const(char)* mode);
307 	// pragma(mangle, `gzclose`) int gzclose(gzFile file);
308 	// pragma(mangle, `gzread`) int gzread(gzFile file, void* buf, uint len);
309 }
310 
311 struct Bz2libFileInputRange
312 {
313 	import std.file : FileException;
314 
315 	static immutable chunkSize = 128 * 1024; // 128K. TODO: find optimal value via benchmark
316 	static immutable defaultExtension = `.bz2`;
317 	static immutable useGC = false;		 /+ TODO: generalize to allocator parameter +/
318 
319 @safe:
320 
321 	this(in FilePath path) @trusted
322 	{
323 		import std.string : toStringz; /+ TODO: avoid GC allocation by looking at how gmp-d z.d solves it +/
324 		_f = BZ2_bzopen(path.str.toStringz, `rb`);
325 		if (!_f)
326 			throw new FileException(`Couldn't open file ` ~ path.str.idup);
327 
328 		static if (useGC)
329 			_buf = new ubyte[chunkSize];
330 		else
331 		{
332 			import core.memory : pureMalloc;
333 			_buf = (cast(ubyte*)pureMalloc(chunkSize))[0 .. chunkSize];
334 		}
335 
336 		loadNextChunk();
337 	}
338 
339 	~this() nothrow @trusted @nogc
340 	{
341 		BZ2_bzclose(_f);	   /+ TODO: error handling? +/
342 
343 		static if (!useGC)
344 		{
345 			import core.memory : pureFree;
346 			pureFree(_buf.ptr);
347 		}
348 	}
349 
350 	this(this) @disable;
351 
352 	void loadNextChunk() @trusted
353 	{
354 		int count = BZ2_bzread(_f, _buf.ptr, chunkSize);
355 		if (count == -1)
356 			throw new Exception(`Error decoding file`);
357 		_bufIx = 0;
358 		_bufReadLength = count;
359 	}
360 
361 	void popFront() in(!empty)
362 	{
363 		_bufIx += 1;
364 		if (_bufIx >= _bufReadLength)
365 		{
366 			loadNextChunk();
367 			_bufIx = 0;		 // restart counter
368 		}
369 	}
370 
371 	pragma(inline, true):
372 	pure nothrow @nogc:
373 
374 	@property ubyte front() const @trusted in(!empty)
375 		=> _buf.ptr[_bufIx];
376 	bool empty() const @property
377 		=> _bufIx == _bufReadLength;
378 
379 	/** Get current bufferFrontChunk.
380 		TODO: need better name for this
381 	 */
382 	inout(ubyte)[] bufferFrontChunk() inout @trusted in(!empty)
383 		=> _buf.ptr[_bufIx .. _bufReadLength];
384 
385 private:
386 	/* import bzlib : BZFILE, BZ2_bzopen, BZ2_bzread, BZ2_bzwrite, BZ2_bzclose; */
387 	pragma(lib, `bz2`);			 // Ubuntu: sudo apt-get install libbz2-dev
388 
389 	BZFILE* _f;
390 
391 	ubyte[] _buf;			   // block read buffer
392 
393 	// number of bytes in `_buf` recently read by `gzread`, normally equal to `_buf.length` except after last read where is it's normally less than `_buf.length`
394 	size_t _bufReadLength;
395 
396 	size_t _bufIx;			  // current stream read index in `_buf`
397 }
398 
399 private void testInputRange(FileInputRange)() @safe
400 if (isInputRange!FileInputRange)
401 {
402 	import std.stdio : File;
403 
404 	const path = FilePath(`test` ~ FileInputRange.defaultExtension);
405 
406 	const data = "abc\ndef\nghi"; // contents of source
407 
408 	foreach (const n; data.length .. data.length) /+ TODO: from 0 +/
409 	{
410 		const source = data[0 .. n]; // slice from the beginning
411 
412 		scope of = new GzipOut(File(path.str, `w`));
413 		of.compress(source);
414 		of.finish();
415 
416 		size_t ix = 0;
417 		foreach (e; FileInputRange(path))
418 		{
419 			assert(cast(char)e == source[ix]);
420 			++ix;
421 		}
422 
423 		import std.algorithm.searching : count;
424 		import std.algorithm.iteration : splitter;
425 		alias R = DecompressByLine!ZlibFileInputRange;
426 
427 		assert(new R(path).count == source.splitter('\n').count);
428 	}
429 }
430 
431 ///
432 @safe unittest {
433 	testInputRange!(GzipFileInputRange);
434 	testInputRange!(ZlibFileInputRange);
435 	testInputRange!(Bz2libFileInputRange);
436 }
437 
438 /** Read Age of Aqcuisitions.
439  */
440 static private void testReadAgeofAqcuisitions(in Path rootDirPath = Path(`~/Work/knet/knowledge/en/age-of-aqcuisition`)) @safe
441 {
442 	import std.path: expandTilde;
443 	import nxt.zio : DecompressByLine, GzipFileInputRange;
444 	import std.path : buildNormalizedPath;
445 
446 	{
447 		const path = FilePath(buildNormalizedPath(rootDirPath.str.expandTilde, `AoA_51715_words.csv.gz`));
448 		size_t count = 0;
449 		foreach (line; new DecompressByLine!GzipFileInputRange(path))
450 			count += 1;
451 		assert(count == 51716);
452 	}
453 
454 	{
455 		const path = FilePath(buildNormalizedPath(rootDirPath.str.expandTilde, `AoA_51715_words.csv.gz`));
456 		size_t count = 0;
457 		foreach (line; new DecompressByLine!ZlibFileInputRange(path))
458 			count += 1;
459 		assert(count == 51716);
460 	}
461 
462 	{
463 		const path = FilePath(buildNormalizedPath(rootDirPath.str.expandTilde, `AoA_51715_words_copy.csv.bz2`));
464 		size_t count = 0;
465 		foreach (line; new DecompressByLine!Bz2libFileInputRange(path))
466 			count += 1;
467 		assert(count == 51716);
468 	}
469 }
470 
471 /** Read Concept 5 assertions.
472  */
473 static private void testReadConcept5Assertions(in FilePath path = FilePath(`/home/per/Knowledge/ConceptNet5/latest/conceptnet-assertions-5.6.0.csv.gz`)) @safe
474 {
475 	alias R = ZlibFileInputRange;
476 
477 	import std.stdio: writeln;
478 	import std.range: take;
479 	import std.algorithm.searching: count;
480 
481 	const lineBlockCount = 100_000;
482 	size_t lineNr = 0;
483 	foreach (const line; new DecompressByLine!R(path))
484 	{
485 		if (lineNr % lineBlockCount == 0)
486 			writeln(`Line `, lineNr, ` read containing:`, line);
487 		lineNr += 1;
488 	}
489 
490 	const lineCount = 5;
491 	foreach (const line; new DecompressByLine!R(path).take(lineCount))
492 		writeln(line);
493 }
494 
495 /// benchmark DBpedia parsing
496 version (benchmark_zio)
497 static private void benchmarkDbpediaParsing(in Path rootPath = Path(`/home/per/Knowledge/DBpedia/latest`)) @system
498 {
499 	alias R = Bz2libFileInputRange;
500 
501 	import nxt.algorithm.searching : startsWith, endsWith;
502 	import std.algorithm : filter;
503 	import std.file : dirEntries, SpanMode;
504 	import std.path : baseName;
505 	import std.stdio : write, writeln, stdout;
506 	import std.datetime : MonoTime;
507 
508 	foreach (const pathStr; dirEntries(rootPath.str, SpanMode.depth).filter!(file => (file.name.baseName.startsWith(`instance_types`) &&
509 																			   file.name.endsWith(`.ttl.bz2`))))
510 	{
511 		write(`Checking `, pathStr, ` ... `); stdout.flush();
512 
513 		immutable before = MonoTime.currTime();
514 
515 		size_t lineCounter = 0;
516 		foreach (const line; new DecompressByLine!R(FilePath(pathStr)))
517 			lineCounter += 1;
518 
519 		immutable after = MonoTime.currTime();
520 
521 		showStat(pathStr, before, after, lineCounter);
522 	}
523 }
524 
525 /// Show statistics.
526 static private void showStat(T)(in const(char[]) tag,
527 								in T before,
528 								in T after,
529 								in size_t lineCount)
530 {
531 	import std.stdio : writefln;
532 	writefln(`%s: %3.1f msecs (%3.1f usecs/line)`,
533 			 tag,
534 			 cast(double)(after - before).total!`msecs`,
535 			 cast(double)(after - before).total!`usecs` / lineCount);
536 }
537 
538 version (unittest)
539 {
540 	import std.range.primitives : isInputRange;
541 }
542 
543 pragma(lib, "bz2");			 // Ubuntu: sudo apt-get install libbz2-dev
544 
545 extern(C) nothrow @nogc:
546 
547 enum BZ_RUN			   = 0;
548 enum BZ_FLUSH			 = 1;
549 enum BZ_FINISH			= 2;
550 
551 enum BZ_OK				= 0;
552 enum BZ_RUN_OK			= 1;
553 enum BZ_FLUSH_OK		  = 2;
554 enum BZ_FINISH_OK		 = 3;
555 enum BZ_STREAM_END		= 4;
556 enum BZ_SEQUENCE_ERROR	= -1;
557 enum BZ_PARAM_ERROR	   = -2;
558 enum BZ_MEM_ERROR		 = -3;
559 enum BZ_DATA_ERROR		= -4;
560 enum BZ_DATA_ERROR_MAGIC  = -5;
561 enum BZ_IO_ERROR		  = -6;
562 enum BZ_UNEXPECTED_EOF	= -7;
563 enum BZ_OUTBUFF_FULL	  = -8;
564 enum BZ_CONFIG_ERROR	  = -9;
565 
566 struct bz_stream
567 {
568 	ubyte* next_in;
569 	uint   avail_in;
570 	uint   total_in_lo32;
571 	uint   total_in_hi32;
572 
573 	ubyte* next_out;
574 	uint   avail_out;
575 	uint   total_out_lo32;
576 	uint   total_out_hi32;
577 
578 	void*  state;
579 
580 	void* function(void*, int, int) nothrow bzalloc;
581 	void  function(void*, void*) nothrow	bzfree;
582 	void* opaque;
583 }
584 
585 /*-- Core (low-level) library functions --*/
586 
587 int BZ2_bzCompressInit(bz_stream* strm,
588 					   int		blockSize100k,
589 					   int		verbosity,
590 					   int		workFactor);
591 
592 int BZ2_bzCompress(bz_stream* strm,
593 				   int action);
594 
595 int BZ2_bzCompressEnd(bz_stream* strm);
596 
597 int BZ2_bzDecompressInit(bz_stream* strm,
598 						 int		verbosity,
599 						 int		small);
600 
601 int BZ2_bzDecompress(bz_stream* strm);
602 
603 int BZ2_bzDecompressEnd(bz_stream *strm);
604 
605 /*-- High(er) level library functions --*/
606 
607 version (BZ_NO_STDIO) {}
608 else
609 {
610 	import core.stdc.stdio;
611 
612 	enum BZ_MAX_UNUSED = 5000;
613 
614 	struct BZFILE;
615 
616 	BZFILE* BZ2_bzReadOpen(int*  bzerror,
617 						   FILE* f,
618 						   int   verbosity,
619 						   int   small,
620 						   void* unused,
621 						   int   nUnused);
622 
623 	void BZ2_bzReadClose(int*	bzerror,
624 						 BZFILE* b);
625 
626 	void BZ2_bzReadGetUnused(int*	bzerror,
627 							 BZFILE* b,
628 							 void**  unused,
629 							 int*	nUnused);
630 
631 	int BZ2_bzRead(int*	bzerror,
632 				   BZFILE* b,
633 				   void*   buf,
634 				   int	 len);
635 
636 	BZFILE* BZ2_bzWriteOpen(int*  bzerror,
637 							FILE* f,
638 							int   blockSize100k,
639 							int   verbosity,
640 							int   workFactor
641 		);
642 
643 	void BZ2_bzWrite(int*	bzerror,
644 					 BZFILE* b,
645 					 void*   buf,
646 					 int	 len);
647 
648 	void BZ2_bzWriteClose(int*		  bzerror,
649 						  BZFILE*	   b,
650 						  int		   abandon,
651 						  uint*		 nbytes_in,
652 						  uint*		 nbytes_out);
653 
654 	void BZ2_bzWriteClose64(int*		  bzerror,
655 							BZFILE*	   b,
656 							int		   abandon,
657 							uint*		 nbytes_in_lo32,
658 							uint*		 nbytes_in_hi32,
659 							uint*		 nbytes_out_lo32,
660 							uint*		 nbytes_out_hi32);
661 }
662 
663 /*-- Utility functions --*/
664 
665 int BZ2_bzBuffToBuffCompress(ubyte*		dest,
666 							 uint*		 destLen,
667 							 ubyte*		source,
668 							 uint		  sourceLen,
669 							 int		   blockSize100k,
670 							 int		   verbosity,
671 							 int		   workFactor);
672 
673 int BZ2_bzBuffToBuffDecompress(ubyte*		dest,
674 							   uint*		 destLen,
675 							   ubyte*		source,
676 							   uint		  sourceLen,
677 							   int		   small,
678 							   int		   verbosity);
679 
680 
681 /*--
682   Code contributed by Yoshioka Tsuneo (tsuneo@rr.iij4u.or.jp)
683   to support better zlib compatibility.
684   This code is not _officially_ part of libbzip2 (yet);
685   I haven't tested it, documented it, or considered the
686   threading-safeness of it.
687   If this code breaks, please contact both Yoshioka and me.
688   --*/
689 
690 const(char)* BZ2_bzlibVersion();
691 
692 BZFILE* BZ2_bzopen(const scope const(char)* path,
693 				   const scope const(char)* mode);
694 
695 BZFILE * BZ2_bzdopen(int		  fd,
696 					 const scope const(char)* mode);
697 
698 int BZ2_bzread(scope BZFILE* b,
699 			   scope void*   buf,
700 			   int	 len);
701 
702 int BZ2_bzwrite(scope BZFILE* b,
703 				scope void*   buf,
704 				int	 len);
705 
706 int BZ2_bzflush(scope BZFILE* b);
707 
708 void BZ2_bzclose(scope BZFILE* b);
709 
710 const(char)* BZ2_bzerror(scope BZFILE *b,
711 						 int	*errnum);