Many beginner-to-intermediate Python programmers seriously underestimate Python’s power of juggling subprocesses. Using the subprocess and asyncio.subprocess modules can seriously strengthen and extend the power of “pure/depdendency-“free” standalone Python scripts, by delegating specialized work to more specialized tools and leaving Python as a ‘glue’ layer.” Let’s see a few examples.

Rclone and standard-library only Python are shockingly powerful together

The fact that Python suffers greatly from the blue-function-red-function problem contributes to the lack of good “filesystem-object-store interfaces” that would allow applications to interact with object stores and filesystems in a unified way. golang does have a unified way of dealing with synchronous and asynchronous execution and the power and unreasonable effectiveness of rclone partly stems from golang’s concurrency design. Rclone is a tremendously powerful tool and the ability to make standalone “zero-dependency” Python scripts that invoke Rclone expands the kinds of “file management/organization” tasks that “zero-dependency” Python scripts can handle. As an aside, I use “zero-dependency” in quotes, because of course we have the dependency of the program being invoked in the subprocess. However that dependency can be much easier to package and deploy. Consider the following “zero-dependency” script that I run regularly on my home-server to verify that my postgresql database backups work properly, atleast insofar as I can restore a postgresql database containerized via podman and inspect/verify some expected states. Here is a summary of what this script does.

$ ./verify_postgres_restore --help

usage: verify_postgres_restore [-h] -r RCLONE_SOURCE
    [--max-age MAX_AGE] -q QUERIES_DIR
    [--container-name CONTAINER_NAME] [--postgres-version POSTGRES_VERSION]
    [--postgres-password POSTGRES_PASSWORD] [--port PORT] [--max-wait MAX_WAIT]
    [-v]

PostgreSQL Backup Testing Tool

This script validates PostgreSQL backups by:
1. Taking an existing logical backup file (.sql.gz). This is pulled from a local
   or remote file or directory/prefix (newest file by modified time) using
   rclone. Therefore all of the rclone environment variables and configuration
   options apply to control how the backup file is retrieved. See the rclone
   documentation. E.x. https://rclone.org/docs/#environment-variables
2. Spinning up a temporary PostgreSQL container using podman
3. Restoring the backup
4. Running validation queries from SQL scripts organized by database
5. Ensuring all queries return 0 (success)

Query directory structure:
  queries/
  ├── giteadb/
  │   ├── test1.sql
  │   └── test2.sql
  ├── shioridb/
  │   └── test3.sql
  └── anotherdb/
      └── test4.sql

Each SQL script should return a single integer:
  - 0 indicates success/test passed
  - Any other value indicates failure

options:
  -h, --help            show this help message and exit
  -q, --queries-dir QUERIES_DIR
                        Directory containing database subdirectories with SQL
                        test scripts
  -v, --verbose         Enable verbose debug logging

Rclone options:
  Rclone options for retrieving the backup file

  -r, --rclone-source RCLONE_SOURCE
                        Can be a path to gzipped PostgreSQL backup file
                        (.sql.gz), a directory with backups (uses the most
                        recent by modified time), or
                        `rclone_profile:bucket/prefix/pg_backup.sql.gz` or a
                        prefix with many such backups like
                        `rclone_profile:bucket/prefix/`. This can also use an
                        rclone connection string. See
                        https://rclone.org/docs/#connection-strings for more
                        information. E.x. `:s3,env_auth:bucket/path` or
                        `:s3,env_auth,profile=my_profile_name_in_aws_config_file,region=us-east-2:mybucket/prefix/`
  --max-age MAX_AGE     Rclone max age. This is helpful because if no backups
                        were taken in, for example, the last 2 days, this
                        command will fail. This can help detect broken backup
                        procedures. Though if you don't have good alerting for
                        broken backup procedures, you probably don't have good
                        alerting for broken restore testing, let alone any
                        restore testing :) . Examples: `2d`, `1w`, `2025-01-01`.
                        See https://rclone.org/docs/#time-options for more
                        information.

container options:
  Options for configuring the PostgreSQL test container

  --container-name CONTAINER_NAME
                        Name for the test container (default: pg-backup-test)
  --postgres-version POSTGRES_VERSION
                        PostgreSQL version tag for container (default: 16)
  --postgres-password POSTGRES_PASSWORD
                        Password for PostgreSQL superuser (default: testpass123)
  --port PORT           Host port to map to container port 5432 (default: 5432)
  --max-wait MAX_WAIT   Maximum seconds to wait for PostgreSQL to be
                        ready (default: 30)

We will focus mainly on the part that involves retrieving the backup either from a local or remote source.

import logging
from pathlib import Path
import subprocess
from typing import Final

_RCLONE_MAX_TRANSFER_SIZE_REACHED_RC: Final[int] = 8
_RCLONE_NO_FILES_TRANSFERED_RC: Final[int] = 9
_RCLONE_SUCCESS_RC: Final[int] = 0

