diff --git a/dissect/target/filesystem.py b/dissect/target/filesystem.py index e22c099ec..b202d9eca 100644 --- a/dissect/target/filesystem.py +++ b/dissect/target/filesystem.py @@ -1753,9 +1753,10 @@ def open_multi_volume(fhs: list[BinaryIO], *args, **kwargs) -> Filesystem: register("btrfs", "BtrfsFilesystem") register("exfat", "ExfatFilesystem") register("squashfs", "SquashFSFilesystem") +register("jffs", "JFFSFilesystem") +register("qnxfs", "QnxFilesystem") register("zip", "ZipFilesystem") register("tar", "TarFilesystem") register("vmtar", "VmtarFilesystem") register("cpio", "CpioFilesystem") register("ad1", "AD1Filesystem") -register("jffs", "JFFSFilesystem") diff --git a/dissect/target/filesystems/qnxfs.py b/dissect/target/filesystems/qnxfs.py new file mode 100644 index 000000000..f5d4a1d0a --- /dev/null +++ b/dissect/target/filesystems/qnxfs.py @@ -0,0 +1,129 @@ +from __future__ import annotations + +from typing import BinaryIO, Iterator + +import dissect.qnxfs as qnxfs +from dissect.qnxfs.qnx4 import INode as INode4 +from dissect.qnxfs.qnx6 import INode as INode6 + +from dissect.target.exceptions import ( + FileNotFoundError, + FilesystemError, + IsADirectoryError, + NotADirectoryError, + NotASymlinkError, +) +from dissect.target.filesystem import Filesystem, FilesystemEntry +from dissect.target.helpers import fsutil + +INode = INode4 | INode6 + + +class QnxFilesystem(Filesystem): + __type__ = "qnxfs" + + def __init__(self, fh: BinaryIO, *args, **kwargs): + super().__init__(fh, *args, **kwargs) + self.qnxfs = qnxfs.QNXFS(fh) + + @staticmethod + def _detect(fh: BinaryIO) -> bool: + return qnxfs.is_qnxfs(fh) + + def get(self, path: str) -> FilesystemEntry: + return QnxFilesystemEntry(self, path, self._get_node(path)) + + def _get_node(self, path: str, node: INode | None = None) -> INode: + try: + return self.qnxfs.get(path, node) + except qnxfs.FileNotFoundError as e: + raise FileNotFoundError(path, cause=e) + except qnxfs.NotADirectoryError as e: + raise NotADirectoryError(path, cause=e) + except qnxfs.NotASymlinkError as e: + raise NotASymlinkError(path, cause=e) + except qnxfs.Error as e: + raise FileNotFoundError(path, cause=e) + + +class QnxFilesystemEntry(FilesystemEntry): + fs: QnxFilesystem + entry: INode + + def get(self, path: str) -> FilesystemEntry: + entry_path = fsutil.join(self.path, path, alt_separator=self.fs.alt_separator) + entry = self.fs._get_node(path, self.entry) + return QnxFilesystemEntry(self.fs, entry_path, entry) + + def open(self) -> BinaryIO: + if self.is_dir(): + raise IsADirectoryError(self.path) + return self._resolve().entry.open() + + def _iterdir(self) -> Iterator[tuple[str, INode]]: + if not self.is_dir(): + raise NotADirectoryError(self.path) + + if self.is_symlink(): + yield from self.readlink_ext().iterdir() + else: + yield from self.entry.iterdir() + + def iterdir(self) -> Iterator[str]: + for name, _ in self._iterdir(): + yield name + + def scandir(self) -> Iterator[FilesystemEntry]: + for name, entry in self._iterdir(): + entry_path = fsutil.join(self.path, name, alt_separator=self.fs.alt_separator) + yield QnxFilesystemEntry(self.fs, entry_path, entry) + + def is_dir(self, follow_symlinks: bool = True) -> bool: + try: + return self._resolve(follow_symlinks).entry.is_dir() + except FilesystemError: + return False + + def is_file(self, follow_symlinks: bool = True) -> bool: + try: + return self._resolve(follow_symlinks).entry.is_file() + except FilesystemError: + return False + + def is_symlink(self) -> bool: + return self.entry.is_symlink() + + def readlink(self) -> str: + if not self.is_symlink(): + raise NotASymlinkError() + + return self.entry.link + + def stat(self, follow_symlinks: bool = False) -> fsutil.stat_result: + return self._resolve(follow_symlinks).lstat() + + def lstat(self) -> fsutil.stat_result: + # mode, ino, dev, nlink, uid, gid, size, atime, mtime, ctime + st_info = fsutil.stat_result( + [ + self.entry.mode, + self.entry.inum, + id(self.fs), + getattr(self.entry, "nlink", 1), # Only QNX4 has nlink + self.entry.uid, + self.entry.gid, + self.entry.size, + self.entry.atime.timestamp(), + self.entry.mtime.timestamp(), + self.entry.ctime.timestamp(), + ] + ) + + st_info.st_birthtime = self.entry.ftime.timestamp() + + st_info.st_blksize = self.fs.qnxfs.block_size + # Blocks are always calculated based on 512 byte blocks + # https://github.com/torvalds/linux/blob/c1e939a21eb111a6d6067b38e8e04b8809b64c4e/fs/qnx6/inode.c#L560-L561 + st_info.st_blocks = (self.entry.size + 511) >> 9 + + return st_info diff --git a/pyproject.toml b/pyproject.toml index 6881e0486..c1c3be914 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -57,6 +57,7 @@ full = [ "dissect.ffs>=3,<4", "dissect.jffs>=1,<2", "dissect.ole>=3,<4", + "dissect.qnxfs>=1,<2", "dissect.shellitem>=3,<4", "dissect.squashfs>=1,<2", "dissect.sql>=3,<4", @@ -93,6 +94,7 @@ dev = [ "dissect.hypervisor[dev]>=3.0.dev,<4.0.dev", "dissect.jffs[dev]>=1.0.dev,<2.0.dev", "dissect.ntfs[dev]>=3.4.dev,<4.0.dev", + "dissect.qnxfs[dev]>=1.0.dev,<2.0.dev", "dissect.regf[dev]>=3.3.dev,<4.0.dev", "dissect.shellitem[dev]>=3.0.dev,<4.0.dev", "dissect.sql[dev]>=3.0.dev,<4.0.dev", diff --git a/tests/_data/filesystems/qnxfs/qnx4.bin.gz b/tests/_data/filesystems/qnxfs/qnx4.bin.gz new file mode 100644 index 000000000..f9ec115a7 --- /dev/null +++ b/tests/_data/filesystems/qnxfs/qnx4.bin.gz @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:eae2cd6f37d105fe4b8e79b181d31236a05e46a777cf07e37165c0eb2d0222fd +size 1713 diff --git a/tests/_data/filesystems/qnxfs/qnx6-be.bin.gz b/tests/_data/filesystems/qnxfs/qnx6-be.bin.gz new file mode 100644 index 000000000..a51fb0de5 --- /dev/null +++ b/tests/_data/filesystems/qnxfs/qnx6-be.bin.gz @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:f3d15f437b4f6cac7f3a55688750823ce0ef44f828301765cce90cdfbd8b3832 +size 3408 diff --git a/tests/_data/filesystems/qnxfs/qnx6-le.bin.gz b/tests/_data/filesystems/qnxfs/qnx6-le.bin.gz new file mode 100644 index 000000000..33b8b66c3 --- /dev/null +++ b/tests/_data/filesystems/qnxfs/qnx6-le.bin.gz @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:4ae2aac5820c8f78c3993442d218b9336a0f0ef21d4ab36999a0a5a343d7ab30 +size 3508 diff --git a/tests/filesystems/test_qnxfs.py b/tests/filesystems/test_qnxfs.py new file mode 100644 index 000000000..5a1c0373a --- /dev/null +++ b/tests/filesystems/test_qnxfs.py @@ -0,0 +1,74 @@ +import gzip +from datetime import datetime +from typing import Iterator +from unittest.mock import Mock, patch + +import pytest + +from dissect.target.filesystems.qnxfs import QnxFilesystem, QnxFilesystemEntry +from tests._utils import absolute_path + + +@pytest.mark.parametrize( + "filename", + [ + "_data/filesystems/qnxfs/qnx4.bin.gz", + "_data/filesystems/qnxfs/qnx6-be.bin.gz", + "_data/filesystems/qnxfs/qnx6-le.bin.gz", + ], +) +def test_qnxfs_detect(filename: str) -> None: + """test that QNX filesystems are correctly detected""" + with gzip.open(absolute_path(filename), "rb") as fh: + assert QnxFilesystem.detect(fh) + assert QnxFilesystem(fh).qnxfs + + +@pytest.fixture +def qnx_fs() -> Iterator[QnxFilesystem]: + with patch("dissect.qnxfs.QNXFS", return_value=Mock(block_size=4096)): + qnx_fs = QnxFilesystem(Mock()) + yield qnx_fs + + +@pytest.fixture +def qnxfs_fs_file_entry(qnx_fs: QnxFilesystem) -> Iterator[QnxFilesystemEntry]: + inode = Mock( + mode=0o100664, + inum=4, + size=165002, + uid=1000, + gid=999, + atime=datetime(2024, 10, 1, 12, 0, 0), + mtime=datetime(2024, 10, 2, 12, 0, 0), + ctime=datetime(2024, 10, 3, 12, 0, 0), + ftime=datetime(2024, 10, 4, 12, 0, 0), + nlink=2, + is_file=lambda: True, + is_dir=lambda: False, + is_symlink=lambda: False, + ) + + entry = QnxFilesystemEntry(qnx_fs, "/some_file", inode) + yield entry + + +@pytest.mark.parametrize("entry_fixture, expected_blocks", [("qnxfs_fs_file_entry", 323)]) +def test_qnxfs_stat(entry_fixture: str, expected_blocks: int, request: pytest.FixtureRequest) -> None: + """test consistency in stat() results""" + qnxfs_entry: QnxFilesystemEntry = request.getfixturevalue(entry_fixture) + stat = qnxfs_entry.stat() + + entry = qnxfs_entry.entry + assert stat.st_mode == entry.mode + assert stat.st_ino == entry.inum + assert stat.st_dev == id(qnxfs_entry.fs) + assert stat.st_nlink == entry.nlink + assert stat.st_uid == entry.uid + assert stat.st_gid == entry.gid + assert stat.st_size == entry.size + assert stat.st_atime == entry.atime.timestamp() + assert stat.st_mtime == entry.mtime.timestamp() + assert stat.st_ctime == entry.ctime.timestamp() + assert stat.st_blksize == qnxfs_entry.fs.qnxfs.block_size + assert stat.st_blocks == expected_blocks