diff --git a/src/BufferStream.js b/src/BufferStream.js index d867aa2c..b5975ee2 100644 --- a/src/BufferStream.js +++ b/src/BufferStream.js @@ -22,11 +22,40 @@ class BufferStream { size = 0; view = new SplitDataView(); + /** Indicates if this buffer stream is complete/has finished being created */ + isComplete = false; + + /** A flag to set to indicate to clear buffers as they get consumed */ + clearBuffers = false; + encoder = new TextEncoder("utf-8"); constructor(options = null) { this.isLittleEndian = options?.littleEndian || this.isLittleEndian; this.view.defaultSize = options?.defaultSize ?? this.view.defaultSize; + this.clearBuffers = options.clearBuffers || false; + } + + /** + * Mark this stream as having finished being written or read from + */ + setComplete(value = true) { + this.isComplete = value; + } + + /** + * Indicates if the value length is currently available in the already + * read/defined portion of the stream + */ + isAvailable(length) { + // console.warn( + // "isAvailable", + // length, + // this.offset, + // this.size, + // this.endOffset + // ); + return this.offset + length <= this.endOffset; } setEndian(isLittle) { @@ -37,6 +66,11 @@ class BufferStream { return this.view.slice(start, end); } + /** + * @deprecated Gets the entire buffer at once. Suggest using the + * view instead, and writing an iterator over the parts to finish + * writing it. + */ getBuffer(start = 0, end = this.size) { if (this.noCopy) { return new Uint8Array(this.slice(start, end)); @@ -272,6 +306,7 @@ class BufferStream { ); this.offset += stream.size; this.size = this.offset; + this.endOffset = this.size; return this.view.availableSize; } @@ -291,13 +326,40 @@ class BufferStream { * @param {*} options.start for the start of the new buffer to use * @param {*} options.end for the end of the buffer to use * @param {*} options.transfer to transfer the buffer to be owned + * Transfer will default true if the entire buffer is being added. + * It should be set explicitly to false to NOT transfer. */ addBuffer(buffer, options = null) { + if (!buffer) { + // Silently ignore null buffers. + return; + } this.view.addBuffer(buffer, options); this.size = this.view.size; + this.endOffset = this.size; return this.size; } + /** + * Consumes the data up to the given offset. + * This will clear the references to the data buffers, and will + * cause resets etc to fail. + * The default offset is the current position, so everything already read. + */ + consume(offset = this.offset) { + if (!this.clearBuffers) { + return; + } + this.view.consume(offset); + } + + /** + * Returns true if the stream has data in the given range. + */ + hasData(start, end) { + return this.view.hasData(start, end); + } + more(length) { if (this.offset + length > this.endOffset) { throw new Error("Request more than currently allocated buffer"); @@ -313,6 +375,7 @@ class BufferStream { this.slice(this.offset, this.offset + length) ); this.increment(length); + newBuf.setComplete(); return newBuf; } @@ -323,7 +386,7 @@ class BufferStream { } end() { - return this.offset >= this.view.byteLength; + return this.isComplete && this.offset >= this.end; } toEnd() { @@ -341,14 +404,16 @@ class ReadBufferStream extends BufferStream { noCopy: false } ) { - super({ littleEndian }); + super({ ...options, littleEndian }); this.noCopy = options.noCopy; this.decoder = new TextDecoder("latin1"); if (buffer instanceof BufferStream) { this.view.from(buffer.view, options); + this.isComplete = true; } else if (buffer) { this.view.addBuffer(buffer); + this.isComplete = true; } this.offset = options.start ?? buffer?.offset ?? 0; this.size = options.stop || buffer?.size || buffer?.byteLength || 0; @@ -367,7 +432,7 @@ class ReadBufferStream extends BufferStream { } end() { - return this.offset >= this.endOffset; + return this.isComplete && this.offset >= this.endOffset; } toEnd() { diff --git a/src/DicomMessage.js b/src/DicomMessage.js index 1fbbbcd6..6feb014d 100644 --- a/src/DicomMessage.js +++ b/src/DicomMessage.js @@ -105,6 +105,82 @@ class DicomMessage { }); } + /** + * Checks if the FMI has been read successfully, returning the + * main syntax if it has, otherwise reads the file meta information + * starting at the current offset. + */ + static _readFmi(dictCreator, stream) { + if (dictCreator.fmi) { + return dictCreator.mainSyntax; + } + + if (!stream.isAvailable(16) && !stream.isComplete) { + return false; + } + const useSyntax = EXPLICIT_LITTLE_ENDIAN; + // read the first tag to check if it's the meta length tag + const el = DicomMessage._readTag(stream, useSyntax); + + let metaHeader; + if (el.tag.toCleanString() !== "00020000") { + // meta length tag is missing + if (!dictCreator.options.ignoreErrors) { + throw new Error( + "Invalid DICOM file, meta length tag is malformed or not present." + ); + } + + // reset stream to the position where we started reading tags + stream.offset = dictCreator.metaStartPos; + + if (!stream.isAvailable(10240) && !stream.isComplete) { + // Not enough data available yet, wait for at least 10k + // to start reading. + return false; + } + + // read meta header elements sequentially + metaHeader = DicomMessage._read(stream, useSyntax, { + untilTag: "00030000", + stopOnGreaterTag: true, + ignoreErrors: true + }); + if (!metaHeader) { + stream.offset = dictCreator.metaStartPos; + return false; + } + } else { + // meta length tag is present + var metaLength = el.values[0]; + + if (!stream.isAvailable(metaLength) && !stream.isComplete) { + stream.offset = dictCreator.metaStartPos; + return false; + } + // read header buffer using the specified meta length + var metaStream = stream.more(metaLength); + // Use a new options object to read without the custom options + // applying to the FMI + metaHeader = DicomMessage._read(metaStream, useSyntax, {}); + } + + dictCreator.fmi = metaHeader; + stream.consume(); + + //get the syntax + const mainSyntax = metaHeader["00020010"].Value[0]; + + if (!mainSyntax) { + throw new Error( + `No syntax provided in ${JSON.stringify(metaHeader)}` + ); + } + dictCreator.mainSyntax = mainSyntax; + + return mainSyntax; + } + static _read( bufferStream, syntax, @@ -126,6 +202,12 @@ class DicomMessage { try { let previousTagOffset; while (!bufferStream.end()) { + if ( + !bufferStream.isAvailable(1024) && + !bufferStream.isComplete + ) { + return false; + } if (dictCreator.continueParse(bufferStream)) { continue; } @@ -217,6 +299,26 @@ class DicomMessage { return encapsulatedSyntaxes.indexOf(syntax) != -1; } + /** + * If there is enough available data, read the 128 byte prefix and the + * DICM header marker, returning true when done, throwing an error when + * not a DICM stream, and returning false if not yet enough data. + */ + static _readDICM(dictCreator, stream) { + if (!stream.isAvailable(132) && !stream.isComplete) { + return false; + } + stream.reset(); + stream.increment(128); + if (stream.readAsciiString(4) !== "DICM") { + throw new Error("Invalid DICOM file, expected header is missing"); + } + dictCreator.metaStartPos = stream.offset; + stream.consume(); + + return true; + } + /** * Reads a DICOM input stream from an array buffer. * @@ -233,65 +335,65 @@ class DicomMessage { forceStoreRaw: false } ) { - var stream = new ReadBufferStream(buffer, null, { - noCopy: options.noCopy - }), - useSyntax = EXPLICIT_LITTLE_ENDIAN; - stream.reset(); - stream.increment(128); - if (stream.readAsciiString(4) !== "DICM") { - throw new Error("Invalid DICOM file, expected header is missing"); + if (!options.dictCreator) { + options.dictCreator = new DictCreator(this, options); + } + const { dictCreator } = options; + if (options.stream === true) { + // Create a streaming input buffer + options.stream = new ReadBufferStream(null, true, { + clearBuffers: options.clearBuffers ?? true + }); } - // save position before reading first tag - var metaStartPos = stream.offset; - - // read the first tag to check if it's the meta length tag - var el = DicomMessage._readTag(stream, useSyntax); + let stream = + options.stream || + new ReadBufferStream(buffer, null, { + noCopy: options.noCopy + }); + if (options.stream) { + stream.addBuffer(buffer); + } - var metaHeader = {}; - if (el.tag.toCleanString() !== "00020000") { - // meta length tag is missing - if (!options.ignoreErrors) { - throw new Error( - "Invalid DICOM file, meta length tag is malformed or not present." - ); + if (dictCreator.metaStartPos === -1) { + const dicmResult = this._readDICM(dictCreator, stream); + if (!dicmResult) { + return false; } + } - // reset stream to the position where we started reading tags - stream.offset = metaStartPos; - - // read meta header elements sequentially - metaHeader = DicomMessage._read(stream, useSyntax, { - untilTag: "00030000", - stopOnGreaterTag: true, - ignoreErrors: true - }); - } else { - // meta length tag is present - var metaLength = el.values[0]; - - // read header buffer using the specified meta length - var metaStream = stream.more(metaLength); - metaHeader = DicomMessage._read(metaStream, useSyntax, options); + if (!dictCreator.mainSyntax) { + const result = this._readFmi(dictCreator, stream); + if (!result) { + return false; + } } - //get the syntax - var mainSyntax = metaHeader["00020010"].Value[0]; + let { mainSyntax } = dictCreator; //in case of deflated dataset, decompress and continue if (mainSyntax === DEFLATED_EXPLICIT_LITTLE_ENDIAN) { + if (!stream.isComplete) { + return false; + } stream = new DeflatedReadBufferStream(stream, { noCopy: options.noCopy }); } mainSyntax = DicomMessage._normalizeSyntax(mainSyntax); - var objects = DicomMessage._read(stream, mainSyntax, options); + const objects = DicomMessage._read(stream, mainSyntax, options); + if (!objects) { + // Needs more data still + return false; + } - var dicomDict = new DicomDict(metaHeader); + var dicomDict = new DicomDict(dictCreator.fmi); dicomDict.dict = objects; + // Reset, ready for another read + dictCreator.reset(); + return dicomDict; } diff --git a/src/DictCreator.js b/src/DictCreator.js index ba9d336b..34abc9cb 100644 --- a/src/DictCreator.js +++ b/src/DictCreator.js @@ -15,8 +15,35 @@ import { ValueRepresentation } from "./ValueRepresentation.js"; * * Restartable parsing, to allow stream inputs */ export class DictCreator { + // Public attributes used dynamically while reading: + + /** + * The current DICOM block being created + */ dict = {}; + /** + * The top of the stack where the data is being currently stored. + */ current = { dict: this.dict, parent: null, level: 0 }; + /** + * The start of the FMI block. Starts at -1 before reading + * the file prefix. + */ + metaStartPos = -1; + /** + * The main transfer syntax for this object + */ + mainSyntax = ""; + /** + * The file meta information associated with this creator. + */ + fmi = null; + + /** The number of bytes needed to continue parsing */ + neededLengthBase = 16; + + // The rest of the information is configuration read/only data + handlers = { [TagHex.Item]: this.handleItem, [TagHex.ItemDelimitationEnd]: this.handleItemDelimitationEnd, @@ -27,6 +54,7 @@ export class DictCreator { privateTagBulkdataSize = 128; publicTagBulkdataSize = 1024; + options = {}; /** * Creates a dict object using the given options. @@ -39,6 +67,7 @@ export class DictCreator { * bulkdata based on the size of it. */ constructor(_dicomMessage, options) { + this.options = options; if (options.writeBulkdata) { this.handlers.bulkdata = this.handleBulkdata; } @@ -53,6 +82,17 @@ export class DictCreator { } } + /** + * Resets this object, ready for reading another DICM file + */ + reset() { + this.metaStartPos = -1; + this.mainSyntax = ""; + this.fmi = null; + this.dict = {}; + this.current = { dict: this.dict, parent: null, level: 0 }; + } + /** * Creates a new tag attribute on cleanTagString based on the readInfo * readInfo has attributes values, BulkDataUUID and BulkDataURI for the @@ -104,6 +144,14 @@ export class DictCreator { return handler?.call(this, header, stream, tsuid, options); } + get neededLength() { + return this.current.neededLength ?? this.neededLengthBase; + } + + /** + * Continues parsing a partially parsed value. This will return true + * if the partial parse is successful. + */ continueParse(stream) { const { current } = this; if ( diff --git a/src/SplitDataView.js b/src/SplitDataView.js index 0755aa5e..da90224a 100644 --- a/src/SplitDataView.js +++ b/src/SplitDataView.js @@ -6,22 +6,90 @@ export default class SplitDataView { buffers = []; views = []; offsets = []; + lengths = []; size = 0; byteLength = 0; /** The default size is 256k */ defaultSize = 256 * 1024; + /** + * The set of byte arrays being consumed. This allows adding byte + * arrays ready to be consumed to the list, and have them available + * once the current consume finishes. + */ + consumed = []; + + /** The last byte index not already consumed */ + consumeOffset = -1; + constructor(options = { defaultSize: 256 * 1024 }) { this.defaultSize = options.defaultSize || this.defaultSize; } + /** + * Consumes the already written or read data, up to the given offset. + */ + consume(offset) { + this.consumeOffset = Math.max(offset, this.consumeOffset); + if (!this.consumed || !this.offsets.length) { + return; + } + const nextOffset = this.offsets[this.consumed.length]; + const nextLength = this.lengths[this.consumed.length]; + if (nextOffset === undefined || nextLength === undefined) { + return; + } + const currentEnd = nextOffset + nextLength; + if (this.consumeOffset < currentEnd) { + // Haven't finished consuming all the data in the current block + return; + } + // Consume the entire buffer for now + this.consumed.push( + this.consumeListener?.( + this.buffers, + 0, + Math.min(this.buffers.length, nextOffset - this.consumeOffset) + ) + ); + this.buffers[this.consumed.length - 1] = null; + this.consume(offset); + } + + /** + * Returns true if there is data for + */ + hasData(start, end) { + if (start > this.size || end > this.size) { + return false; + } + for (let i = 0; i < this.offsets.length; i++) { + const startOffset = this.offsets[i]; + const nextOffset = startOffset + this.lengths[i]; + if (end <= nextOffset) { + return !!this.buffers[i]; + } + if (start < nextOffset && end > startOffset) { + // Enters the if conditions if start...end overlaps + // startOffset...endOffset + // 25...50 overlaps 25...26, 26..27, 49...50 + // but not 24..25 or 50...51 + if (!this.buffers[i]) { + return false; + } + } + } + return true; + } + checkSize(end) { while (end > this.byteLength) { const buffer = new ArrayBuffer(this.defaultSize); this.buffers.push(buffer); this.views.push(new DataView(buffer)); this.offsets.push(this.byteLength); + this.lengths.push(buffer.byteLength); this.byteLength += buffer.byteLength; } @@ -40,7 +108,8 @@ export default class SplitDataView { buffer = buffer.buffer || buffer; const start = options?.start || 0; const end = options?.end || buffer.byteLength; - const transfer = options?.transfer; + const transfer = + options?.transfer ?? (start === 0 && end === buffer.byteLength); if (start === end) { return; } @@ -48,12 +117,13 @@ export default class SplitDataView { const lastOffset = this.offsets.length ? this.offsets[this.offsets.length - 1] : 0; - const lastLength = this.buffers.length - ? this.buffers[this.buffers.length - 1]?.byteLength + const lastLength = this.lengths.length + ? this.lengths[this.lengths.length - 1] : 0; this.buffers.push(addBuffer); this.views.push(new DataView(addBuffer)); this.offsets.push(lastOffset + lastLength); + this.lengths.push(addBuffer.byteLength); this.size += addBuffer.byteLength; this.byteLength += addBuffer.byteLength; } @@ -63,6 +133,7 @@ export default class SplitDataView { this.size = view.size; this.byteLength = view.byteLength; this.offsets.push(...view.offsets); + this.lengths.push(...view.lengths); this.buffers.push(...view.buffers); this.views.push(...view.views); // TODO - use the options to skip copying irrelevant data @@ -85,7 +156,7 @@ export default class SplitDataView { } let offset = this.offsets[index]; let length = buffer.byteLength; - if (end < offset + length) { + if (end <= offset + length) { return buffer.slice(start - offset, end - offset); } const createBuffer = new Uint8Array(end - start); @@ -114,18 +185,27 @@ export default class SplitDataView { for (let index = 0; index < this.buffers.length; index++) { if ( start >= this.offsets[index] && - start < this.offsets[index] + this.buffers[index].byteLength + start < this.offsets[index] + this.lengths[index] ) { return index; } } } + /** + * Returns a buffer view containing the given start position. + * Note this will return undefined if start is after the current + * data set. + */ findView(start, length = 1) { const index = this.findStart(start); - const buffer = this.buffers[index]; const viewOffset = this.offsets[index]; - const viewLength = buffer.byteLength; + const viewLength = this.lengths[index]; + if (viewOffset === undefined) { + throw new Error( + `Finding view is past end of input for start=${start} where offsets=${this.offsets} and lengths are ${this.lengths}` + ); + } if (start + length - viewOffset <= viewLength) { return { view: this.views[index], viewOffset, index }; } diff --git a/test/streaming.test.js b/test/streaming.test.js new file mode 100644 index 00000000..3444f65d --- /dev/null +++ b/test/streaming.test.js @@ -0,0 +1,73 @@ +import fs from "fs"; + +import dcmjs from "../src/index.js"; +import { getTestDataset } from "./testUtils.js"; + +const { NormalizedDictCreator, DicomMessage } = dcmjs.data; + +const url = + "https://github.com/dcmjs-org/data/releases/download/binary-parsing-stressors/large-private-tags.dcm"; + +describe("Streaming Parsing", () => { + let dcmPath, buffer; + + beforeAll(async () => { + dcmPath = await getTestDataset(url, "large-private-tags.dcm"); + console.warn("dcmPath=", dcmPath); + buffer = fs.readFileSync(dcmPath).buffer; + }); + + it("Reads partial streamed data", async () => { + const options = { + stream: true + }; + + options.dictCreator = new NormalizedDictCreator(DicomMessage, options); + + let dicomDict = DicomMessage.readFile(buffer.slice(0, 128), options); + expect(dicomDict).toBe(false); + + const { stream } = options; + expect(stream.hasData(0, 128)).toBe(true); + expect(stream.hasData(0, 132)).toBe(false); + + dicomDict = DicomMessage.readFile(buffer.slice(128, 132), options); + expect(dicomDict).toBe(false); + expect(stream.hasData(0, 132)).toBe(false); + + dicomDict = DicomMessage.readFile(buffer.slice(132, 340), options); + expect(dicomDict).toBe(false); + expect(options.dictCreator.fmi).toBeTruthy(); + expect(stream.hasData(132, 340)).toBe(false); + + options.stream.addBuffer(buffer.slice(340, buffer.byteLength)); + options.stream.setComplete(); + // Should read the rest now + dicomDict = DicomMessage.readFile(null, options); + expect(dicomDict.dict).toBeTruthy(); + }); + + /** Warning - this is fairly slow to run as it chunks things up into tiny bits */ + it("Reads data in streamed 15 byte chunks", async () => { + const chunkLength = 15; + const options = { + stream: true + }; + + options.dictCreator = new NormalizedDictCreator(DicomMessage, options); + + let iNext = 0; + for (let i = 0; i < buffer.byteLength; i = iNext) { + iNext = Math.min(i + chunkLength, buffer.byteLength); + const dicomDict = DicomMessage.readFile( + buffer.slice(i, iNext), + options + ); + expect(dicomDict).toBe(false); + } + options.stream.setComplete(); + // Should read the rest now + const dicomDict = DicomMessage.readFile(null, options); + expect(dicomDict.dict).toBeTruthy(); + }); +});