# -*- coding: utf-8 -*-
#
# Copyright (C) 2021-2026 by The Linux Foundation
# SPDX-License-Identifier: MIT-0
#
__author__ = 'Konstantin Ryabitsev <konstantin@linuxfoundation.org>'
import sys
import os
import re
import argparse
import hashlib
import base64
import subprocess
import logging
import tempfile
import time
import datetime
import warnings
import urllib.parse
import email.utils
import email.header
from pathlib import Path
from typing import Optional, List, Tuple, Dict, Union, Any
from io import BytesIO
GitConfigType = Dict[str, Union[str, List[str]]]
logger: logging.Logger = logging.getLogger(__name__)
# Overridable via [patatt] parameters
GPGBIN: Optional[str] = None
SSHKBIN: Optional[str] = None
# Hardcoded defaults
DEVSIG_HDR = b'X-Developer-Signature'
DEVKEY_HDR = b'X-Developer-Key'
# Result and severity levels
RES_VALID = 0
RES_NOSIG = 4
RES_NOKEY = 8
RES_ERROR = 16
RES_BADSIG = 32
REQ_HDRS: List[bytes] = [b'from', b'subject']
OPT_HDRS: List[bytes] = [b'message-id']
# Quick cache for key info
KEYCACHE: Dict[Union[str, bytes], Any] = dict()
# Quick cache for config settings
CONFIGCACHE: Dict[str, GitConfigType] = dict()
# My version
__VERSION__ = '0.7.0'
MAX_SUPPORTED_FORMAT_VERSION = 1
[docs]
class Error(Exception):
"""Base exception for patatt errors.
Args:
message: Error description.
errors: Optional list of detailed error messages.
"""
errors: Optional[List[str]]
[docs]
def __init__(self, message: str, errors: Optional[List[str]] = None):
super().__init__(message)
self.errors = errors
def __str__(self) -> str:
s = super().__str__()
if self.errors:
s = '%s: (%s)' % (s, ', '.join(self.errors))
return s
[docs]
class SigningError(Error):
"""Raised when message signing fails."""
[docs]
class ConfigurationError(Error):
"""Raised when configuration is invalid or missing."""
[docs]
class ValidationError(Error):
"""Raised when signature validation fails."""
[docs]
class NoKeyError(ValidationError):
"""Raised when the public key for validation cannot be found."""
[docs]
class BodyValidationError(ValidationError):
"""Raised when the message body hash does not match the signature."""
[docs]
class PatattMessage:
"""RFC2822 email message with patch attestation support.
Represents an email message that can be signed and validated using
DKIM-like developer signatures. Uses git-mailinfo for canonicalization
to ensure consistent signatures regardless of mail client formatting.
Args:
msgdata: Raw message bytes in RFC2822 format.
"""
headers: List[bytes]
body: bytes
lf: bytes
signed: bool
canon_headers: Optional[List[bytes]]
canon_body: Optional[bytes]
canon_identity: Optional[str]
sigs: Optional[List[DevsigHeader]]
[docs]
def __init__(self, msgdata: bytes):
self.headers = list()
self.body = b''
self.lf = b'\n'
self.signed = False
self.canon_headers = None
self.canon_body = None
self.canon_identity = None
self.sigs = None
self.load_from_bytes(msgdata)
[docs]
def git_canonicalize(self) -> None:
"""Canonicalize the message using git-mailinfo.
Normalizes headers and body for consistent signing/validation.
Results are cached in canon_headers, canon_body, and canon_identity.
"""
if self.canon_body is not None:
return
# Generate a new payload using m and p and canonicalize with \r\n endings,
# trimming any excess blank lines ("simple" DKIM canonicalization).
m, p, i = PatattMessage._get_git_mailinfo(b''.join(self.headers) + self.lf + self.body)
self.canon_body = b''
for line in re.sub(rb'[\r\n]*$', b'', m + p).split(b'\n'):
self.canon_body += re.sub(rb'[\r\n]*$', b'', line) + b'\r\n'
idata = dict()
for line in re.sub(rb'[\r\n]*$', b'', i).split(b'\n'):
left, right = line.split(b':', 1)
idata[left.lower()] = right.strip()
# Theoretically, we should always see an "Email" line
self.canon_identity = idata.get(b'email', b'').decode()
# Now substituting headers returned by mailinfo
self.canon_headers = list()
for header in self.headers:
try:
left, right = header.split(b':', 1)
lleft = left.lower()
if lleft == b'from':
right = b' ' + idata.get(b'author', b'') + b' <' + idata.get(b'email', b'') + b'>'
elif lleft == b'subject':
right = b' ' + idata.get(b'subject', b'')
self.canon_headers.append(left + b':' + right)
except ValueError:
self.canon_headers.append(header)
[docs]
def sign(self, algo: str, keyinfo: Union[str, bytes], identity: Optional[str], selector: Optional[str]) -> None:
"""Sign the message and add signature headers.
Args:
algo: Signing algorithm ('ed25519', 'openpgp', or 'openssh').
keyinfo: Private key data or identifier.
identity: Signer identity (email). If None, uses canon_identity.
selector: Key selector for keyring lookup.
Raises:
SigningError: If signing fails or message cannot be canonicalized.
"""
# Remove any devsig headers
for header in list(self.headers):
if header.startswith(DEVSIG_HDR) or header.startswith(DEVKEY_HDR):
self.headers.remove(header)
self.git_canonicalize()
if not self.canon_headers or not self.canon_body:
# Should never happen
raise SigningError('Message could not be git-canonicalized')
ds = DevsigHeader()
ds.set_headers(self.canon_headers, mode='sign')
ds.set_body(self.canon_body)
ds.set_field('l', str(len(self.canon_body)))
if not identity:
if not self.canon_identity:
raise SigningError('No identity provided and no canonical identity available')
identity = self.canon_identity
ds.set_field('i', identity)
if selector:
ds.set_field('s', selector)
if algo not in ('ed25519', 'openpgp', 'openssh'):
raise SigningError('Unsupported algorithm: %s' % algo)
ds.set_field('a', '%s-sha256' % algo)
if algo in ('ed25519', 'openssh'):
# Set signing time for non-pgp sigs
ds.set_field('t', str(int(time.time())))
hv, pkinfo = ds.sign(keyinfo)
dshdr = email.header.make_header([(DEVSIG_HDR + b': ' + hv, 'us-ascii')], maxlinelen=78)
self.headers.append(dshdr.encode().encode() + self.lf)
# Make informational header about the key used
idata = [
b'i=%s' % identity.encode(),
b'a=%s' % algo.encode(),
]
if algo == 'openpgp':
idata.append(b'fpr=%s' % pkinfo)
elif algo == 'openssh':
idata.append(b'fpr=%s' % pkinfo)
else:
idata.append(b'pk=%s' % pkinfo)
dkhdr = email.header.make_header([(DEVKEY_HDR + b': ' + b'; '.join(idata), 'us-ascii')], maxlinelen=78)
self.headers.append(dkhdr.encode().encode() + self.lf)
[docs]
def validate(self, identity: str, pkey: Union[bytes, str, None], trim_body: bool = False) -> Tuple[str, str]:
"""Validate the signature for a specific identity.
Args:
identity: The signer identity (email) to validate.
pkey: Public key data for validation. If None, signature is
checked against embedded key data (if available).
trim_body: If True, trim body to length specified in signature.
Returns:
Tuple of (signature_algorithm, public_key_info).
Raises:
ValidationError: If no matching signature or validation fails.
"""
vds = None
if not self.sigs:
raise ValidationError('No signatures found in message')
for ds in self.sigs:
if ds.get_field_as_str('i') == identity:
vds = ds
break
if vds is None:
raise ValidationError('No signatures matching identity %s' % identity)
self.git_canonicalize()
if not self.canon_headers or not self.canon_body:
raise ValidationError('Message must be git-canonicalized before validating')
vds.set_headers(self.canon_headers, mode='validate')
if trim_body:
lfield = vds.get_field_as_bytes('l')
if lfield:
try:
maxlen = int(lfield)
vds.set_body(self.canon_body, maxlen=maxlen)
except ValueError:
vds.set_body(self.canon_body)
else:
vds.set_body(self.canon_body)
return vds.validate(pkey)
[docs]
def as_bytes(self) -> bytes:
"""Return the message as bytes, including any signature headers."""
return b''.join(self.headers) + self.lf + self.body
[docs]
def as_string(self, encoding: str = 'utf-8') -> str:
"""Return the message as a string.
Args:
encoding: Character encoding to use. Defaults to 'utf-8'.
"""
return self.as_bytes().decode(encoding)
[docs]
def load_from_bytes(self, msgdata: bytes) -> None:
"""Parse message data and populate headers and body.
Args:
msgdata: Raw RFC2822 message bytes.
Raises:
RuntimeError: If the data is not a valid RFC2822 message.
"""
# We use simplest parsing -- using Python's email module would be overkill
ldshn = DEVSIG_HDR.lower()
with BytesIO(msgdata) as fh:
while True:
line = fh.readline()
if not len(line):
break
if not len(line.strip()):
self.lf = line
self.body = fh.read()
break
# is it a wrapped header?
if line[0] in ("\x09", "\x20", 0x09, 0x20):
if not len(self.headers):
raise RuntimeError('Not a valid RFC2822 message')
# attach it to the previous header
self.headers[-1] += line
continue
# Is it a signature header?
if line.lower().startswith(ldshn):
self.signed = True
self.headers.append(line)
if not len(self.headers) or not len(self.body):
raise RuntimeError('Not a valid RFC2822 message')
[docs]
def get_sigs(self) -> List[DevsigHeader]:
"""Extract and return all signature headers from the message.
Returns:
List of DevsigHeader objects parsed from X-Developer-Signature
headers. Results are cached after first call.
Raises:
RuntimeError: If headers cannot be parsed.
"""
if self.sigs is not None:
return self.sigs
ldshn = DEVSIG_HDR.lower()
self.sigs = list()
from_id = None
for header in self.headers:
try:
left, right = header.split(b':', 1)
hn = left.strip().lower()
hv = right
if hn == ldshn:
self.sigs.append(DevsigHeader(hv))
elif hn == b'from':
parts = email.utils.parseaddr(hv.decode().strip())
from_id = parts[1]
except ValueError:
raise RuntimeError('Error parsing headers')
if from_id:
for ds in self.sigs:
if 'i' not in ds.hdata:
ds.set_field('i', from_id)
return self.sigs
@staticmethod
def _get_git_mailinfo(payload: bytes) -> Tuple[bytes, bytes, bytes]:
with tempfile.TemporaryDirectory(suffix='.git-mailinfo') as td:
mf = os.path.join(td, 'm')
pf = os.path.join(td, 'p')
cmdargs = ['git', 'mailinfo', '--encoding=utf-8', '--no-scissors', mf, pf]
# normalize line endings in payload to single lf for consistent results
# from git-mailinfo.
ecode, i, err = _run_command(cmdargs, stdin=payload.replace(b'\r\n', b'\n'))
if ecode > 0:
logger.debug('FAILED : Failed running git-mailinfo:')
logger.debug(err.decode())
raise RuntimeError('Failed to run git-mailinfo: %s' % err.decode())
with open(mf, 'rb') as mfh:
m = mfh.read()
with open(pf, 'rb') as pfh:
p = pfh.read()
return m, p, i
[docs]
def get_data_dir() -> Path:
"""Get the patatt data directory, creating it if necessary.
Returns:
Path to $XDG_DATA_HOME/patatt or ~/.local/share/patatt.
"""
if 'XDG_DATA_HOME' in os.environ:
datahome = Path(os.environ['XDG_DATA_HOME'])
else:
datahome = Path.home() / '.local' / 'share'
datadir = datahome / 'patatt'
datadir.mkdir(parents=True, exist_ok=True)
return datadir
def _run_command(cmdargs: List[str],
stdin: Optional[bytes] = None,
env: Optional[Dict[str, str]] = None) -> Tuple[int, bytes, bytes]:
logger.debug('Running %s', ' '.join(cmdargs))
cp = subprocess.run(cmdargs, input=stdin, env=env, capture_output=True, text=False)
logger.debug('Completed %s', repr(cp))
return cp.returncode, cp.stdout, cp.stderr
def git_run_command(gitdir: Optional[str],
args: List[str],
stdin: Optional[bytes] = None,
env: Optional[Dict[str, str]] = None) -> Tuple[int, bytes, bytes]:
if gitdir:
args = ['git', '--git-dir', gitdir, '--no-pager'] + args
else:
args = ['git', '--no-pager'] + args
return _run_command(args, stdin=stdin, env=env)
def get_config_from_git(regexp: str,
section: Optional[str] = None,
defaults: Optional[Dict[str, Union[str, List[str]]]] = None,
multivals: Optional[List[str]] = None) -> GitConfigType:
if multivals is None:
multivals = list()
args = ['config', '-z', '--get-regexp', regexp]
_, bout, _ = git_run_command(None, args)
if defaults is None:
defaults = dict()
if not len(bout):
return defaults
gitconfig = defaults
out = bout.decode()
for line in out.split('\x00'):
if not line:
continue
key, value = line.split('\n', 1)
try:
chunks = key.split('.')
# Drop the starting part
chunks.pop(0)
cfgkey = chunks.pop(-1).lower()
if len(chunks):
if not section:
# Ignore it
continue
# We're in a subsection
sname = '.'.join(chunks)
if sname != section:
# Not our section
continue
elif section:
# We want config from a subsection specifically
continue
if cfgkey in multivals:
if cfgkey not in gitconfig:
gitconfig[cfgkey] = list()
elif isinstance(gitconfig[cfgkey], str):
gitconfig[cfgkey] = [gitconfig[cfgkey]] # type: ignore[list-item]
else:
gitconfig[cfgkey] = list()
# We've made sure this is a list
gitconfig[cfgkey].append(value) # type: ignore[union-attr]
else:
gitconfig[cfgkey] = value
except ValueError:
logger.debug('Ignoring git config entry %s', line)
return gitconfig
def gpg_run_command(cmdargs: List[str], stdin: Optional[bytes] = None) -> Tuple[int, bytes, bytes]:
gpgbin, _ = set_bin_paths(None)
cmdargs = [gpgbin, '--batch', '--no-auto-key-retrieve', '--no-auto-check-trustdb'] + cmdargs
return _run_command(cmdargs, stdin)
def sshk_run_command(cmdargs: List[str], stdin: Optional[bytes] = None) -> Tuple[int, bytes, bytes]:
_, sshkbin = set_bin_paths(None)
cmdargs = [sshkbin] + cmdargs
return _run_command(cmdargs, stdin)
def get_git_toplevel(gitdir: Optional[str] = None) -> str:
cmdargs = ['git']
if gitdir:
cmdargs += ['--git-dir', gitdir]
cmdargs += ['rev-parse', '--show-toplevel']
ecode, out, err = _run_command(cmdargs)
if ecode == 0:
return out.decode().strip()
return ''
[docs]
def make_pkey_path(keytype: str, identity: str, selector: str) -> Path:
"""Construct the standard keyring path for a public key.
Args:
keytype: Key algorithm type ('ed25519', 'openpgp', 'openssh').
identity: Signer identity in email format (local@domain).
selector: Key selector for distinguishing multiple keys.
Returns:
Path in format: keytype/domain/local/selector
Raises:
ValidationError: If identity is not in valid email format.
"""
chunks = identity.split('@', 1)
if len(chunks) != 2:
raise ValidationError('identity must include both local and domain parts')
local = chunks[0].lower()
domain = chunks[1].lower()
selector = selector.lower()
# urlencode all potentially untrusted bits to make sure nobody tries path-based badness
return Path(urllib.parse.quote_plus(keytype), urllib.parse.quote_plus(domain),
urllib.parse.quote_plus(local), urllib.parse.quote_plus(selector))
[docs]
def make_byhash_path(keytype: str, identity: str, selector: str) -> Path:
"""Construct a privacy-preserving by-hash keyring path.
Computes SHA256 of the standard keypath to avoid exposing identity
information in directory structure.
Args:
keytype: Key algorithm type ('ed25519', 'openpgp', 'openssh').
identity: Signer identity in email format (local@domain).
selector: Key selector for distinguishing multiple keys.
Returns:
Path in format: by-hash/XX/YYY... where XX is first 2 hex chars
and YYY... is remaining 62 hex chars of SHA256 hash.
"""
keypath = make_pkey_path(keytype, identity, selector)
hash_digest = hashlib.sha256(str(keypath).encode('utf-8')).hexdigest()
prefix = hash_digest[:2]
remainder = hash_digest[2:]
return Path('by-hash', prefix, remainder)
[docs]
def get_public_key(source: str, keytype: str, identity: str, selector: str) -> Tuple[bytes, str]:
"""Look up a public key from a keyring source.
Searches for the key at the standard path first, then falls back to
by-hash lookup if not found.
Args:
source: Keyring source. Either a filesystem path or a git ref
in format 'ref:repo:refspec:subpath'.
keytype: Key algorithm type ('ed25519', 'openpgp', 'openssh').
identity: Signer identity in email format (local@domain).
selector: Key selector for distinguishing multiple keys.
Returns:
Tuple of (key_data, key_source_description).
Raises:
KeyError: If key cannot be found in any location.
ConfigurationError: If ref source format is invalid.
"""
keypath = make_pkey_path(keytype, identity, selector)
logger.debug('Looking for %s in %s', keypath, source)
# ref:refs/heads/someref:in-repo/path
if source.startswith('ref:'):
# split by :
parts = source.split(':', 4)
if len(parts) < 4:
raise ConfigurationError('Invalid ref, must have at least 3 colons: %s' % source)
gitrepo = parts[1]
gitref = parts[2]
gitsub = parts[3]
if not gitrepo:
gitrepo = get_git_toplevel()
if not gitrepo:
raise KeyError('Not in a git tree, so cannot use a ref:: source')
gitrepo = os.path.expanduser(gitrepo)
if gitrepo.find('$') >= 0:
gitrepo = os.path.expandvars(gitrepo)
if os.path.isdir(os.path.join(gitrepo, '.git')):
gittop = os.path.join(gitrepo, '.git')
else:
gittop = gitrepo
# it could omit the refspec, meaning "whatever the current ref"
# grab the key from a fully ref'ed path
subpath = os.path.join(gitsub, keypath)
if not gitref:
# What is our current ref?
cmdargs = ['symbolic-ref', 'HEAD']
ecode, out, err = git_run_command(gittop, cmdargs)
if ecode == 0:
gitref = out.decode().strip()
if not gitref:
raise KeyError('Could not figure out current ref in %s' % gittop)
keysrc = f'{gitref}:{subpath}'
cmdargs = ['show', keysrc]
ecode, out, err = git_run_command(gittop, cmdargs)
if ecode == 0:
# Handle one level of symlinks
if out.find(b'\n') < 0 < out.find(b'/'):
# Check this path as well
linktgt = os.path.normpath(os.path.join(os.path.dirname(subpath), out.decode()))
keysrc = f'{gitref}:{linktgt}'
cmdargs = ['show', keysrc]
ecode, out, err = git_run_command(gittop, cmdargs)
if ecode == 0:
logger.debug('KEYSRC : %s (symlinked)', keysrc)
return out, 'ref:%s:%s' % (gittop, keysrc)
logger.debug('KEYSRC : %s', keysrc)
return out, 'ref:%s:%s' % (gittop, keysrc)
# Does it exist on disk but hasn't been committed yet?
fullpath = Path(gitrepo) / subpath
if fullpath.exists():
with open(fullpath, 'rb') as fh:
logger.debug('KEYSRC : %s', fullpath)
return fh.read(), str(fullpath)
# Try by-hash lookup
byhash_keypath = make_byhash_path(keytype, identity, selector)
byhash_subpath = Path(gitsub) / byhash_keypath
keysrc = f'{gitref}:{byhash_subpath}'
cmdargs = ['show', keysrc]
ecode, out, err = git_run_command(gittop, cmdargs)
if ecode == 0:
logger.debug('KEYSRC : %s (by-hash)', keysrc)
return out, 'ref:%s:%s' % (gittop, keysrc)
# Check disk for by-hash path
fullpath = Path(gitrepo) / byhash_subpath
if fullpath.exists():
with open(fullpath, 'rb') as fh:
logger.debug('KEYSRC : %s (by-hash)', fullpath)
return fh.read(), str(fullpath)
raise KeyError('Could not find %s in %s:%s' % (subpath, gittop, gitref))
# It's a disk path, then
# Expand ~ and env vars
source_path = Path(source).expanduser()
if '$' in source:
source_path = Path(os.path.expandvars(source))
fullpath = source_path / keypath
if fullpath.exists():
with open(fullpath, 'rb') as fh:
logger.debug('Loaded key from %s', fullpath)
return fh.read(), str(fullpath)
# Try by-hash lookup
byhash_keypath = make_byhash_path(keytype, identity, selector)
byhash_fullpath = source_path / byhash_keypath
if byhash_fullpath.exists():
with open(byhash_fullpath, 'rb') as fh:
logger.debug('Loaded key from %s (by-hash)', byhash_fullpath)
return fh.read(), str(byhash_fullpath)
raise KeyError('Could not find %s' % fullpath)
def _load_messages(cmdargs: argparse.Namespace) -> Dict[str, bytes]:
import sys
if len(cmdargs.msgfile):
# Load all message from the files passed to make sure they all parse correctly
messages = dict()
for msgfile in cmdargs.msgfile:
with open(msgfile, 'rb') as fh:
messages[msgfile] = fh.read()
elif not sys.stdin.isatty():
messages = {'-': sys.stdin.buffer.read()}
else:
logger.critical('E: Pipe a message to sign or pass filenames with individual messages')
raise RuntimeError('Nothing to do')
return messages
[docs]
def sign_message(msgdata: bytes,
algo: str,
keyinfo: Union[str, bytes],
identity: Optional[str],
selector: Optional[str]) -> bytes:
"""Sign an RFC2822 message and return the signed message bytes.
Args:
msgdata: Raw RFC2822 message bytes.
algo: Signing algorithm ('ed25519', 'openpgp', 'openssh').
keyinfo: Private key data or identifier.
identity: Signer identity (email). If None, extracted from message.
selector: Key selector for keyring lookup.
Returns:
Signed message bytes with X-Developer-Signature header added.
"""
pm = PatattMessage(msgdata)
pm.sign(algo, keyinfo, identity=identity, selector=selector)
return pm.as_bytes()
def set_bin_paths(config: Optional[GitConfigType]) -> Tuple[str, str]:
global GPGBIN, SSHKBIN
if GPGBIN is None:
if config and config.get('gpg-bin'):
_gpgbin = config.get('gpg-bin')
assert isinstance(GPGBIN, str), 'gpg-bin must be a string'
GPGBIN = _gpgbin
elif (_gpgbin := get_config_from_git(r'gpg\..*').get('program')) is not None:
assert isinstance(_gpgbin, str), 'gpg program must be a string'
GPGBIN = _gpgbin
else:
GPGBIN = 'gpg'
if SSHKBIN is None:
if config and config.get('ssh-keygen-bin'):
_sshkbin = config.get('ssh-keygen-bin')
assert isinstance(_sshkbin, str), 'ssh-keygen-bin must be a string'
SSHKBIN = _sshkbin
elif (_sshkbin := get_config_from_git(r'gpg\..*', section='ssh').get('program')) is not None:
assert isinstance(_sshkbin, str), 'program must be a string'
SSHKBIN = _sshkbin
else:
SSHKBIN = 'ssh-keygen'
return GPGBIN, SSHKBIN
def get_algo_keydata(config: GitConfigType) -> Tuple[str, str]:
global KEYCACHE
# Do we have the signingkey defined?
usercfg = get_config_from_git(r'user\..*')
if not config.get('identity') and usercfg.get('email'):
# Use user.email
config['identity'] = str(usercfg.get('email'))
# Do we have this already looked up?
identity = config.get('identity')
if not isinstance(identity, str):
raise ConfigurationError('Identity must be a string, got %s' % type(identity).__name__)
if identity in KEYCACHE:
algo, keydata = KEYCACHE[identity]
return algo, keydata
if not config.get('signingkey'):
if usercfg.get('signingkey'):
logger.info('N: Using pgp key %s defined by user.signingkey', usercfg.get('signingkey'))
logger.info('N: Override by setting patatt.signingkey')
config['signingkey'] = 'openpgp:%s' % usercfg.get('signingkey')
else:
logger.critical('E: patatt.signingkey is not set')
logger.critical('E: Perhaps you need to run genkey first?')
raise NoKeyError('patatt.signingkey is not set')
sk = config.get('signingkey')
if not isinstance(sk, str):
raise ConfigurationError('Signing key must be a string, got %s' % type(sk).__name__)
if sk.startswith('ed25519:'):
algo = 'ed25519'
identifier = sk[8:]
keysrc = None
if identifier.startswith('/') and Path(identifier).exists():
keysrc = identifier
else:
# datadir/private/%s.key
ddir = get_data_dir()
skey = ddir / 'private' / f'{identifier}.key'
if skey.exists():
keysrc = str(skey)
else:
# finally, try .git/%s.key
if (gtdir := get_git_toplevel()):
skey = Path(gtdir) / '.git' / f'{identifier}.key'
if skey.exists():
keysrc = str(skey)
if not keysrc:
raise ConfigurationError('Could not find the key matching %s' % identifier)
logger.info('N: Using ed25519 key: %s', keysrc)
with open(keysrc, 'r') as fh:
keydata = fh.read()
elif sk.startswith('openpgp:'):
algo = 'openpgp'
keydata = sk[8:]
elif sk.startswith('openssh:'):
algo = 'openssh'
keydata = sk[8:]
else:
logger.critical('E: Unknown key type: %s', sk)
raise ConfigurationError('Unknown key type: %s' % sk)
KEYCACHE[identity] = (algo, keydata)
return algo, keydata
def rfc2822_sign(message: bytes, config: Optional[GitConfigType] = None) -> bytes:
if config is None:
config = get_main_config()
algo, keydata = get_algo_keydata(config)
pm = PatattMessage(message)
identity = config.get('identity')
assert not isinstance(identity, list)
selector = config.get('selector')
assert not isinstance(selector, list)
pm.sign(algo, keydata, identity=identity, selector=selector)
logger.debug('--- SIGNED MESSAGE STARTS ---')
logger.debug(pm.as_string())
return pm.as_bytes()
[docs]
def get_main_config(section: Optional[str] = None) -> GitConfigType:
"""Load patatt configuration from git config.
Args:
section: Optional subsection name for patatt config.
If None, loads base patatt.* settings.
Returns:
Configuration dictionary with keyring sources and settings.
Results are cached per section.
"""
global CONFIGCACHE
if section:
csection = section
else:
csection = 'default'
if csection in CONFIGCACHE:
return CONFIGCACHE[csection]
config = get_config_from_git(r'patatt\..*', section=section, multivals=['keyringsrc'])
# Append some extra keyring locations
if 'keyringsrc' not in config or not isinstance(config['keyringsrc'], list):
config['keyringsrc'] = list()
assert isinstance(config['keyringsrc'], list) # Just to make lint checkers shut up
config['keyringsrc'] += ['ref:::.keys', 'ref:::.local-keys', 'ref::refs/meta/keyring:']
set_bin_paths(config)
logger.debug('config: %s', config)
CONFIGCACHE[csection] = config
return config
def cmd_sign(cmdargs: argparse.Namespace, config: GitConfigType) -> None:
try:
messages = _load_messages(cmdargs)
except IOError as ex:
logger.critical('E: %s', ex)
sys.exit(1)
for fn, msgdata in messages.items():
try:
signed = rfc2822_sign(msgdata, config)
logger.debug('--- SIGNED MESSAGE STARTS ---')
logger.debug(signed.decode())
if fn == '-':
sys.stdout.buffer.write(signed)
else:
with open(fn, 'wb') as fh:
fh.write(signed)
logger.critical('SIGN | %s', os.path.basename(fn))
except (ConfigurationError, NoKeyError) as ex:
logger.debug('Exiting due to %s', ex)
sys.exit(1)
except SigningError as ex:
logger.critical('E: %s', ex)
sys.exit(1)
except RuntimeError as ex:
logger.critical('E: %s: %s' % (fn, ex))
sys.exit(1)
[docs]
def validate_message(msgdata: bytes,
sources: List[str],
trim_body: bool = False
) -> List[Tuple[int, Optional[str], Optional[str], Optional[str], Optional[str], List[str]]]:
"""Validate all signatures in an RFC2822 message.
Args:
msgdata: Raw RFC2822 message bytes.
sources: List of keyring sources to search for public keys.
trim_body: If True, trim body to length specified in signatures.
Returns:
List of attestation tuples, one per signature found:
(result_code, identity, timestamp, key_source, algorithm, errors)
Result codes: RES_VALID, RES_BADSIG, RES_NOKEY, RES_NOSIG, RES_ERROR
"""
attestations: List[Tuple[int, Optional[str], Optional[str], Optional[str], Optional[str], List[str]]] = list()
pm = PatattMessage(msgdata)
if not pm.signed:
logger.debug('message is not signed')
attestations.append((RES_NOSIG, None, None, None, None, ['no signatures found']))
return attestations
# Find all identities for which we have public keys
for ds in pm.get_sigs():
errors = list()
a = ds.get_field_as_str('a')
i = ds.get_field_as_str('i')
s = ds.get_field_as_str('s')
t = ds.get_field_as_str('t')
if not s:
s = 'default'
if not a:
errors.append('no algorithm specified in signature')
attestations.append((RES_ERROR, i, t, None, None, errors))
continue
if not i:
errors.append('no identity specified in signature')
attestations.append((RES_ERROR, i, t, None, None, errors))
continue
if a.startswith('ed25519'):
algo = 'ed25519'
elif a.startswith('openpgp'):
algo = 'openpgp'
elif a.startswith('openssh'):
algo = 'openssh'
else:
errors.append('%s/%s Unknown algorigthm: %s' % (i, s, a))
attestations.append((RES_ERROR, i, t, None, a, errors))
continue
pkey = keysrc = None
for source in sources:
try:
pkey, keysrc = get_public_key(source, algo, i, s)
break
except KeyError:
pass
if not pkey and algo in ('ed25519', 'openssh'):
errors.append('%s/%s no matching %s key found' % (i, s, algo))
attestations.append((RES_NOKEY, i, t, None, algo, errors))
continue
try:
signkey, signtime = pm.validate(i, pkey, trim_body=trim_body)
if keysrc is None:
# Default keyring used
keysrc = '(default keyring)/%s' % signkey
attestations.append((RES_VALID, i, signtime, keysrc, algo, errors))
except NoKeyError:
# Not in default keyring
errors.append('%s/%s no matching openpgp key found' % (i, s))
attestations.append((RES_NOKEY, i, t, None, algo, errors))
except ValidationError:
if keysrc is None:
errors.append('failed to validate using default keyring')
else:
errors.append('failed to validate using %s' % keysrc)
attestations.append((RES_BADSIG, i, t, keysrc, algo, errors))
return attestations
def cmd_validate(cmdargs: argparse.Namespace, config: GitConfigType) -> None:
import mailbox
if len(cmdargs.msgfile) == 1:
# Try to open as an mbox file
try:
mbox = mailbox.mbox(cmdargs.msgfile[0])
except IOError as ex:
logger.critical('E: %s', ex)
sys.exit(1)
messages = dict()
for msg in mbox:
subject = msg.get('Subject', 'No subject')
messages[subject] = msg.as_bytes()
else:
try:
messages = _load_messages(cmdargs)
except IOError as ex:
logger.critical('E: %s', ex)
sys.exit(1)
ddir = get_data_dir()
pdir = ddir / 'public'
sources = config.get('keyringsrc', list())
if not isinstance(sources, list):
sources = [sources]
if str(pdir) not in sources:
sources.append(str(pdir))
if config.get('trimbody', 'no') == 'yes':
trim_body = True
else:
trim_body = False
highest_err = 0
for fn, msgdata in messages.items():
try:
attestations = validate_message(msgdata, sources, trim_body=trim_body)
for result, identity, signtime, keysrc, algo, errors in attestations:
if result > highest_err:
highest_err = result
if result == RES_VALID:
logger.critical(' PASS | %s, %s', identity, fn)
if keysrc:
logger.info(' | key: %s', keysrc)
else:
logger.info(' | key: default GnuPG keyring')
elif result <= RES_NOSIG:
logger.critical(' NOSIG | %s', fn)
for error in errors:
logger.critical(' | %s', error)
elif result <= RES_NOKEY:
logger.critical(' NOKEY | %s, %s', identity, fn)
for error in errors:
logger.critical(' | %s', error)
elif result <= RES_ERROR:
logger.critical(' ERROR | %s, %s', identity, fn)
for error in errors:
logger.critical(' | %s', error)
else:
logger.critical('BADSIG | %s, %s', identity, fn)
for error in errors:
logger.critical(' | %s', error)
except RuntimeError as ex:
highest_err = RES_ERROR
logger.critical(' ERROR | err: %s | %s', ex, fn)
sys.exit(highest_err)
def cmd_genkey(cmdargs: argparse.Namespace, config: GitConfigType) -> None:
try:
from nacl.signing import SigningKey
except ModuleNotFoundError:
raise RuntimeError('This operation requires PyNaCl libraries')
# Do we have the signingkey defined?
usercfg = get_config_from_git(r'user\..*')
if not config.get('identity'):
if not usercfg.get('email'):
logger.critical('This operation requires user.email to be set')
sys.exit(1)
# Use user.email
config['identity'] = str(usercfg.get('email'))
identity = str(config.get('identity'))
identifier = cmdargs.keyname
if not identifier:
identifier = datetime.datetime.today().strftime('%Y%m%d')
ddir = get_data_dir()
sdir = ddir / 'private'
pdir = ddir / 'public'
if not sdir.exists():
sdir.mkdir(mode=0o0700)
if not pdir.exists():
pdir.mkdir(mode=0o0755)
skey = sdir / f'{identifier}.key'
pkey = pdir / f'{identifier}.pub'
# Do we have a key with this identifier already present?
if skey.exists() and not cmdargs.force:
logger.critical('Key already exists: %s', skey)
logger.critical('Use a different -n or pass -f to overwrite it')
raise RuntimeError('Key already exists')
logger.critical('Generating a new ed25519 keypair')
newkey = SigningKey.generate()
# Make sure we write it as 0600
def priv_opener(path: str, flags: int) -> int:
return os.open(path, flags, 0o0600)
with open(skey, 'wb', opener=priv_opener) as fh:
fh.write(base64.b64encode(bytes(newkey)))
logger.critical('Wrote: %s', skey)
with open(pkey, 'wb') as fh:
fh.write(base64.b64encode(newkey.verify_key.encode()))
logger.critical('Wrote: %s', pkey)
# Also copy it into our local keyring
spkey = pdir / make_pkey_path('ed25519', identity, identifier)
spkey.parent.mkdir(parents=True, exist_ok=True)
with open(spkey, 'wb') as fh:
fh.write(base64.b64encode(newkey.verify_key.encode()))
logger.critical('Wrote: %s', spkey)
dpkey = pdir / make_pkey_path('ed25519', identity, 'default')
if not dpkey.exists():
# symlink our new key to be the default
dpkey.symlink_to(identifier)
logger.critical('Add the following to your .git/config (or global ~/.gitconfig):')
logger.critical('---')
if cmdargs.section:
logger.critical('[patatt "%s"]', cmdargs.section)
else:
logger.critical('[patatt]')
logger.critical(' signingkey = ed25519:%s', identifier)
logger.critical(' selector = %s', identifier)
logger.critical('---')
logger.critical('Next, communicate the contents of the following file to the')
logger.critical('repository keyring maintainers for inclusion into the project:')
logger.critical(pkey)
def cmd_install_hook(cmdargs: argparse.Namespace, config: GitConfigType) -> None:
if not (gitrepo := get_git_toplevel()):
logger.critical('Not in a git tree, cannot install hook')
sys.exit(1)
hookfile = Path(gitrepo) / '.git' / 'hooks' / 'sendemail-validate'
if hookfile.exists():
logger.critical('Hook already exists: %s', hookfile)
sys.exit(1)
hookfile.parent.mkdir(parents=True, exist_ok=True)
with open(hookfile, 'w') as fh:
fh.write('#!/bin/sh\n')
fh.write('# installed by patatt install-hook\n')
fh.write('grep -q "^GIT: " "${1}" && exit 0\n')
fh.write('patatt sign --hook "${1}"\n')
hookfile.chmod(0o755)
logger.critical('Hook installed as %s', hookfile)
def command() -> None:
parser = argparse.ArgumentParser(
prog='patatt',
description='Cryptographically attest patches before sending out',
formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
parser.add_argument('-v', '--verbose', action='store_true', default=False,
help='Be a bit more verbose')
parser.add_argument('-d', '--debug', action='store_true', default=False,
help='Show debugging output')
parser.add_argument('-s', '--section', dest='section', default=None,
help='Use config section [patatt "sectionname"]')
parser.add_argument('--version', action='version', version=__VERSION__)
subparsers = parser.add_subparsers(help='sub-command help', dest='subcmd')
sp_sign = subparsers.add_parser('sign', help='Cryptographically attest an RFC2822 message')
sp_sign.add_argument('--hook', dest='hookmode', action='store_true', default=False,
help='Git hook mode')
sp_sign.add_argument('msgfile', nargs='*', help='RFC2822 message files to sign')
sp_sign.set_defaults(func=cmd_sign)
sp_val = subparsers.add_parser('validate', help='Validate a devsig-signed message')
sp_val.add_argument('msgfile', nargs='*', help='Individual signed message files to validate or an mbox')
sp_val.set_defaults(func=cmd_validate)
sp_gen = subparsers.add_parser('genkey', help='Generate a new ed25519 keypair')
sp_gen.add_argument('-n', '--keyname', default=None,
help='Name to use for the key, e.g. "workstation", or "default"')
sp_gen.add_argument('-f', '--force', action='store_true', default=False,
help='Overwrite any existing keys, if found')
sp_gen.set_defaults(func=cmd_genkey)
sp_install = subparsers.add_parser('install-hook', help='Install sendmail-validate hook into the current repo')
sp_install.set_defaults(func=cmd_install_hook)
_args = parser.parse_args()
logger.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
formatter = logging.Formatter('%(message)s')
try:
if _args.hookmode:
formatter = logging.Formatter('patatt: %(message)s')
except AttributeError:
pass
ch.setFormatter(formatter)
if _args.verbose:
ch.setLevel(logging.INFO)
elif _args.debug:
ch.setLevel(logging.DEBUG)
else:
ch.setLevel(logging.CRITICAL)
logger.addHandler(ch)
config = get_main_config(section=_args.section)
if 'func' not in _args:
parser.print_help()
sys.exit(1)
try:
_args.func(_args, config)
except RuntimeError:
sys.exit(1)
if __name__ == '__main__':
command()