1 /** File I/O of Compressed Files. 2 * 3 * See_Also: https://forum.dlang.org/post/jykarqycnrecajveqpos@forum.dlang.org 4 */ 5 module nxt.zio; 6 7 import nxt.path : Path, FilePath; 8 9 version = benchmark_zio; 10 11 @safe: 12 13 struct GzipFileInputRange 14 { 15 import std.stdio : File; 16 import std.traits : ReturnType; 17 18 enum chunkSize = 0x4000; /+ TODO: find optimal value via benchmark +/ 19 20 enum defaultExtension = `.gz`; 21 22 this(in FilePath path) @trusted 23 { 24 _f = File(path.str, `r`); 25 _chunkRange = _f.byChunk(chunkSize); 26 _uncompress = new UnCompress; 27 loadNextChunk(); 28 } 29 30 void loadNextChunk() @trusted 31 { 32 if (!_chunkRange.empty) 33 { 34 _uncompressedBuf = cast(ubyte[])_uncompress.uncompress(_chunkRange.front); 35 _chunkRange.popFront(); 36 } 37 else 38 { 39 if (!_exhausted) 40 { 41 _uncompressedBuf = cast(ubyte[])_uncompress.flush(); 42 _exhausted = true; 43 } 44 else 45 { 46 _uncompressedBuf.length = 0; 47 } 48 } 49 _bufIx = 0; 50 } 51 52 void popFront() 53 { 54 _bufIx += 1; 55 if (_bufIx >= _uncompressedBuf.length) 56 { 57 loadNextChunk(); 58 } 59 } 60 61 pragma(inline, true): 62 pure nothrow @safe @nogc: 63 64 @property ubyte front() const 65 { 66 return _uncompressedBuf[_bufIx]; 67 } 68 69 bool empty() const @property 70 { 71 return _uncompressedBuf.length == 0; 72 } 73 74 private: 75 import std.zlib : UnCompress; 76 UnCompress _uncompress; 77 File _f; 78 ReturnType!(_f.byChunk) _chunkRange; 79 bool _exhausted; ///< True if exhausted. 80 ubyte[] _uncompressedBuf; ///< Uncompressed buffer. 81 size_t _bufIx; ///< Current byte index into `_uncompressedBuf`. 82 } 83 84 /** Is `true` iff `R` is a block input range. 85 TODO: Move to std.range 86 */ 87 private template isBlockInputRange(R) 88 { 89 import std.range.primitives : isInputRange; 90 enum isBlockInputRange = (isInputRange!R && 91 __traits(hasMember, R, `bufferFrontChunk`) && /+ TODO: ask dlang for better naming +/ 92 __traits(hasMember, R, `loadNextChunk`)); /+ TODO: ask dlang for better naming +/ 93 } 94 95 /** Decompress `BlockInputRange` linewise. 96 */ 97 class DecompressByLine(BlockInputRange) 98 { 99 private alias E = char; 100 101 /** If `range` is of type `isBlockInputRange` decoding compressed files will 102 * be much faster. 103 */ 104 this(in FilePath path, 105 E separator = '\n', 106 in size_t initialCapacity = 80) 107 { 108 this._range = typeof(_range)(path); 109 this._separator = separator; 110 static if (__traits(hasMember, typeof(_lbuf), `withCapacity`)) 111 this._lbuf = typeof(_lbuf).withCapacity(initialCapacity); 112 popFront(); 113 } 114 115 void popFront() @trusted 116 { 117 _lbuf.shrinkTo(0); 118 119 static if (isBlockInputRange!(typeof(_range))) 120 { 121 /+ TODO: functionize +/ 122 while (!_range.empty) 123 { 124 ubyte[] currentFronts = _range.bufferFrontChunk; 125 // `_range` is mutable so sentinel-based search can kick 126 127 enum useCountUntil = false; 128 static if (useCountUntil) 129 { 130 import std.algorithm.searching : countUntil; 131 // TODO 132 } 133 else 134 { 135 import std.algorithm.searching : find; 136 const hit = currentFronts.find(_separator); // or use `indexOf` 137 } 138 139 if (hit.length) 140 { 141 const lineLength = hit.ptr - currentFronts.ptr; 142 _lbuf.put(currentFronts[0 .. lineLength]); // add everything up to separator 143 _range._bufIx += lineLength + _separator.sizeof; // advancement + separator 144 if (_range.empty) 145 _range.loadNextChunk(); 146 break; // done 147 } 148 else // no separator yet 149 { 150 _lbuf.put(currentFronts); // so just add everything 151 _range.loadNextChunk(); 152 } 153 } 154 } 155 else 156 { 157 /+ TODO: sentinel-based search for `_separator` in `_range` +/ 158 while (!_range.empty && 159 _range.front != _separator) 160 { 161 _lbuf.put(_range.front); 162 _range.popFront(); 163 } 164 165 if (!_range.empty && 166 _range.front == _separator) 167 { 168 _range.popFront(); // pop separator 169 } 170 } 171 } 172 173 pragma(inline): 174 pure nothrow @safe @nogc: 175 176 bool empty() const @property 177 { 178 return _lbuf.data.length == 0; 179 } 180 181 const(E)[] front() const return scope 182 { 183 return _lbuf.data; 184 } 185 186 private: 187 BlockInputRange _range; 188 189 import std.array : Appender; 190 Appender!(E[]) _lbuf; // line buffer 191 192 // NOTE this is slower for ldc: 193 // import nxt.container.dynamic_array : Array; 194 // Array!E _lbuf; 195 196 E _separator; 197 } 198 199 class GzipOut 200 { 201 import std.zlib: Compress, HeaderFormat; 202 import std.stdio: File; 203 204 this(File file) @trusted 205 { 206 _f = file; 207 _compress = new Compress(HeaderFormat.gzip); 208 } 209 210 void compress(const string s) @trusted 211 { 212 auto compressed = _compress.compress(s); 213 _f.rawWrite(compressed); 214 } 215 216 void finish() @trusted 217 { 218 auto compressed = _compress.flush; 219 _f.rawWrite(compressed); 220 _f.close; 221 } 222 223 private: 224 Compress _compress; 225 File _f; 226 } 227 228 struct ZlibFileInputRange 229 { 230 import std.file : FileException; 231 232 /* Zlib docs: 233 CHUNK is simply the buffer size for feeding data to and pulling data from 234 the zlib routines. Larger buffer sizes would be more efficient, 235 especially for inflate(). If the memory is available, buffers sizes on 236 the order of 128K or 256K bytes should be used. 237 */ 238 enum chunkSize = 128 * 1024; // 128K 239 240 enum defaultExtension = `.gz`; 241 242 @safe: 243 244 this(in FilePath path) @trusted 245 { 246 import std.string : toStringz; /+ TODO: avoid GC allocation by looking at how gmp-d z.d solves it +/ 247 _f = gzopen(path.str.toStringz, `rb`); 248 if (!_f) 249 throw new FileException(`Couldn't open file ` ~ path.str.idup); 250 _buf = new ubyte[chunkSize]; 251 loadNextChunk(); 252 } 253 254 ~this() nothrow @trusted @nogc 255 { 256 const int ret = gzclose(_f); 257 if (ret < 0) 258 assert(0, `Couldn't close file`); /+ TODO: replace with non-GC-allocated exception +/ 259 } 260 261 this(this) @disable; 262 263 void loadNextChunk() @trusted 264 { 265 int count = gzread(_f, _buf.ptr, chunkSize); 266 if (count == -1) 267 throw new Exception(`Error decoding file`); 268 _bufIx = 0; 269 _bufReadLength = count; 270 } 271 272 void popFront() in(!empty) 273 { 274 _bufIx += 1; 275 if (_bufIx >= _bufReadLength) 276 { 277 loadNextChunk(); 278 _bufIx = 0; // restart counter 279 } 280 } 281 282 pragma(inline, true): 283 pure nothrow @nogc: 284 285 @property ubyte front() const @trusted in(!empty) => _buf.ptr[_bufIx]; 286 bool empty() const @property => _bufIx == _bufReadLength; 287 288 /** Get current bufferFrontChunk. 289 TODO: need better name for this 290 */ 291 inout(ubyte)[] bufferFrontChunk() inout @trusted in(!empty) => _buf.ptr[_bufIx .. _bufReadLength]; 292 293 private: 294 import etc.c.zlib : gzFile, gzopen, gzclose, gzread; 295 296 gzFile _f; 297 298 ubyte[] _buf; // block read buffer 299 300 // number of bytes in `_buf` recently read by `gzread`, normally equal to `_buf.length` except after last read where is it's normally less than `_buf.length` 301 size_t _bufReadLength; 302 303 size_t _bufIx; // current stream read index in `_buf` 304 305 /+ TODO: make this work: +/ 306 // extern (C) nothrow @nogc: 307 // pragma(mangle, `gzopen`) gzFile gzopen(const(char)* path, const(char)* mode); 308 // pragma(mangle, `gzclose`) int gzclose(gzFile file); 309 // pragma(mangle, `gzread`) int gzread(gzFile file, void* buf, uint len); 310 } 311 312 struct Bz2libFileInputRange 313 { 314 import std.file : FileException; 315 316 enum chunkSize = 128 * 1024; // 128K. TODO: find optimal value via benchmark 317 enum defaultExtension = `.bz2`; 318 enum useGC = false; /+ TODO: generalize to allocator parameter +/ 319 320 @safe: 321 322 this(in FilePath path) @trusted 323 { 324 import std.string : toStringz; /+ TODO: avoid GC allocation by looking at how gmp-d z.d solves it +/ 325 _f = BZ2_bzopen(path.str.toStringz, `rb`); 326 if (!_f) 327 throw new FileException(`Couldn't open file ` ~ path.str.idup); 328 329 static if (useGC) 330 _buf = new ubyte[chunkSize]; 331 else 332 { 333 import core.memory : pureMalloc; 334 _buf = (cast(ubyte*)pureMalloc(chunkSize))[0 .. chunkSize]; 335 } 336 337 loadNextChunk(); 338 } 339 340 ~this() nothrow @trusted @nogc 341 { 342 BZ2_bzclose(_f); /+ TODO: error handling? +/ 343 344 static if (!useGC) 345 { 346 import core.memory : pureFree; 347 pureFree(_buf.ptr); 348 } 349 } 350 351 this(this) @disable; 352 353 void loadNextChunk() @trusted 354 { 355 int count = BZ2_bzread(_f, _buf.ptr, chunkSize); 356 if (count == -1) 357 throw new Exception(`Error decoding file`); 358 _bufIx = 0; 359 _bufReadLength = count; 360 } 361 362 void popFront() in(!empty) 363 { 364 _bufIx += 1; 365 if (_bufIx >= _bufReadLength) 366 { 367 loadNextChunk(); 368 _bufIx = 0; // restart counter 369 } 370 } 371 372 pragma(inline, true): 373 pure nothrow @nogc: 374 375 @property ubyte front() const @trusted in(!empty) 376 => _buf.ptr[_bufIx]; 377 bool empty() const @property 378 => _bufIx == _bufReadLength; 379 380 /** Get current bufferFrontChunk. 381 TODO: need better name for this 382 */ 383 inout(ubyte)[] bufferFrontChunk() inout @trusted in(!empty) 384 => _buf.ptr[_bufIx .. _bufReadLength]; 385 386 private: 387 /* import bzlib : BZFILE, BZ2_bzopen, BZ2_bzread, BZ2_bzwrite, BZ2_bzclose; */ 388 pragma(lib, `bz2`); // Ubuntu: sudo apt-get install libbz2-dev 389 390 BZFILE* _f; 391 392 ubyte[] _buf; // block read buffer 393 394 // number of bytes in `_buf` recently read by `gzread`, normally equal to `_buf.length` except after last read where is it's normally less than `_buf.length` 395 size_t _bufReadLength; 396 397 size_t _bufIx; // current stream read index in `_buf` 398 } 399 400 private void testInputRange(FileInputRange)() @safe 401 if (isInputRange!FileInputRange) 402 { 403 import std.stdio : File; 404 405 const path = FilePath(`test` ~ FileInputRange.defaultExtension); 406 407 const data = "abc\ndef\nghi"; // contents of source 408 409 foreach (const n; data.length .. data.length) /+ TODO: from 0 +/ 410 { 411 const source = data[0 .. n]; // slice from the beginning 412 413 scope of = new GzipOut(File(path.str, `w`)); 414 of.compress(source); 415 of.finish(); 416 417 size_t ix = 0; 418 foreach (e; FileInputRange(path)) 419 { 420 assert(cast(char)e == source[ix]); 421 ++ix; 422 } 423 424 import std.algorithm.searching : count; 425 import std.algorithm.iteration : splitter; 426 alias R = DecompressByLine!ZlibFileInputRange; 427 428 assert(new R(path).count == source.splitter('\n').count); 429 } 430 } 431 432 /// 433 @safe unittest { 434 testInputRange!(GzipFileInputRange); 435 testInputRange!(ZlibFileInputRange); 436 testInputRange!(Bz2libFileInputRange); 437 } 438 439 /** Read Age of Aqcuisitions. 440 */ 441 static private void testReadAgeofAqcuisitions(in Path rootDirPath = Path(`~/Work/knet/knowledge/en/age-of-aqcuisition`)) @safe 442 { 443 import std.path: expandTilde; 444 import nxt.zio : DecompressByLine, GzipFileInputRange; 445 import std.path : buildNormalizedPath; 446 447 { 448 const path = FilePath(buildNormalizedPath(rootDirPath.str.expandTilde, `AoA_51715_words.csv.gz`)); 449 size_t count = 0; 450 foreach (line; new DecompressByLine!GzipFileInputRange(path)) 451 count += 1; 452 assert(count == 51716); 453 } 454 455 { 456 const path = FilePath(buildNormalizedPath(rootDirPath.str.expandTilde, `AoA_51715_words.csv.gz`)); 457 size_t count = 0; 458 foreach (line; new DecompressByLine!ZlibFileInputRange(path)) 459 count += 1; 460 assert(count == 51716); 461 } 462 463 { 464 const path = FilePath(buildNormalizedPath(rootDirPath.str.expandTilde, `AoA_51715_words_copy.csv.bz2`)); 465 size_t count = 0; 466 foreach (line; new DecompressByLine!Bz2libFileInputRange(path)) 467 count += 1; 468 assert(count == 51716); 469 } 470 } 471 472 /** Read Concept 5 assertions. 473 */ 474 static private void testReadConcept5Assertions(in FilePath path = FilePath(`/home/per/Knowledge/ConceptNet5/latest/conceptnet-assertions-5.6.0.csv.gz`)) @safe 475 { 476 alias R = ZlibFileInputRange; 477 478 import std.stdio: writeln; 479 import std.range: take; 480 import std.algorithm.searching: count; 481 482 const lineBlockCount = 100_000; 483 size_t lineNr = 0; 484 foreach (const line; new DecompressByLine!R(path)) 485 { 486 if (lineNr % lineBlockCount == 0) 487 writeln(`Line `, lineNr, ` read containing:`, line); 488 lineNr += 1; 489 } 490 491 const lineCount = 5; 492 foreach (const line; new DecompressByLine!R(path).take(lineCount)) 493 writeln(line); 494 } 495 496 /// benchmark DBpedia parsing 497 version (benchmark_zio) 498 static private void benchmarkDbpediaParsing(in Path rootPath = Path(`/home/per/Knowledge/DBpedia/latest`)) @system 499 { 500 alias R = Bz2libFileInputRange; 501 502 import nxt.algorithm.searching : startsWith, endsWith; 503 import std.algorithm : filter; 504 import std.file : dirEntries, SpanMode; 505 import std.path : baseName; 506 import std.stdio : write, writeln, stdout; 507 import std.datetime : MonoTime; 508 509 foreach (const pathStr; dirEntries(rootPath.str, SpanMode.depth).filter!(file => (file.name.baseName.startsWith(`instance_types`) && 510 file.name.endsWith(`.ttl.bz2`)))) 511 { 512 write(`Checking `, pathStr, ` ... `); stdout.flush(); 513 514 immutable before = MonoTime.currTime(); 515 516 size_t lineCounter = 0; 517 foreach (const line; new DecompressByLine!R(FilePath(pathStr))) 518 lineCounter += 1; 519 520 immutable after = MonoTime.currTime(); 521 522 showStat(pathStr, before, after, lineCounter); 523 } 524 } 525 526 /// Show statistics. 527 static private void showStat(T)(in const(char[]) tag, 528 in T before, 529 in T after, 530 in size_t lineCount) 531 { 532 import std.stdio : writefln; 533 writefln(`%s: %3.1f msecs (%3.1f usecs/line)`, 534 tag, 535 cast(double)(after - before).total!`msecs`, 536 cast(double)(after - before).total!`usecs` / lineCount); 537 } 538 539 version (unittest) 540 { 541 import std.range.primitives : isInputRange; 542 } 543 544 pragma(lib, "bz2"); // Ubuntu: sudo apt-get install libbz2-dev 545 546 extern(C) nothrow @nogc: 547 548 enum BZ_RUN = 0; 549 enum BZ_FLUSH = 1; 550 enum BZ_FINISH = 2; 551 552 enum BZ_OK = 0; 553 enum BZ_RUN_OK = 1; 554 enum BZ_FLUSH_OK = 2; 555 enum BZ_FINISH_OK = 3; 556 enum BZ_STREAM_END = 4; 557 enum BZ_SEQUENCE_ERROR = -1; 558 enum BZ_PARAM_ERROR = -2; 559 enum BZ_MEM_ERROR = -3; 560 enum BZ_DATA_ERROR = -4; 561 enum BZ_DATA_ERROR_MAGIC = -5; 562 enum BZ_IO_ERROR = -6; 563 enum BZ_UNEXPECTED_EOF = -7; 564 enum BZ_OUTBUFF_FULL = -8; 565 enum BZ_CONFIG_ERROR = -9; 566 567 struct bz_stream 568 { 569 ubyte* next_in; 570 uint avail_in; 571 uint total_in_lo32; 572 uint total_in_hi32; 573 574 ubyte* next_out; 575 uint avail_out; 576 uint total_out_lo32; 577 uint total_out_hi32; 578 579 void* state; 580 581 void* function(void*, int, int) nothrow bzalloc; 582 void function(void*, void*) nothrow bzfree; 583 void* opaque; 584 } 585 586 /*-- Core (low-level) library functions --*/ 587 588 int BZ2_bzCompressInit(bz_stream* strm, 589 int blockSize100k, 590 int verbosity, 591 int workFactor); 592 593 int BZ2_bzCompress(bz_stream* strm, 594 int action); 595 596 int BZ2_bzCompressEnd(bz_stream* strm); 597 598 int BZ2_bzDecompressInit(bz_stream* strm, 599 int verbosity, 600 int small); 601 602 int BZ2_bzDecompress(bz_stream* strm); 603 604 int BZ2_bzDecompressEnd(bz_stream *strm); 605 606 /*-- High(er) level library functions --*/ 607 608 version (BZ_NO_STDIO) {} 609 else 610 { 611 import core.stdc.stdio; 612 613 enum BZ_MAX_UNUSED = 5000; 614 615 struct BZFILE; 616 617 BZFILE* BZ2_bzReadOpen(int* bzerror, 618 FILE* f, 619 int verbosity, 620 int small, 621 void* unused, 622 int nUnused); 623 624 void BZ2_bzReadClose(int* bzerror, 625 BZFILE* b); 626 627 void BZ2_bzReadGetUnused(int* bzerror, 628 BZFILE* b, 629 void** unused, 630 int* nUnused); 631 632 int BZ2_bzRead(int* bzerror, 633 BZFILE* b, 634 void* buf, 635 int len); 636 637 BZFILE* BZ2_bzWriteOpen(int* bzerror, 638 FILE* f, 639 int blockSize100k, 640 int verbosity, 641 int workFactor 642 ); 643 644 void BZ2_bzWrite(int* bzerror, 645 BZFILE* b, 646 void* buf, 647 int len); 648 649 void BZ2_bzWriteClose(int* bzerror, 650 BZFILE* b, 651 int abandon, 652 uint* nbytes_in, 653 uint* nbytes_out); 654 655 void BZ2_bzWriteClose64(int* bzerror, 656 BZFILE* b, 657 int abandon, 658 uint* nbytes_in_lo32, 659 uint* nbytes_in_hi32, 660 uint* nbytes_out_lo32, 661 uint* nbytes_out_hi32); 662 } 663 664 /*-- Utility functions --*/ 665 666 int BZ2_bzBuffToBuffCompress(ubyte* dest, 667 uint* destLen, 668 ubyte* source, 669 uint sourceLen, 670 int blockSize100k, 671 int verbosity, 672 int workFactor); 673 674 int BZ2_bzBuffToBuffDecompress(ubyte* dest, 675 uint* destLen, 676 ubyte* source, 677 uint sourceLen, 678 int small, 679 int verbosity); 680 681 682 /*-- 683 Code contributed by Yoshioka Tsuneo (tsuneo@rr.iij4u.or.jp) 684 to support better zlib compatibility. 685 This code is not _officially_ part of libbzip2 (yet); 686 I haven't tested it, documented it, or considered the 687 threading-safeness of it. 688 If this code breaks, please contact both Yoshioka and me. 689 --*/ 690 691 const(char)* BZ2_bzlibVersion(); 692 693 BZFILE* BZ2_bzopen(const scope const(char)* path, 694 const scope const(char)* mode); 695 696 BZFILE * BZ2_bzdopen(int fd, 697 const scope const(char)* mode); 698 699 int BZ2_bzread(scope BZFILE* b, 700 scope void* buf, 701 int len); 702 703 int BZ2_bzwrite(scope BZFILE* b, 704 scope void* buf, 705 int len); 706 707 int BZ2_bzflush(scope BZFILE* b); 708 709 void BZ2_bzclose(scope BZFILE* b); 710 711 const(char)* BZ2_bzerror(scope BZFILE *b, 712 int *errnum);