import logging
import sys
from copy import deepcopy
from typing import BinaryIO, Iterable, Optional, Union
import numpy as np
from ._pointwriter import IPointWriter
from .compression import LazBackend
from .errors import LaspyException
from .header import LasHeader
from .point import dims
from .point.record import PackedPointRecord, ScaleAwarePointRecord
from .vlrs.vlrlist import VLRList
logger = logging.getLogger(__name__)
[docs]
class LasWriter:
"""
Allows to write a complete LAS/LAZ file to the destination.
"""
[docs]
def __init__(
self,
dest: BinaryIO,
header: LasHeader,
do_compress: Optional[bool] = None,
laz_backend: Optional[Union[LazBackend, Iterable[LazBackend]]] = None,
closefd: bool = True,
encoding_errors: str = "strict",
) -> None:
"""
Parameters
----------
dest: file_object
file object where the LAS/LAZ will be written
header: LasHeader
The header of the file to be written
do_compress: bool, optional
Whether the file data should be written as LAS (uncompressed)
or LAZ (compressed).
If None, the file won't be compressed, unless a laz_backend is provided
laz_backend: LazBackend or list of LazBackend, optional
The LazBackend to use (or if it is a sequence the LazBackend to try)
for the compression
closefd: bool, default True
should the `dest` be closed when the writer is closed
encoding_errors: str, default 'strict'
How encoding errors should be treated.
Possible values and their explanation can be seen here:
https://docs.python.org/3/library/codecs.html#error-handlers.
"""
self.closefd = closefd
self.encoding_errors = encoding_errors
self.header = deepcopy(header)
# The point writer will take care of creating and writing
# the correct laszip vlr, however we have to make sure
# no prior laszip vlr exists
try:
self.header.vlrs.pop(header.vlrs.index("LasZipVlr"))
except ValueError:
pass
self.header.partial_reset()
self.dest = dest
self.done = False
dims.raise_if_version_not_compatible_with_fmt(
header.point_format.id, str(self.header.version)
)
if laz_backend is not None:
if do_compress is None:
do_compress = True
self.laz_backend = laz_backend
else:
if do_compress is None:
do_compress = False
self.laz_backend = LazBackend.detect_available()
self.header.are_points_compressed = do_compress
if do_compress:
self.point_writer: IPointWriter = self._create_laz_backend(self.laz_backend)
else:
self.point_writer: IPointWriter = UncompressedPointWriter(self.dest)
self.point_writer.write_initial_header_and_vlrs(
self.header, self.encoding_errors
)
[docs]
def write_points(self, points: PackedPointRecord) -> None:
"""
.. note ::
If the points type is PackedPointRecord, then you need to make sure
the points are expressed in the same scales and offsets 'system' than
this writer.
You can use :meth:`.LasData.change_scaling` before getting the points
to ensure that.
If the points type is ScaleAwarePointsRecord, then points will automatically
re-scaled.
Parameters
----------
points: PackedPointRecord or ScaleAwarePointRecord
The points to be written
Raises
------
LaspyException
If the point format of the points does not match
the point format of the writer.
"""
if not points:
return
if self.done:
raise LaspyException("Cannot write points anymore")
if points.point_format != self.header.point_format:
raise LaspyException("Incompatible point formats")
if self.header.max_point_count() - self.header.point_count < len(points):
raise LaspyException(
"Cannot write {} points as it would exceed the maximum number of points the file"
"can store. Current point count: {}, max point count: {}".format(
len(points), self.header.point_count, self.header.max_point_count()
)
)
restore_needed = False
if isinstance(points, ScaleAwarePointRecord) and (
np.any(points.scales != self.header.scales)
or np.any(points.offsets != self.header.offsets)
):
saved_offsets, saved_scales = points.offsets, points.scales
saved_X, saved_Y, saved_Z = (
points.X.copy(),
points.Y.copy(),
points.Z.copy(),
)
points.change_scaling(
scales=self.header.scales, offsets=self.header.offsets
)
restore_needed = True
self.header.grow(points)
self.point_writer.write_points(points)
if restore_needed:
points.offsets, points.scales = saved_offsets, saved_scales
points.X, points.Y, points.Z = saved_X, saved_Y, saved_Z
[docs]
def write_evlrs(self, evlrs: VLRList) -> None:
"""Writes the EVLRs to the file
Parameters
----------
evlrs: VLRList
The EVLRs to be written
Raises
------
LaspyException
If the file's version is not >= 1.4
"""
if self.header.version.minor < 4:
raise LaspyException(
"EVLRs are not supported on files with version less than 1.4"
)
if len(evlrs) > 0:
self.point_writer.done()
self.done = True
self.header.number_of_evlrs = len(evlrs)
self.header.start_of_first_evlr = self.dest.tell()
evlrs.write_to(self.dest, as_extended=True)
[docs]
def close(self) -> None:
"""Closes the writer.
flushes the points, updates the header, making it impossible
to write points afterwards.
"""
if self.point_writer is not None:
if not self.done:
self.point_writer.done()
if self.header.point_count == 0:
self.header.maxs = [0.0, 0.0, 0.0]
self.header.mins = [0.0, 0.0, 0.0]
self.point_writer.write_updated_header(self.header, self.encoding_errors)
if self.closefd:
self.dest.close()
self.done = True
def _create_laz_backend(
self, laz_backends: Union[LazBackend, Iterable[LazBackend]]
) -> "IPointWriter":
try:
laz_backends = iter(laz_backends)
except TypeError:
laz_backends = (laz_backends,)
last_error: Optional[Exception] = None
for backend in laz_backends:
try:
if not backend.is_available():
raise LaspyException(f"The '{backend}' is not available")
return backend.create_writer(self.dest, self.header)
except Exception as e:
logger.error(e)
last_error = e
if last_error is not None:
raise LaspyException(f"No LazBackend could be initialized: {last_error}")
else:
raise LaspyException("No LazBackend selected, cannot compress")
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
class UncompressedPointWriter(IPointWriter):
"""
Writing points in the simple uncompressed case.
"""
def __init__(self, dest: BinaryIO) -> None:
self.dest = dest
@property
def destination(self) -> BinaryIO:
return self.dest
def write_points(self, points: PackedPointRecord) -> None:
if sys.byteorder == "little":
self.dest.write(points.memoryview())
else:
self.dest.write(memoryview(points.array.byteswap()))
def done(self) -> None:
pass