Source code for orangebox.parser

# Orangebox - Cleanflight/Betaflight blackbox data parser.
# Copyright (C) 2019  Károly Kiripolszky
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.

import logging
from typing import Iterator, List, Optional

from .context import Context
from .events import event_map
from .reader import Reader
from .types import Event, EventParser, EventType, FieldDef, Frame, FrameType, Headers

MAX_TIME_JUMP = 10 * 1000000
MAX_ITER_JUMP = 500 * 10

_log = logging.getLogger(__name__)


[docs] class Parser: """Parse and iterate over decoded frames. """ def __init__(self, reader: Reader): """ :param reader: The `.Reader` used to iterate over the relevant bits of bytes :type reader: Reader """ self._reader = reader self._events = [] # type: List[Event] self._headers = {} # type: Headers self._field_names = [] # type: List[str] self._end_of_log = False self._ctx = None # type: Optional[Context] self.set_log_index(reader.log_index)
[docs] def set_log_index(self, index: int): """Select a log by index within the file. Calling this method will set the new index for the underlying `.Reader` object and also update the header information as a side effect. The state of the parser will be reset. The first index is 1. You can get the maximum number of logs from `.Reader.log_count`. See also `.Reader.set_log_index()`. :param index: The selected log index """ self._events = [] self._end_of_log = False reader = self._reader reader.set_log_index(index) self._headers = {k: v for k, v in reader.headers.items() if "Field" not in k} self._ctx = Context(self._headers, reader.field_defs) self._field_names = [] for ftype in [FrameType.INTRA, FrameType.SLOW, FrameType.GPS]: # Note: retaining the order above is important for communality with bb-log-viewer # Note 2: GPS_home is not written out by the blackbox-log-viewer (but added as offset to GPS_coord) # Note 3: GPS mysteriously contains a "time" field. This is correctly skipped by the filter below if ftype in reader.field_defs: self._field_names += filter(lambda x: x is not None and x not in self._field_names, map(lambda x: x.name, reader.field_defs[ftype]))
[docs] @staticmethod def load(path: str, log_index: int = 1) -> "Parser": """Factory method to create a parser for a log file. :param path: Path to blackbox log file :param log_index: Index within log file (defaults to 1) :rtype: Parser """ return Parser(Reader(path, log_index))
[docs] def frames(self) -> Iterator[Frame]: """Return an iterator for the current frames. :rtype: Iterator[Frame] """ field_defs = self._reader.field_defs last_slow = None # type: Optional[Frame] last_gps = None # type: Optional[Frame] ctx = self._ctx # type: Context reader = self._reader last_time = None last_iter = 0 last_frame_pos = 0 last_frame_is_corrupt = False for byte in reader: try: ftype = FrameType(chr(byte)) except ValueError: if not last_frame_is_corrupt: reader.seek(last_frame_pos + 1) ctx.invalid_frame_count += 1 last_frame_is_corrupt = True continue ctx.frame_type = ftype last_frame_is_corrupt = False last_frame_pos = reader.tell() - 1 if ftype == FrameType.EVENT: # parse event frame (event frames do not depend on field defs) if not self._parse_event_frame(reader): ctx.invalid_frame_count += 1 ctx.read_frame_count += 1 if self._end_of_log: _log.info( "Frames: total: {total:d}, parsed: {parsed:d}, skipped: {skipped:d} invalid: {invalid:d} ({invalid_percent:.2f}%)" .format(**ctx.stats)) break continue if ftype not in field_defs: _log.warning("No field def found for frame type {!r}".format(ftype)) ctx.invalid_frame_count += 1 ctx.read_frame_count += 1 continue # decode INTRA, INTER, SLOW, GPS or GPS_HOME frame frame = self._parse_frame(field_defs[ftype], reader) # store these frames to append them to subsequent frames: if ftype == FrameType.SLOW: last_slow = frame ctx.read_frame_count += 1 continue elif ftype == FrameType.GPS: last_gps = frame ctx.read_frame_count += 1 continue elif ftype == FrameType.GPS_HOME: ctx.add_frame(frame) ctx.read_frame_count += 1 continue # validate frame current_time = ctx.get_current_value_by_name(ftype, "time") if last_time is not None and last_time >= current_time and MAX_TIME_JUMP < current_time - last_time: _log.debug("Invalid {:s} Frame #{:d} due to time desync".format(ftype.value, ctx.read_frame_count + 1)) last_time = current_time ctx.read_frame_count += 1 ctx.invalid_frame_count += 1 continue last_time = current_time current_iter = ctx.get_current_value_by_name(ftype, "loopIteration") ctx.last_iter = current_iter if last_iter >= current_iter and MAX_ITER_JUMP < current_iter + last_iter: _log.debug("Skipping {:s} Frame #{:d} due to iter desync".format(ftype.value, ctx.read_frame_count + 1)) last_iter = current_iter ctx.read_frame_count += 1 ctx.invalid_frame_count += 1 continue last_iter = current_iter # add in extra frames (GPS, GPS_HOME and SLOW) extra_data = [] # add slow frames (list of empty strings if not available to ensure # the right amount of ',' are written out at least) if FrameType.SLOW in field_defs: if last_slow: extra_data += last_slow.data else: extra_data += [""] * len(field_defs[FrameType.SLOW]) # add GPS frames the way blackbox-log-viewer seems to do it if FrameType.GPS in field_defs: if last_gps: extra_data += list(last_gps.data[1:]) # skip time else: extra_data += [""] * (len(field_defs[FrameType.GPS]) - 1) frame = Frame(ftype, frame.data + tuple(extra_data)) try: FrameType(chr(reader.value())) except ValueError: _log.debug("Dropping {:s} Frame #{:d} because it's corrupt" .format(ftype.value, ctx.read_frame_count + 1)) ctx.invalid_frame_count += 1 continue ctx.read_frame_count += 1 ctx.add_frame(frame) yield frame
def _parse_frame(self, fdefs: List[FieldDef], reader: Reader) -> Frame: result = () ctx = self._ctx ctx.field_index = 0 field_count = ctx.field_def_counts[ctx.frame_type] while ctx.field_index < field_count: # make current frame available in context ctx.current_frame = result fdef = fdefs[ctx.field_index] # decode current field value rawvalue = fdef.decoderfun(reader, ctx) # apply predictions if isinstance(rawvalue, tuple): value = () for v in rawvalue: fdef = fdefs[ctx.field_index] value += (fdef.predictorfun(v, ctx),) ctx.field_index += 1 result += value else: value = fdef.predictorfun(rawvalue, ctx) ctx.field_index += 1 result += (value,) return Frame(ctx.frame_type, result) def _parse_event_frame(self, reader: Reader) -> bool: byte = next(reader) try: event_type = EventType(byte) except ValueError: _log.warning("Unknown event type: {!r}".format(byte)) return False _log.debug("New event frame #{:d}: {:s}".format(self._ctx.read_frame_count + 1, event_type.name)) parser = event_map[event_type] # type: EventParser event_data = parser(reader) self.events.append(Event(event_type, event_data)) if event_type == EventType.LOG_END: self._end_of_log = True return True @property def headers(self) -> Headers: """Headers key-value map. This will not contain the headers describing the field definitions. To get the raw headers see `.Reader` instead. Key is a string, value can be a string, a number or a list of numbers. :type: dict """ return dict(self._headers) @property def events(self) -> List[Event]: """Log events found during parsing. All the events are available only after parsing has finished. :type: List[Event] """ return list(self._events) @property def field_names(self) -> List[str]: """A list of all field names found in the current header. :type: List[str] """ return list(self._field_names) @property def reader(self) -> Reader: """Return the underlying `.Reader` object. :type: Reader """ return self._reader