# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""
This module tests some of the methods related to the ``ECSV``
reader/writer.
"""
import copy
import os
import sys
from contextlib import nullcontext
from io import StringIO
import numpy as np
import pytest
import yaml
from astropy import units as u
from astropy.io import ascii
from astropy.io.ascii.ecsv import DELIMITERS, InvalidEcsvDatatypeWarning
from astropy.io.tests.mixin_columns import compare_attrs, mixin_cols, serialized_names
from astropy.table import Column, QTable, Table
from astropy.table.column import MaskedColumn
from astropy.table.table_helpers import simple_table
from astropy.time import Time
from astropy.units import QuantityInfo
from astropy.units import allclose as quantity_allclose
from astropy.utils.masked import Masked
from .common import TEST_DIR
DTYPES = [
"bool",
"int8",
"int16",
"int32",
"int64",
"uint8",
"uint16",
"uint32",
"uint64",
"float16",
"float32",
"float64",
"float128",
"str",
]
if not hasattr(np, "float128") or os.name == "nt" or sys.maxsize <= 2**32:
DTYPES.remove("float128")
T_DTYPES = Table()
for dtype in DTYPES:
if dtype == "bool":
data = np.array([False, True, False])
elif dtype == "str":
data = np.array(["ab 0", "ab, 1", "ab2"])
else:
data = np.arange(3, dtype=dtype)
c = Column(data, unit="m / s", description="descr_" + dtype)
# Add meta in way that uses the default_factory type and not in alphabetical order
c.meta["meta " + dtype] = 1
c.meta["a"] = 2
T_DTYPES[dtype] = c
# Add meta in way that uses the default_factory type and not in alphabetical order
T_DTYPES.meta["comments"] = ["comment1", "comment2"]
T_DTYPES.meta["a"] = 3
# Corresponds to simple_table()
SIMPLE_LINES = [
"# %ECSV 1.0",
"# ---",
"# datatype:",
"# - {name: a, datatype: int64}",
"# - {name: b, datatype: float64}",
"# - {name: c, datatype: string}",
"# schema: astropy-2.0",
"a b c",
"1 1.0 c",
"2 2.0 d",
"3 3.0 e",
]
def test_write_simple():
"""
Write a simple table with common types. This shows the compact version
of serialization with one line per column.
"""
t = simple_table()
out = StringIO()
t.write(out, format="ascii.ecsv")
assert out.getvalue().splitlines() == SIMPLE_LINES
def test_write_full():
"""
Write a full-featured table with common types and explicitly checkout output
"""
t = T_DTYPES["bool", "int64", "float64", "str"]
lines = [
"# %ECSV 1.0",
"# ---",
"# datatype:",
"# - name: bool",
"# unit: m / s",
"# datatype: bool",
"# description: descr_bool",
"# meta: !!omap",
"# - {meta bool: 1}",
"# - {a: 2}",
"# - name: int64",
"# unit: m / s",
"# datatype: int64",
"# description: descr_int64",
"# meta: !!omap",
"# - {meta int64: 1}",
"# - {a: 2}",
"# - name: float64",
"# unit: m / s",
"# datatype: float64",
"# description: descr_float64",
"# meta: !!omap",
"# - {meta float64: 1}",
"# - {a: 2}",
"# - name: str",
"# unit: m / s",
"# datatype: string",
"# description: descr_str",
"# meta: !!omap",
"# - {meta str: 1}",
"# - {a: 2}",
"# meta: !!omap",
"# - comments: [comment1, comment2]",
"# - {a: 3}",
"# schema: astropy-2.0",
"bool int64 float64 str",
'False 0 0.0 "ab 0"',
'True 1 1.0 "ab, 1"',
"False 2 2.0 ab2",
]
out = StringIO()
t.write(out, format="ascii.ecsv")
assert out.getvalue().splitlines() == lines
def test_write_read_roundtrip():
"""
Write a full-featured table with all types and see that it round-trips on
readback. Use both space and comma delimiters.
"""
t = T_DTYPES
for delimiter in DELIMITERS:
out = StringIO()
t.write(out, format="ascii.ecsv", delimiter=delimiter)
t2s = [
Table.read(out.getvalue(), format="ascii.ecsv"),
Table.read(out.getvalue(), format="ascii"),
ascii.read(out.getvalue()),
ascii.read(out.getvalue(), format="ecsv", guess=False),
ascii.read(out.getvalue(), format="ecsv"),
]
for t2 in t2s:
assert t.meta == t2.meta
for name in t.colnames:
assert t[name].attrs_equal(t2[name])
assert np.all(t[name] == t2[name])
def test_write_read_roundtrip_empty_table(tmp_path):
# see https://github.com/astropy/astropy/issues/13191
sfile = tmp_path / "x.ecsv"
Table().write(sfile)
t = Table.read(sfile)
assert len(t) == 0
assert len(t.colnames) == 0
def test_bad_delimiter():
"""
Passing a delimiter other than space or comma gives an exception
"""
out = StringIO()
with pytest.raises(ValueError) as err:
T_DTYPES.write(out, format="ascii.ecsv", delimiter="|")
assert "only space and comma are allowed" in str(err.value)
def test_bad_header_start():
"""
Bad header without initial # %ECSV x.x
"""
lines = copy.copy(SIMPLE_LINES)
lines[0] = "# %ECV 0.9"
with pytest.raises(ascii.InconsistentTableError):
Table.read("\n".join(lines), format="ascii.ecsv", guess=False)
def test_bad_delimiter_input():
"""
Illegal delimiter in input
"""
lines = copy.copy(SIMPLE_LINES)
lines.insert(2, "# delimiter: |")
with pytest.raises(ValueError) as err:
Table.read("\n".join(lines), format="ascii.ecsv", guess=False)
assert "only space and comma are allowed" in str(err.value)
def test_multidim_input():
"""
Multi-dimensional column in input
"""
t = Table()
t["a"] = np.arange(24).reshape(2, 3, 4)
t["a"].info.description = "description"
t["a"].info.meta = {1: 2}
t["b"] = [1, 2]
out = StringIO()
t.write(out, format="ascii.ecsv")
t2 = Table.read(out.getvalue(), format="ascii.ecsv")
assert np.all(t2["a"] == t["a"])
assert t2["a"].shape == t["a"].shape
assert t2["a"].dtype == t["a"].dtype
assert t2["a"].info.description == t["a"].info.description
assert t2["a"].info.meta == t["a"].info.meta
assert np.all(t2["b"] == t["b"])
def test_structured_input():
"""
Structured column in input.
"""
t = Table()
# Add unit, description and meta to make sure that round-trips as well.
t["a"] = Column(
[("B", (1.0, [2.0, 3.0])), ("A", (9.0, [8.0, 7.0]))],
dtype=[("s", "U1"), ("v", [("p0", "f8"), ("p1", "2f8")])],
description="description",
format=">", # Most formats do not work with structured!
unit="m", # Overall unit should round-trip.
meta={1: 2},
)
t["b"] = Column(
[[(1.0, 2.0), (9.0, 8.0)], [(3.0, 4.0), (7.0, 6.0)]],
dtype="f8,f8",
unit=u.Unit("m,s"), # Per part unit should round-trip too.
)
out = StringIO()
t.write(out, format="ascii.ecsv")
t2 = Table.read(out.getvalue(), format="ascii.ecsv")
for col in t.colnames:
assert np.all(t2[col] == t[col])
assert t2[col].shape == t[col].shape
assert t2[col].dtype == t[col].dtype
assert t2[col].unit == t[col].unit
assert t2[col].format == t[col].format
assert t2[col].info.description == t[col].info.description
assert t2[col].info.meta == t[col].info.meta
def test_round_trip_empty_table():
"""Test fix in #5010 for issue #5009 (ECSV fails for empty type with bool type)"""
t = Table(dtype=[bool, "i", "f"], names=["a", "b", "c"])
out = StringIO()
t.write(out, format="ascii.ecsv")
t2 = Table.read(out.getvalue(), format="ascii.ecsv")
assert t.dtype == t2.dtype
assert len(t2) == 0
def test_csv_ecsv_colnames_mismatch():
"""
Test that mismatch in column names from normal CSV header vs.
ECSV YAML header raises the expected exception.
"""
lines = copy.copy(SIMPLE_LINES)
header_index = lines.index("a b c")
lines[header_index] = "a b d"
with pytest.raises(ValueError) as err:
ascii.read(lines, format="ecsv")
assert "column names from ECSV header ['a', 'b', 'c']" in str(err.value)
def test_regression_5604():
"""
See https://github.com/astropy/astropy/issues/5604 for more.
"""
t = Table()
t.meta = {"foo": 5 * u.km, "foo2": u.s}
t["bar"] = [7] * u.km
out = StringIO()
t.write(out, format="ascii.ecsv")
assert "!astropy.units.Unit" in out.getvalue()
assert "!astropy.units.Quantity" in out.getvalue()
def assert_objects_equal(obj1, obj2, attrs, compare_class=True):
if compare_class:
assert obj1.__class__ is obj2.__class__
assert obj1.shape == obj2.shape
info_attrs = [
"info.name",
"info.format",
"info.unit",
"info.description",
"info.dtype",
]
for attr in attrs + info_attrs:
a1 = obj1
a2 = obj2
for subattr in attr.split("."):
try:
a1 = getattr(a1, subattr)
a2 = getattr(a2, subattr)
except AttributeError:
a1 = a1[subattr]
a2 = a2[subattr]
if isinstance(a1, np.ndarray) and a1.dtype.kind == "f":
assert quantity_allclose(a1, a2, rtol=1e-10)
else:
assert np.all(a1 == a2)
# Check meta values and key order but allow None to be equivalent to {}
meta1 = obj1.info.meta or {}
meta2 = obj2.info.meta or {}
assert meta1 == meta2
assert meta1.keys() == meta2.keys()
# For no attrs that means we just compare directly.
if not attrs:
if isinstance(obj1, np.ndarray) and obj1.dtype.kind == "f":
assert quantity_allclose(obj1, obj2, rtol=1e-15)
else:
assert np.all(obj1 == obj2)
def test_ecsv_mixins_ascii_read_class():
"""Ensure that ascii.read(ecsv_file) returns the correct class
(QTable if any Quantity subclasses, Table otherwise).
"""
# Make a table with every mixin type except Quantities
t = QTable(
{
name: col
for name, col in mixin_cols.items()
if not isinstance(col.info, QuantityInfo)
}
)
out = StringIO()
t.write(out, format="ascii.ecsv")
t2 = ascii.read(out.getvalue(), format="ecsv")
assert type(t2) is Table
# Add a single quantity column
t["lon"] = mixin_cols["lon"]
out = StringIO()
t.write(out, format="ascii.ecsv")
t2 = ascii.read(out.getvalue(), format="ecsv")
assert type(t2) is QTable
def test_ecsv_mixins_qtable_to_table():
"""Test writing as QTable and reading as Table. Ensure correct classes
come out.
"""
names = sorted(mixin_cols)
t = QTable([mixin_cols[name] for name in names], names=names)
out = StringIO()
t.write(out, format="ascii.ecsv")
t2 = Table.read(out.getvalue(), format="ascii.ecsv")
assert t.colnames == t2.colnames
for name, col in t.columns.items():
col2 = t2[name]
attrs = compare_attrs[name]
compare_class = True
if isinstance(col.info, QuantityInfo):
# Downgrade Quantity to Column + unit
assert type(col2) is Column
# Class-specific attributes like `value` or `wrap_angle` are lost.
attrs = ["unit"]
compare_class = False
# Compare data values here (assert_objects_equal doesn't know how in this case)
assert np.allclose(col.value, col2, rtol=1e-10)
assert_objects_equal(col, col2, attrs, compare_class)
@pytest.mark.parametrize("table_cls", (Table, QTable))
def test_ecsv_mixins_as_one(table_cls):
"""Test write/read all cols at once and validate intermediate column names"""
names = sorted(mixin_cols)
all_serialized_names = []
# ECSV stores times as value by default, so we just get the column back.
# One exception is tm3, which is set to serialize via jd1 and jd2.
for name in names:
s_names = serialized_names[name]
if not name.startswith("tm3"):
s_names = [
s_name.replace(".jd1", "")
for s_name in s_names
if not s_name.endswith("jd2")
]
all_serialized_names.extend(s_names)
t = table_cls([mixin_cols[name] for name in names], names=names)
out = StringIO()
t.write(out, format="ascii.ecsv")
t2 = table_cls.read(out.getvalue(), format="ascii.ecsv")
assert t.colnames == t2.colnames
# Read as a ascii.basic table (skip all the ECSV junk)
t3 = table_cls.read(out.getvalue(), format="ascii.basic")
assert t3.colnames == all_serialized_names
def make_multidim(col, ndim):
"""Take a col with length=2 and make it N-d by repeating elements.
For the special case of ndim==1 just return the original.
The output has shape [3] * ndim. By using 3 we can be sure that repeating
the two input elements gives an output that is sufficiently unique for
the multidim tests.
"""
if ndim > 1:
import itertools
idxs = [idx for idx, _ in zip(itertools.cycle([0, 1]), range(3**ndim))]
col = col[idxs].reshape([3] * ndim)
return col
@pytest.mark.parametrize("name_col", list(mixin_cols.items()))
@pytest.mark.parametrize("table_cls", (Table, QTable))
@pytest.mark.parametrize("ndim", (1, 2, 3))
def test_ecsv_mixins_per_column(table_cls, name_col, ndim):
"""Test write/read one col at a time and do detailed validation.
This tests every input column type as 1-d, 2-d and 3-d.
"""
name, col = name_col
c = make_multidim(np.array([1.0, 2.0]), ndim)
col = make_multidim(col, ndim)
t = table_cls([c, col, c], names=["c1", name, "c2"])
t[name].info.description = "description"
t[name].info.meta = {"b": 2, "a": 1}
out = StringIO()
t.write(out, format="ascii.ecsv")
t2 = table_cls.read(out.getvalue(), format="ascii.ecsv")
assert t.colnames == t2.colnames
for colname in t.colnames:
assert len(t2[colname].shape) == ndim
if colname in ("c1", "c2"):
compare = ["data"]
else:
# Storing Longitude as Column loses wrap_angle.
compare = [
attr
for attr in compare_attrs[colname]
if not (attr == "wrap_angle" and table_cls is Table)
]
assert_objects_equal(t[colname], t2[colname], compare)
# Special case to make sure Column type doesn't leak into Time class data
if name.startswith("tm"):
assert t2[name]._time.jd1.__class__ is np.ndarray
assert t2[name]._time.jd2.__class__ is np.ndarray
def test_round_trip_masked_table_default(tmp_path):
"""Test (mostly) round-trip of MaskedColumn through ECSV using default serialization
that uses an empty string "" to mark NULL values. Note:
>>> simple_table(masked=True)
a b c
int64 float64 str1
----- ------- ----
-- 1.0 c
2 2.0 --
3 -- e
"""
filename = tmp_path / "test.ecsv"
t = simple_table(masked=True) # int, float, and str cols with one masked element
t.write(filename)
t2 = Table.read(filename)
assert t2.masked is False
assert t2.colnames == t.colnames
for name in t2.colnames:
# From formal perspective the round-trip columns are the "same"
assert np.all(t2[name].mask == t[name].mask)
assert np.all(t2[name] == t[name])
# But peeking under the mask shows that the underlying data are changed
# because by default ECSV uses "" to represent masked elements.
t[name].mask = False
t2[name].mask = False
assert not np.all(t2[name] == t[name]) # Expected diff
def test_round_trip_masked_table_serialize_mask(tmp_path):
"""
Same as prev but set the serialize_method to 'data_mask' so mask is written out
"""
filename = tmp_path / "test.ecsv"
t = simple_table(masked=True) # int, float, and str cols with one masked element
t["c"][0] = "" # This would come back as masked for default "" NULL marker
# MaskedColumn with no masked elements. See table the MaskedColumnInfo class
# _represent_as_dict() method for info about how we test a column with no masked elements.
t["d"] = [1, 2, 3]
t.write(filename, serialize_method="data_mask")
t2 = Table.read(filename)
assert t2.masked is False
assert t2.colnames == t.colnames
for name in t2.colnames:
assert np.all(t2[name].mask == t[name].mask)
assert np.all(t2[name] == t[name])
# Data under the mask round-trips also (unmask data to show this).
t[name].mask = False
t2[name].mask = False
assert np.all(t2[name] == t[name])
@pytest.mark.parametrize("table_cls", (Table, QTable))
def test_ecsv_round_trip_user_defined_unit(table_cls, tmp_path):
"""Ensure that we can read-back enabled user-defined units."""
# Test adapted from #8897, where it was noted that this works
# but was not tested.
filename = tmp_path / "test.ecsv"
unit = u.def_unit("bandpass_sol_lum")
t = table_cls()
t["l"] = np.arange(5) * unit
t.write(filename)
# without the unit enabled, get UnrecognizedUnit
if table_cls is QTable:
ctx = pytest.warns(u.UnitsWarning, match=r"'bandpass_sol_lum' did not parse .*")
else:
ctx = nullcontext()
# Note: The read might also generate ResourceWarning, in addition to UnitsWarning
with ctx:
t2 = table_cls.read(filename)
assert isinstance(t2["l"].unit, u.UnrecognizedUnit)
assert str(t2["l"].unit) == "bandpass_sol_lum"
if table_cls is QTable:
assert np.all(t2["l"].value == t["l"].value)
else:
assert np.all(t2["l"] == t["l"])
# But with it enabled, it works.
with u.add_enabled_units(unit):
t3 = table_cls.read(filename)
assert t3["l"].unit is unit
assert np.all(t3["l"] == t["l"])
# Just to be sure, also try writing with unit enabled.
filename2 = tmp_path / "test2.ecsv"
t3.write(filename2)
t4 = table_cls.read(filename)
assert t4["l"].unit is unit
assert np.all(t4["l"] == t["l"])
def test_read_masked_bool():
txt = """\
# %ECSV 1.0
# ---
# datatype:
# - {name: col0, datatype: bool}
# schema: astropy-2.0
col0
1
0
True
""
False
"""
dat = ascii.read(txt, format="ecsv")
col = dat["col0"]
assert isinstance(col, MaskedColumn)
assert np.all(col.mask == [False, False, False, True, False])
assert np.all(col == [True, False, True, False, False])
@pytest.mark.parametrize("serialize_method", ["null_value", "data_mask"])
@pytest.mark.parametrize("dtype", [np.int64, np.float64, bool, str])
@pytest.mark.parametrize("delimiter", [",", " "])
def test_roundtrip_multidim_masked_array(serialize_method, dtype, delimiter):
# TODO also test empty string with null value
t = Table()
col = MaskedColumn(np.arange(12).reshape(2, 3, 2), dtype=dtype)
if dtype is str:
# np does something funny and gives a dtype of U21.
col = col.astype("U2")
col.mask[0, 0, 0] = True
col.mask[1, 1, 1] = True
t["a"] = col
t["b"] = ["x", "y"] # Add another column for kicks
out = StringIO()
t.write(out, format="ascii.ecsv", serialize_method=serialize_method)
t2 = Table.read(out.getvalue(), format="ascii.ecsv")
assert t2.masked is False
assert t2.colnames == t.colnames
for name in t2.colnames:
assert t2[name].dtype == t[name].dtype
if hasattr(t[name], "mask"):
assert np.all(t2[name].mask == t[name].mask)
assert np.all(t2[name] == t[name])
@pytest.mark.parametrize("subtype", ["some-user-type", "complex"])
def test_multidim_unknown_subtype(subtype):
"""Test an ECSV file with a string type but unknown subtype"""
txt = f"""\
# %ECSV 1.0
# ---
# datatype:
# - name: a
# datatype: string
# subtype: {subtype}
# schema: astropy-2.0
a
[1,2]
[3,4]"""
with pytest.warns(
InvalidEcsvDatatypeWarning,
match=rf"unexpected subtype '{subtype}' set for column 'a'",
):
t = ascii.read(txt, format="ecsv")
assert t["a"].dtype.kind == "U"
assert t["a"][0] == "[1,2]"
def test_multidim_bad_shape():
"""Test a malformed ECSV file"""
txt = """\
# %ECSV 1.0
# ---
# datatype:
# - name: a
# datatype: string
# subtype: int64[3]
# schema: astropy-2.0
a
[1,2]
[3,4]"""
with pytest.raises(
ValueError, match="column 'a' failed to convert: shape mismatch"
):
Table.read(txt, format="ascii.ecsv")
def test_write_not_json_serializable():
t = Table()
t["a"] = np.array([{1, 2}, 1], dtype=object)
match = (
"could not convert column 'a' to string: Object of type set is not JSON"
" serializable"
)
out = StringIO()
with pytest.raises(TypeError, match=match):
t.write(out, format="ascii.ecsv")
def test_read_not_json_serializable():
"""Test a malformed ECSV file"""
txt = """\
# %ECSV 1.0
# ---
# datatype:
# - {name: a, datatype: string, subtype: json}
# schema: astropy-2.0
a
fail
[3,4]"""
match = "column 'a' failed to convert: column value is not valid JSON"
with pytest.raises(ValueError, match=match):
Table.read(txt, format="ascii.ecsv")
def test_read_bad_datatype():
"""Test a malformed ECSV file"""
txt = """\
# %ECSV 1.0
# ---
# datatype:
# - {name: a, datatype: object}
# schema: astropy-2.0
a
fail
[3,4]"""
with pytest.warns(
InvalidEcsvDatatypeWarning,
match="unexpected datatype 'object' of column 'a' is not in allowed",
):
t = Table.read(txt, format="ascii.ecsv")
assert t["a"][0] == "fail"
assert type(t["a"][1]) is str
assert type(t["a"].dtype) == np.dtype("O")
def test_read_complex():
"""Test an ECSV v1.0 file with a complex column"""
txt = """\
# %ECSV 1.0
# ---
# datatype:
# - {name: a, datatype: complex}
# schema: astropy-2.0
a
1+1j
2+2j"""
with pytest.warns(
InvalidEcsvDatatypeWarning,
match="unexpected datatype 'complex' of column 'a' is not in allowed",
):
t = Table.read(txt, format="ascii.ecsv")
assert t["a"].dtype.type is np.complex128
def test_read_str():
"""Test an ECSV file with a 'str' instead of 'string' datatype"""
txt = """\
# %ECSV 1.0
# ---
# datatype:
# - {name: a, datatype: str}
# schema: astropy-2.0
a
sometext
S""" # also testing single character text
with pytest.warns(
InvalidEcsvDatatypeWarning,
match="unexpected datatype 'str' of column 'a' is not in allowed",
):
t = Table.read(txt, format="ascii.ecsv")
assert isinstance(t["a"][1], str)
assert isinstance(t["a"][0], np.str_)
def test_read_bad_datatype_for_object_subtype():
"""Test a malformed ECSV file"""
txt = """\
# %ECSV 1.0
# ---
# datatype:
# - {name: a, datatype: int64, subtype: json}
# schema: astropy-2.0
a
fail
[3,4]"""
match = "column 'a' failed to convert: datatype of column 'a' must be \"string\""
with pytest.raises(ValueError, match=match):
Table.read(txt, format="ascii.ecsv")
def test_full_repr_roundtrip():
"""Test round-trip of float values to full precision even with format
specified"""
t = Table()
t["a"] = np.array([np.pi, 1 / 7], dtype=np.float64)
t["a"].info.format = ".2f"
out = StringIO()
t.write(out, format="ascii.ecsv")
t2 = Table.read(out.getvalue(), format="ascii.ecsv")
assert np.all(t["a"] == t2["a"])
assert t2["a"].info.format == ".2f"
#############################################################################
# Define a number of specialized columns for testing and the expected values
# of `datatype` for each column.
#############################################################################
# First here is some helper code used to make the expected outputs code.
def _get_ecsv_header_dict(text):
lines = [line.strip() for line in text.splitlines()]
lines = [line[2:] for line in lines if line.startswith("#")]
lines = lines[2:] # Get rid of the header
out = yaml.safe_load("\n".join(lines))
return out
def _make_expected_values(cols):
from pprint import pformat
for name, col in cols.items():
t = Table()
t[name] = col
out = StringIO()
t.write(out, format="ascii.ecsv")
hdr = _get_ecsv_header_dict(out.getvalue())
fmt_hdr = pformat(hdr["datatype"])
print(f"exps[{name!r}] =", fmt_hdr[:1])
print(fmt_hdr[1:])
print()
# Expected values of `datatype` for each column
exps = {}
cols = {}
# Run of the mill scalar for completeness
cols["scalar"] = np.array([1, 2], dtype=np.int16)
exps["scalar"] = [{"datatype": "int16", "name": "scalar"}]
# Array of lists that works as a 2-d variable array. This is just treated
# as an object.
cols["2-d variable array lists"] = c = np.empty(shape=(2,), dtype=object)
c[0] = [[1, 2], ["a", 4]]
c[1] = [[1, 2, 3], [4, 5.25, 6]]
exps["2-d variable array lists"] = [
{"datatype": "string", "name": "2-d variable array lists", "subtype": "json"}
]
# Array of numpy arrays that is a 2-d variable array
cols["2-d variable array numpy"] = c = np.empty(shape=(2,), dtype=object)
c[0] = np.array([[1, 2], [3, 4]], dtype=np.float32)
c[1] = np.array([[1, 2, 3], [4, 5.5, 6]], dtype=np.float32)
exps["2-d variable array numpy"] = [
{
"datatype": "string",
"name": "2-d variable array numpy",
"subtype": "float32[2,null]",
}
]
cols["1-d variable array lists"] = np.array([[1, 2], [3, 4, 5]], dtype=object)
exps["1-d variable array lists"] = [
{"datatype": "string", "name": "1-d variable array lists", "subtype": "json"}
]
# Variable-length array
cols["1-d variable array numpy"] = np.array(
[np.array([1, 2], dtype=np.uint8), np.array([3, 4, 5], dtype=np.uint8)],
dtype=object,
)
exps["1-d variable array numpy"] = [
{"datatype": "string", "name": "1-d variable array numpy", "subtype": "uint8[null]"}
]
cols["1-d variable array numpy str"] = np.array(
[np.array(["a", "b"]), np.array(["c", "d", "e"])], dtype=object
)
exps["1-d variable array numpy str"] = [
{
"datatype": "string",
"name": "1-d variable array numpy str",
"subtype": "string[null]",
}
]
cols["1-d variable array numpy bool"] = np.array(
[np.array([True, False]), np.array([True, False, True])], dtype=object
)
exps["1-d variable array numpy bool"] = [
{
"datatype": "string",
"name": "1-d variable array numpy bool",
"subtype": "bool[null]",
}
]
cols["1-d regular array"] = np.array([[1, 2], [3, 4]], dtype=np.int8)
exps["1-d regular array"] = [
{"datatype": "string", "name": "1-d regular array", "subtype": "int8[2]"}
]
cols["2-d regular array"] = np.arange(8, dtype=np.float16).reshape(2, 2, 2)
exps["2-d regular array"] = [
{"datatype": "string", "name": "2-d regular array", "subtype": "float16[2,2]"}
]
cols["scalar object"] = np.array([{"a": 1}, {"b": 2}], dtype=object)
exps["scalar object"] = [
{"datatype": "string", "name": "scalar object", "subtype": "json"}
]
cols["1-d object"] = np.array(
[[{"a": 1}, {"b": 2}], [{"a": 1}, {"b": 2}]], dtype=object
)
exps["1-d object"] = [
{"datatype": "string", "name": "1-d object", "subtype": "json[2]"}
]
@pytest.mark.parametrize("name,col,exp", list(zip(cols, cols.values(), exps.values())))
def test_specialized_columns(name, col, exp):
"""Test variable length lists, multidim columns, object columns."""
t = Table()
t[name] = col
out = StringIO()
t.write(out, format="ascii.ecsv")
hdr = _get_ecsv_header_dict(out.getvalue())
assert hdr["datatype"] == exp
t2 = Table.read(out.getvalue(), format="ascii.ecsv")
assert t2.colnames == t.colnames
for colname in t2.colnames:
assert t2[colname].dtype == t[colname].dtype
for val1, val2 in zip(t2[colname], t[colname]):
if isinstance(val1, np.ndarray):
assert val1.dtype == val2.dtype
assert np.all(val1 == val2)
def test_full_subtypes():
"""Read ECSV file created by M. Taylor that includes scalar, fixed array,
variable array for all datatypes. This file has missing values for all
columns as both per-value null and blank entries for the entire column
value.
Note: original file was modified to include blank values in f_float and
f_double columns.
"""
t = Table.read(os.path.join(TEST_DIR, "data", "subtypes.ecsv"))
colnames = (
"i_index,"
"s_byte,s_short,s_int,s_long,s_float,s_double,s_string,s_boolean,"
"f_byte,f_short,f_int,f_long,f_float,f_double,f_string,f_boolean,"
"v_byte,v_short,v_int,v_long,v_float,v_double,v_string,v_boolean,"
"m_int,m_double"
).split(",")
assert t.colnames == colnames
type_map = {
"byte": "int8",
"short": "int16",
"int": "int32",
"long": "int64",
"float": "float32",
"double": "float64",
"string": "str",
"boolean": "bool",
}
for col in t.itercols():
info = col.info
if info.name == "i_index":
continue
assert isinstance(col, MaskedColumn)
type_name = info.name[2:] # short, int, etc
subtype = info.name[:1]
if subtype == "s": # Scalar
assert col.shape == (16,)
if subtype == "f": # Fixed array
assert col.shape == (16, 3)
if subtype == "v": # Variable array
assert col.shape == (16,)
assert info.dtype.name == "object"
for val in col:
assert isinstance(val, np.ndarray)
assert val.dtype.name.startswith(type_map[type_name])
assert len(val) in [0, 1, 2, 3]
else:
assert info.dtype.name.startswith(type_map[type_name])
def test_masked_empty_subtypes():
"""Test blank field in subtypes. Similar to previous test but with explicit
checks of values"""
txt = """
# %ECSV 1.0
# ---
# datatype:
# - {name: o, datatype: string, subtype: json}
# - {name: f, datatype: string, subtype: 'int64[2]'}
# - {name: v, datatype: string, subtype: 'int64[null]'}
# schema: astropy-2.0
o f v
null [0,1] [1]
"" "" ""
[1,2] [2,3] [2,3]
"""
t = Table.read(txt, format="ascii.ecsv")
assert np.all(t["o"] == np.array([None, -1, [1, 2]], dtype=object))
assert np.all(t["o"].mask == [False, True, False])
exp = np.ma.array([[0, 1], [-1, -1], [2, 3]], mask=[[0, 0], [1, 1], [0, 0]])
assert np.all(t["f"] == exp)
assert np.all(t["f"].mask == exp.mask)
assert np.all(t["v"][0] == [1])
assert np.all(t["v"][2] == [2, 3])
assert np.all(t["v"].mask == [False, True, False])
def test_masked_vals_in_array_subtypes():
"""Test null values in fixed and variable array subtypes."""
t = Table()
t["f"] = np.ma.array([[1, 2], [3, 4]], mask=[[0, 1], [1, 0]], dtype=np.int64)
t["v"] = np.empty(2, dtype=object)
t["v"][0] = np.ma.array([1, 2], mask=[0, 1], dtype=np.int64)
t["v"][1] = np.ma.array([3, 4, 5], mask=[1, 0, 0], dtype=np.int64)
out = StringIO()
t.write(out, format="ascii.ecsv")
txt = """
# %ECSV 1.0
# ---
# datatype:
# - {name: f, datatype: string, subtype: 'int64[2]'}
# - {name: v, datatype: string, subtype: 'int64[null]'}
# schema: astropy-2.0
f v
[1,null] [1,null]
[null,4] [null,4,5]
"""
hdr = _get_ecsv_header_dict(out.getvalue())
hdr_exp = _get_ecsv_header_dict(txt)
assert hdr == hdr_exp
t2 = Table.read(out.getvalue(), format="ascii.ecsv")
assert t2.colnames == t.colnames
for name in t2.colnames:
assert t2[name].dtype == t[name].dtype
assert type(t2[name]) is type(t[name])
for val1, val2 in zip(t2[name], t[name]):
if isinstance(val1, np.ndarray):
assert val1.dtype == val2.dtype
if isinstance(val1, np.ma.MaskedArray):
assert np.all(val1.mask == val2.mask)
assert np.all(val1 == val2)
def test_guess_ecsv_with_one_column():
"""Except for ECSV, guessing always requires at least 2 columns"""
txt = """
# %ECSV 1.0
# ---
# datatype:
# - {name: col, datatype: string, description: hello}
# schema: astropy-2.0
col
1
2
"""
t = ascii.read(txt)
assert t["col"].dtype.kind == "U" # would be int with basic format
assert t["col"].description == "hello"
@pytest.mark.parametrize("masked", [MaskedColumn, Masked, np.ma.MaskedArray])
def test_write_structured_masked_column(masked):
a = np.array([(1, 2), (3, 4)], dtype="i,i")
mc = masked(a, mask=[(True, False), (False, False)])
t = Table([mc], names=["mc"])
out = StringIO()
t.write(out, format="ascii.ecsv")
t2 = Table.read(out.getvalue(), format="ascii.ecsv")
assert type(t2["mc"]) is type(t["mc"])
assert (t2["mc"] == mc).all()
assert (t2["mc"].mask == mc.mask).all()
def test_write_masked_time_ymdhms_mixin():
# Regression test for gh-16370
# Make a masked time,
t = Time({"year": 2000, "month": 1, "day": [1, 2]})
t[0] = np.ma.masked
# Create a table and write to a file
qt = QTable([t], names=["t"])
out = StringIO()
qt.write(out, format="ascii.ecsv")
# Read back and compare.
qt2 = QTable.read(out.getvalue(), format="ascii.ecsv")
# Note that value under time does not roundtrip
assert (qt2["t"] == t).all()
assert (qt2["t"].mask == t.mask).all()