class StorageService:
def __init__(self):
self.aws = AWSClient(
aws_access_key_id=myEnvironment.S3_ACCESS_KEY,
aws_secret_access_key=myEnvironment.S3_SECRET_KEY,
region_name="eu-central-1",
)
self.s3 = self.aws.session.client("s3")
def upload_file(self, file_bytes: bytes, bucket: str, key: str):
self.s3.put_object(Body=file_bytes, Bucket=bucket, Key=key)
def get_file_url(self, bucket: str, key: str, expires_in: int = 3600):
return self.s3.generate_presigned_url(
"get_object",
Params={"Bucket": bucket, "Key": key},
ExpiresIn=expires_in,
)
def delete_file(self, bucket: str, key: str):
self.s3.delete_object(Bucket=bucket, Key=key)