Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for dissect.qnxfs #454

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion dissect/target/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -1753,9 +1753,10 @@ def open_multi_volume(fhs: list[BinaryIO], *args, **kwargs) -> Filesystem:
register("btrfs", "BtrfsFilesystem")
register("exfat", "ExfatFilesystem")
register("squashfs", "SquashFSFilesystem")
register("jffs", "JFFSFilesystem")
register("qnxfs", "QnxFilesystem")
register("zip", "ZipFilesystem")
register("tar", "TarFilesystem")
register("vmtar", "VmtarFilesystem")
register("cpio", "CpioFilesystem")
register("ad1", "AD1Filesystem")
register("jffs", "JFFSFilesystem")
129 changes: 129 additions & 0 deletions dissect/target/filesystems/qnxfs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
from __future__ import annotations

from typing import BinaryIO, Iterator

import dissect.qnxfs as qnxfs
from dissect.qnxfs.qnx4 import INode as INode4
from dissect.qnxfs.qnx6 import INode as INode6

from dissect.target.exceptions import (
FileNotFoundError,
FilesystemError,
IsADirectoryError,
NotADirectoryError,
NotASymlinkError,
)
from dissect.target.filesystem import Filesystem, FilesystemEntry
from dissect.target.helpers import fsutil

INode = INode4 | INode6


class QnxFilesystem(Filesystem):
__type__ = "qnxfs"

def __init__(self, fh: BinaryIO, *args, **kwargs):
super().__init__(fh, *args, **kwargs)
self.qnxfs = qnxfs.QNXFS(fh)

@staticmethod
def _detect(fh: BinaryIO) -> bool:
return qnxfs.is_qnxfs(fh)

def get(self, path: str) -> FilesystemEntry:
return QnxFilesystemEntry(self, path, self._get_node(path))

def _get_node(self, path: str, node: INode | None = None) -> INode:
try:
return self.qnxfs.get(path, node)
except qnxfs.FileNotFoundError as e:
raise FileNotFoundError(path, cause=e)
except qnxfs.NotADirectoryError as e:
raise NotADirectoryError(path, cause=e)
except qnxfs.NotASymlinkError as e:
raise NotASymlinkError(path, cause=e)
except qnxfs.Error as e:
raise FileNotFoundError(path, cause=e)


class QnxFilesystemEntry(FilesystemEntry):
fs: QnxFilesystem
entry: INode

def get(self, path: str) -> FilesystemEntry:
entry_path = fsutil.join(self.path, path, alt_separator=self.fs.alt_separator)
entry = self.fs._get_node(path, self.entry)
return QnxFilesystemEntry(self.fs, entry_path, entry)

def open(self) -> BinaryIO:
if self.is_dir():
raise IsADirectoryError(self.path)
return self._resolve().entry.open()

def _iterdir(self) -> Iterator[tuple[str, INode]]:
if not self.is_dir():
raise NotADirectoryError(self.path)

if self.is_symlink():
yield from self.readlink_ext().iterdir()
else:
yield from self.entry.iterdir()

def iterdir(self) -> Iterator[str]:
for name, _ in self._iterdir():
yield name

def scandir(self) -> Iterator[FilesystemEntry]:
for name, entry in self._iterdir():
entry_path = fsutil.join(self.path, name, alt_separator=self.fs.alt_separator)
yield QnxFilesystemEntry(self.fs, entry_path, entry)

def is_dir(self, follow_symlinks: bool = True) -> bool:
try:
return self._resolve(follow_symlinks).entry.is_dir()
except FilesystemError:
return False

def is_file(self, follow_symlinks: bool = True) -> bool:
try:
return self._resolve(follow_symlinks).entry.is_file()
except FilesystemError:
return False

def is_symlink(self) -> bool:
return self.entry.is_symlink()

def readlink(self) -> str:
if not self.is_symlink():
raise NotASymlinkError()

return self.entry.link

def stat(self, follow_symlinks: bool = True) -> fsutil.stat_result:
return self._resolve(follow_symlinks).lstat()

def lstat(self) -> fsutil.stat_result:
# mode, ino, dev, nlink, uid, gid, size, atime, mtime, ctime
st_info = fsutil.stat_result(
[
self.entry.mode,
self.entry.inum,
id(self.fs),
getattr(self.entry, "nlink", 1), # Only QNX4 has nlink
self.entry.uid,
self.entry.gid,
self.entry.size,
self.entry.atime.timestamp(),
self.entry.mtime.timestamp(),
self.entry.ctime.timestamp(),
]
)

st_info.st_birthtime = self.entry.ftime.timestamp()

