Source code for orangebox.reader

# Orangebox - Cleanflight/Betaflight blackbox data parser.
# Copyright (C) 2019  Károly Kiripolszky
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.

import logging
from typing import BinaryIO, Dict, Iterator, List, Optional

from .decoders import decoder_map
from .predictors import predictor_map
from .tools import _trycast
from .types import FieldDef, FrameType, Headers

MAX_FRAME_SIZE = 256

_log = logging.getLogger(__name__)


[docs] class Reader: """Implements a file-like object for reading a flight log and store the raw data in a structured way. Does not do any real parsing, the iterator just yields bytes. .. todo:: Detecting and informing the user about possible file corruption (missing headers, etc.) """ def __init__(self, path: str, log_index: Optional[int] = None): """ :param path: Path to a log file :param log_index: Session index within log file. If set to `None` (the default) there will be no session selected and headers and frame data won't be read until the first call to `.set_log_index()`. """ self._headers = {} # type: Headers self._field_defs = {} # type: Dict[FrameType, List[FieldDef]] self._log_index = 0 self._header_size = 0 self._path = path _log.info("Processing: " + path) self._frame_data_ptr = 0 self._log_pointers = [] # type: List[int] self._frame_data = b'' self._frame_data_len = 0 with open(path, "rb") as f: if not f.seekable(): msg = "Input file must be seekable" _log.critical(msg) raise IOError(msg) self._find_pointers(f) if log_index is not None: self.set_log_index(log_index)
[docs] def set_log_index(self, index: int): """Set the current log index and read its corresponding frame data as raw bytes, plus parse the raw headers of the selected log. :param index: The selected log index :raise RuntimeError: If ``index`` is smaller than 1 or greater than `.log_count` """ if index == self._log_index: return if index < 1 or self.log_count < index: raise RuntimeError("Invalid log_index: {:d} (1 <= x < {:d})".format(index, self.log_count)) start = self._log_pointers[index - 1] with open(self._path, "rb") as f: f.seek(start) self._update_headers(f) f.seek(start + self._header_size) size = self._log_pointers[index] - start - self._header_size if index < self.log_count else None self._frame_data = f.read(size) if size is not None else f.read() self._log_index = index self._frame_data_ptr = 0 self._frame_data_len = len(self._frame_data) self._build_field_defs() _log.info("Log #{:d} out of {:d} (start: 0x{:X}, size: {:d})" .format(self._log_index, self.log_count, start, self._frame_data_len))
def _update_headers(self, f: BinaryIO): start = f.tell() while True: line = f.readline() if not line: # nothing left to read break has_next = self._parse_header_line(line) if not has_next: f.seek(-len(line), 1) _log.debug( "End of headers at {0:d} (0x{0:X}) (headers: {1:d})".format(f.tell(), len(self._headers.keys()))) break self._header_size = f.tell() - start def _parse_header_line(self, data: bytes) -> bool: """Parse a header line and return its resulting character length. Return None if the line cannot be parsed. """ if data[0] != 72: # 72 == ord('H') # not a header line return False line = data.decode().replace("H ", "", 1) name, value = line.split(':', 1) self._headers[name.strip()] = [_trycast(s.strip()) for s in value.split(',')] if ',' in value \ else _trycast(value.strip()) return True def _find_pointers(self, f: BinaryIO): start = f.tell() first_line = f.readline() f.seek(start) content = f.read() new_index = content.find(first_line) step = len(first_line) while -1 < new_index: self._log_pointers.append(new_index) new_index = content.find(first_line, new_index + step + 1) def _build_field_defs(self): """Use the read headers to populate the `field_defs` property. """ headers = self._headers field_defs = self._field_defs predictors = predictor_map decoders = decoder_map for frame_type in FrameType: # field header format: 'Field <FrameType> <Property>' for header_key, header_value in headers.items(): if "Field " + frame_type.value not in header_key: # skip headers unrelated to defining fields continue if frame_type not in field_defs: field_defs[frame_type] = [FieldDef(frame_type) for _ in range(len(header_value))] prop = header_key.split(" ", 2)[-1] for i, framedef_value in enumerate(header_value): fdef_name = field_defs[frame_type][i].name if fdef_name == "GPS_coord[1]" and framedef_value == 7: framedef_value = 256 # catch latitude field_defs[frame_type][i].__dict__[prop] = framedef_value if prop == "predictor": if framedef_value not in predictors: raise RuntimeError("No predictor found for {:d}".format(framedef_value)) else: field_defs[frame_type][i].predictorfun = predictors[framedef_value] elif prop == "encoding": if framedef_value not in decoders: raise RuntimeError("No decoder found for {:d}".format(framedef_value)) else: decoder = decoders[framedef_value] if decoder.__name__.endswith("_versioned"): # short circuit calls to versioned decoders # noinspection PyArgumentList decoder = decoder(headers.get("Data version")) field_defs[frame_type][i].decoderfun = decoder if FrameType.INTER not in field_defs: # partial or missing header information return # copy field names from INTRA to INTER defs for i, fdef in enumerate(field_defs[FrameType.INTER]): fdef.name = field_defs[FrameType.INTRA][i].name @property def log_index(self) -> int: """Return the currently set log index. May return 0 if `.set_log_index()` haven't been called yet. :type: int """ return self._log_index @property def log_count(self) -> int: """The number of logs in the current file. :type: int """ return len(self._log_pointers) @property def log_pointers(self) -> List[int]: """List of byte pointers to the start of each log file, including headers. :type: List[int] """ return list(self._log_pointers) @property def headers(self) -> Headers: """Dict of parsed headers. :type: dict """ return dict(self._headers) @property def field_defs(self) -> Dict[FrameType, List[FieldDef]]: """Dict of built field definitions. :type: dict """ return dict(self._field_defs)
[docs] def value(self) -> int: """Get current byte value. """ return self._frame_data[self._frame_data_ptr]
[docs] def has_subsequent(self, data: bytes) -> bool: """Return `True` if upcoming bytes equal ``data``. """ return self._frame_data[self._frame_data_ptr:self._frame_data_ptr + len(data)] == data
[docs] def tell(self) -> int: """IO protocol """ return self._frame_data_ptr
[docs] def seek(self, n: int): """IO protocol """ self._frame_data_ptr = n
def __iter__(self) -> Iterator[Optional[int]]: return self def __next__(self) -> Optional[int]: if self._frame_data_len == self._frame_data_ptr: return None byte = self._frame_data[self._frame_data_ptr] self._frame_data_ptr += 1 return byte def __len__(self) -> int: return self._frame_data_len