-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathmegolm_backup.py
executable file
·170 lines (140 loc) · 5.86 KB
/
megolm_backup.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
#!/usr/bin/env python3
# megolm_export: operate on megolm session data
# Copyright (C) 2019 Aleksa Sarai <cyphar@cyphar.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import sys
import hmac
import base64
import struct
import getpass
import hashlib
import argparse
from Crypto.Util import Counter
from Crypto.Cipher import AES
from Crypto.Random import get_random_bytes
# This parsing is from the spec:
# <https://github.com/matrix-org/matrix-doc/blob/master/specification/modules/end_to_end_encryption.rst#key-exports>
#
# Given a passphrase, we have
# {K, K'} = PBKDF2(HMAC-SHA-256, passphrase, S, N, 512)
# where K is the first 256 bits and K' the last 256 bits.
#
# Size | Description
# -----+------------------------------------------
# 1 | Export format version, which must be 0x01.
# 16 | The salt S.
# 16 | The initialization vector IV.
# 4 | The number of rounds N, as a big-endian unsigned 32-bit integer.
# var | The encrypted JSON object.
# 32 | The HMAC-SHA-256 of all the above string concatenated together,
# | using K' as the key.
HEADER = b"-----BEGIN MEGOLM SESSION DATA-----"
FOOTER = b"-----END MEGOLM SESSION DATA-----"
# XXX: It kinda sucks you can't have 16-byte bigints with Python's struct...
CryptoParams = struct.Struct(">c16s16sL")
MAC_SIZE = 32
def bail(*args):
print("[!]", *args, file=sys.stderr)
sys.exit(1)
# A bytes-friendly version of textwrap.fill.
def bytes_wrap(b, width):
wrapped = []
while b:
wrapped.append(b[:width])
b = b[width:]
return b"\n".join(wrapped)
# Short-hand for the PBKDF2 and split we need for K and K'.
def stretch_keys(passphrase, S, N):
if not isinstance(passphrase, bytes):
passphrase = passphrase.encode("utf-8")
keys = hashlib.pbkdf2_hmac("sha512", passphrase, S, N, dklen=512//8)
return (keys[:256//8], keys[256//8:])
def enc_session_data(passphrase, json_data):
# Figure out our parameters.
version, S, IV, N = b"\x01", get_random_bytes(16), get_random_bytes(16), 500000
# Clear bit 63 of IV -- apparently this is required to work around a quirk
# of the Android AES-CTR's counter implementation.
IV = int.from_bytes(IV, byteorder="big") & ~(1 << 63)
# Get our keys.
K, Kp = stretch_keys(passphrase, S, N)
# Encrypt the JSON.
ctr = Counter.new(128, initial_value=IV)
cipher = AES.new(K, AES.MODE_CTR, counter=ctr)
plaintext = json_data
ciphertext = cipher.encrypt(plaintext)
# Prepend the crypto parameters.
params = CryptoParams.pack(version, S, IV.to_bytes(16, "big"), N)
body = params + ciphertext
# Compute the MAC.
body += hmac.digest(Kp, body, "sha256")
# Base64 everything, wrap it at 128-chars, and add the header+footer.
session_data = bytes_wrap(base64.b64encode(body), 128)
return b"\n".join([HEADER, session_data, FOOTER])
def dec_session_data(passphrase, session_data):
# Get rid of any trailing newlines.
session_data = session_data.strip()
# Does it have the header and footer?
if not session_data.startswith(HEADER):
bail("session data invalid: missing header %r" % (HEADER,))
if not session_data.endswith(FOOTER):
bail("session data invalid: missing footer %r" % (FOOTER,))
# Get the body and base64-decode it.
body = base64.b64decode(session_data[len(HEADER):-len(FOOTER)])
if len(body) < CryptoParams.size + MAC_SIZE:
bail("session data invalid: data packet too small")
# Get the parameters (we need S and N to check the MAC).
params = body[:CryptoParams.size]
version, S, IV, N = CryptoParams.unpack(params)
IV = int.from_bytes(IV, byteorder="big")
# Figure out the keys.
K, Kp = stretch_keys(passphrase, S, N)
# Check the MAC.
mac = body[-MAC_SIZE:]
our_mac = hmac.digest(Kp, body[:-MAC_SIZE], "sha256")
if not hmac.compare_digest(mac, our_mac):
bail("session data corrupted or bad passphrase: mac check failed")
# Okay, decrypt the JSON.
ctr = Counter.new(128, initial_value=IV)
cipher = AES.new(K, AES.MODE_CTR, counter=ctr)
ciphertext = body[CryptoParams.size:-MAC_SIZE]
return cipher.decrypt(ciphertext)
def main(args):
parser = argparse.ArgumentParser(description="Operate on megolm session backups.")
parser.add_argument("file", nargs="?", default="-", help="Input text file (- for stdin).")
parser.add_argument("-o", "--output", default="-", required=False, help="Output text file (- for stdout).")
mode_group = parser.add_mutually_exclusive_group(required=True)
mode_group.add_argument("--into", dest="mode", const="encrypt", action="store_const", help="Encrypt and represent file as a megolm session backup.")
mode_group.add_argument("--from", dest="mode", const="decrypt", action="store_const", help="Decrypt the given megolm session and output the contents.")
args = parser.parse_args(args)
if args.file == "-":
args.file = "/dev/stdin"
if args.output == "-":
args.output = "/dev/stdout"
action = {
"encrypt": enc_session_data,
"decrypt": dec_session_data,
}[args.mode]
with open(args.file, "rb") as f:
data = f.read()
# Wait until after reading input to get the passphrase so pipelines work
# properly. This results in slightly strange behaviour for interactive
# uses, but most people will be using this in a pipeline.
passphrase = getpass.getpass("Backup passphrase [mode=%s]: " % (args.mode,))
output = action(passphrase, data)
with open(args.output, "wb") as f:
f.write(output + b"\n")
f.flush()
if __name__ == "__main__":
main(sys.argv[1:])