-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy paths3_client.py
More file actions
93 lines (80 loc) · 3.22 KB
/
s3_client.py
File metadata and controls
93 lines (80 loc) · 3.22 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import boto3
import json
import os
from botocore.exceptions import ClientError
from io import BytesIO
from nypl_py_utils.functions.log_helper import create_log
class S3Client:
"""
Client for fetching and setting an AWS S3 file.
Takes as input the name of the S3 bucket. If fetching/setting a cache, also
takes the cached resource.
"""
def __init__(self, bucket, resource=None):
self.logger = create_log('s3_client')
self.bucket = bucket
self.resource = resource
try:
self.s3_client = boto3.client(
's3', region_name=os.environ.get('AWS_REGION', 'us-east-1'))
except ClientError as e:
error_msg = f'Could not create S3 client: {e}'
self.logger.error(error_msg)
raise S3ClientError(error_msg) from None
def close(self):
self.s3_client.close()
def fetch_cache(self):
"""Fetches a JSON file from S3 and returns the resulting dictionary"""
self.logger.info(
f'Fetching {self.resource} from S3 bucket {self.bucket}')
try:
output_stream = BytesIO()
self.s3_client.download_fileobj(
self.bucket, self.resource, output_stream)
return json.loads(output_stream.getvalue())
except ClientError as e:
error_msg = (
f'Error retrieving {self.resource} from S3 bucket '
f'{self.bucket}: {e}')
self.logger.error(error_msg)
raise S3ClientError(error_msg) from None
def set_cache(self, state):
"""Writes a dictionary to JSON and uploads the resulting file to S3"""
self.logger.info(
f'Setting {self.resource} in S3 bucket {self.bucket} to {state}')
try:
input_stream = BytesIO(json.dumps(state).encode())
self.s3_client.upload_fileobj(
input_stream, self.bucket, self.resource)
except ClientError as e:
error_msg = (
f'Error uploading {self.resource} to S3 bucket '
f'{self.s3_bucket}: {e}')
self.logger.error(error_msg)
raise S3ClientError(error_msg) from None
def upload_file(self, content, file_path):
"""
Writes an arbitrary file to S3. Note that this will overwrite any
existing file with the same name.
Parameters
----------
content: str
The string that should be written to the file. Must be utf-8.
file_path: str
The full path of the file that should be written not including the
bucket. Example: "subdirectory/example_file.csv"
"""
self.logger.info(
f'Writing {file_path} in S3 bucket {self.bucket}')
try:
input_stream = BytesIO(content.encode())
self.s3_client.upload_fileobj(input_stream, self.bucket, file_path)
except ClientError as e:
error_msg = (
f'Error uploading {file_path} to S3 bucket '
f'{self.s3_bucket}: {e}')
self.logger.error(error_msg)
raise S3ClientError(error_msg) from None
class S3ClientError(Exception):
def __init__(self, message=None):
self.message = message