Skip to content

Commit

Permalink
Add QNX filesystem support
Browse files Browse the repository at this point in the history
  • Loading branch information
Schamper committed Oct 30, 2024
1 parent c97cf63 commit 9fec57e
Show file tree
Hide file tree
Showing 7 changed files with 214 additions and 1 deletion.
3 changes: 2 additions & 1 deletion dissect/target/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -1753,9 +1753,10 @@ def open_multi_volume(fhs: list[BinaryIO], *args, **kwargs) -> Filesystem:
register("btrfs", "BtrfsFilesystem")
register("exfat", "ExfatFilesystem")
register("squashfs", "SquashFSFilesystem")
register("jffs", "JFFSFilesystem")
register("qnxfs", "QnxFilesystem")
register("zip", "ZipFilesystem")
register("tar", "TarFilesystem")
register("vmtar", "VmtarFilesystem")
register("cpio", "CpioFilesystem")
register("ad1", "AD1Filesystem")
register("jffs", "JFFSFilesystem")
127 changes: 127 additions & 0 deletions dissect/target/filesystems/qnxfs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
from typing import BinaryIO, Iterator

import dissect.qnxfs as qnxfs
from dissect.qnxfs.qnx4 import INode as INode4
from dissect.qnxfs.qnx6 import INode as INode6

from dissect.target.exceptions import (
FileNotFoundError,
FilesystemError,
IsADirectoryError,
NotADirectoryError,
NotASymlinkError,
)
from dissect.target.filesystem import Filesystem, FilesystemEntry
from dissect.target.helpers import fsutil

INode = INode4 | INode6


class QnxFilesystem(Filesystem):
__type__ = "qnxfs"

def __init__(self, fh: BinaryIO, *args, **kwargs):
super().__init__(fh, *args, **kwargs)
self.qnxfs = qnxfs.QNXFS(fh)

@staticmethod
def _detect(fh: BinaryIO) -> bool:
return qnxfs.is_qnxfs(fh)

def get(self, path: str) -> FilesystemEntry:
return QnxFilesystemEntry(self, path, self._get_node(path))

def _get_node(self, path: str, node: INode | None = None) -> INode:
try:
return self.qnxfs.get(path, node)
except qnxfs.FileNotFoundError as e:
raise FileNotFoundError(path, cause=e)
except qnxfs.NotADirectoryError as e:
raise NotADirectoryError(path, cause=e)
except qnxfs.NotASymlinkError as e:
raise NotASymlinkError(path, cause=e)
except qnxfs.Error as e:
raise FileNotFoundError(path, cause=e)


class QnxFilesystemEntry(FilesystemEntry):
fs: QnxFilesystem
entry: INode

def get(self, path: str) -> FilesystemEntry:
entry_path = fsutil.join(self.path, path, alt_separator=self.fs.alt_separator)
entry = self.fs._get_node(path, self.entry)
return QnxFilesystemEntry(self.fs, entry_path, entry)

def open(self) -> BinaryIO:
if self.is_dir():
raise IsADirectoryError(self.path)
return self._resolve().entry.open()

def _iterdir(self) -> Iterator[tuple[str, INode]]:
if not self.is_dir():
raise NotADirectoryError(self.path)

if self.is_symlink():
yield from self.readlink_ext().iterdir()
else:
yield from self.entry.iterdir()

def iterdir(self) -> Iterator[str]:
for name, _ in self._iterdir():
yield name

def scandir(self) -> Iterator[FilesystemEntry]:
for name, entry in self._iterdir():
entry_path = fsutil.join(self.path, name, alt_separator=self.fs.alt_separator)
yield QnxFilesystemEntry(self.fs, entry_path, entry)

def is_dir(self, follow_symlinks: bool = False) -> bool:
try:
return self._resolve(follow_symlinks).entry.is_dir()
except FilesystemError:
return False

def is_file(self, follow_symlinks: bool = False) -> bool:
try:
return self._resolve(follow_symlinks).entry.is_file()
except FilesystemError:
return False

def is_symlink(self) -> bool:
return self.entry.is_symlink()

def readlink(self) -> str:
if not self.is_symlink():
raise NotASymlinkError()

return self.entry.link

def stat(self, follow_symlinks: bool = False) -> fsutil.stat_result:
return self._resolve(follow_symlinks).lstat()

def lstat(self) -> fsutil.stat_result:
# mode, ino, dev, nlink, uid, gid, size, atime, mtime, ctime
st_info = fsutil.stat_result(
[
self.entry.mode,
self.entry.inum,
id(self.fs),
getattr(self.entry, "nlink", 1), # Only QNX4 has nlink
self.entry.uid,
self.entry.gid,
self.entry.size,
self.entry.atime.timestamp(),
self.entry.mtime.timestamp(),
self.entry.ctime.timestamp(),
]
)

