1 /** File I/O of Compressed Files. 2 * 3 * See_Also: https://forum.dlang.org/post/jykarqycnrecajveqpos@forum.dlang.org 4 */ 5 module nxt.zio; 6 7 import nxt.path : Path, FilePath; 8 9 version = benchmark_zio; 10 11 @safe: 12 13 struct GzipFileInputRange 14 { 15 import std.stdio : File; 16 import std.traits : ReturnType; 17 18 static immutable chunkSize = 0x4000; /+ TODO: find optimal value via benchmark +/ 19 static immutable defaultExtension = `.gz`; 20 21 this(in FilePath path) @trusted 22 { 23 _f = File(path.str, `r`); 24 _chunkRange = _f.byChunk(chunkSize); 25 _uncompress = new UnCompress; 26 loadNextChunk(); 27 } 28 29 void loadNextChunk() @trusted 30 { 31 if (!_chunkRange.empty) 32 { 33 _uncompressedBuf = cast(ubyte[])_uncompress.uncompress(_chunkRange.front); 34 _chunkRange.popFront(); 35 } 36 else 37 { 38 if (!_exhausted) 39 { 40 _uncompressedBuf = cast(ubyte[])_uncompress.flush(); 41 _exhausted = true; 42 } 43 else 44 { 45 _uncompressedBuf.length = 0; 46 } 47 } 48 _bufIx = 0; 49 } 50 51 void popFront() 52 { 53 _bufIx += 1; 54 if (_bufIx >= _uncompressedBuf.length) 55 { 56 loadNextChunk(); 57 } 58 } 59 60 pragma(inline, true): 61 pure nothrow @safe @nogc: 62 63 @property ubyte front() const 64 { 65 return _uncompressedBuf[_bufIx]; 66 } 67 68 bool empty() const @property 69 { 70 return _uncompressedBuf.length == 0; 71 } 72 73 private: 74 import std.zlib : UnCompress; 75 UnCompress _uncompress; 76 File _f; 77 ReturnType!(_f.byChunk) _chunkRange; 78 bool _exhausted; ///< True if exhausted. 79 ubyte[] _uncompressedBuf; ///< Uncompressed buffer. 80 size_t _bufIx; ///< Current byte index into `_uncompressedBuf`. 81 } 82 83 /** Is `true` iff `R` is a block input range. 84 TODO: Move to std.range 85 */ 86 private template isBlockInputRange(R) 87 { 88 import std.range.primitives : isInputRange; 89 enum isBlockInputRange = (isInputRange!R && 90 __traits(hasMember, R, `bufferFrontChunk`) && /+ TODO: ask dlang for better naming +/ 91 __traits(hasMember, R, `loadNextChunk`)); /+ TODO: ask dlang for better naming +/ 92 } 93 94 /** Decompress `BlockInputRange` linewise. 95 */ 96 class DecompressByLine(BlockInputRange) 97 { 98 private alias E = char; 99 100 /** If `range` is of type `isBlockInputRange` decoding compressed files will 101 * be much faster. 102 */ 103 this(in FilePath path, 104 E separator = '\n', 105 in size_t initialCapacity = 80) 106 { 107 this._range = typeof(_range)(path); 108 this._separator = separator; 109 static if (__traits(hasMember, typeof(_lbuf), `withCapacity`)) 110 this._lbuf = typeof(_lbuf).withCapacity(initialCapacity); 111 popFront(); 112 } 113 114 void popFront() @trusted 115 { 116 _lbuf.shrinkTo(0); 117 118 static if (isBlockInputRange!(typeof(_range))) 119 { 120 /+ TODO: functionize +/ 121 while (!_range.empty) 122 { 123 ubyte[] currentFronts = _range.bufferFrontChunk; 124 // `_range` is mutable so sentinel-based search can kick 125 126 static immutable useCountUntil = false; 127 static if (useCountUntil) 128 { 129 import std.algorithm.searching : countUntil; 130 // TODO 131 } 132 else 133 { 134 import std.algorithm.searching : find; 135 const hit = currentFronts.find(_separator); // or use `indexOf` 136 } 137 138 if (hit.length) 139 { 140 const lineLength = hit.ptr - currentFronts.ptr; 141 _lbuf.put(currentFronts[0 .. lineLength]); // add everything up to separator 142 _range._bufIx += lineLength + _separator.sizeof; // advancement + separator 143 if (_range.empty) 144 _range.loadNextChunk(); 145 break; // done 146 } 147 else // no separator yet 148 { 149 _lbuf.put(currentFronts); // so just add everything 150 _range.loadNextChunk(); 151 } 152 } 153 } 154 else 155 { 156 /+ TODO: sentinel-based search for `_separator` in `_range` +/ 157 while (!_range.empty && 158 _range.front != _separator) 159 { 160 _lbuf.put(_range.front); 161 _range.popFront(); 162 } 163 164 if (!_range.empty && 165 _range.front == _separator) 166 { 167 _range.popFront(); // pop separator 168 } 169 } 170 } 171 172 pragma(inline): 173 pure nothrow @safe @nogc: 174 175 bool empty() const @property 176 { 177 return _lbuf.data.length == 0; 178 } 179 180 const(E)[] front() const return scope 181 { 182 return _lbuf.data; 183 } 184 185 private: 186 BlockInputRange _range; 187 188 import std.array : Appender; 189 Appender!(E[]) _lbuf; // line buffer 190 191 // NOTE this is slower for ldc: 192 // import nxt.container.dynamic_array : Array; 193 // Array!E _lbuf; 194 195 E _separator; 196 } 197 198 class GzipOut 199 { 200 import std.zlib: Compress, HeaderFormat; 201 import std.stdio: File; 202 203 this(File file) @trusted 204 { 205 _f = file; 206 _compress = new Compress(HeaderFormat.gzip); 207 } 208 209 void compress(const string s) @trusted 210 { 211 auto compressed = _compress.compress(s); 212 _f.rawWrite(compressed); 213 } 214 215 void finish() @trusted 216 { 217 auto compressed = _compress.flush; 218 _f.rawWrite(compressed); 219 _f.close; 220 } 221 222 private: 223 Compress _compress; 224 File _f; 225 } 226 227 struct ZlibFileInputRange 228 { 229 import std.file : FileException; 230 231 /* Zlib docs: 232 CHUNK is simply the buffer size for feeding data to and pulling data from 233 the zlib routines. Larger buffer sizes would be more efficient, 234 especially for inflate(). If the memory is available, buffers sizes on 235 the order of 128K or 256K bytes should be used. 236 */ 237 static immutable chunkSize = 128 * 1024; // 128K 238 239 static immutable defaultExtension = `.gz`; 240 241 @safe: 242 243 this(in FilePath path) @trusted 244 { 245 import std.string : toStringz; /+ TODO: avoid GC allocation by looking at how gmp-d z.d solves it +/ 246 _f = gzopen(path.str.toStringz, `rb`); 247 if (!_f) 248 throw new FileException(`Couldn't open file ` ~ path.str.idup); 249 _buf = new ubyte[chunkSize]; 250 loadNextChunk(); 251 } 252 253 ~this() nothrow @trusted @nogc 254 { 255 const int ret = gzclose(_f); 256 if (ret < 0) 257 assert(0, `Couldn't close file`); /+ TODO: replace with non-GC-allocated exception +/ 258 } 259 260 this(this) @disable; 261 262 void loadNextChunk() @trusted 263 { 264 int count = gzread(_f, _buf.ptr, chunkSize); 265 if (count == -1) 266 throw new Exception(`Error decoding file`); 267 _bufIx = 0; 268 _bufReadLength = count; 269 } 270 271 void popFront() in(!empty) 272 { 273 _bufIx += 1; 274 if (_bufIx >= _bufReadLength) 275 { 276 loadNextChunk(); 277 _bufIx = 0; // restart counter 278 } 279 } 280 281 pragma(inline, true): 282 pure nothrow @nogc: 283 284 @property ubyte front() const @trusted in(!empty) => _buf.ptr[_bufIx]; 285 bool empty() const @property => _bufIx == _bufReadLength; 286 287 /** Get current bufferFrontChunk. 288 TODO: need better name for this 289 */ 290 inout(ubyte)[] bufferFrontChunk() inout @trusted in(!empty) => _buf.ptr[_bufIx .. _bufReadLength]; 291 292 private: 293 import etc.c.zlib : gzFile, gzopen, gzclose, gzread; 294 295 gzFile _f; 296 297 ubyte[] _buf; // block read buffer 298 299 // number of bytes in `_buf` recently read by `gzread`, normally equal to `_buf.length` except after last read where is it's normally less than `_buf.length` 300 size_t _bufReadLength; 301 302 size_t _bufIx; // current stream read index in `_buf` 303 304 /+ TODO: make this work: +/ 305 // extern (C) nothrow @nogc: 306 // pragma(mangle, `gzopen`) gzFile gzopen(const(char)* path, const(char)* mode); 307 // pragma(mangle, `gzclose`) int gzclose(gzFile file); 308 // pragma(mangle, `gzread`) int gzread(gzFile file, void* buf, uint len); 309 } 310 311 struct Bz2libFileInputRange 312 { 313 import std.file : FileException; 314 315 static immutable chunkSize = 128 * 1024; // 128K. TODO: find optimal value via benchmark 316 static immutable defaultExtension = `.bz2`; 317 static immutable useGC = false; /+ TODO: generalize to allocator parameter +/ 318 319 @safe: 320 321 this(in FilePath path) @trusted 322 { 323 import std.string : toStringz; /+ TODO: avoid GC allocation by looking at how gmp-d z.d solves it +/ 324 _f = BZ2_bzopen(path.str.toStringz, `rb`); 325 if (!_f) 326 throw new FileException(`Couldn't open file ` ~ path.str.idup); 327 328 static if (useGC) 329 _buf = new ubyte[chunkSize]; 330 else 331 { 332 import core.memory : pureMalloc; 333 _buf = (cast(ubyte*)pureMalloc(chunkSize))[0 .. chunkSize]; 334 } 335 336 loadNextChunk(); 337 } 338 339 ~this() nothrow @trusted @nogc 340 { 341 BZ2_bzclose(_f); /+ TODO: error handling? +/ 342 343 static if (!useGC) 344 { 345 import core.memory : pureFree; 346 pureFree(_buf.ptr); 347 } 348 } 349 350 this(this) @disable; 351 352 void loadNextChunk() @trusted 353 { 354 int count = BZ2_bzread(_f, _buf.ptr, chunkSize); 355 if (count == -1) 356 throw new Exception(`Error decoding file`); 357 _bufIx = 0; 358 _bufReadLength = count; 359 } 360 361 void popFront() in(!empty) 362 { 363 _bufIx += 1; 364 if (_bufIx >= _bufReadLength) 365 { 366 loadNextChunk(); 367 _bufIx = 0; // restart counter 368 } 369 } 370 371 pragma(inline, true): 372 pure nothrow @nogc: 373 374 @property ubyte front() const @trusted in(!empty) 375 => _buf.ptr[_bufIx]; 376 bool empty() const @property 377 => _bufIx == _bufReadLength; 378 379 /** Get current bufferFrontChunk. 380 TODO: need better name for this 381 */ 382 inout(ubyte)[] bufferFrontChunk() inout @trusted in(!empty) 383 => _buf.ptr[_bufIx .. _bufReadLength]; 384 385 private: 386 /* import bzlib : BZFILE, BZ2_bzopen, BZ2_bzread, BZ2_bzwrite, BZ2_bzclose; */ 387 pragma(lib, `bz2`); // Ubuntu: sudo apt-get install libbz2-dev 388 389 BZFILE* _f; 390 391 ubyte[] _buf; // block read buffer 392 393 // number of bytes in `_buf` recently read by `gzread`, normally equal to `_buf.length` except after last read where is it's normally less than `_buf.length` 394 size_t _bufReadLength; 395 396 size_t _bufIx; // current stream read index in `_buf` 397 } 398 399 private void testInputRange(FileInputRange)() @safe 400 if (isInputRange!FileInputRange) 401 { 402 import std.stdio : File; 403 404 const path = FilePath(`test` ~ FileInputRange.defaultExtension); 405 406 const data = "abc\ndef\nghi"; // contents of source 407 408 foreach (const n; data.length .. data.length) /+ TODO: from 0 +/ 409 { 410 const source = data[0 .. n]; // slice from the beginning 411 412 scope of = new GzipOut(File(path.str, `w`)); 413 of.compress(source); 414 of.finish(); 415 416 size_t ix = 0; 417 foreach (e; FileInputRange(path)) 418 { 419 assert(cast(char)e == source[ix]); 420 ++ix; 421 } 422 423 import std.algorithm.searching : count; 424 import std.algorithm.iteration : splitter; 425 alias R = DecompressByLine!ZlibFileInputRange; 426 427 assert(new R(path).count == source.splitter('\n').count); 428 } 429 } 430 431 /// 432 @safe unittest { 433 testInputRange!(GzipFileInputRange); 434 testInputRange!(ZlibFileInputRange); 435 testInputRange!(Bz2libFileInputRange); 436 } 437 438 /** Read Age of Aqcuisitions. 439 */ 440 static private void testReadAgeofAqcuisitions(in Path rootDirPath = Path(`~/Work/knet/knowledge/en/age-of-aqcuisition`)) @safe 441 { 442 import std.path: expandTilde; 443 import nxt.zio : DecompressByLine, GzipFileInputRange; 444 import std.path : buildNormalizedPath; 445 446 { 447 const path = FilePath(buildNormalizedPath(rootDirPath.str.expandTilde, `AoA_51715_words.csv.gz`)); 448 size_t count = 0; 449 foreach (line; new DecompressByLine!GzipFileInputRange(path)) 450 count += 1; 451 assert(count == 51716); 452 } 453 454 { 455 const path = FilePath(buildNormalizedPath(rootDirPath.str.expandTilde, `AoA_51715_words.csv.gz`)); 456 size_t count = 0; 457 foreach (line; new DecompressByLine!ZlibFileInputRange(path)) 458 count += 1; 459 assert(count == 51716); 460 } 461 462 { 463 const path = FilePath(buildNormalizedPath(rootDirPath.str.expandTilde, `AoA_51715_words_copy.csv.bz2`)); 464 size_t count = 0; 465 foreach (line; new DecompressByLine!Bz2libFileInputRange(path)) 466 count += 1; 467 assert(count == 51716); 468 } 469 } 470 471 /** Read Concept 5 assertions. 472 */ 473 static private void testReadConcept5Assertions(in FilePath path = FilePath(`/home/per/Knowledge/ConceptNet5/latest/conceptnet-assertions-5.6.0.csv.gz`)) @safe 474 { 475 alias R = ZlibFileInputRange; 476 477 import std.stdio: writeln; 478 import std.range: take; 479 import std.algorithm.searching: count; 480 481 const lineBlockCount = 100_000; 482 size_t lineNr = 0; 483 foreach (const line; new DecompressByLine!R(path)) 484 { 485 if (lineNr % lineBlockCount == 0) 486 writeln(`Line `, lineNr, ` read containing:`, line); 487 lineNr += 1; 488 } 489 490 const lineCount = 5; 491 foreach (const line; new DecompressByLine!R(path).take(lineCount)) 492 writeln(line); 493 } 494 495 /// benchmark DBpedia parsing 496 version (benchmark_zio) 497 static private void benchmarkDbpediaParsing(in Path rootPath = Path(`/home/per/Knowledge/DBpedia/latest`)) @system 498 { 499 alias R = Bz2libFileInputRange; 500 501 import nxt.algorithm.searching : startsWith, endsWith; 502 import std.algorithm : filter; 503 import std.file : dirEntries, SpanMode; 504 import std.path : baseName; 505 import std.stdio : write, writeln, stdout; 506 import std.datetime : MonoTime; 507 508 foreach (const pathStr; dirEntries(rootPath.str, SpanMode.depth).filter!(file => (file.name.baseName.startsWith(`instance_types`) && 509 file.name.endsWith(`.ttl.bz2`)))) 510 { 511 write(`Checking `, pathStr, ` ... `); stdout.flush(); 512 513 immutable before = MonoTime.currTime(); 514 515 size_t lineCounter = 0; 516 foreach (const line; new DecompressByLine!R(FilePath(pathStr))) 517 lineCounter += 1; 518 519 immutable after = MonoTime.currTime(); 520 521 showStat(pathStr, before, after, lineCounter); 522 } 523 } 524 525 /// Show statistics. 526 static private void showStat(T)(in const(char[]) tag, 527 in T before, 528 in T after, 529 in size_t lineCount) 530 { 531 import std.stdio : writefln; 532 writefln(`%s: %3.1f msecs (%3.1f usecs/line)`, 533 tag, 534 cast(double)(after - before).total!`msecs`, 535 cast(double)(after - before).total!`usecs` / lineCount); 536 } 537 538 version (unittest) 539 { 540 import std.range.primitives : isInputRange; 541 } 542 543 pragma(lib, "bz2"); // Ubuntu: sudo apt-get install libbz2-dev 544 545 extern(C) nothrow @nogc: 546 547 enum BZ_RUN = 0; 548 enum BZ_FLUSH = 1; 549 enum BZ_FINISH = 2; 550 551 enum BZ_OK = 0; 552 enum BZ_RUN_OK = 1; 553 enum BZ_FLUSH_OK = 2; 554 enum BZ_FINISH_OK = 3; 555 enum BZ_STREAM_END = 4; 556 enum BZ_SEQUENCE_ERROR = -1; 557 enum BZ_PARAM_ERROR = -2; 558 enum BZ_MEM_ERROR = -3; 559 enum BZ_DATA_ERROR = -4; 560 enum BZ_DATA_ERROR_MAGIC = -5; 561 enum BZ_IO_ERROR = -6; 562 enum BZ_UNEXPECTED_EOF = -7; 563 enum BZ_OUTBUFF_FULL = -8; 564 enum BZ_CONFIG_ERROR = -9; 565 566 struct bz_stream 567 { 568 ubyte* next_in; 569 uint avail_in; 570 uint total_in_lo32; 571 uint total_in_hi32; 572 573 ubyte* next_out; 574 uint avail_out; 575 uint total_out_lo32; 576 uint total_out_hi32; 577 578 void* state; 579 580 void* function(void*, int, int) nothrow bzalloc; 581 void function(void*, void*) nothrow bzfree; 582 void* opaque; 583 } 584 585 /*-- Core (low-level) library functions --*/ 586 587 int BZ2_bzCompressInit(bz_stream* strm, 588 int blockSize100k, 589 int verbosity, 590 int workFactor); 591 592 int BZ2_bzCompress(bz_stream* strm, 593 int action); 594 595 int BZ2_bzCompressEnd(bz_stream* strm); 596 597 int BZ2_bzDecompressInit(bz_stream* strm, 598 int verbosity, 599 int small); 600 601 int BZ2_bzDecompress(bz_stream* strm); 602 603 int BZ2_bzDecompressEnd(bz_stream *strm); 604 605 /*-- High(er) level library functions --*/ 606 607 version (BZ_NO_STDIO) {} 608 else 609 { 610 import core.stdc.stdio; 611 612 enum BZ_MAX_UNUSED = 5000; 613 614 struct BZFILE; 615 616 BZFILE* BZ2_bzReadOpen(int* bzerror, 617 FILE* f, 618 int verbosity, 619 int small, 620 void* unused, 621 int nUnused); 622 623 void BZ2_bzReadClose(int* bzerror, 624 BZFILE* b); 625 626 void BZ2_bzReadGetUnused(int* bzerror, 627 BZFILE* b, 628 void** unused, 629 int* nUnused); 630 631 int BZ2_bzRead(int* bzerror, 632 BZFILE* b, 633 void* buf, 634 int len); 635 636 BZFILE* BZ2_bzWriteOpen(int* bzerror, 637 FILE* f, 638 int blockSize100k, 639 int verbosity, 640 int workFactor 641 ); 642 643 void BZ2_bzWrite(int* bzerror, 644 BZFILE* b, 645 void* buf, 646 int len); 647 648 void BZ2_bzWriteClose(int* bzerror, 649 BZFILE* b, 650 int abandon, 651 uint* nbytes_in, 652 uint* nbytes_out); 653 654 void BZ2_bzWriteClose64(int* bzerror, 655 BZFILE* b, 656 int abandon, 657 uint* nbytes_in_lo32, 658 uint* nbytes_in_hi32, 659 uint* nbytes_out_lo32, 660 uint* nbytes_out_hi32); 661 } 662 663 /*-- Utility functions --*/ 664 665 int BZ2_bzBuffToBuffCompress(ubyte* dest, 666 uint* destLen, 667 ubyte* source, 668 uint sourceLen, 669 int blockSize100k, 670 int verbosity, 671 int workFactor); 672 673 int BZ2_bzBuffToBuffDecompress(ubyte* dest, 674 uint* destLen, 675 ubyte* source, 676 uint sourceLen, 677 int small, 678 int verbosity); 679 680 681 /*-- 682 Code contributed by Yoshioka Tsuneo (tsuneo@rr.iij4u.or.jp) 683 to support better zlib compatibility. 684 This code is not _officially_ part of libbzip2 (yet); 685 I haven't tested it, documented it, or considered the 686 threading-safeness of it. 687 If this code breaks, please contact both Yoshioka and me. 688 --*/ 689 690 const(char)* BZ2_bzlibVersion(); 691 692 BZFILE* BZ2_bzopen(const scope const(char)* path, 693 const scope const(char)* mode); 694 695 BZFILE * BZ2_bzdopen(int fd, 696 const scope const(char)* mode); 697 698 int BZ2_bzread(scope BZFILE* b, 699 scope void* buf, 700 int len); 701 702 int BZ2_bzwrite(scope BZFILE* b, 703 scope void* buf, 704 int len); 705 706 int BZ2_bzflush(scope BZFILE* b); 707 708 void BZ2_bzclose(scope BZFILE* b); 709 710 const(char)* BZ2_bzerror(scope BZFILE *b, 711 int *errnum);