1 /** File I/O of Compressed Files. 2 * 3 * See_Also: https://forum.dlang.org/post/jykarqycnrecajveqpos@forum.dlang.org 4 * 5 * TODO: Add to dub a package having the dub.sdl line: libs "z" "bz2" "zstd" 6 */ 7 module nxt.zio; 8 9 version = benchmark_zio; 10 11 @safe: 12 13 struct GzipFileInputRange 14 { 15 import std.stdio : File; 16 import std.traits : ReturnType; 17 18 enum chunkSize = 0x4000; // TODO: find optimal value via benchmark 19 20 enum defaultExtension = `.gz`; 21 22 this(in char[] path) @trusted 23 { 24 _f = File(path, `r`); 25 _chunkRange = _f.byChunk(chunkSize); 26 _uncompress = new UnCompress; 27 loadNextChunk(); 28 } 29 30 void loadNextChunk() @trusted 31 { 32 if (!_chunkRange.empty) 33 { 34 _uncompressedBuf = cast(ubyte[])_uncompress.uncompress(_chunkRange.front); 35 _chunkRange.popFront(); 36 } 37 else 38 { 39 if (!_exhausted) 40 { 41 _uncompressedBuf = cast(ubyte[])_uncompress.flush(); 42 _exhausted = true; 43 } 44 else 45 { 46 _uncompressedBuf.length = 0; 47 } 48 } 49 _bufIx = 0; 50 } 51 52 void popFront() 53 { 54 _bufIx += 1; 55 if (_bufIx >= _uncompressedBuf.length) 56 { 57 loadNextChunk(); 58 } 59 } 60 61 pragma(inline, true): 62 @safe pure nothrow @nogc: 63 64 @property ubyte front() const 65 { 66 return _uncompressedBuf[_bufIx]; 67 } 68 69 bool empty() const @property 70 { 71 return _uncompressedBuf.length == 0; 72 } 73 74 private: 75 import std.zlib : UnCompress; 76 UnCompress _uncompress; 77 File _f; 78 ReturnType!(_f.byChunk) _chunkRange; 79 bool _exhausted; ///< True if exhausted. 80 ubyte[] _uncompressedBuf; ///< Uncompressed buffer. 81 size_t _bufIx; ///< Current byte index into `_uncompressedBuf`. 82 } 83 84 /** Is `true` iff `R` is a block input range. 85 TODO: Move to std.range 86 */ 87 private template isBlockInputRange(R) 88 { 89 import std.range.primitives : isInputRange; 90 enum isBlockInputRange = (isInputRange!R && 91 __traits(hasMember, R, `bufferFrontChunk`) && // TODO: ask dlang for better naming 92 __traits(hasMember, R, `loadNextChunk`)); // TODO: ask dlang for better naming 93 } 94 95 /** Decompress `BlockInputRange` linewise. 96 */ 97 class DecompressByLine(BlockInputRange) 98 { 99 private alias E = char; 100 101 /** If `range` is of type `isBlockInputRange` decoding compressed files will 102 * be much faster. 103 */ 104 this(in const(char)[] range, 105 E separator = '\n', 106 in size_t initialCapacity = 80) 107 { 108 this._range = typeof(_range)(range); 109 this._separator = separator; 110 static if (__traits(hasMember, typeof(_lbuf), `withCapacity`)) 111 this._lbuf = typeof(_lbuf).withCapacity(initialCapacity); 112 popFront(); 113 } 114 115 void popFront() @trusted 116 { 117 _lbuf.shrinkTo(0); 118 119 static if (isBlockInputRange!(typeof(_range))) 120 { 121 // TODO: functionize 122 while (!_range.empty) 123 { 124 ubyte[] currentFronts = _range.bufferFrontChunk; 125 // `_range` is mutable so sentinel-based search can kick 126 127 enum useCountUntil = false; 128 static if (useCountUntil) 129 { 130 import std.algorithm.searching : countUntil; 131 // TODO 132 } 133 else 134 { 135 import std.algorithm.searching : find; 136 const hit = currentFronts.find(_separator); // or use `indexOf` 137 } 138 139 if (hit.length) 140 { 141 const lineLength = hit.ptr - currentFronts.ptr; 142 _lbuf.put(currentFronts[0 .. lineLength]); // add everything up to separator 143 _range._bufIx += lineLength + _separator.sizeof; // advancement + separator 144 if (_range.empty) 145 _range.loadNextChunk(); 146 break; // done 147 } 148 else // no separator yet 149 { 150 _lbuf.put(currentFronts); // so just add everything 151 _range.loadNextChunk(); 152 } 153 } 154 } 155 else 156 { 157 // TODO: sentinel-based search for `_separator` in `_range` 158 while (!_range.empty && 159 _range.front != _separator) 160 { 161 _lbuf.put(_range.front); 162 _range.popFront(); 163 } 164 165 if (!_range.empty && 166 _range.front == _separator) 167 { 168 _range.popFront(); // pop separator 169 } 170 } 171 } 172 173 pragma(inline): 174 @safe pure nothrow @nogc: 175 176 bool empty() const @property 177 { 178 return _lbuf.data.length == 0; 179 } 180 181 const(E)[] front() const return scope 182 { 183 return _lbuf.data; 184 } 185 186 private: 187 BlockInputRange _range; 188 189 import std.array : Appender; 190 Appender!(E[]) _lbuf; // line buffer 191 192 // NOTE this is slower for ldc: 193 // import nxt.container.dynamic_array : Array; 194 // Array!E _lbuf; 195 196 E _separator; 197 } 198 199 class GzipOut 200 { 201 import std.zlib: Compress, HeaderFormat; 202 import std.stdio: File; 203 204 this(File file) @trusted 205 { 206 _f = file; 207 _compress = new Compress(HeaderFormat.gzip); 208 } 209 210 void compress(const string s) @trusted 211 { 212 auto compressed = _compress.compress(s); 213 _f.rawWrite(compressed); 214 } 215 216 void finish() @trusted 217 { 218 auto compressed = _compress.flush; 219 _f.rawWrite(compressed); 220 _f.close; 221 } 222 223 private: 224 Compress _compress; 225 File _f; 226 } 227 228 struct ZlibFileInputRange 229 { 230 import std.file : FileException; 231 232 /* Zlib docs: 233 CHUNK is simply the buffer size for feeding data to and pulling data from 234 the zlib routines. Larger buffer sizes would be more efficient, 235 especially for inflate(). If the memory is available, buffers sizes on 236 the order of 128K or 256K bytes should be used. 237 */ 238 enum chunkSize = 128 * 1024; // 128K 239 240 enum defaultExtension = `.gz`; 241 242 @safe: 243 244 this(in char[] path) @trusted 245 { 246 import std.string : toStringz; // TODO: avoid GC allocation by looking at how gmp-d z.d solves it 247 _f = gzopen(path.toStringz, `rb`); 248 if (!_f) 249 throw new FileException(`Couldn't open file ` ~ path.idup); 250 _buf = new ubyte[chunkSize]; 251 loadNextChunk(); 252 } 253 254 ~this() nothrow @trusted @nogc 255 { 256 const int ret = gzclose(_f); 257 if (ret < 0) 258 assert(`Couldn't close file`); // TODO: replace with non-GC-allocated exception 259 } 260 261 @disable this(this); 262 263 void loadNextChunk() @trusted 264 { 265 int count = gzread(_f, _buf.ptr, chunkSize); 266 if (count == -1) 267 throw new Exception(`Error decoding file`); 268 _bufIx = 0; 269 _bufReadLength = count; 270 } 271 272 void popFront() in(!empty) 273 { 274 _bufIx += 1; 275 if (_bufIx >= _bufReadLength) 276 { 277 loadNextChunk(); 278 _bufIx = 0; // restart counter 279 } 280 } 281 282 pragma(inline, true): 283 pure nothrow @nogc: 284 285 @property ubyte front() const @trusted in(!empty) => _buf.ptr[_bufIx]; 286 bool empty() const @property => _bufIx == _bufReadLength; 287 288 /** Get current bufferFrontChunk. 289 TODO: need better name for this 290 */ 291 inout(ubyte)[] bufferFrontChunk() inout @trusted in(!empty) => _buf.ptr[_bufIx .. _bufReadLength]; 292 293 private: 294 import etc.c.zlib : gzFile, gzopen, gzclose, gzread; 295 296 gzFile _f; 297 298 ubyte[] _buf; // block read buffer 299 300 // number of bytes in `_buf` recently read by `gzread`, normally equal to `_buf.length` except after last read where is it's normally less than `_buf.length` 301 size_t _bufReadLength; 302 303 size_t _bufIx; // current stream read index in `_buf` 304 305 // TODO: make this work: 306 // extern (C) nothrow @nogc: 307 // pragma(mangle, `gzopen`) gzFile gzopen(const(char)* path, const(char)* mode); 308 // pragma(mangle, `gzclose`) int gzclose(gzFile file); 309 // pragma(mangle, `gzread`) int gzread(gzFile file, void* buf, uint len); 310 } 311 312 struct Bz2libFileInputRange 313 { 314 import std.file : FileException; 315 316 enum chunkSize = 128 * 1024; // 128K. TODO: find optimal value via benchmark 317 enum defaultExtension = `.bz2`; 318 enum useGC = false; // TODO: generalize to allocator parameter 319 320 @safe: 321 322 this(in char[] path) @trusted 323 { 324 import std.string : toStringz; // TODO: avoid GC allocation by looking at how gmp-d z.d solves it 325 _f = BZ2_bzopen(path.toStringz, `rb`); 326 if (!_f) 327 throw new FileException(`Couldn't open file ` ~ path.idup); 328 329 static if (useGC) 330 _buf = new ubyte[chunkSize]; 331 else 332 { 333 import core.memory : pureMalloc; 334 _buf = (cast(ubyte*)pureMalloc(chunkSize))[0 .. chunkSize]; 335 } 336 337 loadNextChunk(); 338 } 339 340 ~this() nothrow @trusted @nogc 341 { 342 BZ2_bzclose(_f); // TODO: error handling? 343 344 static if (!useGC) 345 { 346 import core.memory : pureFree; 347 pureFree(_buf.ptr); 348 } 349 } 350 351 @disable this(this); 352 353 void loadNextChunk() @trusted 354 { 355 int count = BZ2_bzread(_f, _buf.ptr, chunkSize); 356 if (count == -1) 357 throw new Exception(`Error decoding file`); 358 _bufIx = 0; 359 _bufReadLength = count; 360 } 361 362 void popFront() in(!empty) 363 { 364 _bufIx += 1; 365 if (_bufIx >= _bufReadLength) 366 { 367 loadNextChunk(); 368 _bufIx = 0; // restart counter 369 } 370 } 371 372 pragma(inline, true): 373 pure nothrow @nogc: 374 375 @property ubyte front() const @trusted in(!empty) 376 => _buf.ptr[_bufIx]; 377 bool empty() const @property 378 => _bufIx == _bufReadLength; 379 380 /** Get current bufferFrontChunk. 381 TODO: need better name for this 382 */ 383 inout(ubyte)[] bufferFrontChunk() inout @trusted in(!empty) 384 => _buf.ptr[_bufIx .. _bufReadLength]; 385 386 private: 387 /* import bzlib : BZFILE, BZ2_bzopen, BZ2_bzread, BZ2_bzwrite, BZ2_bzclose; */ 388 pragma(lib, `bz2`); // Ubuntu: sudo apt-get install libbz2-dev 389 390 BZFILE* _f; 391 392 ubyte[] _buf; // block read buffer 393 394 // number of bytes in `_buf` recently read by `gzread`, normally equal to `_buf.length` except after last read where is it's normally less than `_buf.length` 395 size_t _bufReadLength; 396 397 size_t _bufIx; // current stream read index in `_buf` 398 } 399 400 private void testInputRange(FileInputRange)() @safe 401 if (isInputRange!FileInputRange) 402 { 403 import std.stdio : File; 404 405 enum path = `test` ~ FileInputRange.defaultExtension; 406 407 const data = "abc\ndef\nghi"; // contents of source 408 409 foreach (const n; data.length .. data.length) // TODO: from 0 410 { 411 const source = data[0 .. n]; // slice from the beginning 412 413 scope of = new GzipOut(File(path, `w`)); 414 of.compress(source); 415 of.finish(); 416 417 size_t ix = 0; 418 foreach (e; FileInputRange(path)) 419 { 420 assert(cast(char)e == source[ix]); 421 ++ix; 422 } 423 424 import std.algorithm.searching : count; 425 import std.algorithm.iteration : splitter; 426 alias R = DecompressByLine!ZlibFileInputRange; 427 428 assert(new R(path).count == source.splitter('\n').count); 429 } 430 } 431 432 @safe unittest 433 { 434 testInputRange!(GzipFileInputRange); 435 testInputRange!(ZlibFileInputRange); 436 testInputRange!(Bz2libFileInputRange); 437 } 438 439 /** Read Age of Aqcuisitions. 440 */ 441 static private void testReadAgeofAqcuisitions(const string rootDirPath = `~/Work/knet/knowledge/en/age-of-aqcuisition`) @safe 442 { 443 import std.path: expandTilde; 444 import nxt.zio : DecompressByLine, GzipFileInputRange; 445 import std.path : buildNormalizedPath; 446 447 { 448 const path = buildNormalizedPath(rootDirPath.expandTilde, 449 `AoA_51715_words.csv.gz`); 450 size_t count = 0; 451 foreach (line; new DecompressByLine!GzipFileInputRange(path)) 452 count += 1; 453 assert(count == 51716); 454 } 455 456 { 457 const path = buildNormalizedPath(rootDirPath.expandTilde, 458 `AoA_51715_words.csv.gz`); 459 size_t count = 0; 460 foreach (line; new DecompressByLine!ZlibFileInputRange(path)) 461 count += 1; 462 assert(count == 51716); 463 } 464 465 { 466 const path = buildNormalizedPath(rootDirPath.expandTilde, 467 `AoA_51715_words_copy.csv.bz2`); 468 size_t count = 0; 469 foreach (line; new DecompressByLine!Bz2libFileInputRange(path)) 470 count += 1; 471 assert(count == 51716); 472 } 473 } 474 475 /** Read Concept 5 assertions. 476 */ 477 static private void testReadConcept5Assertions(const string path = `/home/per/Knowledge/ConceptNet5/latest/conceptnet-assertions-5.6.0.csv.gz`) @safe 478 { 479 alias R = ZlibFileInputRange; 480 481 import std.stdio: writeln; 482 import std.range: take; 483 import std.algorithm.searching: count; 484 485 const lineBlockCount = 100_000; 486 size_t lineNr = 0; 487 foreach (const line; new DecompressByLine!R(path)) 488 { 489 if (lineNr % lineBlockCount == 0) 490 writeln(`Line `, lineNr, ` read containing:`, line); 491 lineNr += 1; 492 } 493 494 const lineCount = 5; 495 foreach (const line; new DecompressByLine!R(path).take(lineCount)) 496 writeln(line); 497 } 498 499 /// benchmark DBpedia parsing 500 version(benchmark_zio) 501 static private void benchmarkDbpediaParsing(const string rootPath = `/home/per/Knowledge/DBpedia/latest`) @system 502 { 503 alias R = Bz2libFileInputRange; 504 505 import nxt.array_algorithm : startsWith, endsWith; 506 import std.algorithm : filter; 507 import std.file : dirEntries, SpanMode; 508 import std.path : baseName; 509 import std.stdio : write, writeln, stdout; 510 import std.datetime : MonoTime; 511 512 foreach (const path; dirEntries(rootPath, SpanMode.depth).filter!(file => (file.name.baseName.startsWith(`instance_types`) && 513 file.name.endsWith(`.ttl.bz2`)))) 514 { 515 write(`Checking `, path, ` ... `); stdout.flush(); 516 517 immutable before = MonoTime.currTime(); 518 519 size_t lineCounter = 0; 520 foreach (const line; new DecompressByLine!R(path)) 521 lineCounter += 1; 522 523 immutable after = MonoTime.currTime(); 524 525 showStat(path, before, after, lineCounter); 526 } 527 } 528 529 /// Show statistics. 530 static private void showStat(T)(in const(char[]) tag, 531 in T before, 532 in T after, 533 in size_t lineCount) 534 { 535 import std.stdio : writefln; 536 writefln(`%s: %3.1f msecs (%3.1f usecs/line)`, 537 tag, 538 cast(double)(after - before).total!`msecs`, 539 cast(double)(after - before).total!`usecs` / lineCount); 540 } 541 542 version(unittest) 543 { 544 import std.range.primitives : isInputRange; 545 } 546 547 pragma(lib, "bz2"); // Ubuntu: sudo apt-get install libbz2-dev 548 549 extern(C) nothrow @nogc: 550 551 enum BZ_RUN = 0; 552 enum BZ_FLUSH = 1; 553 enum BZ_FINISH = 2; 554 555 enum BZ_OK = 0; 556 enum BZ_RUN_OK = 1; 557 enum BZ_FLUSH_OK = 2; 558 enum BZ_FINISH_OK = 3; 559 enum BZ_STREAM_END = 4; 560 enum BZ_SEQUENCE_ERROR = -1; 561 enum BZ_PARAM_ERROR = -2; 562 enum BZ_MEM_ERROR = -3; 563 enum BZ_DATA_ERROR = -4; 564 enum BZ_DATA_ERROR_MAGIC = -5; 565 enum BZ_IO_ERROR = -6; 566 enum BZ_UNEXPECTED_EOF = -7; 567 enum BZ_OUTBUFF_FULL = -8; 568 enum BZ_CONFIG_ERROR = -9; 569 570 struct bz_stream 571 { 572 ubyte* next_in; 573 uint avail_in; 574 uint total_in_lo32; 575 uint total_in_hi32; 576 577 ubyte* next_out; 578 uint avail_out; 579 uint total_out_lo32; 580 uint total_out_hi32; 581 582 void* state; 583 584 void* function(void*, int, int) nothrow bzalloc; 585 void function(void*, void*) nothrow bzfree; 586 void* opaque; 587 } 588 589 /*-- Core (low-level) library functions --*/ 590 591 int BZ2_bzCompressInit(bz_stream* strm, 592 int blockSize100k, 593 int verbosity, 594 int workFactor); 595 596 int BZ2_bzCompress(bz_stream* strm, 597 int action); 598 599 int BZ2_bzCompressEnd(bz_stream* strm); 600 601 int BZ2_bzDecompressInit(bz_stream* strm, 602 int verbosity, 603 int small); 604 605 int BZ2_bzDecompress(bz_stream* strm); 606 607 int BZ2_bzDecompressEnd(bz_stream *strm); 608 609 /*-- High(er) level library functions --*/ 610 611 version(BZ_NO_STDIO) {} 612 else 613 { 614 import core.stdc.stdio; 615 616 enum BZ_MAX_UNUSED = 5000; 617 618 struct BZFILE; 619 620 BZFILE* BZ2_bzReadOpen(int* bzerror, 621 FILE* f, 622 int verbosity, 623 int small, 624 void* unused, 625 int nUnused); 626 627 void BZ2_bzReadClose(int* bzerror, 628 BZFILE* b); 629 630 void BZ2_bzReadGetUnused(int* bzerror, 631 BZFILE* b, 632 void** unused, 633 int* nUnused); 634 635 int BZ2_bzRead(int* bzerror, 636 BZFILE* b, 637 void* buf, 638 int len); 639 640 BZFILE* BZ2_bzWriteOpen(int* bzerror, 641 FILE* f, 642 int blockSize100k, 643 int verbosity, 644 int workFactor 645 ); 646 647 void BZ2_bzWrite(int* bzerror, 648 BZFILE* b, 649 void* buf, 650 int len); 651 652 void BZ2_bzWriteClose(int* bzerror, 653 BZFILE* b, 654 int abandon, 655 uint* nbytes_in, 656 uint* nbytes_out); 657 658 void BZ2_bzWriteClose64(int* bzerror, 659 BZFILE* b, 660 int abandon, 661 uint* nbytes_in_lo32, 662 uint* nbytes_in_hi32, 663 uint* nbytes_out_lo32, 664 uint* nbytes_out_hi32); 665 } 666 667 /*-- Utility functions --*/ 668 669 int BZ2_bzBuffToBuffCompress(ubyte* dest, 670 uint* destLen, 671 ubyte* source, 672 uint sourceLen, 673 int blockSize100k, 674 int verbosity, 675 int workFactor); 676 677 int BZ2_bzBuffToBuffDecompress(ubyte* dest, 678 uint* destLen, 679 ubyte* source, 680 uint sourceLen, 681 int small, 682 int verbosity); 683 684 685 /*-- 686 Code contributed by Yoshioka Tsuneo (tsuneo@rr.iij4u.or.jp) 687 to support better zlib compatibility. 688 This code is not _officially_ part of libbzip2 (yet); 689 I haven't tested it, documented it, or considered the 690 threading-safeness of it. 691 If this code breaks, please contact both Yoshioka and me. 692 --*/ 693 694 const(char)* BZ2_bzlibVersion(); 695 696 BZFILE* BZ2_bzopen(const scope const(char)* path, 697 const scope const(char)* mode); 698 699 BZFILE * BZ2_bzdopen(int fd, 700 const scope const(char)* mode); 701 702 int BZ2_bzread(scope BZFILE* b, 703 scope void* buf, 704 int len); 705 706 int BZ2_bzwrite(scope BZFILE* b, 707 scope void* buf, 708 int len); 709 710 int BZ2_bzflush(scope BZFILE* b); 711 712 void BZ2_bzclose(scope BZFILE* b); 713 714 const(char)* BZ2_bzerror(scope BZFILE *b, 715 int *errnum);