Source code for stegx.cli
from __future__ import annotations
import argparse
import getpass
import logging
import os
import sys
import time
import zlib
from typing import Callable, List, Optional
from PIL import Image, UnidentifiedImageError
from stegx.io_sources import fetch_cover_to_tempfile, is_url
from stegx.kdf import KdfParams
from stegx.safe_paths import (
PathValidationError,
sink_safe_path,
validate_user_path,
)
from stegx.shamir import combine_shares, split_secret
from stegx.constants import YK_CHALLENGE_NONCE_LEN
from stegx.sentinel import cover_fingerprint
from stegx.yubikey import (
YUBIKEY_AVAILABLE,
YubiKeyError,
challenge_for_operation,
resolve_yubikey_response,
)
from stegx.steganography import (
EmbedOptions,
calculate_lsb_capacity,
embed_v2,
extract_v2,
)
from stegx.exceptions import StegXError
from stegx.utils import (
META_PANIC,
META_PANIC_MODE,
create_payload,
create_payload_from_bytes,
create_payload_from_files,
parse_payload,
parse_payload_full,
save_extracted,
save_extracted_file,
setup_logging,
)
__version__ = "2.0.0"
def _build_version_string() -> str:
def _present(mod_name: str) -> str:
try:
__import__(mod_name)
return "ok"
except ImportError:
return "missing"
from stegx.compression import available_algorithms
from stegx.secure_memory import MEMORY_LOCK_AVAILABLE
from stegx.yubikey import YUBIKEY_AVAILABLE
parts = [f"stegx {__version__}"]
parts.append("")
parts.append("Core (required):")
parts.append(f" argon2-cffi : {_present('argon2')}")
parts.append(f" cryptography : {_present('cryptography')}")
parts.append(f" Pillow : {_present('PIL')}")
parts.append(f" tqdm : {_present('tqdm')}")
parts.append("")
parts.append("Optional:")
parts.append(f" zstandard : {_present('zstandard')}")
parts.append(f" brotli : {_present('brotli')}")
parts.append(f" zxcvbn : {_present('zxcvbn')}")
parts.append(f" ykman (YubiKey): {'ok' if YUBIKEY_AVAILABLE else 'missing'}")
parts.append(f" numpy : {_present('numpy')}")
parts.append("")
parts.append("Runtime capabilities:")
parts.append(f" memory locking : {'ok (mlock / VirtualLock)' if MEMORY_LOCK_AVAILABLE else 'missing'}")
parts.append(f" compression : {', '.join(available_algorithms())}")
return "\n".join(parts)
GENERIC_DECODE_ERROR = (
"Extraction failed: wrong password, wrong keyfile, or image does not"
" contain StegX data."
)
def _print_err(msg: str) -> None:
print(f"Error: {msg}", file=sys.stderr)
def _print_ok(msg: str) -> None:
print(msg)
def _win_pid_is_alive(pid: int) -> bool:
if pid <= 0:
return False
import ctypes
from ctypes import wintypes
PROCESS_QUERY_LIMITED_INFORMATION = 0x1000
kernel32 = ctypes.windll.kernel32
kernel32.OpenProcess.argtypes = [wintypes.DWORD, wintypes.BOOL, wintypes.DWORD]
kernel32.OpenProcess.restype = wintypes.HANDLE
kernel32.CloseHandle.argtypes = [wintypes.HANDLE]
kernel32.CloseHandle.restype = wintypes.BOOL
kernel32.GetLastError.restype = wintypes.DWORD
handle = kernel32.OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, False, pid)
if handle:
kernel32.CloseHandle(handle)
return True
return kernel32.GetLastError() != 87
class _OutputLock:
_MIN_STALE_SECONDS = 5
def __init__(self, output_path: str):
self._lock_path = output_path + ".stegx-lock"
self._fd: Optional[int] = None
def _try_create(self) -> bool:
flags = os.O_CREAT | os.O_EXCL | os.O_WRONLY
try:
self._fd = os.open(self._lock_path, flags, 0o600)
except FileExistsError:
return False
payload = f"{os.getpid()}\n{time.time():.3f}"
os.write(self._fd, payload.encode("ascii"))
return True
def _steal_if_stale(self) -> bool:
try:
fd = os.open(self._lock_path, os.O_RDONLY)
except OSError:
return False
pid = 0
try:
try:
st = os.fstat(fd)
except OSError:
return False
mtime = st.st_mtime
if time.time() - mtime < self._MIN_STALE_SECONDS:
return False
try:
raw = os.read(fd, 128)
except OSError:
return False
try:
pid_str = raw.decode("ascii", errors="ignore").splitlines()[0].strip()
pid = int(pid_str)
except (ValueError, IndexError):
pid = -1
alive = False
if pid > 0:
if os.name == "nt":
try:
import psutil
alive = psutil.pid_exists(pid)
except ImportError:
alive = _win_pid_is_alive(pid)
else:
try:
os.kill(pid, 0)
alive = True
except ProcessLookupError:
alive = False
except PermissionError:
alive = True
if alive:
return False
try:
st2 = os.fstat(fd)
except OSError:
return False
if st2.st_mtime != mtime:
return False
finally:
try:
os.close(fd)
except OSError:
pass
try:
os.unlink(self._lock_path)
logging.warning(
"Removed stale lock file %s (owning process is gone).",
self._lock_path,
)
except OSError:
return False
return True
def __enter__(self) -> "_OutputLock":
if self._try_create():
return self
if self._steal_if_stale() and self._try_create():
return self
raise RuntimeError(
f"Another stegx process appears to be writing to the same "
f"output (lock file {self._lock_path} exists). If this is "
f"stale, remove it manually and retry."
)
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
try:
if self._fd is not None:
os.close(self._fd)
except OSError:
pass
try:
os.unlink(self._lock_path)
except OSError:
pass
def _output_path_is_traversal_free(path: str, allow_outside: bool = False) -> bool:
if allow_outside:
return True
norm = os.path.normpath(path)
parts = norm.replace("\\", "/").split("/")
return ".." not in parts
def _bounded_int(lo: int, hi: int, name: str = "value"):
def _check(raw: str) -> int:
try:
v = int(raw)
except ValueError:
raise argparse.ArgumentTypeError(
f"{name} must be an integer, got {raw!r}"
)
if not (lo <= v <= hi):
raise argparse.ArgumentTypeError(
f"{name} must be between {lo} and {hi}, got {v}"
)
return v
return _check
def _prompt_password(confirm: bool, label: str = "Password") -> str:
pw = getpass.getpass(f"{label}: ")
if not pw:
raise ValueError(f"{label} cannot be empty.")
if confirm:
again = getpass.getpass(f"Confirm {label.lower()}: ")
if pw != again:
raise ValueError("Passwords do not match.")
return pw
def _resolve_password(
args: argparse.Namespace,
confirm: bool,
label: str = "Password",
arg_name: str = "password",
) -> str:
explicit = getattr(args, arg_name, None)
if getattr(args, "password_stdin", False):
pw = sys.stdin.readline().rstrip("\r\n")
if not pw:
raise ValueError(f"{label} read from stdin was empty.")
return pw
if explicit:
logging.warning(
"Passing the password via --%s exposes it to shell history and"
" `ps`. Prefer the interactive prompt.",
arg_name.replace("_", "-"),
)
return explicit
return _prompt_password(confirm=confirm, label=label)
def _maybe_yubikey_response(
args: argparse.Namespace, challenge: bytes, *, required: bool = False
) -> Optional[bytes]:
want_yk = bool(getattr(args, "yubikey", False))
resp_file = getattr(args, "yubikey_response_file", None)
if not want_yk and not resp_file and not required:
return None
if not want_yk and not resp_file and required:
raise ValueError(
"This payload requires the YubiKey factor. Re-run with --yubikey "
"(or --yubikey-response-file for tests)."
)
if resp_file is None and not YUBIKEY_AVAILABLE:
raise ValueError(
"--yubikey needs the ykman / yubikit packages. Install with "
"`pip install ykman` or provide --yubikey-response-file."
)
try:
return resolve_yubikey_response(challenge, response_file=resp_file)
except YubiKeyError as e:
raise ValueError(f"YubiKey interaction failed: {e}")
def _read_keyfile(path: Optional[str]) -> Optional[bytes]:
if not path:
return None
safe_path = validate_user_path(path, kind="file", must_exist=True)
with open(sink_safe_path(safe_path), "rb") as f:
data = f.read()
if not data:
raise ValueError("Keyfile is empty.")
return data
def _check_password_strength(password: str, strict: bool) -> None:
try:
from zxcvbn import zxcvbn
except ImportError:
logging.debug("zxcvbn not installed; skipping password-strength gate.")
return
result = zxcvbn(password)
score = int(result.get("score", 0))
feedback = result.get("feedback", {}).get("warning") or "Low entropy."
if score < 3:
msg = (
f"Password strength is weak (zxcvbn score {score}/4). {feedback}"
" Use a longer passphrase mixing words, numbers and punctuation."
)
if strict:
raise ValueError(msg)
logging.warning(msg)
FIPS_BANNED_COMPRESSION = {"lzma", "bz2", "zstd", "zstd_dict_v1", "brotli"}
def _apply_fips_policy(args: argparse.Namespace) -> None:
if not getattr(args, "fips", False):
return
from stegx.fips import assert_fips_runtime
assert_fips_runtime()
if getattr(args, "dual_cipher", False):
raise ValueError(
"--fips forbids --dual-cipher (ChaCha20-Poly1305 is not FIPS-approved)."
)
if getattr(args, "yubikey", False):
raise ValueError(
"--fips forbids --yubikey (YubiKey slot 2 uses HMAC-SHA1, "
"which is not on the FIPS 140 validated algorithm list)."
)
if getattr(args, "old_yubikey", False):
raise ValueError(
"--fips forbids --old-yubikey: the rewrap's unwrap step would "
"invoke HMAC-SHA1, which is not on the FIPS 140 validated "
"algorithm list."
)
kdf_selected = getattr(args, "kdf", None)
compression_selected = getattr(args, "compression", None)
if kdf_selected == "argon2id":
logging.info("--fips: switching KDF from argon2id to pbkdf2.")
args.kdf = "pbkdf2"
args._fips_forced_kdf = True
if compression_selected is not None and compression_selected != "fast":
logging.info("--fips: forcing --compression=fast (zlib only).")
args.compression = "fast"
args._fips_forced_compression = True
def _build_embed_options(args: argparse.Namespace, keyfile_bytes: Optional[bytes]) -> EmbedOptions:
_apply_fips_policy(args)
kdf_id = args.kdf
kdf_params: KdfParams
if kdf_id == "argon2id":
kdf_params = KdfParams.default_argon2id()
elif kdf_id == "pbkdf2":
kdf_params = KdfParams.default_pbkdf2()
else:
raise ValueError(f"Unknown KDF selection: {kdf_id}")
return EmbedOptions(
dual_cipher=args.dual_cipher,
use_matrix_embedding=args.matrix_embedding,
use_adaptive=args.adaptive,
adaptive_cutoff=args.adaptive_cutoff,
adaptive_cost_mode=getattr(args, "adaptive_mode", "laplacian"),
max_fill_ratio=args.max_fill / 100.0,
keyfile_bytes=keyfile_bytes,
decoy_file_bytes=None,
decoy_filename=None,
decoy_password=None,
always_split_cover=getattr(args, "always_split_cover", False),
preserve_cover_encoding=not args.no_preserve_cover,
compression=args.compress,
kdf_params=kdf_params,
)
def _resolve_cover_path(raw_path: str) -> tuple[str, Optional[str]]:
if is_url(raw_path):
tmp = fetch_cover_to_tempfile(raw_path)
return tmp, tmp
return raw_path, None
def _resolve_encode_files(args: argparse.Namespace) -> List[str]:
raw_files = args.file if isinstance(args.file, list) else [args.file]
for f in raw_files:
if not os.path.isfile(f):
raise FileNotFoundError(f"File to hide not found: {f}")
return raw_files
def _resolve_encode_credentials(
args: argparse.Namespace, cover_path: str
) -> "tuple[str, Optional[bytes], Optional[bytes], Optional[bytes]]":
password = _resolve_password(args, confirm=True, label="Password")
_check_password_strength(password, strict=args.strict_password)
keyfile_bytes = _read_keyfile(args.keyfile)
yk_nonce: Optional[bytes] = None
yubikey_response: Optional[bytes] = None
wants_yk = bool(
getattr(args, "yubikey", False) or getattr(args, "yubikey_response_file", None)
)
if wants_yk:
with Image.open(cover_path) as _img:
_probe = _img.convert("RGBA") if _img.mode == "P" else _img
fp = cover_fingerprint(_probe)
yk_nonce = os.urandom(YK_CHALLENGE_NONCE_LEN)
challenge = challenge_for_operation(yk_nonce, fp)
yubikey_response = _maybe_yubikey_response(args, challenge)
return password, keyfile_bytes, yubikey_response, yk_nonce
def _resolve_encode_decoy(
args: argparse.Namespace, real_password: str
) -> "tuple[Optional[bytes], Optional[str], Optional[str]]":
if not args.decoy_file:
return None, None, None
safe_decoy = validate_user_path(args.decoy_file, kind="file", must_exist=True)
with open(sink_safe_path(safe_decoy), "rb") as f:
decoy_bytes = f.read()
decoy_name = os.path.basename(safe_decoy)
decoy_password = args.decoy_password or _prompt_password(
confirm=True, label="Decoy password"
)
_check_password_strength(decoy_password, strict=args.strict_password)
if decoy_password == real_password:
raise ValueError("Decoy password must differ from the real password.")
return decoy_bytes, decoy_name, decoy_password
def _resolve_encode_panic(
args: argparse.Namespace, real_password: str
) -> "tuple[Optional[str], Optional[bytes]]":
if not getattr(args, "panic_password", None):
return None, None
if args.decoy_file:
raise ValueError(
"--panic-password and --decoy-file share the cover's decoy "
"region; choose one or the other."
)
panic_password = args.panic_password
_check_password_strength(panic_password, strict=args.strict_password)
if panic_password == real_password:
raise ValueError("Panic password must differ from the real password.")
panic_marker_payload: Optional[bytes] = None
if getattr(args, "panic_decoy", None):
safe_panic_decoy = validate_user_path(
args.panic_decoy, kind="file", must_exist=True
)
with open(sink_safe_path(safe_panic_decoy), "rb") as f:
panic_marker_payload = f.read()
return panic_password, panic_marker_payload
def _probe_cover_capacity(cover_path: str, original_arg: str) -> int:
try:
with Image.open(cover_path) as probe:
if probe.mode == "P":
probe = probe.convert("RGBA")
elif probe.mode not in ("RGB", "RGBA", "L"):
raise ValueError(f"Unsupported cover image mode: {probe.mode}.")
return calculate_lsb_capacity(probe)
except UnidentifiedImageError:
raise ValueError(f"Cannot identify image file: {original_arg}")
def _attach_polyglot(saved_path: str, polyglot_files: Optional[List[str]]) -> None:
if not polyglot_files:
return
from stegx.polyglot import build_zip_from_files, make_png_zip_polyglot
safe_members = [
validate_user_path(f, kind="file", must_exist=True)
for f in polyglot_files
]
safe_members = [sink_safe_path(p) for p in safe_members]
zip_bytes = build_zip_from_files(safe_members)
make_png_zip_polyglot(saved_path, zip_bytes)
zip_names = ", ".join(os.path.basename(f) for f in safe_members)
_print_ok(
f"Polyglot PNG+ZIP written: {saved_path} (public ZIP side: [{zip_names}])"
)
[docs]
def perform_encode(args: argparse.Namespace) -> bool:
tmp_cover: Optional[str] = None
try:
if not is_url(args.image):
args.image = validate_user_path(
args.image, kind="file", must_exist=True
)
args.image = sink_safe_path(args.image)
args.output = sink_safe_path(validate_user_path(args.output))
try:
cover_path, tmp_cover = _resolve_cover_path(args.image)
except ValueError as e:
raise FileNotFoundError(f"Cover image not available: {e}")
if not os.path.isfile(cover_path):
raise FileNotFoundError(f"Cover image not found: {args.image}")
raw_files = _resolve_encode_files(args)
password, keyfile_bytes, yubikey_response, yk_nonce = _resolve_encode_credentials(
args, cover_path
)
decoy_bytes, decoy_name, decoy_password = _resolve_encode_decoy(args, password)
panic_password, panic_marker_payload = _resolve_encode_panic(args, password)
capacity_bits = _probe_cover_capacity(cover_path, args.image)
logging.info("Cover capacity ~= %d bits (%d bytes).", capacity_bits, capacity_bits // 8)
payload_bytes = create_payload_from_files(
raw_files,
compress=args.compress,
compression_mode=args.compression,
show_progress=not getattr(args, "verbose", False),
)
if not _output_path_is_traversal_free(args.output, allow_outside=getattr(args, "allow_outside_cwd", False)):
_print_err(
f"Refusing to write outside the current directory: {args.output!r}. "
"Pass --allow-outside-cwd to override."
)
return False
output_dir = os.path.dirname(args.output)
if output_dir and not os.path.exists(output_dir):
os.makedirs(output_dir, exist_ok=True)
if not args.output.lower().endswith(".png"):
logging.warning("Output does not end with .png; saving as PNG.")
options = _build_embed_options(args, keyfile_bytes)
options.yubikey_response = yubikey_response
options.yk_challenge_nonce = yk_nonce
options.decoy_file_bytes = decoy_bytes
options.decoy_filename = decoy_name
options.decoy_password = decoy_password
options.panic_password = panic_password
options.panic_marker_payload = panic_marker_payload
with _OutputLock(args.output):
saved_path = embed_v2(cover_path, payload_bytes, args.output, password, options)
names = ", ".join(os.path.basename(f) for f in raw_files)
_maybe_audit(args, "encode", ok=True, cover_path=cover_path, stego_path=saved_path)
_attach_polyglot(saved_path, getattr(args, "polyglot_zip", None))
_print_ok(f"Successfully encoded [{names}] into '{saved_path}'.")
return True
except (StegXError, FileNotFoundError, ValueError, OSError, zlib.error) as e:
logging.error("Encoding failed: %s", e)
_print_err(f"Encoding failed: {e}")
_maybe_audit(args, "encode", ok=False, note=type(e).__name__)
return False
except Exception as e:
logging.exception("Unexpected encoding failure.")
_print_err(f"Unexpected error: {e}")
_maybe_audit(args, "encode", ok=False, note=type(e).__name__)
return False
finally:
if tmp_cover:
try:
os.unlink(sink_safe_path(tmp_cover))
except (OSError, PathValidationError):
pass
def _decode_wants_stdout(args: argparse.Namespace) -> bool:
return bool(getattr(args, "stdout", False)) or args.destination == "-"
def _collect_flags_for_audit(args: argparse.Namespace) -> list:
flags = []
for attr in (
"dual_cipher", "adaptive", "matrix_embedding",
"strict_password", "always_split_cover", "fips",
"yubikey", "no_preserve_cover",
):
if getattr(args, attr, False):
flags.append(f"--{attr.replace('_', '-')}")
if getattr(args, "keyfile", None):
flags.append("--keyfile")
if getattr(args, "decoy_file", None):
flags.append("--decoy-file")
if getattr(args, "panic_password", None):
flags.append("--panic-password")
if getattr(args, "polyglot_zip", None):
flags.append("--polyglot-zip")
if getattr(args, "compression", None) and args.compression != "best":
forced = getattr(args, "_fips_forced_compression", False)
tag = "fips-forced" if forced else args.compression
flags.append(f"--compression={tag}")
if getattr(args, "kdf", None) and args.kdf != "argon2id":
forced = getattr(args, "_fips_forced_kdf", False)
tag = "fips-forced" if forced else args.kdf
flags.append(f"--kdf={tag}")
return flags
def _maybe_audit(args: argparse.Namespace, op: str, *, ok: bool,
cover_path: "Optional[str]" = None,
stego_path: "Optional[str]" = None,
note: "Optional[str]" = None) -> None:
path = getattr(args, "audit_log", None)
if not path:
return
try:
from stegx.audit_log import append_record
append_record(
path, op,
ok=ok,
cover_path=cover_path,
stego_path=stego_path,
flags=_collect_flags_for_audit(args),
note=note,
)
except Exception as e:
logging.debug("audit: unexpected error in append_record: %s", e)
[docs]
def perform_decode(args: argparse.Namespace) -> bool:
try:
args.image = sink_safe_path(
validate_user_path(args.image, kind="file", must_exist=True)
)
to_stdout = _decode_wants_stdout(args)
if not to_stdout:
if args.destination is None:
raise ValueError(
"A destination directory is required (or pass --stdout / -d -"
" to write the payload to stdout)."
)
args.destination = sink_safe_path(
validate_user_path(args.destination)
)
if not os.path.exists(args.destination):
os.makedirs(args.destination, exist_ok=True)
elif not os.path.isdir(args.destination):
raise NotADirectoryError(
f"Destination is not a directory: {args.destination}"
)
password = _resolve_password(args, confirm=False, label="Password")
keyfile_bytes = _read_keyfile(args.keyfile)
yk_factory: Optional[Callable[[bytes], bytes]] = None
if getattr(args, "yubikey", False) or getattr(args, "yubikey_response_file", None):
def yk_factory(challenge: bytes) -> bytes:
return _maybe_yubikey_response(args, challenge, required=True)
from stegx.steganography import extract_v2_with_region
try:
decrypted, matched_region = extract_v2_with_region(
args.image,
password,
keyfile_bytes,
yubikey_factory=yk_factory,
allow_v1=getattr(args, "allow_v1", False),
)
except (StegXError, ValueError, OSError) as e:
logging.debug("extract_v2 raised: %s", e)
_print_err(GENERIC_DECODE_ERROR)
return False
try:
filename, file_data, meta = parse_payload_full(decrypted)
except (StegXError, ValueError, zlib.error):
_print_err(GENERIC_DECODE_ERROR)
return False
if meta.get(META_PANIC):
from stegx.panic import (
PANIC_MODE_SILENT,
destroy_real_region_in_place,
)
panic_mode = meta.get(META_PANIC_MODE, PANIC_MODE_SILENT)
destroy_real_region_in_place(
args.image, matched_region, panic_mode=panic_mode,
)
if panic_mode == PANIC_MODE_SILENT:
_print_err(GENERIC_DECODE_ERROR)
return False
if to_stdout:
try:
sys.stdout.buffer.write(file_data)
sys.stdout.buffer.flush()
except BrokenPipeError:
pass
logging.info(
"Wrote %d bytes of '%s' to stdout (original filename preserved in log only).",
len(file_data), filename,
)
return True
written = save_extracted(filename, file_data, meta, args.destination)
if len(written) == 1:
_print_ok(f"Successfully decoded to '{written[0]}'.")
else:
_print_ok(
f"Successfully decoded {len(written)} files from bundle into "
f"'{args.destination}'."
)
_maybe_audit(args, "decode", ok=True, stego_path=args.image)
return True
except (FileNotFoundError, ValueError, OSError, NotADirectoryError) as e:
logging.debug("decoding setup error: %s", e)
if isinstance(e, NotADirectoryError):
_print_err(f"{e}")
else:
_print_err(GENERIC_DECODE_ERROR)
_maybe_audit(args, "decode", ok=False, stego_path=args.image,
note=type(e).__name__)
return False
except Exception as e:
logging.exception("Unexpected decoding failure.")
_print_err(f"Unexpected error: {e}")
_maybe_audit(args, "decode", ok=False, stego_path=args.image,
note=type(e).__name__)
return False
[docs]
def perform_shamir_split(args: argparse.Namespace) -> bool:
try:
args.file = sink_safe_path(
validate_user_path(args.file, kind="file", must_exist=True)
)
if args.k > args.n:
raise ValueError("Threshold k cannot exceed total shares n.")
if len(args.cover) != args.n:
raise ValueError(
f"Expected {args.n} cover images (-n {args.n}), got {len(args.cover)}."
)
args.cover = [
c if is_url(c) else sink_safe_path(
validate_user_path(c, kind="file", must_exist=True)
)
for c in args.cover
]
args.out_dir = sink_safe_path(validate_user_path(args.out_dir))
password = _resolve_password(args, confirm=True, label="Password")
keyfile_bytes = _read_keyfile(args.keyfile)
with open(sink_safe_path(args.file), "rb") as f:
secret = f.read()
shares = split_secret(secret, args.k, args.n)
os.makedirs(args.out_dir, exist_ok=True)
for idx, (raw_cover, share_bytes) in enumerate(zip(args.cover, shares), start=1):
cover_path, tmp_cover = _resolve_cover_path(raw_cover)
try:
if not os.path.isfile(cover_path):
raise FileNotFoundError(f"Cover not found: {raw_cover}")
share_yk_nonce: Optional[bytes] = None
share_yk_resp: Optional[bytes] = None
if getattr(args, "yubikey", False) or getattr(args, "yubikey_response_file", None):
with Image.open(cover_path) as _img:
_probe = _img.convert("RGBA") if _img.mode == "P" else _img
_fp = cover_fingerprint(_probe)
share_yk_nonce = os.urandom(YK_CHALLENGE_NONCE_LEN)
share_yk_resp = _maybe_yubikey_response(
args, challenge_for_operation(share_yk_nonce, _fp)
)
filename = f"{os.path.basename(args.file)}.share{idx:02d}"
inner = create_payload_from_bytes(filename, share_bytes, compress=False)
out_path = os.path.join(args.out_dir, f"stego_share_{idx:02d}.png")
opts = _build_embed_options(args, keyfile_bytes)
opts.yubikey_response = share_yk_resp
opts.yk_challenge_nonce = share_yk_nonce
opts.decoy_file_bytes = None
opts.decoy_password = None
with _OutputLock(out_path):
embed_v2(cover_path, inner, out_path, password, opts)
_print_ok(f"Share {idx}/{args.n} embedded in {out_path}")
finally:
if tmp_cover:
try:
os.unlink(tmp_cover)
except OSError:
pass
_print_ok(f"Done. Any {args.k} of {args.n} shares reconstruct the secret.")
return True
except (FileNotFoundError, ValueError, OSError) as e:
logging.error("Shamir split failed: %s", e)
_print_err(f"Shamir split failed: {e}")
return False
except Exception as e:
logging.exception("Unexpected Shamir-split failure.")
_print_err(f"Unexpected error: {e}")
return False
[docs]
def perform_rewrap(args: argparse.Namespace) -> bool:
tmp_cover: Optional[str] = None
scratch: Optional[str] = None
try:
args.image = sink_safe_path(
validate_user_path(args.image, kind="file", must_exist=True)
)
if getattr(args, "output", None):
args.output = sink_safe_path(validate_user_path(args.output))
_apply_fips_policy(args)
old_password = _prompt_password(confirm=False, label="Current password")
new_password = _prompt_password(confirm=True, label="New password")
if new_password == old_password:
raise ValueError("New password must differ from the current password.")
_check_password_strength(new_password, strict=args.strict_password)
old_keyfile = _read_keyfile(getattr(args, "old_keyfile", None))
new_keyfile = _read_keyfile(getattr(args, "keyfile", None))
old_yk_args = argparse.Namespace(
yubikey=getattr(args, "old_yubikey", False),
yubikey_response_file=getattr(args, "old_yubikey_response_file", None),
)
new_yk_args = args
_old_yk_resp: list = []
def _old_yk_factory(challenge: bytes) -> bytes:
resp = _maybe_yubikey_response(old_yk_args, challenge, required=True)
_old_yk_resp.append(resp)
return resp
old_yk_factory: Optional[Callable[[bytes], bytes]] = (
_old_yk_factory
if (getattr(old_yk_args, "yubikey", False) or
getattr(old_yk_args, "yubikey_response_file", None))
else None
)
from stegx.steganography import (
_all_positions,
_derive_position_material,
extract_v2_with_region,
)
from stegx.decoy import reorder_region, split_regions
from stegx.sentinel import cover_fingerprint
from stegx.panic import _overwrite_lsbs_randomly
output_path = getattr(args, "output", None) or args.image
with _OutputLock(output_path):
try:
raw_inner, matched_region = extract_v2_with_region(
args.image, old_password, old_keyfile, yubikey_factory=old_yk_factory,
)
except (ValueError, OSError) as e:
_print_err(f"rewrap: cannot decrypt with current credentials ({e}).")
_maybe_audit(args, "rewrap", ok=False, stego_path=args.image,
note="old-credentials-invalid")
return False
image = Image.open(args.image)
image.load()
if image.mode == "P":
image = image.convert("RGBA")
fingerprint = cover_fingerprint(image)
all_positions = _all_positions(image)
new_yk_nonce: Optional[bytes] = None
new_yubikey: Optional[bytes] = None
if (getattr(new_yk_args, "yubikey", False) or
getattr(new_yk_args, "yubikey_response_file", None)):
new_yk_nonce = os.urandom(YK_CHALLENGE_NONCE_LEN)
new_challenge = challenge_for_operation(new_yk_nonce, fingerprint)
new_yubikey = _maybe_yubikey_response(new_yk_args, new_challenge)
if matched_region == "real-full":
old_region = list(all_positions)
elif matched_region == "real-half":
_decoy, real_region = split_regions(all_positions, fingerprint)
old_region = list(real_region)
elif matched_region == "decoy-half":
decoy_region, _real = split_regions(all_positions, fingerprint)
old_region = list(decoy_region)
else:
_print_err("rewrap: unrecognised match region; refusing to modify image.")
_maybe_audit(args, "rewrap", ok=False, stego_path=args.image,
note=f"unknown-region:{matched_region}")
return False
old_seed, _sent, _decoy_seed = _derive_position_material(
old_password, old_keyfile, fingerprint,
_old_yk_resp[0] if _old_yk_resp else None,
)
old_ordered = reorder_region(old_region, old_seed)
_overwrite_lsbs_randomly(image, old_ordered)
import tempfile as _tempfile
scratch_dir = os.path.dirname(os.path.abspath(args.image)) or "."
_sc_fd, scratch = _tempfile.mkstemp(
dir=scratch_dir, prefix=".stegx_rewrap_", suffix=".tmp",
)
os.close(_sc_fd)
image.save(scratch, format="PNG", pnginfo=None, optimize=False)
image.close()
options = _build_embed_options(args, new_keyfile)
options.yubikey_response = new_yubikey
options.yk_challenge_nonce = new_yk_nonce
options.decoy_file_bytes = None
options.decoy_password = None
options.panic_password = None
options.always_split_cover = (matched_region != "real-full")
saved_path = embed_v2(scratch, raw_inner, output_path, new_password, options)
if os.path.abspath(saved_path) == os.path.abspath(scratch):
scratch = None
_print_ok(f"Rewrapped '{args.image}' -> '{saved_path}' with the new credentials.")
_maybe_audit(args, "rewrap", ok=True, stego_path=saved_path,
note=f"old-region={matched_region}")
return True
except (FileNotFoundError, ValueError, OSError) as e:
logging.error("rewrap failed: %s", e)
_print_err(f"rewrap failed: {e}")
_maybe_audit(args, "rewrap", ok=False, note=type(e).__name__)
return False
except Exception as e:
logging.exception("rewrap unexpected failure.")
_print_err(f"Unexpected error: {e}")
_maybe_audit(args, "rewrap", ok=False, note=type(e).__name__)
return False
finally:
if scratch and os.path.exists(scratch):
try:
os.unlink(scratch)
except OSError:
pass
if tmp_cover and os.path.exists(tmp_cover):
try:
os.unlink(tmp_cover)
except OSError:
pass
[docs]
def perform_pick_cover(args: argparse.Namespace) -> bool:
from stegx.cover_selector import pick_best_cover
try:
if not os.path.isdir(args.dir):
raise NotADirectoryError(f"Cover directory not found: {args.dir}")
if args.payload and not os.path.isfile(args.payload):
raise FileNotFoundError(f"Payload file not found: {args.payload}")
payload_size = os.path.getsize(args.payload) if args.payload else args.size
if payload_size is None or payload_size <= 0:
raise ValueError("Supply either --payload FILE or --size BYTES.")
best, ranked = pick_best_cover(args.dir, payload_size)
print(f"Payload size: {payload_size:,} bytes ({payload_size * 8:,} bits)")
print(f"Candidates : {len(ranked)} images scanned in {args.dir}")
print()
print(f"{'score':>8} {'W x H':>11} {'mode':<5} {'capacity (B)':>14} {'entropy':>7} path")
print("-" * 80)
for c in ranked[: args.limit]:
marker = " " if c.enough_capacity else "x"
print(
f"{marker} {c.score:>6.2f} {c.width:>5} x {c.height:<3} "
f"{c.mode:<5} {c.capacity_bits // 8:>14,} {c.entropy:>7.4f} {c.path}"
)
if best is None:
print()
print("No candidate has enough capacity. Use a larger image or split the payload.")
return False
print()
print(f"Best pick: {best.path}")
return True
except (FileNotFoundError, NotADirectoryError, ValueError, OSError) as e:
_print_err(f"pick-cover failed: {e}")
return False
[docs]
def perform_benchmark(args: argparse.Namespace) -> bool:
import os
import time
from stegx.compression import (
MODE_BEST,
MODE_FAST,
available_algorithms,
compress_best,
ratio_report,
)
from stegx.kdf import (
KdfParams,
calibrate_argon2_for_target_ms,
derive_master_key,
)
iterations = max(1, int(args.iterations))
size_kib = max(1, int(args.size_kib))
if getattr(args, "calibrate", False):
target_ms = max(100, int(getattr(args, "target_ms", 500)))
print(f"\n=== Argon2id calibration (target ~{target_ms} ms) ===")
tuned = calibrate_argon2_for_target_ms(target_ms=target_ms)
print(
f"Recommended Argon2id params for this machine:\n"
f" time_cost = {tuned.time_cost}\n"
f" memory_cost_kib= {tuned.memory_cost_kib} "
f"({tuned.memory_cost_kib / 1024:.1f} MiB)\n"
f" parallelism = {tuned.parallelism}"
)
print(
"\nThese values are compiled into the default profile at\n"
" stegx/kdf.py::ARGON2_TIME_COST / ARGON2_MEMORY_COST_KIB /\n"
" ARGON2_PARALLELISM\n"
"— bump them there if you want the project-wide default to change."
)
return True
sample_text = (
b"The quick brown fox jumps over the lazy dog. "
b"StegX benchmark corpus 0123456789 {}[];:<>?,./\\|\n"
)
sample = (sample_text * ((size_kib * 1024 // len(sample_text)) + 1))[: size_kib * 1024]
print(f"\n=== Argon2id KDF timing ({iterations} iterations) ===")
params = KdfParams.default_argon2id()
print(
f"Default parameters: time_cost={params.time_cost}, "
f"memory={params.memory_cost_kib} KiB, parallelism={params.parallelism}"
)
salt = os.urandom(16)
timings = []
for i in range(iterations):
t0 = time.perf_counter()
derive_master_key("benchmark-password-XYZ-123", salt, params)
elapsed_ms = (time.perf_counter() - t0) * 1000
timings.append(elapsed_ms)
print(f" run {i+1}/{iterations}: {elapsed_ms:7.1f} ms")
avg = sum(timings) / len(timings)
print(f" mean: {avg:.1f} ms (min {min(timings):.1f}, max {max(timings):.1f})")
if avg < 300:
rec = (
f"Your machine runs the default Argon2id params in {avg:.0f} ms -- "
"you can safely raise memory_cost_kib to strengthen brute-force "
"resistance without noticeable latency."
)
elif avg > 2000:
rec = (
f"Argon2id took {avg:.0f} ms on average -- if UX matters, consider "
"lowering memory_cost_kib to 32768 or time_cost to 2."
)
else:
rec = f"Argon2id latency ({avg:.0f} ms) is within the recommended 0.3-2 s window."
print(f"Recommendation: {rec}")
print(f"\n=== Compression multiplexer ({size_kib} KiB of mixed ASCII) ===")
print(f"Available algorithms: {', '.join(available_algorithms())}")
t0 = time.perf_counter()
fast_alg, fast_blob = compress_best(sample, mode=MODE_FAST)
fast_ms = (time.perf_counter() - t0) * 1000
t0 = time.perf_counter()
best_alg, best_blob = compress_best(sample, mode=MODE_BEST)
best_ms = (time.perf_counter() - t0) * 1000
print(f" --compression fast: {ratio_report(len(sample), len(fast_blob))} "
f"via {fast_alg} ({fast_ms:.1f} ms)")
print(f" --compression best: {ratio_report(len(sample), len(best_blob))} "
f"via {best_alg} ({best_ms:.1f} ms)")
if len(fast_blob) > 0:
savings = (1 - len(best_blob) / len(fast_blob)) * 100
print(f" best saves {savings:+.1f}% over fast (at {best_ms - fast_ms:.0f} ms extra cost).")
print(f"\n=== Estimated wall-clock for full encode ===")
total_ms = 2 * avg + best_ms
print(f" Argon2id x 2 ({2*avg:.0f} ms) + compression ({best_ms:.0f} ms) "
f"~= {total_ms:.0f} ms")
print()
return True
[docs]
def perform_shamir_combine(args: argparse.Namespace) -> bool:
try:
args.destination = sink_safe_path(validate_user_path(args.destination))
args.image = [
sink_safe_path(
validate_user_path(p, kind="file", must_exist=True)
)
for p in args.image
]
if not os.path.isdir(args.destination):
os.makedirs(args.destination, exist_ok=True)
password = _resolve_password(args, confirm=False, label="Password")
keyfile_bytes = _read_keyfile(args.keyfile)
wants_yk = getattr(args, "yubikey", False) or getattr(args, "yubikey_response_file", None)
shares: List[bytes] = []
for stego_path in args.image:
yk_factory: Optional[Callable[[bytes], bytes]] = None
if wants_yk:
def yk_factory(challenge: bytes, _p=stego_path) -> bytes:
return _maybe_yubikey_response(args, challenge, required=True)
decrypted = extract_v2(
stego_path,
password,
keyfile_bytes,
yubikey_factory=yk_factory,
)
_filename, share_bytes = parse_payload(decrypted)
shares.append(share_bytes)
secret = combine_shares(shares)
output_name = os.path.basename(args.output or "recovered_secret.bin")
output_path = os.path.join(args.destination, output_name)
from stegx.safe_paths import ensure_under_base
ensure_under_base(output_path, args.destination)
with open(sink_safe_path(output_path), "wb") as f:
f.write(secret)
_print_ok(f"Combined {len(shares)} shares into {output_path}.")
return True
except (FileNotFoundError, ValueError, OSError) as e:
logging.error("Shamir combine failed: %s", e)
_print_err(f"Shamir combine failed: {e}")
return False
except Exception as e:
logging.exception("Unexpected Shamir-combine failure.")
_print_err(f"Unexpected error: {e}")
return False
def _add_common_embed_flags(p: argparse.ArgumentParser) -> None:
p.add_argument("-p", "--password", metavar="PASSWORD", default=None,
help="Password (discouraged -- leaks into shell history; default: interactive prompt).")
p.add_argument("--password-stdin", action="store_true",
help="Read the password from a single line of stdin.")
p.add_argument("--keyfile", metavar="PATH", default=None,
help="Optional keyfile mixed into the KDF input (acts as a second factor).")
p.add_argument("--yubikey", action="store_true",
help="Require a YubiKey HMAC-SHA1 challenge-response (slot 2) as a "
"second factor. Needs `pip install ykman`.")
p.add_argument("--yubikey-response-file", metavar="PATH", default=None,
help=argparse.SUPPRESS)
p.add_argument("--kdf", choices=("argon2id", "pbkdf2"), default="argon2id",
help="Password-based KDF (default: argon2id).")
p.add_argument("--dual-cipher", action="store_true",
help="Layer ChaCha20-Poly1305 over AES-256-GCM. Both keys "
"derive from the same password-master via HKDF, so "
"this protects only against a catastrophic break in "
"one of the two algorithms — not against a password "
"break. Incompatible with --fips (ChaCha is not FIPS).")
p.add_argument("--adaptive", action="store_true",
help="Embed only in high-edge-cost regions to resist CNN steganalysers.")
p.add_argument("--adaptive-cutoff", type=float, default=0.40,
help="Percentile cutoff for --adaptive (0-1, default 0.40).")
p.add_argument("--adaptive-mode", choices=("laplacian", "hill"), default="laplacian",
help="Cost map used by --adaptive: 'laplacian' (default, fast) "
"or 'hill' (Li et al. 2014, stronger against CNN steganalysers).")
p.add_argument("--matrix-embedding", action="store_true",
help="Use F5-style Hamming(7,3) matrix embedding for the ciphertext body.")
p.add_argument("--max-fill", type=float, default=25.0,
help="Refuse payloads that fill more than this percentage of capacity (default: 25%%).")
p.add_argument("--strict-password", action="store_true",
help="Reject (rather than warn on) passwords with zxcvbn score < 3.")
p.add_argument("--no-preserve-cover", action="store_true",
help="Do not mirror the cover's PNG encoder parameters on save.")
p.add_argument("--audit-log", metavar="PATH", default=None,
help="Append a hash-chained JSONL audit record for this operation "
"(timestamp, op, cover/stego SHA-256, flag-set, ok/fail). "
"Payload contents are never logged.")
p.add_argument("--no-compress", action="store_false", dest="compress", default=True,
help="Disable compression of the hidden file.")
p.add_argument("--compression", choices=("fast", "best"), default="best",
help="Compression profile: 'fast' = zlib only, 'best' (default) = try"
" zlib, lzma, bz2, zstd, brotli and pick the smallest output.")
p.add_argument("--always-split-cover", action="store_true",
help="Paranoia: always split the cover into two halves and fill the "
"decoy half with random bits even when no --decoy-file is set, "
"so an observer cannot detect decoy-mode usage. Halves cover "
"capacity permanently.")
p.add_argument("--fips", action="store_true",
help="Restrict to FIPS 140-validated primitives: PBKDF2-HMAC-SHA256, "
"AES-256-GCM, HKDF-SHA256, zlib-only compression. Refuses "
"Argon2id, ChaCha20-Poly1305, brotli, lzma, bz2, zstd, and "
"YubiKey HMAC-SHA1. Requires a FIPS-validated cryptography "
"backend at runtime.")
p.add_argument("--allow-outside-cwd", action="store_true",
help="Permit writing the stego output outside the current working "
"directory (disables the anti-traversal check on --output).")
def _build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
prog="stegx",
description=f"StegX v{__version__}: authenticated LSB steganography with Argon2id + AES-GCM.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=(
"Examples:\n"
" stegx encode -i cover.png -f secret.zip -o stego.png\n"
" stegx encode -i cover.png -f real.zip -o stego.png --decoy-file harmless.txt\n"
" stegx decode -i stego.png -d ./out\n"
" stegx shamir-split -k 3 -n 5 -f secret.bin -c c1.png c2.png c3.png c4.png c5.png -O shares/\n"
" stegx shamir-combine -i shares/stego_share_01.png shares/stego_share_02.png"
" shares/stego_share_03.png -d ./out -o recovered.bin\n"
),
)
parser.add_argument(
"-v", "--version",
action="version",
version=_build_version_string(),
help="Show the version banner, including which optional extras are available.",
)
parser.add_argument("--verbose", action="store_true", help="Enable debug logging.")
sub = parser.add_subparsers(dest="mode", required=True)
enc = sub.add_parser("encode", help="Hide a file in a cover image.")
enc.add_argument("-i", "--image", required=True, metavar="COVER")
enc.add_argument("-f", "--file", required=True, metavar="FILE", nargs="+",
help="One or more files to hide. Multiple files are "
"tarred up transparently; the decoder unpacks them.")
enc.add_argument("-o", "--output", required=True, metavar="OUTPUT_PNG")
enc.add_argument("--decoy-file", metavar="PATH", default=None,
help="Optional decoy payload for plausible deniability.")
enc.add_argument("--decoy-password", metavar="PASSWORD", default=None,
help="Password for the decoy (prompted if omitted and --decoy-file is set).")
enc.add_argument("--panic-password", metavar="PASSWORD", default=None,
help="Arm self-destruct: typing this password at decode time wipes "
"the real region's LSBs before reporting success/failure. "
"Mutually exclusive with --decoy-file.")
enc.add_argument("--panic-decoy", metavar="PATH", default=None,
help="Sacrificial payload returned after panic destruction "
"(omit to use silent mode — generic error, no output).")
enc.add_argument("--polyglot-zip", metavar="PATH", nargs="+", default=None,
help="After encoding, append a ZIP archive containing the listed "
"files to the stego PNG so it is simultaneously a valid PNG "
"(viewed normally) and a valid ZIP (extractable with unzip). "
"The ZIP is a public side-channel; the hidden StegX payload "
"is unaffected.")
_add_common_embed_flags(enc)
dec = sub.add_parser("decode", help="Extract a hidden file from a stego image.")
dec.add_argument("-i", "--image", required=True, metavar="STEGO")
dec.add_argument("-d", "--destination", metavar="OUT_DIR", default=None,
help="Directory to write the extracted file. Use '-' or --stdout "
"to write the payload to stdout instead.")
dec.add_argument("--stdout", action="store_true",
help="Write the decrypted payload to stdout instead of a file.")
dec.add_argument("-p", "--password", metavar="PASSWORD", default=None)
dec.add_argument("--password-stdin", action="store_true")
dec.add_argument("--keyfile", metavar="PATH", default=None)
dec.add_argument("--yubikey", action="store_true",
help="Supply a YubiKey HMAC-SHA1 response on the FLAG_YUBIKEY-bound payload.")
dec.add_argument("--yubikey-response-file", metavar="PATH", default=None,
help=argparse.SUPPRESS)
dec.add_argument("--audit-log", metavar="PATH", default=None,
help="Append a hash-chained audit record (see `encode --audit-log`).")
dec.add_argument("--allow-v1", action="store_true",
help="Allow decoding of legacy StegX v1 stego images. "
"v1 uses weaker PBKDF2 parameters and an always-on "
"pixel scan, so it is off by default to prevent a "
"downgrade attack and a CPU-timing side channel.")
spl = sub.add_parser("shamir-split",
help="Split a secret file into k-of-n stego shares.")
spl.add_argument("-k", type=_bounded_int(1, 255, name="-k"),
required=True, metavar="THRESHOLD")
spl.add_argument("-n", type=_bounded_int(1, 255, name="-n"),
required=True, metavar="TOTAL_SHARES")
spl.add_argument("-f", "--file", required=True, metavar="SECRET_FILE")
spl.add_argument("-c", "--cover", nargs="+", required=True, metavar="COVER_PNG",
help="n cover images, one per share.")
spl.add_argument("-O", "--out-dir", required=True, metavar="OUT_DIR",
help="Directory to write stego_share_XX.png files.")
_add_common_embed_flags(spl)
bench = sub.add_parser("benchmark",
help="Measure Argon2id / compression performance on this machine.")
bench.add_argument("--iterations", type=int, default=3, metavar="N",
help="Number of Argon2id samples to average (default: 3).")
bench.add_argument("--size-kib", type=int, default=64, metavar="K",
help="Sample-payload size in KiB for the compression benchmark (default: 64).")
bench.add_argument("--calibrate", action="store_true",
help="Run an Argon2id cost-sweep and recommend memory_cost_kib "
"that hits roughly --target-ms on this machine.")
bench.add_argument("--target-ms", type=int, default=500, metavar="MS",
help="Target Argon2id latency in milliseconds for --calibrate "
"(default: 500).")
rw = sub.add_parser("rewrap",
help="Rotate password / keyfile / yubikey on an existing stego "
"image without leaking plaintext to disk.")
rw.add_argument("-i", "--image", required=True, metavar="STEGO")
rw.add_argument("-o", "--output", metavar="OUT_PNG", default=None,
help="Write the rewrapped stego here (default: overwrite input).")
rw.add_argument("--old-keyfile", metavar="PATH", default=None,
help="Keyfile currently bound to the stego image.")
rw.add_argument("--keyfile", metavar="PATH", default=None,
help="Keyfile to bind on the new stego image (optional).")
rw.add_argument("--old-yubikey", action="store_true",
help="The stego is currently sealed with a YubiKey factor.")
rw.add_argument("--old-yubikey-response-file", metavar="PATH", default=None,
help=argparse.SUPPRESS)
rw.add_argument("--yubikey", action="store_true",
help="Seal the rewrapped stego with a YubiKey factor going forward.")
rw.add_argument("--yubikey-response-file", metavar="PATH", default=None,
help=argparse.SUPPRESS)
rw.add_argument("--kdf", choices=("argon2id", "pbkdf2"), default="argon2id",
help="KDF for the NEW wrapping (default: argon2id).")
rw.add_argument("--dual-cipher", action="store_true")
rw.add_argument("--adaptive", action="store_true")
rw.add_argument("--adaptive-cutoff", type=float, default=0.40)
rw.add_argument("--adaptive-mode", choices=("laplacian", "hill"), default="laplacian")
rw.add_argument("--matrix-embedding", action="store_true")
rw.add_argument("--max-fill", type=float, default=100.0)
rw.add_argument("--strict-password", action="store_true")
rw.add_argument("--no-preserve-cover", action="store_true")
rw.add_argument("--no-compress", action="store_false", dest="compress", default=True)
rw.add_argument("--compression", choices=("fast", "best"), default="best")
rw.add_argument("--always-split-cover", action="store_true")
rw.add_argument("--fips", action="store_true")
rw.add_argument("--audit-log", metavar="PATH", default=None)
pick = sub.add_parser("pick-cover",
help="Rank covers in a directory by capacity + entropy for a given payload.")
pick.add_argument("--dir", required=True, metavar="COVER_DIR",
help="Directory containing candidate cover images.")
pick.add_argument("--payload", metavar="FILE", default=None,
help="Payload file — size used to check capacity.")
pick.add_argument("--size", type=int, metavar="BYTES", default=None,
help="Payload size in bytes (use instead of --payload).")
pick.add_argument("--limit", type=int, default=20, metavar="N",
help="Show at most N candidates (default: 20).")
cmb = sub.add_parser("shamir-combine",
help="Recover a secret from k-or-more stego shares.")
cmb.add_argument("-i", "--image", nargs="+", required=True, metavar="STEGO_SHARE")
cmb.add_argument("-d", "--destination", required=True, metavar="OUT_DIR")
cmb.add_argument("-o", "--output", metavar="FILENAME", default=None,
help="Filename for the recovered secret (default: recovered_secret.bin).")
cmb.add_argument("-p", "--password", metavar="PASSWORD", default=None)
cmb.add_argument("--password-stdin", action="store_true")
cmb.add_argument("--keyfile", metavar="PATH", default=None)
cmb.add_argument("--yubikey", action="store_true")
cmb.add_argument("--yubikey-response-file", metavar="PATH", default=None,
help=argparse.SUPPRESS)
return parser
[docs]
def main(argv: Optional[List[str]] = None) -> None:
parser = _build_parser()
args = parser.parse_args(argv)
log_level = logging.DEBUG if args.verbose else logging.INFO
setup_logging(log_level)
logging.debug("StegX v%s started (mode=%s).", __version__, args.mode)
if args.mode == "encode":
ok = perform_encode(args)
elif args.mode == "decode":
ok = perform_decode(args)
elif args.mode == "shamir-split":
ok = perform_shamir_split(args)
elif args.mode == "shamir-combine":
ok = perform_shamir_combine(args)
elif args.mode == "benchmark":
ok = perform_benchmark(args)
elif args.mode == "pick-cover":
ok = perform_pick_cover(args)
elif args.mode == "rewrap":
ok = perform_rewrap(args)
else:
parser.error(f"Unknown mode: {args.mode}")
return
sys.exit(0 if ok else 1)
if __name__ == "__main__":
main()