-
Notifications
You must be signed in to change notification settings - Fork 15
/
Copy pathapi-key-secret-manager-upload.py
137 lines (105 loc) · 4.2 KB
/
api-key-secret-manager-upload.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import argparse
import getpass
import logging
import time
import boto3
DEFAULT_LOG_LEVEL = logging.INFO
LOGGER = logging.getLogger(__name__)
LOGGING_FORMAT = "%(asctime)s %(levelname)-5.5s " \
"[%(name)s]:[%(threadName)s] " \
"%(message)s"
AWS_ACCESS_KEY_ID = "AWS_ACCESS_KEY_ID"
AWS_SECRET_ACCESS_KEY = "AWS_SECRET_ACCESS_KEY"
AWS_SESSION_TOKEN = "AWS_SESSION_TOKEN"
def _check_missing_field(validation_dict, extraction_key):
"""Check if a field exists in a dictionary
:param validation_dict: Dictionary
:param extraction_key: String
:raises: Exception
"""
extracted_value = validation_dict.get(extraction_key)
if not extracted_value:
LOGGER.error(f"Missing '{extraction_key}' key in the dict")
raise Exception
def _validate_field(validation_dict, extraction_key, expected_value):
"""Validate the passed in field
:param validation_dict: Dictionary
:param extraction_key: String
:param expected_value: String
:raises: ValueError
"""
extracted_value = validation_dict.get(extraction_key)
_check_missing_field(validation_dict, extraction_key)
if extracted_value != expected_value:
LOGGER.error(f"Incorrect value found for '{extraction_key}' key")
raise ValueError
def _cli_args():
"""Parse CLI Args
:rtype: argparse.Namespace
"""
parser = argparse.ArgumentParser(description="api-key-secret-manager-upload")
parser.add_argument("-s",
"--secret-name",
type=str,
help="Secret Name",
required=True
)
parser.add_argument("-p",
"--aws-profile",
type=str,
default="default",
help="AWS profile to be used for the API calls")
parser.add_argument("-v",
"--verbose",
action="store_true",
help="debug log output")
parser.add_argument("-e",
"--env",
action="store_true",
help="Use environment variables for AWS credentials")
return parser.parse_args()
def _silence_noisy_loggers():
"""Silence chatty libraries for better logging"""
for logger in ['boto3', 'botocore',
'botocore.vendored.requests.packages.urllib3']:
logging.getLogger(logger).setLevel(logging.WARNING)
def main():
"""What executes when the script is run"""
start = time.time() # to capture elapsed time
args = _cli_args()
# logging configuration
log_level = DEFAULT_LOG_LEVEL
if args.verbose:
log_level = logging.DEBUG
logging.basicConfig(level=log_level, format=LOGGING_FORMAT)
# silence chatty libraries
_silence_noisy_loggers()
if args.env:
LOGGER.info(
"Attempting to fetch AWS credentials via environment variables")
aws_access_key_id = os.environ.get(AWS_ACCESS_KEY_ID)
aws_secret_access_key = os.environ.get(AWS_SECRET_ACCESS_KEY)
aws_session_token = os.environ.get(AWS_SESSION_TOKEN)
if not aws_secret_access_key or not aws_access_key_id or not aws_session_token:
raise Exception(
f"Missing one or more environment variables - "
f"'{AWS_ACCESS_KEY_ID}', '{AWS_SECRET_ACCESS_KEY}', "
f"'{AWS_SESSION_TOKEN}'"
)
else:
LOGGER.info(f"AWS Profile being used: {args.aws_profile}")
boto3.setup_default_session(profile_name=args.aws_profile)
sm_client = boto3.client("secretsmanager")
LOGGER.info(f"Updating Secret: {args.secret_name}")
resp = sm_client.update_secret(
SecretId=args.secret_name,
SecretString=getpass.getpass("Please enter the API Key: ")
)
_check_missing_field(resp, "ResponseMetadata")
_validate_field(resp["ResponseMetadata"], "HTTPStatusCode", 200)
LOGGER.info("Successfully updated secret value")
LOGGER.debug("Closing secretsmanager boto3 client")
sm_client.close()
LOGGER.info(f"Total time elapsed: {time.time() - start} seconds")
if __name__ == "__main__":
main()