PostgreSQL Source Code git master
Loading...
Searching...
No Matches
pgp_session_data.py
Go to the documentation of this file.
1#!/usr/bin/python
2# -*- coding: utf-8 -*-
3#
4# Generate PGP data to check the session key length of the input data provided
5# to pgp_pub_decrypt_bytea().
6#
7# First, the crafted data is generated from valid RSA data, freshly generated
8# by this script each time it is run, see generate_rsa_keypair().
9# Second, the crafted PGP data is built, see build_message_data() and
10# build_key_data(). Finally, the resulting SQL script is generated.
11#
12# This script generates in stdout the SQL file that is used in the regression
13# tests of pgcrypto. The following command can be used to regenerate the file
14# which should never be manually manipulated:
15# python3 scripts/pgp_session_data.py > sql/pgp-pubkey-session.sql
16
17import os
18import re
19import struct
20import secrets
21import sys
22import time
23
24# pwn for binary manipulation (p32, p64)
25from pwn import *
26
27# Cryptographic libraries, to craft the PGP data.
28from Crypto.Cipher import AES
29from Crypto.PublicKey import RSA
30from Crypto.Util.number import inverse
31
32# AES key used for session key encryption (16 bytes for AES-128)
33AES_KEY = b'\x01' * 16
34
35def generate_rsa_keypair(key_size: int = 2048) -> dict:
36 """
37 Generate a fresh RSA key pair.
38
39 The generated key includes all components needed for PGP operations:
40 - n: public modulus (p * q)
41 - e: public exponent (typically 65537)
42 - d: private exponent (e^-1 mod phi(n))
43 - p, q: prime factors of n
44 - u: coefficient (p^-1 mod q) for CRT optimization
45
46 The caller can pass the wanted key size in input, for a default of 2048
47 bytes. This function returns the RSA key components, after performing
48 some validation on them.
49 """
50
51 start_time = time.time()
52
53 # Generate RSA key
54 key = RSA.generate(key_size)
55
56 # Extract all key components
57 rsa_components = {
58 'n': key.n, # Public modulus (p * q)
59 'e': key.e, # Public exponent (typically 65537)
60 'd': key.d, # Private exponent (e^-1 mod phi(n))
61 'p': key.p, # First prime factor
62 'q': key.q, # Second prime factor
63 'u': inverse(key.p, key.q) # Coefficient for CRT: p^-1 mod q
64 }
65
66 # Validate key components for correctness
67 validate_rsa_key(rsa_components)
68
69 return rsa_components
70
71def validate_rsa_key(rsa: dict) -> None:
72 """
73 Validate a generated RSA key.
74
75 This function performs basic validation to ensure the RSA key is properly
76 constructed and all components are consistent, at least mathematically.
77
78 Validations performed:
79 1. n = p * q (modulus is product of primes)
80 2. gcd(e, phi(n)) = 1 (public exponent is coprime to phi(n))
81 3. (d * e) mod(phi(n)) = 1 (private exponent is multiplicative inverse)
82 4. (u * p) (mod q) = 1 (coefficient is correct for CRT)
83 """
84
85 n, e, d, p, q, u = rsa['n'], rsa['e'], rsa['d'], rsa['p'], rsa['q'], rsa['u']
86
87 # Check that n = p * q
88 if n != p * q:
89 raise ValueError("RSA validation failed: n <> p * q")
90
91 # Check that p and q are different
92 if p == q:
93 raise ValueError("RSA validation failed: p = q (not allowed)")
94
95 # Calculate phi(n) = (p-1)(q-1)
96 phi_n = (p - 1) * (q - 1)
97
98 # Check that gcd(e, phi(n)) = 1
99 def gcd(a, b):
100 while b:
101 a, b = b, a % b
102 return a
103
104 if gcd(e, phi_n) != 1:
105 raise ValueError("RSA validation failed: gcd(e, phi(n)) <> 1")
106
107 # Check that (d * e) mod(phi(n)) = 1
108 if (d * e) % phi_n != 1:
109 raise ValueError("RSA validation failed: d * e <> 1 (mod phi(n))")
110
111 # Check that (u * p) (mod q) = 1
112 if (u * p) % q != 1:
113 raise ValueError("RSA validation failed: u * p <> 1 (mod q)")
114
115def mpi_encode(x: int) -> bytes:
116 """
117 Encode an integer as an OpenPGP Multi-Precision Integer (MPI).
118
119 Format (RFC 4880, Section 3.2):
120 - 2 bytes: bit length of the integer (big-endian)
121 - N bytes: the integer in big-endian format
122
123 This is used to encode RSA key components (n, e, d, p, q, u) in PGP
124 packets.
125
126 The integer to encode is given in input, returning an MPI-encoded
127 integer.
128
129 For example:
130 mpi_encode(65537) -> b'\x00\x11\x01\x00\x01'
131 (17 bits, value 0x010001)
132 """
133 if x < 0:
134 raise ValueError("MPI cannot encode negative integers")
135
136 if x == 0:
137 # Special case: zero has 0 bits and empty magnitude
138 bits = 0
139 mag = b""
140 else:
141 # Calculate bit length and convert to bytes
142 bits = x.bit_length()
143 mag = x.to_bytes((bits + 7) // 8, 'big')
144
145 # Pack: 2-byte bit length + magnitude bytes
146 return struct.pack('>H', bits) + mag
147
148def new_packet(tag: int, payload: bytes) -> bytes:
149 """
150 Create a new OpenPGP packet with a proper header.
151
152 OpenPGP packet format (RFC 4880, Section 4.2):
153 - New packet format: 0xC0 | tag
154 - Length encoding depends on payload size:
155 * 0-191: single byte
156 * 192-8383: two bytes (192 + ((length - 192) >> 8), (length - 192) & 0xFF)
157 * 8384+: five bytes (0xFF + 4-byte big-endian length)
158
159 The packet is built from a "tag" (1-63) and some "payload" data. The
160 result generated is a complete OpenPGP packet.
161
162 For example:
163 new_packet(1, b'data') -> b'\xC1\x04data'
164 (Tag 1, length 4, payload 'data')
165 """
166 # New packet format: set bit 7 and 6, clear bit 5, tag in bits 0-5
167 first = 0xC0 | (tag & 0x3F)
168 ln = len(payload)
169
170 # Encode length according to OpenPGP specification
171 if ln <= 191:
172 # Single byte length for small packets
173 llen = bytes([ln])
174 elif ln <= 8383:
175 # Two-byte length for medium packets
176 ln2 = ln - 192
177 llen = bytes([192 + (ln2 >> 8), ln2 & 0xFF])
178 else:
179 # Five-byte length for large packets
180 llen = bytes([255]) + struct.pack('>I', ln)
181
182 return bytes([first]) + llen + payload
183
184def build_key_data(rsa: dict) -> bytes:
185 """
186 Build the key data, containing an RSA private key.
187
188 The RSA contents should have been generated previously.
189
190 Format (see RFC 4880, Section 5.5.3):
191 - 1 byte: version (4)
192 - 4 bytes: creation time (current Unix timestamp)
193 - 1 byte: public key algorithm (2 = RSA encrypt)
194 - MPI: RSA public modulus n
195 - MPI: RSA public exponent e
196 - 1 byte: string-to-key usage (0 = no encryption)
197 - MPI: RSA private exponent d
198 - MPI: RSA prime p
199 - MPI: RSA prime q
200 - MPI: RSA coefficient u = p^-1 mod q
201 - 2 bytes: checksum of private key material
202
203 This function takes a set of RSA key components in input (n, e, d, p, q, u)
204 and returns a secret key packet.
205 """
206
207 # Public key portion
208 ver = bytes([4]) # Version 4 key
209 ctime = struct.pack('>I', int(time.time())) # Current Unix timestamp
210 algo = bytes([2]) # RSA encrypt algorithm
211 n_mpi = mpi_encode(rsa['n']) # Public modulus
212 e_mpi = mpi_encode(rsa['e']) # Public exponent
213 pub = ver + ctime + algo + n_mpi + e_mpi
214
215 # Private key portion
216 hide_type = bytes([0]) # No string-to-key encryption
217 d_mpi = mpi_encode(rsa['d']) # Private exponent
218 p_mpi = mpi_encode(rsa['p']) # Prime p
219 q_mpi = mpi_encode(rsa['q']) # Prime q
220 u_mpi = mpi_encode(rsa['u']) # Coefficient u = p^-1 mod q
221
222 # Calculate checksum of private key material (simple sum mod 65536)
223 private_data = d_mpi + p_mpi + q_mpi + u_mpi
224 cksum = sum(private_data) & 0xFFFF
225
226 secret = hide_type + private_data + struct.pack('>H', cksum)
227 payload = pub + secret
228
229 return new_packet(7, payload)
230
231def pgp_cfb_encrypt_resync(key, plaintext):
232 """
233 Implement OpenPGP CFB mode with resync.
234
235 OpenPGP CFB mode is a variant of standard CFB with a resync operation
236 after the first two blocks.
237
238 Algorithm (RFC 4880, Section 13.9):
239 1. Block 1: FR=zeros, encrypt full block_size bytes
240 2. Block 2: FR=block1, encrypt only 2 bytes
241 3. Resync: FR = block1[2:] + block2
242 4. Remaining blocks: standard CFB mode
243
244 This function uses the following arguments:
245 - key: AES encryption key (16 bytes for AES-128)
246 - plaintext: Data to encrypt
247 """
248 block_size = 16 # AES block size
249 cipher = AES.new(key[:16], AES.MODE_ECB) # Use ECB for manual CFB
250 ciphertext = b''
251
252 # Block 1: FR=zeros, encrypt full 16 bytes
253 FR = b'\x00' * block_size
254 FRE = cipher.encrypt(FR) # Encrypt the feedback register
255 block1 = bytes(a ^ b for a, b in zip(FRE, plaintext[0:16]))
256 ciphertext += block1
257
258 # Block 2: FR=block1, encrypt only 2 bytes
259 FR = block1
260 FRE = cipher.encrypt(FR)
261 block2 = bytes(a ^ b for a, b in zip(FRE[0:2], plaintext[16:18]))
262 ciphertext += block2
263
264 # Resync: FR = block1[2:16] + block2[0:2]
265 # This is the key difference from standard CFB mode
266 FR = block1[2:] + block2
267
268 # Block 3+: Continue with standard CFB mode
269 pos = 18
270 while pos < len(plaintext):
271 FRE = cipher.encrypt(FR)
272 chunk_len = min(block_size, len(plaintext) - pos)
273 chunk = plaintext[pos:pos+chunk_len]
274 enc_chunk = bytes(a ^ b for a, b in zip(FRE[:chunk_len], chunk))
275 ciphertext += enc_chunk
276
277 # Update feedback register for next iteration
278 if chunk_len == block_size:
279 FR = enc_chunk
280 else:
281 # Partial block: pad with old FR bytes
282 FR = enc_chunk + FR[chunk_len:]
283 pos += chunk_len
284
285 return ciphertext
286
287def build_literal_data_packet(data: bytes) -> bytes:
288 """
289 Build a literal data packet containing a message.
290
291 Format (RFC 4880, Section 5.9):
292 - 1 byte: data format ('b' = binary, 't' = text, 'u' = UTF-8 text)
293 - 1 byte: filename length (0 = no filename)
294 - N bytes: filename (empty in this case)
295 - 4 bytes: date (current Unix timestamp)
296 - M bytes: literal data
297
298 The data used to build the packet is given in input, with the generated
299 result returned.
300 """
301 body = bytes([
302 ord('b'), # Binary data format
303 0, # Filename length (0 = no filename)
304 ]) + struct.pack('>I', int(time.time())) + data # Current timestamp + data
305
306 return new_packet(11, body)
307
308def build_symenc_data_packet(sess_key: bytes, cipher_algo: int, payload: bytes) -> bytes:
309 """
310 Build a symmetrically-encrypted data packet using AES-128-CFB.
311
312 This packet contains encrypted data using the session key. The format
313 includes a random prefix, for security (see RFC 4880, Section 5.7).
314
315 Packet structure:
316 - Random prefix (block_size bytes)
317 - Prefix repeat (last 2 bytes of prefix repeated)
318 - Encrypted literal data packet
319
320 This function uses the following set of arguments:
321 - sess_key: Session key for encryption
322 - cipher_algo: Cipher algorithm identifier (7 = AES-128)
323 - payload: Data to encrypt (wrapped in literal data packet)
324 """
325 block_size = 16 # AES-128 block size
326 key = sess_key[:16] # Use first 16 bytes for AES-128
327
328 # Create random prefix + repeat last 2 bytes (total 18 bytes)
329 # This is required by OpenPGP for integrity checking
330 prefix_random = secrets.token_bytes(block_size)
331 prefix = prefix_random + prefix_random[-2:] # 18 bytes total
332
333 # Wrap payload in literal data packet
334 literal_pkt = build_literal_data_packet(payload)
335
336 # Plaintext = prefix + literal data packet
337 plaintext = prefix + literal_pkt
338
339 # Encrypt using OpenPGP CFB mode with resync
340 ciphertext = pgp_cfb_encrypt_resync(key, plaintext)
341
342 return new_packet(9, ciphertext)
343
344def build_tag1_packet(rsa: dict, sess_key: bytes) -> bytes:
345 """
346 Build a public-key encrypted key.
347
348 This is a very important function, as it is able to create the packet
349 triggering the overflow check. This function can also be used to create
350 "legit" packet data.
351
352 Format (RFC 4880, Section 5.1):
353 - 1 byte: version (3)
354 - 8 bytes: key ID (0 = any key accepted)
355 - 1 byte: public key algorithm (2 = RSA encrypt)
356 - MPI: RSA-encrypted session key
357
358 This uses in arguments the generated RSA key pair, and the session key
359 to encrypt. The latter is manipulated to trigger the overflow.
360
361 This function returns a complete packet encrypted by a session key.
362 """
363
364 # Calculate RSA modulus size in bytes
365 n_bytes = (rsa['n'].bit_length() + 7) // 8
366
367 # Session key message format:
368 # - 1 byte: symmetric cipher algorithm (7 = AES-128)
369 # - N bytes: session key
370 # - 2 bytes: checksum (simple sum of session key bytes)
371 algo_byte = bytes([7]) # AES-128 algorithm identifier
372 cksum = sum(sess_key) & 0xFFFF # 16-bit checksum
373 M = algo_byte + sess_key + struct.pack('>H', cksum)
374
375 # PKCS#1 v1.5 padding construction
376 # Format: 0x02 || PS || 0x00 || M
377 # Total padded message must be exactly n_bytes long.
378 total_len = n_bytes # Total length must equal modulus size in bytes
379 ps_len = total_len - len(M) - 2 # Subtract 2 for 0x02 and 0x00 bytes
380
381 if ps_len < 8:
382 raise ValueError(f"Padding string too short ({ps_len} bytes); need at least 8 bytes. "
383 f"Message length: {len(M)}, Modulus size: {n_bytes} bytes")
384
385 # Create padding string with *ALL* bytes being 0xFF (no zero separator!)
386 PS = bytes([0xFF]) * ps_len
387
388 # Construct the complete padded message
389 # Normal PKCS#1 v1.5 padding: 0x02 || PS || 0x00 || M
390 padded = bytes([0x02]) + PS + bytes([0x00]) + M
391
392 # Verify padding construction
393 if len(padded) != n_bytes:
394 raise ValueError(f"Padded message length ({len(padded)}) doesn't match RSA modulus size ({n_bytes})")
395
396 # Convert padded message to integer and encrypt with RSA
397 m_int = int.from_bytes(padded, 'big')
398
399 # Ensure message is smaller than modulus (required for RSA)
400 if m_int >= rsa['n']:
401 raise ValueError("Padded message is larger than RSA modulus")
402
403 # RSA encryption: c = m^e mod n
404 c_int = pow(m_int, rsa['e'], rsa['n'])
405
406 # Encode encrypted result as MPI
407 c_mpi = mpi_encode(c_int)
408
409 # Build complete packet
410 ver = bytes([3]) # Version 3 packet
411 key_id = b"\x00" * 8 # Key ID (0 = any key accepted)
412 algo = bytes([2]) # RSA encrypt algorithm
413 payload = ver + key_id + algo + c_mpi
414
415 return new_packet(1, payload)
416
417def build_message_data(rsa: dict) -> bytes:
418 """
419 This function creates a crafted message, with a long session key
420 length.
421
422 This takes in input the RSA key components generated previously,
423 returning a concatenated set of PGP packets crafted for the purpose
424 of this test.
425 """
426
427 # Base prefix for session key (AES key + padding + size).
428 # Note that the crafted size is the important part for this test.
429 prefix = AES_KEY + b"\x00" * 16 + p32(0x10)
430
431 # Build encrypted data packet, legit.
432 sedata = build_symenc_data_packet(AES_KEY, cipher_algo=7, payload=b"\x0a\x00")
433
434 # Build multiple packets
435 packets = [
436 # First packet, legit.
437 build_tag1_packet(rsa, prefix),
438
439 # Encrypted data packet, legit.
440 sedata,
441
442 # Second packet: information payload.
443 #
444 # This packet contains a longer-crafted session key, able to trigger
445 # the overflow check in pgcrypto. This is the critical part, and
446 # and you are right to pay a lot of attention here if you are
447 # reading this code.
448 build_tag1_packet(rsa, prefix)
449 ]
450
451 return b"".join(packets)
452
453def main():
454 # Default key size.
455 # This number can be set to a higher number if wanted, like 4096. We
456 # just do not need to do that here.
457 key_size = 2048
458
459 # Generate fresh RSA key pair
460 rsa = generate_rsa_keypair(key_size)
461
462 # Generate the message data.
463 print("### Building message data", file=sys.stderr)
464 message_data = build_message_data(rsa)
465
466 # Build the key containing the RSA private key
467 print("### Building key data", file=sys.stderr)
468 key_data = build_key_data(rsa)
469
470 # Convert to hexadecimal, for the bytea used in the SQL file.
471 message_data = message_data.hex()
472 key_data = key_data.hex()
473
474 # Split each value into lines of 72 characters, for readability.
475 message_data = re.sub("(.{72})", "\\1\n", message_data, 0, re.DOTALL)
476 key_data = re.sub("(.{72})", "\\1\n", key_data, 0, re.DOTALL)
477
478 # Get the script filename for documentation
479 file_basename = os.path.basename(__file__)
480
481 # Output the SQL test case
482 print(f'''-- Test for overflow with session key at decrypt.
483-- Data automatically generated by scripts/{file_basename}.
484-- See this file for details explaining how this data is generated.
485SELECT pgp_pub_decrypt_bytea(
486'\\x{message_data}'::bytea,
487'\\x{key_data}'::bytea);''',
488 file=sys.stdout)
489
490if __name__ == "__main__":
491 main()
void print(const void *obj)
Definition print.c:36
bytes build_tag1_packet(dict rsa, bytes sess_key)
pgp_cfb_encrypt_resync(key, plaintext)
bytes build_key_data(dict rsa)
dict generate_rsa_keypair(int key_size=2048)
bytes build_literal_data_packet(bytes data)
bytes build_message_data(dict rsa)
bytes new_packet(int tag, bytes payload)
bytes build_symenc_data_packet(bytes sess_key, int cipher_algo, bytes payload)
bytes mpi_encode(int x)
None validate_rsa_key(dict rsa)
const void size_t len
static int fb(int x)
static uint32 gcd(uint32 a, uint32 b)