Many beginner-to-intermediate Python programmers seriously underestimate
Python’s power of juggling subprocesses. Using the
subprocess
and
asyncio.subprocess
modules can seriously strengthen and extend the power of
“pure/depdendency-“free” standalone Python scripts, by delegating specialized
work to more specialized tools and leaving Python as a ‘glue’ layer.” Let’s see
a few examples.
Rclone and standard-library only Python are shockingly powerful together
The fact that Python suffers greatly from the
blue-function-red-function
problem contributes to the lack of good “filesystem-object-store interfaces”
that would allow applications to interact with object stores and filesystems in
a unified way. golang does have a unified way of dealing with synchronous and
asynchronous execution and the power and unreasonable effectiveness of
rclone
partly stems from golang’s concurrency design.
Rclone is a tremendously powerful tool and the ability to make standalone
“zero-dependency” Python scripts that invoke Rclone expands the kinds of “file
management/organization” tasks that “zero-dependency” Python scripts can handle.
As an aside, I use “zero-dependency” in quotes, because of course we have the
dependency of the program being invoked in the subprocess. However that
dependency can be much easier to package and deploy. Consider the following
“zero-dependency” script that I run regularly on my home-server to verify that
my postgresql database backups work properly, atleast insofar as I can restore a
postgresql database containerized via podman and inspect/verify some expected
states. Here is a summary of what this script does.
$ ./verify_postgres_restore --help
usage: verify_postgres_restore [-h] -r RCLONE_SOURCE
[--max-age MAX_AGE] -q QUERIES_DIR
[--container-name CONTAINER_NAME] [--postgres-version POSTGRES_VERSION]
[--postgres-password POSTGRES_PASSWORD] [--port PORT] [--max-wait MAX_WAIT]
[-v]
PostgreSQL Backup Testing Tool
This script validates PostgreSQL backups by:
1. Taking an existing logical backup file (.sql.gz). This is pulled from a local
or remote file or directory/prefix (newest file by modified time) using
rclone. Therefore all of the rclone environment variables and configuration
options apply to control how the backup file is retrieved. See the rclone
documentation. E.x. https://rclone.org/docs/#environment-variables
2. Spinning up a temporary PostgreSQL container using podman
3. Restoring the backup
4. Running validation queries from SQL scripts organized by database
5. Ensuring all queries return 0 (success)
Query directory structure:
queries/
├── giteadb/
│ ├── test1.sql
│ └── test2.sql
├── shioridb/
│ └── test3.sql
└── anotherdb/
└── test4.sql
Each SQL script should return a single integer:
- 0 indicates success/test passed
- Any other value indicates failure
options:
-h, --help show this help message and exit
-q, --queries-dir QUERIES_DIR
Directory containing database subdirectories with SQL
test scripts
-v, --verbose Enable verbose debug logging
Rclone options:
Rclone options for retrieving the backup file
-r, --rclone-source RCLONE_SOURCE
Can be a path to gzipped PostgreSQL backup file
(.sql.gz), a directory with backups (uses the most
recent by modified time), or
`rclone_profile:bucket/prefix/pg_backup.sql.gz` or a
prefix with many such backups like
`rclone_profile:bucket/prefix/`. This can also use an
rclone connection string. See
https://rclone.org/docs/#connection-strings for more
information. E.x. `:s3,env_auth:bucket/path` or
`:s3,env_auth,profile=my_profile_name_in_aws_config_file,region=us-east-2:mybucket/prefix/`
--max-age MAX_AGE Rclone max age. This is helpful because if no backups
were taken in, for example, the last 2 days, this
command will fail. This can help detect broken backup
procedures. Though if you don't have good alerting for
broken backup procedures, you probably don't have good
alerting for broken restore testing, let alone any
restore testing :) . Examples: `2d`, `1w`, `2025-01-01`.
See https://rclone.org/docs/#time-options for more
information.
container options:
Options for configuring the PostgreSQL test container
--container-name CONTAINER_NAME
Name for the test container (default: pg-backup-test)
--postgres-version POSTGRES_VERSION
PostgreSQL version tag for container (default: 16)
--postgres-password POSTGRES_PASSWORD
Password for PostgreSQL superuser (default: testpass123)
--port PORT Host port to map to container port 5432 (default: 5432)
--max-wait MAX_WAIT Maximum seconds to wait for PostgreSQL to be
ready (default: 30)
We will focus mainly on the part that involves retrieving the backup either from a local or remote source.
import logging
from pathlib import Path
import subprocess
from typing import Final
_RCLONE_MAX_TRANSFER_SIZE_REACHED_RC: Final[int] = 8
_RCLONE_NO_FILES_TRANSFERED_RC: Final[int] = 9
_RCLONE_SUCCESS_RC: Final[int] = 0
def _download_most_recent_postgres_backup_sqlgz_archive(
rclone_source: str,
workdir: Path,
rclone_config: Path | None = None,
max_age: str | None = None,
) -> Path:
"""Download the most recently modified '.sql.gz' file from the rclone source
into `workdir` and return the Path, which is _guaranteed_ to be in
`workdir`. `max_age` is used to error if there were no sufficiently recent
backups (e.x. the backup process is broken). `rclone_source` _can_ refer to
a local path, `profile_from_rclone_config:path` or an inline rclone
connection string. Raises `FileNotFoundError` if not files matching the
criterion were found, or `RuntimeError` if there are any other rclone errors
(in which case, inspect the logs in stderr)"""
_cmd = (
"rclone",
*(("--config", str(rclone_config)) if rclone_config else ()),
"copy",
*(("--max-age", str(max_age)) if max_age else ()),
"--include",
"**.sql.gz",
"--check-first",
"--order-by",
"modtime,descending",
# Max transfer of 1 byte after sorting and 'completing the sort before
# downloading anything' collectively have the effect of downloading only
# the 'most recently modified file
"--max-transfer",
"1B",
"--cutoff-mode",
"soft",
"--transfers",
"1",
"--inplace",
"--error-on-no-transfer",
"--use-json-log",
"-vv",
rclone_source,
str(workdir),
)
_LOGGER.info("Running cmd %s", shlex.join(_cmd))
proc = subprocess.run(_cmd)
rc = proc.returncode
if rc == _RCLONE_NO_FILES_TRANSFERED_RC:
_msg = f"No files found in {rclone_source}"
raise FileNotFoundError(_msg)
if rc not in (_RCLONE_MAX_TRANSFER_SIZE_REACHED_RC, _RCLONE_SUCCESS_RC):
_msg = f"Rclone failed and exited with the unexpected code {rc}"
raise RuntimeError(_msg)
downloaded_file = next((x for x in workdir.rglob("**/*") if x != workdir), None)
if not downloaded_file:
_msg = f"No files downloaded to {workdir}??"
raise FileNotFoundError(_msg)
return downloaded_file
I can guarantee that every server that I manage will have rclone installed,
so this script is guaranteed to work. One might argue that “if you can guarantee
that rclone will be available on your servers, you can just as easily apt install python3-boto3 or something and use the Python AWS SDK.” Fair enough,
but in my opinion, boto3 is atrocious. Moreover, packaging Python
applications with actual library dependencies is almost always more complex that
the statically-linked single binary that can represent golang programs. Also
matching the performance and concurrency of rclone is very tough with boto3,
especially when many files are involved. This single function handles retrieving
the .sql.gz logical backup file from the local filesystem, SMB, NFS, S3, GCS,
SFTP (if you retrieve backups from there for whatever reason), Google Drive (why
are you doing this to yourself??), and dozens of more storage backends. It does
so performantly, atleast as correctly as you are likely to accomplish in Python,
and with a single command.
This script is only ~500 lines (depending on how one counts), which is not bad considering it retrieves the logical backup file from (basically) anywhere, spins up a database, runs SQL queries against that database, reports on the results, error handles, exposes a nice CLI, and does all of this in a maintainable way that humans (and LLMs) can easily grok/modify as needed. A fair amount of those lines are simply type annotations, comments, and parameter passing.
Of course, the script itself doesn’t know that rclone is installed in the
way that a fully packaged Python application might be able to know
confidently. A simple guard early on the in CLI’s execution as follows can
generally make sure expectations are met.
import shutil
if not shutil.which("rclone"):
raise FileNotFoundError("Rclone _must_ be installed to run this script")
Parallelizing Python With xargs
I manage a fair amount of parquet files and hive-partitioned parquet datasets at work and in my personal life.
Sometimes I need ‘bulk’ modifications of these datasets. For example, maybe I need to sort each file, or change the schema by casting some column(s) to some data type, or add a column to each file. I might write a Python script that can operate on one or more files and do the transformation so I have the option to manage parallelism within Python or outside of Python. I have done something like the following on many occasions.
find /path/to/parquet/dataset -print0 -type f -name '*.parquet' \
xargs -0 -I{} -P8 sh -c \
'python3 my_parquet_transformation_script.py -o /path/to/parquet/dataset {}'
Command explanation
The -print0 means to separate filenames with a “null terminator” which is a
character that never appears in file names so that we can safely split on
that in xargs while never worrying about “what if the filenames have a
newline or some other weird character.” The -0 part in the xargs means
to accept inputs from stdin delimited by null terminators.
So the find part means “find all of the files (as opposed to directories
or symlinks) in
/path/to/parquet/dataset that end with .parquet and stream them as they
are found to stdout delimited by a null terminator.”
In the xargs, -P8 means to multiplex stdin to 8 workers so 8 commands
can process at once. I might modify 8 depending on the memory requirements
of the Python script and how much parallelism I can get while processing
each file. -I{} means that {} in the Python script invocation will be
replaced with the null-terminator-delimited input segment from stdin.
While using sh -c involves invoking extra subprocesses, I find it useful
because I can easily replace it with echo as my “dry-run” and see the
exact commands that will be executed before replacing echo with sh -c
and running it for real.
This might be helpful because I explicitly want to keep the same file names and
update them “inplace.” Moreover, imagine that my parquet transformation script
uses the polars
dataframe
library or pyarrow
. Those might be
able to use 2-4 cores for processing a single file because parquet typically
parallelizes across row groups and/or column chunks, while the actual writing
back to disk is typically single-threaded. Processing many files in parallel is
often required to actually use 80-90% of my CPU. Setting the xargs parallelism
requires some care, as the underlying script has some parallelism already
(though not as much as I would like) and just setting -P0 (as many parallel
xargs tasks as my computer has CPU cores) is not optimal or might blow up RAM
usage.
Since this is frequently a 1-off task, I will just run this in a virtual
environment or slap a quick pyproject.toml on this and use uv run --spec /path/to/package .... At some point, I might make a slightly dedicated tool for
these “keep the same rows and don’t really modify data, but do this slight
isomorphism” kinds of operations, but for now I do them with dedicated scripts
as needed. Sometimes Python libraries that release the GIL have some “gotchas”
when used in multiprocessing contexts. Sometimes certain coreutils are simply
faster than anything that is easily doable in the standard library of Python.
find is one such example. Certain filesystem operations are shockingly slow in
Python, especially over NFS or SMB-mounted filesystems.
Asynchronous Subprocess Juggling
Since spawning a subprocess already dispatches work outside the scope of the (C)Python interpreter, subprocesses can be “naturally” handled in an asynchronous way.
A common building-block might look something like the following.
import asyncio
import asyncio.subprocess
class _CommandResult(NamedTuple):
"""Result of a command execution."""
returncode: int
stdout: bytes
stderr: bytes
async def do_stuff(cmd: list[str], timeout_seconds: float) -> _CommandResult
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
stdout, stderr = await asyncio.wait_for(
process.communicate(),
timeout=timeout_seconds,
)
except asyncio.TimeoutError:
process.kill()
await process.wait()
raise
if process.returncode is None:
_msg = f"Why is returncode not set? {_cmd_str=}"
raise RuntimeError(_msg)
return _CommandResult(
returncode=process.returncode,
stdout=stdout if capture_output else b"",
stderr=stderr if capture_output else b"",
)
The core CLI entrypoint might look something like the following.
import asyncio
import sys
from collections.abc import Sequence
import logging
_LOGGER = logging.getLogger(__name__)
async def async_main(argv: Sequence[str] | None = None) -> int:
"""
Async main entry point for the CLI.
Returns:
Exit code (0 for success, 1 for failure)
"""
_opts = _parse_arguments(argv if argv is not None else sys.argv[1:])
logging.basicConfig(
level=logging.DEBUG if _opts.verbose else logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
try:
return await _actual_core_logic(opts=_opts)
except Exception as e:
_LOGGER.exception("Test execution failed")
return 1
def main(argv: Sequence[str] | None = None) -> int:
"""
Main entry point for the CLI.
Returns:
Exit code (0 for success, 1 for failure)
"""
return asyncio.run(async_main(argv))
if __name__ == "__main__":
sys.exit(main())
Note the one invocation of
asyncio.run
to bridge synchronous ans asynchronous execution as close to the script
entrypoint as possible. Imagine in our core logic, we juggle many invocations of
rclone or even rsync, possibly reaching different remote file stores. We
might also have a long running task and want to periodically report on its
progress, especially if the tool doesn’t natively support progress reporting.
Asynchronous programming helps greatly with this.
import time
async def poll_long_running_task_and_log_progress(
cmd_args: tuple[str, ...],
poll_interval_seconds: float,
) -> None:
cmd = ("long-running-cli", *cmd_args)
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
while True:
try:
stdout, stderr = await asyncio.wait_for(
process.communicate(),
timeout=poll_interval_seconds,
)
except asyncio.TimeoutError:
await _report_on_progress(["cmd", "args"])
I almost always find myself wrapping asyncio.subprocess.create_subprocess_exec
in a context manager so that the caller can decide on “entire-command-jugging”
timeouts and operate with the expectation that “the function is in charge of
spawning and cleaning up the process.”
There aren’t a huge number of situations that require juggling many
subprocesses, but Python’s ability to do that asynchronously can really come in
handy when that is what I actually want to do. In many cases, we can modify the
underlying tool invoked by the Python subprocess to behave more as we want and
doing so is generally preferable. For example, if one wants to search for files
in many filesystem locations, a more dedicated tools like
fd
will likely do the job better than jugging
many invocations of the more standard core utility
find
. However modifying
existing tools or using other tools designed for concurrency and/or parallelism
isn’t always viable. In those situations, Python can do some seriously heavy
lifting – or at the very least coordinate 2+ workers that actually do the heavy
lifting.