def _download_most_recent_postgres_backup_sqlgz_archive(
    rclone_source: str,
    workdir: Path,
    rclone_config: Path | None = None,
    max_age: str | None = None,
) -> Path:
    """Download the most recently modified '.sql.gz' file from the rclone source
    into `workdir` and return the Path, which is _guaranteed_ to be in
    `workdir`. `max_age` is used to error if there were no sufficiently recent
    backups (e.x. the backup process is broken). `rclone_source` _can_ refer to
    a local path, `profile_from_rclone_config:path` or an inline rclone
    connection string. Raises `FileNotFoundError` if not files matching the
    criterion were found, or `RuntimeError` if there are any other rclone errors
    (in which case, inspect the logs in stderr)"""
    _cmd = (
        "rclone",
        *(("--config", str(rclone_config)) if rclone_config else ()),
        "copy",
        *(("--max-age", str(max_age)) if max_age else ()),
        "--include",
        "**.sql.gz",
        "--check-first",
        "--order-by",
        "modtime,descending",
        # Max transfer of 1 byte after sorting and 'completing the sort before
        # downloading anything' collectively have the effect of downloading only
        # the 'most recently modified file
        "--max-transfer",
        "1B",
        "--cutoff-mode",
        "soft",
        "--transfers",
        "1",
        "--inplace",
        "--error-on-no-transfer",
        "--use-json-log",
        "-vv",
        rclone_source,
        str(workdir),
    )
    _LOGGER.info("Running cmd %s", shlex.join(_cmd))

    proc = subprocess.run(_cmd)
    rc = proc.returncode
    if rc == _RCLONE_NO_FILES_TRANSFERED_RC:
        _msg = f"No files found in {rclone_source}"
        raise FileNotFoundError(_msg)
    if rc not in (_RCLONE_MAX_TRANSFER_SIZE_REACHED_RC, _RCLONE_SUCCESS_RC):
        _msg = f"Rclone failed and exited with the unexpected code {rc}"
        raise RuntimeError(_msg)
    downloaded_file = next((x for x in workdir.rglob("**/*") if x != workdir), None)
    if not downloaded_file:
        _msg = f"No files downloaded to {workdir}??"
        raise FileNotFoundError(_msg)
    return downloaded_file

I can guarantee that every server that I manage will have rclone installed, so this script is guaranteed to work. One might argue that “if you can guarantee that rclone will be available on your servers, you can just as easily apt install python3-boto3 or something and use the Python AWS SDK.” Fair enough, but in my opinion, boto3 is atrocious. Moreover, packaging Python applications with actual library dependencies is almost always more complex that the statically-linked single binary that can represent golang programs. Also matching the performance and concurrency of rclone is very tough with boto3, especially when many files are involved. This single function handles retrieving the .sql.gz logical backup file from the local filesystem, SMB, NFS, S3, GCS, SFTP (if you retrieve backups from there for whatever reason), Google Drive (why are you doing this to yourself??), and dozens of more storage backends. It does so performantly, atleast as correctly as you are likely to accomplish in Python, and with a single command.

This script is only ~500 lines (depending on how one counts), which is not bad considering it retrieves the logical backup file from (basically) anywhere, spins up a database, runs SQL queries against that database, reports on the results, error handles, exposes a nice CLI, and does all of this in a maintainable way that humans (and LLMs) can easily grok/modify as needed. A fair amount of those lines are simply type annotations, comments, and parameter passing.

Of course, the script itself doesn’t know that rclone is installed in the way that a fully packaged Python application might be able to know confidently. A simple guard early on the in CLI’s execution as follows can generally make sure expectations are met.

import shutil

if not shutil.which("rclone"):
    raise FileNotFoundError("Rclone _must_ be installed to run this script")

Parallelizing Python With xargs

I manage a fair amount of parquet files and hive-partitioned parquet datasets at work and in my personal life.

Yes I know there are other options like delta tables or iceberg tables, but I want to give those maybe 3-5 more years to cook before I use them in my personal life for my self-hosted data lake.

Sometimes I need ‘bulk’ modifications of these datasets. For example, maybe I need to sort each file, or change the schema by casting some column(s) to some data type, or add a column to each file. I might write a Python script that can operate on one or more files and do the transformation so I have the option to manage parallelism within Python or outside of Python. I have done something like the following on many occasions.

find /path/to/parquet/dataset -print0 -type f -name '*.parquet' \
    xargs -0 -I{} -P8 sh -c \
    'python3 my_parquet_transformation_script.py -o /path/to/parquet/dataset {}'
Command explanation

The -print0 means to separate filenames with a “null terminator” which is a character that never appears in file names so that we can safely split on that in xargs while never worrying about “what if the filenames have a newline or some other weird character.” The -0 part in the xargs means to accept inputs from stdin delimited by null terminators.

So the find part means “find all of the files (as opposed to directories or symlinks) in /path/to/parquet/dataset that end with .parquet and stream them as they are found to stdout delimited by a null terminator.”

In the xargs, -P8 means to multiplex stdin to 8 workers so 8 commands can process at once. I might modify 8 depending on the memory requirements of the Python script and how much parallelism I can get while processing each file. -I{} means that {} in the Python script invocation will be replaced with the null-terminator-delimited input segment from stdin.