st_info.st_birthtime = self.entry.ftime.timestamp()

st_info.st_blksize = self.fs.qnxfs.block_size
# Blocks are always calculated based on 512 byte blocks
# https://github.com/torvalds/linux/blob/c1e939a21eb111a6d6067b38e8e04b8809b64c4e/fs/qnx6/inode.c#L560-L561
st_info.st_blocks = (self.entry.size + 511) >> 9

return st_info
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ full = [
"dissect.ffs>=3,<4",
"dissect.jffs>=1,<2",
"dissect.ole>=3,<4",
"dissect.qnxfs>=1,<2",
"dissect.shellitem>=3,<4",
"dissect.squashfs>=1,<2",
"dissect.sql>=3,<4",
Expand Down Expand Up @@ -93,6 +94,7 @@ dev = [
"dissect.hypervisor[dev]>=3.0.dev,<4.0.dev",
"dissect.jffs[dev]>=1.0.dev,<2.0.dev",
"dissect.ntfs[dev]>=3.4.dev,<4.0.dev",
"dissect.qnxfs[dev]>=1.0.dev,<2.0.dev",
"dissect.regf[dev]>=3.3.dev,<4.0.dev",
"dissect.shellitem[dev]>=3.0.dev,<4.0.dev",
"dissect.sql[dev]>=3.0.dev,<4.0.dev",
Expand Down
3 changes: 3 additions & 0 deletions tests/_data/filesystems/qnxfs/qnx4.bin.gz
Git LFS file not shown
3 changes: 3 additions & 0 deletions tests/_data/filesystems/qnxfs/qnx6-be.bin.gz
Git LFS file not shown
3 changes: 3 additions & 0 deletions tests/_data/filesystems/qnxfs/qnx6-le.bin.gz
Git LFS file not shown
74 changes: 74 additions & 0 deletions tests/filesystems/test_qnxfs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import gzip
from datetime import datetime
from typing import Iterator
from unittest.mock import Mock, patch

import pytest

from dissect.target.filesystems.qnxfs import QnxFilesystem, QnxFilesystemEntry
from tests._utils import absolute_path


@pytest.mark.parametrize(
"filename",
[
"_data/filesystems/qnxfs/qnx4.bin.gz",
"_data/filesystems/qnxfs/qnx6-be.bin.gz",
"_data/filesystems/qnxfs/qnx6-le.bin.gz",
],
)
def test_qnxfs_detect(filename: str) -> None:
"""test that QNX filesystems are correctly detected"""
with gzip.open(absolute_path(filename), "rb") as fh:
assert QnxFilesystem.detect(fh)
assert QnxFilesystem(fh).qnxfs


@pytest.fixture
def qnx_fs() -> Iterator[QnxFilesystem]:
with patch("dissect.qnxfs.QNXFS", return_value=Mock(block_size=4096)):
qnx_fs = QnxFilesystem(Mock())
yield qnx_fs


@pytest.fixture
def qnxfs_fs_file_entry(qnx_fs: QnxFilesystem) -> Iterator[QnxFilesystemEntry]:
inode = Mock(
mode=0o100664,
inum=4,
size=165002,
uid=1000,
gid=999,
atime=datetime(2024, 10, 1, 12, 0, 0),
mtime=datetime(2024, 10, 2, 12, 0, 0),
ctime=datetime(2024, 10, 3, 12, 0, 0),
ftime=datetime(2024, 10, 4, 12, 0, 0),
nlink=2,
is_file=lambda: True,
is_dir=lambda: False,
is_symlink=lambda: False,
)

entry = QnxFilesystemEntry(qnx_fs, "/some_file", inode)
yield entry


@pytest.mark.parametrize("entry_fixture, expected_blocks", [("qnxfs_fs_file_entry", 323)])
def test_qnxfs_stat(entry_fixture: str, expected_blocks: int, request: pytest.FixtureRequest) -> None:
"""test consistency in stat() results"""
qnxfs_entry: QnxFilesystemEntry = request.getfixturevalue(entry_fixture)
stat = qnxfs_entry.stat()

entry = qnxfs_entry.entry
assert stat.st_mode == entry.mode
assert stat.st_ino == entry.inum
assert stat.st_dev == id(qnxfs_entry.fs)
assert stat.st_nlink == entry.nlink
assert stat.st_uid == entry.uid
assert stat.st_gid == entry.gid
assert stat.st_size == entry.size
assert stat.st_atime == entry.atime.timestamp()
assert stat.st_mtime == entry.mtime.timestamp()
assert stat.st_ctime == entry.ctime.timestamp()
assert stat.st_blksize == qnxfs_entry.fs.qnxfs.block_size
assert stat.st_blocks == expected_blocks

0 comments on commit 9fec57e

Please sign in to comment.