# Licensed under a 3-clause BSD style license - see LICENSE.rst """ This module includes a fast iterator-based XML parser. """ # STDLIB import contextlib import io import sys # ASTROPY from astropy.utils import data from ._iterparser import IterParser as _fast_iterparse __all__ = ["get_xml_iterator", "get_xml_encoding", "xml_readlines"] @contextlib.contextmanager def _convert_to_fd_or_read_function(fd): """ Returns a function suitable for streaming input, or a file object. This function is only useful if passing off to C code where: - If it's a real file object, we want to use it as a real C file object to avoid the Python overhead. - If it's not a real file object, it's much handier to just have a Python function to call. This is somewhat quirky behavior, of course, which is why it is private. For a more useful version of similar behavior, see `astropy.utils.misc.get_readable_fileobj`. Parameters ---------- fd : object May be: - a file object. If the file is uncompressed, this raw file object is returned verbatim. Otherwise, the read method is returned. - a function that reads from a stream, in which case it is returned verbatim. - a file path, in which case it is opened. Again, like a file object, if it's uncompressed, a raw file object is returned, otherwise its read method. - an object with a :meth:`read` method, in which case that method is returned. Returns ------- fd : context-dependent See above. """ if callable(fd): yield fd return with data.get_readable_fileobj(fd, encoding="binary") as new_fd: if sys.platform.startswith("win"): yield new_fd.read else: if isinstance(new_fd, io.FileIO): yield new_fd else: yield new_fd.read def _slow_iterparse(fd, buffersize=2**10): from xml.parsers import expat if not callable(fd): read = fd.read else: read = fd queue = [] text = [] def start(name, attr): queue.append( (True, name, attr, (parser.CurrentLineNumber, parser.CurrentColumnNumber)) ) del text[:] def end(name): queue.append( ( False, name, "".join(text).strip(), (parser.CurrentLineNumber, parser.CurrentColumnNumber), ) ) parser = expat.ParserCreate() parser.specified_attributes = True parser.StartElementHandler = start parser.EndElementHandler = end parser.CharacterDataHandler = text.append Parse = parser.Parse data = read(buffersize) while data: Parse(data, False) yield from queue del queue[:] data = read(buffersize) Parse("", True) yield from queue @contextlib.contextmanager def get_xml_iterator(source, _debug_python_based_parser=False): """ Returns an iterator over the elements of an XML file. The iterator doesn't ever build a tree, so it is much more memory and time efficient than the alternative in ``cElementTree``. Parameters ---------- source : path-like, :term:`file-like (readable)`, or callable Handle that contains the data or function that reads it. If a function or callable object, it must directly read from a stream. Non-callable objects must define a ``read`` method. Returns ------- parts : iterator The iterator returns 4-tuples (*start*, *tag*, *data*, *pos*): - *start*: when `True` is a start element event, otherwise an end element event. - *tag*: The name of the element - *data*: Depends on the value of *event*: - if *start* == `True`, data is a dictionary of attributes - if *start* == `False`, data is a string containing the text content of the element - *pos*: Tuple (*line*, *col*) indicating the source of the event. """ with _convert_to_fd_or_read_function(source) as fd: if _debug_python_based_parser: context = _slow_iterparse(fd) else: context = _fast_iterparse(fd) yield iter(context) def get_xml_encoding(source): """ Determine the encoding of an XML file by reading its header. Parameters ---------- source : path-like, :term:`file-like (readable)`, or callable Handle that contains the data or function that reads it. If a function or callable object, it must directly read from a stream. Non-callable objects must define a ``read`` method. Returns ------- encoding : str """ with get_xml_iterator(source) as iterator: start, tag, data, pos = next(iterator) if not start or tag != "xml": raise OSError("Invalid XML file") # The XML spec says that no encoding === utf-8 return data.get("encoding") or "utf-8" def xml_readlines(source): """ Get the lines from a given XML file. Correctly determines the encoding and always returns unicode. Parameters ---------- source : path-like, :term:`file-like (readable)`, or callable Handle that contains the data or function that reads it. If a function or callable object, it must directly read from a stream. Non-callable objects must define a ``read`` method. Returns ------- lines : list of unicode """ encoding = get_xml_encoding(source) with data.get_readable_fileobj(source, encoding=encoding) as input: input.seek(0) xml_lines = input.readlines() return xml_lines