While using sh -c involves invoking extra subprocesses, I find it useful because I can easily replace it with echo as my “dry-run” and see the exact commands that will be executed before replacing echo with sh -c and running it for real.

This might be helpful because I explicitly want to keep the same file names and update them “inplace.” Moreover, imagine that my parquet transformation script uses the polars dataframe library or pyarrow . Those might be able to use 2-4 cores for processing a single file because parquet typically parallelizes across row groups and/or column chunks, while the actual writing back to disk is typically single-threaded. Processing many files in parallel is often required to actually use 80-90% of my CPU. Setting the xargs parallelism requires some care, as the underlying script has some parallelism already (though not as much as I would like) and just setting -P0 (as many parallel xargs tasks as my computer has CPU cores) is not optimal or might blow up RAM usage.

Since this is frequently a 1-off task, I will just run this in a virtual environment or slap a quick pyproject.toml on this and use uv run --spec /path/to/package .... At some point, I might make a slightly dedicated tool for these “keep the same rows and don’t really modify data, but do this slight isomorphism” kinds of operations, but for now I do them with dedicated scripts as needed. Sometimes Python libraries that release the GIL have some “gotchas” when used in multiprocessing contexts. Sometimes certain coreutils are simply faster than anything that is easily doable in the standard library of Python. find is one such example. Certain filesystem operations are shockingly slow in Python, especially over NFS or SMB-mounted filesystems.

Asynchronous Subprocess Juggling

Since spawning a subprocess already dispatches work outside the scope of the (C)Python interpreter, subprocesses can be “naturally” handled in an asynchronous way.

A common building-block might look something like the following.

import asyncio
import asyncio.subprocess

class _CommandResult(NamedTuple):
    """Result of a command execution."""

    returncode: int
    stdout: bytes
    stderr: bytes

async def do_stuff(cmd: list[str], timeout_seconds: float) -> _CommandResult
    process = await asyncio.create_subprocess_exec(
        *cmd,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
    )
    try:
        stdout, stderr = await asyncio.wait_for(
            process.communicate(),
            timeout=timeout_seconds,
        )
    except asyncio.TimeoutError:
        process.kill()
        await process.wait()
        raise
    if process.returncode is None:
        _msg = f"Why is returncode not set? {_cmd_str=}"
        raise RuntimeError(_msg)
    return _CommandResult(
        returncode=process.returncode,
        stdout=stdout if capture_output else b"",
        stderr=stderr if capture_output else b"",
    )

The core CLI entrypoint might look something like the following.

import asyncio
import sys
from collections.abc import Sequence
import logging

_LOGGER = logging.getLogger(__name__)

async def async_main(argv: Sequence[str] | None = None) -> int:
    """
    Async main entry point for the CLI.

    Returns:
        Exit code (0 for success, 1 for failure)
    """
    _opts = _parse_arguments(argv if argv is not None else sys.argv[1:])

    logging.basicConfig(
        level=logging.DEBUG if _opts.verbose else logging.INFO,
        format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
    )

    try:
        return await _actual_core_logic(opts=_opts)
    except Exception as e:
        _LOGGER.exception("Test execution failed")
        return 1


def main(argv: Sequence[str] | None = None) -> int:
    """
    Main entry point for the CLI.

    Returns:
        Exit code (0 for success, 1 for failure)
    """
    return asyncio.run(async_main(argv))


if __name__ == "__main__":
    sys.exit(main())

Note the one invocation of asyncio.run to bridge synchronous ans asynchronous execution as close to the script entrypoint as possible. Imagine in our core logic, we juggle many invocations of rclone or even rsync, possibly reaching different remote file stores. We might also have a long running task and want to periodically report on its progress, especially if the tool doesn’t natively support progress reporting. Asynchronous programming helps greatly with this.

import time

async def poll_long_running_task_and_log_progress(
    cmd_args: tuple[str, ...],
    poll_interval_seconds: float,
) -> None:
    cmd = ("long-running-cli", *cmd_args)
    process = await asyncio.create_subprocess_exec(
        *cmd,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
    )
    while True:
        try:
            stdout, stderr = await asyncio.wait_for(
                process.communicate(),
                timeout=poll_interval_seconds,
            )
        except asyncio.TimeoutError:
            await _report_on_progress(["cmd", "args"])

I almost always find myself wrapping asyncio.subprocess.create_subprocess_exec in a context manager so that the caller can decide on “entire-command-jugging” timeouts and operate with the expectation that “the function is in charge of spawning and cleaning up the process.”

There aren’t a huge number of situations that require juggling many subprocesses, but Python’s ability to do that asynchronously can really come in handy when that is what I actually want to do. In many cases, we can modify the underlying tool invoked by the Python subprocess to behave more as we want and doing so is generally preferable. For example, if one wants to search for files in many filesystem locations, a more dedicated tools like fd will likely do the job better than jugging many invocations of the more standard core utility find . However modifying existing tools or using other tools designed for concurrency and/or parallelism isn’t always viable. In those situations, Python can do some seriously heavy lifting – or at the very least coordinate 2+ workers that actually do the heavy lifting.