1 /** File I/O of Compressed Files. 2 * 3 * See_Also: https://forum.dlang.org/post/jykarqycnrecajveqpos@forum.dlang.org 4 */ 5 module nxt.zio; 6 7 import std.range.primitives : isInputRange; 8 9 @safe: 10 11 struct GzipFileInputRange 12 { 13 import std.stdio : File; 14 import std.traits : ReturnType; 15 16 enum chunkSize = 0x4000; // TODO: find optimal value via benchmark 17 18 enum defaultExtension = `.gz`; 19 20 this(in char[] path) @trusted 21 { 22 _f = File(path, `r`); 23 _chunkRange = _f.byChunk(chunkSize); 24 _uncompress = new UnCompress; 25 loadNextChunk(); 26 } 27 28 void loadNextChunk() @trusted 29 { 30 if (!_chunkRange.empty) 31 { 32 _uncompressedBuf = cast(ubyte[])_uncompress.uncompress(_chunkRange.front); 33 _chunkRange.popFront(); 34 } 35 else 36 { 37 if (!_exhausted) 38 { 39 _uncompressedBuf = cast(ubyte[])_uncompress.flush(); 40 _exhausted = true; 41 } 42 else 43 { 44 _uncompressedBuf.length = 0; 45 } 46 } 47 _bufIx = 0; 48 } 49 50 void popFront() 51 { 52 _bufIx += 1; 53 if (_bufIx >= _uncompressedBuf.length) 54 { 55 loadNextChunk(); 56 } 57 } 58 59 pragma(inline, true): 60 @safe pure nothrow @nogc: 61 62 @property ubyte front() const 63 { 64 return _uncompressedBuf[_bufIx]; 65 } 66 67 @property bool empty() const 68 { 69 return _uncompressedBuf.length == 0; 70 } 71 72 private: 73 import std.zlib : UnCompress; 74 UnCompress _uncompress; 75 File _f; 76 ReturnType!(_f.byChunk) _chunkRange; 77 bool _exhausted; ///< True if exhausted. 78 ubyte[] _uncompressedBuf; ///< Uncompressed buffer. 79 size_t _bufIx; ///< Current byte index into `_uncompressedBuf`. 80 } 81 82 /** Is `true` iff `R` is a block input range. 83 TODO: Move to std.range 84 */ 85 private template isBlockInputRange(R) 86 { 87 import std.range.primitives : isInputRange; 88 enum isBlockInputRange = (isInputRange!R && 89 __traits(hasMember, R, `bufferFrontChunk`) && // TODO: ask dlang for better naming 90 __traits(hasMember, R, `loadNextChunk`)); // TODO: ask dlang for better naming 91 } 92 93 /** Decompress `BlockInputRange` linewise. 94 */ 95 class DecompressByLine(BlockInputRange) 96 { 97 private alias E = char; 98 99 /** If `range` is of type `isBlockInputRange` decoding compressed files will 100 * be much faster. 101 */ 102 this(in const(char)[] range, 103 E separator = '\n', 104 in size_t initialCapacity = 80) 105 { 106 this._range = typeof(_range)(range); 107 this._separator = separator; 108 static if (__traits(hasMember, typeof(_lbuf), `withCapacity`)) 109 { 110 this._lbuf = typeof(_lbuf).withCapacity(initialCapacity); 111 } 112 popFront(); 113 } 114 115 void popFront() @trusted 116 { 117 _lbuf.shrinkTo(0); 118 119 static if (isBlockInputRange!(typeof(_range))) 120 { 121 // TODO: functionize 122 while (!_range.empty) 123 { 124 ubyte[] currentFronts = _range.bufferFrontChunk; 125 // `_range` is mutable so sentinel-based search can kick 126 127 enum useCountUntil = false; 128 static if (useCountUntil) 129 { 130 import std.algorithm.searching : countUntil; 131 // TODO 132 } 133 else 134 { 135 import std.algorithm.searching : find; 136 const hit = currentFronts.find(_separator); // or use `indexOf` 137 } 138 139 if (hit.length) 140 { 141 const lineLength = hit.ptr - currentFronts.ptr; 142 _lbuf.put(currentFronts[0 .. lineLength]); // add everything up to separator 143 _range._bufIx += lineLength + _separator.sizeof; // advancement + separator 144 if (_range.empty) 145 { 146 _range.loadNextChunk(); 147 } 148 break; // done 149 } 150 else // no separator yet 151 { 152 _lbuf.put(currentFronts); // so just add everything 153 _range.loadNextChunk(); 154 } 155 } 156 } 157 else 158 { 159 // TODO: sentinel-based search for `_separator` in `_range` 160 while (!_range.empty && 161 _range.front != _separator) 162 { 163 _lbuf.put(_range.front); 164 _range.popFront(); 165 } 166 167 if (!_range.empty && 168 _range.front == _separator) 169 { 170 _range.popFront(); // pop separator 171 } 172 } 173 } 174 175 pragma(inline): 176 @safe pure nothrow @nogc: 177 178 @property bool empty() const 179 { 180 return _lbuf.data.length == 0; 181 } 182 183 const(E)[] front() const return scope 184 { 185 return _lbuf.data; 186 } 187 188 private: 189 BlockInputRange _range; 190 191 import std.array : Appender; 192 Appender!(E[]) _lbuf; // line buffer 193 194 // NOTE this is slower for ldc: 195 // import nxt.dynamic_array : Array; 196 // Array!E _lbuf; 197 198 E _separator; 199 } 200 201 class GzipOut 202 { 203 import std.zlib: Compress, HeaderFormat; 204 import std.stdio: File; 205 206 this(File file) @trusted 207 { 208 _f = file; 209 _compress = new Compress(HeaderFormat.gzip); 210 } 211 212 void compress(const string s) @trusted 213 { 214 auto compressed = _compress.compress(s); 215 _f.rawWrite(compressed); 216 } 217 218 void finish() @trusted 219 { 220 auto compressed = _compress.flush; 221 _f.rawWrite(compressed); 222 _f.close; 223 } 224 225 private: 226 Compress _compress; 227 File _f; 228 } 229 230 struct ZlibFileInputRange 231 { 232 import std.file : FileException; 233 234 /* Zlib docs: 235 CHUNK is simply the buffer size for feeding data to and pulling data from 236 the zlib routines. Larger buffer sizes would be more efficient, 237 especially for inflate(). If the memory is available, buffers sizes on 238 the order of 128K or 256K bytes should be used. 239 */ 240 enum chunkSize = 128 * 1024; // 128K 241 242 enum defaultExtension = `.gz`; 243 244 @safe: 245 246 this(in char[] path) @trusted 247 { 248 import std.string : toStringz; // TODO: avoid GC allocation by looking at how gmp-d z.d solves it 249 _f = gzopen(path.toStringz, `rb`); 250 if (!_f) 251 { 252 throw new FileException(`Couldn't open file ` ~ path.idup); 253 } 254 _buf = new ubyte[chunkSize]; 255 loadNextChunk(); 256 } 257 258 ~this() @trusted @nogc 259 { 260 const int ret = gzclose(_f); 261 if (ret < 0) 262 { 263 assert(`Couldn't close file`); // TODO: replace with non-GC-allocated exception 264 } 265 } 266 267 @disable this(this); 268 269 void loadNextChunk() @trusted 270 { 271 int count = gzread(_f, _buf.ptr, chunkSize); 272 if (count == -1) 273 { 274 throw new Exception(`Error decoding file`); 275 } 276 _bufIx = 0; 277 _bufReadLength = count; 278 } 279 280 void popFront() 281 { 282 assert(!empty); 283 _bufIx += 1; 284 if (_bufIx >= _bufReadLength) 285 { 286 loadNextChunk(); 287 _bufIx = 0; // restart counter 288 } 289 } 290 291 pragma(inline, true): 292 pure nothrow @nogc: 293 294 @property ubyte front() const @trusted 295 { 296 assert(!empty); 297 return _buf.ptr[_bufIx]; 298 } 299 300 @property bool empty() const 301 { 302 return _bufIx == _bufReadLength; 303 } 304 305 /** Get current bufferFrontChunk. 306 TODO: need better name for this 307 */ 308 inout(ubyte)[] bufferFrontChunk() inout @trusted 309 { 310 assert(!empty); 311 return _buf.ptr[_bufIx .. _bufReadLength]; 312 } 313 314 private: 315 import etc.c.zlib : gzFile, gzopen, gzclose, gzread; 316 317 gzFile _f; 318 319 ubyte[] _buf; // block read buffer 320 321 // number of bytes in `_buf` recently read by `gzread`, normally equal to `_buf.length` except after last read where is it's normally less than `_buf.length` 322 size_t _bufReadLength; 323 324 size_t _bufIx; // current stream read index in `_buf` 325 326 // TODO: make this work: 327 // extern (C) nothrow @nogc: 328 // pragma(mangle, `gzopen`) gzFile gzopen(const(char)* path, const(char)* mode); 329 // pragma(mangle, `gzclose`) int gzclose(gzFile file); 330 // pragma(mangle, `gzread`) int gzread(gzFile file, void* buf, uint len); 331 } 332 333 struct Bz2libFileInputRange 334 { 335 import std.file : FileException; 336 337 enum chunkSize = 128 * 1024; // 128K. TODO: find optimal value via benchmark 338 enum defaultExtension = `.bz2`; 339 enum useGC = false; // TODO: generalize to allocator parameter 340 341 @safe: 342 343 this(in char[] path) @trusted 344 { 345 import std.string : toStringz; // TODO: avoid GC allocation by looking at how gmp-d z.d solves it 346 _f = BZ2_bzopen(path.toStringz, `rb`); 347 if (!_f) 348 { 349 throw new FileException(`Couldn't open file ` ~ path.idup); 350 } 351 352 static if (useGC) 353 { 354 _buf = new ubyte[chunkSize]; 355 } 356 else 357 { 358 import core.memory : pureMalloc; 359 _buf = (cast(ubyte*)pureMalloc(chunkSize))[0 .. chunkSize]; 360 } 361 362 loadNextChunk(); 363 } 364 365 ~this() @trusted @nogc 366 { 367 BZ2_bzclose(_f); // TODO: error handling? 368 369 static if (!useGC) 370 { 371 import core.memory : pureFree; 372 pureFree(_buf.ptr); 373 } 374 } 375 376 @disable this(this); 377 378 void loadNextChunk() @trusted 379 { 380 int count = BZ2_bzread(_f, _buf.ptr, chunkSize); 381 if (count == -1) 382 { 383 throw new Exception(`Error decoding file`); 384 } 385 _bufIx = 0; 386 _bufReadLength = count; 387 } 388 389 void popFront() 390 { 391 assert(!empty); 392 _bufIx += 1; 393 if (_bufIx >= _bufReadLength) 394 { 395 loadNextChunk(); 396 _bufIx = 0; // restart counter 397 } 398 } 399 400 pragma(inline, true): 401 pure nothrow @nogc: 402 403 @property ubyte front() const @trusted 404 { 405 assert(!empty); 406 return _buf.ptr[_bufIx]; 407 } 408 409 @property bool empty() const 410 { 411 return _bufIx == _bufReadLength; 412 } 413 414 /** Get current bufferFrontChunk. 415 TODO: need better name for this 416 */ 417 inout(ubyte)[] bufferFrontChunk() inout @trusted 418 { 419 assert(!empty); 420 return _buf.ptr[_bufIx .. _bufReadLength]; 421 } 422 423 private: 424 import nxt.bzlib : BZFILE, BZ2_bzopen, BZ2_bzread, BZ2_bzwrite, BZ2_bzclose; 425 pragma(lib, `bz2`); // Ubuntu: sudo apt-get install libbz2-dev 426 427 BZFILE* _f; 428 429 ubyte[] _buf; // block read buffer 430 431 // number of bytes in `_buf` recently read by `gzread`, normally equal to `_buf.length` except after last read where is it's normally less than `_buf.length` 432 size_t _bufReadLength; 433 434 size_t _bufIx; // current stream read index in `_buf` 435 } 436 437 void testInputRange(FileInputRange)() @safe 438 if (isInputRange!FileInputRange) 439 { 440 import std.stdio : File; 441 442 enum path = `test` ~ FileInputRange.defaultExtension; 443 444 const wholeSource = "abc\ndef\nghi"; // contents of source 445 446 foreach (const n; wholeSource.length .. wholeSource.length) // TODO: from 0 447 { 448 const source = wholeSource[0 .. n]; // slice from the beginning 449 450 File file = File(path, `w`); // TODO: `scope` 451 auto of = new GzipOut(file); // TODO: `scope` 452 of.compress(source); 453 of.finish(); 454 455 size_t ix = 0; 456 foreach (e; FileInputRange(path)) 457 { 458 assert(cast(char)e == source[ix]); 459 ++ix; 460 } 461 462 import std.algorithm.searching : count; 463 import std.algorithm.iteration : splitter; 464 alias R = DecompressByLine!ZlibFileInputRange; 465 466 assert(new R(path).count == source.splitter('\n').count); 467 } 468 } 469 470 @safe unittest 471 { 472 testInputRange!(GzipFileInputRange); 473 testInputRange!(ZlibFileInputRange); 474 testInputRange!(Bz2libFileInputRange); 475 } 476 477 /** Read Age of Aqcuisitions. 478 */ 479 static private void testReadAgeofAqcuisitions(const string rootDirPath = `~/Work/knet/knowledge/en/age-of-aqcuisition`) @safe 480 { 481 import std.path: expandTilde; 482 import nxt.zio : DecompressByLine, GzipFileInputRange; 483 import std.path : buildNormalizedPath; 484 485 { 486 const path = buildNormalizedPath(rootDirPath.expandTilde, 487 `AoA_51715_words.csv.gz`); 488 size_t count = 0; 489 foreach (line; new DecompressByLine!GzipFileInputRange(path)) 490 { 491 count += 1; 492 } 493 assert(count == 51716); 494 } 495 496 { 497 const path = buildNormalizedPath(rootDirPath.expandTilde, 498 `AoA_51715_words.csv.gz`); 499 size_t count = 0; 500 foreach (line; new DecompressByLine!ZlibFileInputRange(path)) 501 { 502 count += 1; 503 } 504 assert(count == 51716); 505 } 506 507 { 508 const path = buildNormalizedPath(rootDirPath.expandTilde, 509 `AoA_51715_words_copy.csv.bz2`); 510 size_t count = 0; 511 foreach (line; new DecompressByLine!Bz2libFileInputRange(path)) 512 { 513 count += 1; 514 } 515 assert(count == 51716); 516 } 517 } 518 519 /** Read Concept 5 assertions. 520 */ 521 static private void testReadConcept5Assertions(const string path = `/home/per/Knowledge/ConceptNet5/latest/conceptnet-assertions-5.6.0.csv.gz`) @safe 522 { 523 alias R = ZlibFileInputRange; 524 525 import std.stdio: writeln; 526 import std.range: take; 527 import std.algorithm.searching: count; 528 529 const lineBlockCount = 100_000; 530 size_t lineNr = 0; 531 foreach (const line; new DecompressByLine!R(path)) 532 { 533 if (lineNr % lineBlockCount == 0) 534 { 535 writeln(`Line `, lineNr, ` read containing:`, line); 536 } 537 lineNr += 1; 538 } 539 540 const lineCount = 5; 541 foreach (const line; new DecompressByLine!R(path).take(lineCount)) 542 { 543 writeln(line); 544 } 545 } 546 547 /// benchmark DBpedia parsing 548 static private void benchmarkDbpediaParsing(const string rootPath = `/home/per/Knowledge/DBpedia/latest`) @system 549 { 550 alias R = Bz2libFileInputRange; 551 552 import nxt.array_algorithm : startsWith, endsWith; 553 import std.algorithm : filter; 554 import std.file : dirEntries, SpanMode; 555 import std.path : baseName; 556 import std.stdio : write, writeln, stdout; 557 import std.datetime : MonoTime; 558 559 foreach (const path; dirEntries(rootPath, SpanMode.depth).filter!(file => (file.name.baseName.startsWith(`instance_types`) && 560 file.name.endsWith(`.ttl.bz2`)))) 561 { 562 write(`Checking `, path, ` ... `); stdout.flush(); 563 564 immutable before = MonoTime.currTime(); 565 566 size_t lineCounter = 0; 567 foreach (const line; new DecompressByLine!R(path)) 568 { 569 lineCounter += 1; 570 } 571 572 immutable after = MonoTime.currTime(); 573 574 showStat(path, before, after, lineCounter); 575 } 576 } 577 578 /// Show statistics. 579 static private void showStat(T)(in const(char[]) tag, 580 in T before, 581 in T after, 582 in size_t lineCount) 583 { 584 import std.stdio : writefln; 585 writefln(`%s: %3.1f msecs (%3.1f usecs/line)`, 586 tag, 587 cast(double)(after - before).total!`msecs`, 588 cast(double)(after - before).total!`usecs` / lineCount); 589 }