st_info.st_blksize = self.fs.qnxfs.block_size
# Blocks are always calculated based on 512 byte blocks
# https://github.com/torvalds/linux/blob/c1e939a21eb111a6d6067b38e8e04b8809b64c4e/fs/qnx6/inode.c#L560-L561
st_info.st_blocks = (self.entry.size + 511) >> 9

return st_info
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ full = [
"dissect.ffs>=3,<4",
"dissect.jffs>=1,<2",
"dissect.ole>=3,<4",
"dissect.qnxfs>=1,<2",
"dissect.shellitem>=3,<4",
"dissect.squashfs>=1,<2",
"dissect.sql>=3,<4",
Expand Down Expand Up @@ -93,6 +94,7 @@ dev = [
"dissect.hypervisor[dev]>=3.0.dev,<4.0.dev",
"dissect.jffs[dev]>=1.0.dev,<2.0.dev",
"dissect.ntfs[dev]>=3.4.dev,<4.0.dev",
"dissect.qnxfs[dev]>=1.0.dev,<2.0.dev",
"dissect.regf[dev]>=3.3.dev,<4.0.dev",
"dissect.shellitem[dev]>=3.0.dev,<4.0.dev",
"dissect.sql[dev]>=3.0.dev,<4.0.dev",
Expand Down
3 changes: 3 additions & 0 deletions tests/_data/filesystems/qnxfs/qnx4.bin.gz
Git LFS file not shown
3 changes: 3 additions & 0 deletions tests/_data/filesystems/qnxfs/qnx6-be.bin.gz
Git LFS file not shown
3 changes: 3 additions & 0 deletions tests/_data/filesystems/qnxfs/qnx6-le.bin.gz
Git LFS file not shown
74 changes: 74 additions & 0 deletions tests/filesystems/test_qnxfs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import gzip
from datetime import datetime
from typing import Iterator
from unittest.mock import Mock, patch

import pytest

from dissect.target.filesystems.qnxfs import QnxFilesystem, QnxFilesystemEntry
from tests._utils import absolute_path


@pytest.mark.parametrize(
"filename",
[
"_data/filesystems/qnxfs/qnx4.bin.gz",
"_data/filesystems/qnxfs/qnx6-be.bin.gz",
"_data/filesystems/qnxfs/qnx6-le.bin.gz",
],
)
def test_qnxfs_detect(filename: str) -> None:
"""test that QNX filesystems are correctly detected"""
with gzip.open(absolute_path(filename), "rb") as fh:
assert QnxFilesystem.detect(fh)
assert QnxFilesystem(fh).qnxfs


@pytest.fixture
def qnx_fs() -> Iterator[QnxFilesystem]:
with patch("dissect.qnxfs.QNXFS", return_value=Mock(block_size=4096)):
qnx_fs = QnxFilesystem(Mock())
yield qnx_fs


@pytest.fixture
def qnxfs_fs_file_entry(qnx_fs: QnxFilesystem) -> Iterator[QnxFilesystemEntry]:
inode = Mock(
mode=0o100664,
inum=4,
size=165002,
uid=1000,
gid=999,
atime=datetime(2024, 10, 1, 12, 0, 0),
mtime=datetime(2024, 10, 2, 12, 0, 0),
ctime=datetime(2024, 10, 3, 12, 0, 0),
ftime=datetime(2024, 10, 4, 12, 0, 0),
nlink=2,
is_file=lambda: True,
is_dir=lambda: False,
is_symlink=lambda: False,
)

entry = QnxFilesystemEntry(qnx_fs, "/some_file", inode)
yield entry


@pytest.mark.parametrize("entry_fixture, expected_blocks", [("qnxfs_fs_file_entry", 323)])
def test_qnxfs_stat(entry_fixture: str, expected_blocks: int, request: pytest.FixtureRequest) -> None:
"""test consistency in stat() results"""
qnxfs_entry: QnxFilesystemEntry = request.getfixturevalue(entry_fixture)
stat = qnxfs_entry.stat()

entry = qnxfs_entry.entry
assert stat.st_mode == entry.mode
assert stat.st_ino == entry.inum
assert stat.st_dev == id(qnxfs_entry.fs)
assert stat.st_nlink == entry.nlink
assert stat.st_uid == entry.uid
assert stat.st_gid == entry.gid
assert stat.st_size == entry.size
assert stat.st_atime == entry.atime.timestamp()
assert stat.st_mtime == entry.mtime.timestamp()
assert stat.st_ctime == entry.ctime.timestamp()
assert stat.st_blksize == qnxfs_entry.fs.qnxfs.block_size
assert stat.st_blocks == expected_blocks
Loading