|
| 1 | +# -------------------------------------------------------------------------------------------- |
| 2 | +# Copyright (c) Microsoft Corporation. All rights reserved. |
| 3 | +# Licensed under the MIT License. See License.txt in the project root for license information. |
| 4 | +# -------------------------------------------------------------------------------------------- |
| 5 | + |
| 6 | +from __future__ import unicode_literals |
| 7 | + |
| 8 | +import os |
| 9 | +import re |
| 10 | +import hashlib |
| 11 | +import datetime |
| 12 | +import copy |
| 13 | +import pathlib |
| 14 | + |
| 15 | +from six.moves.urllib.parse import urlsplit # pylint: disable=import-error,relative-import |
| 16 | +from six.moves.urllib.parse import quote # pylint: disable=import-error,no-name-in-module,relative-import |
| 17 | + |
| 18 | +from azure.storage.blob import BlobPermissions, BlockBlobService |
| 19 | +from . import models |
| 20 | + |
| 21 | +def construct_sas_url(blob, uri): |
| 22 | + """Make up blob URL with container URL""" |
| 23 | + newuri = copy.copy(uri) |
| 24 | + newuri.pathname = '{}/{}'.format(uri.path, quote(blob.name.encode('utf-8'))) |
| 25 | + return newuri.geturl() |
| 26 | + |
| 27 | + |
| 28 | +def convert_blobs_to_resource_files(blobs, resource_properties): |
| 29 | + """Convert a list of blobs to a list of ResourceFiles""" |
| 30 | + resource_files = [] |
| 31 | + if not blobs: |
| 32 | + raise ValueError('No input data found with reference {}'. |
| 33 | + format(resource_properties.source.prefix)) |
| 34 | + try: |
| 35 | + prefix = resource_properties.source.prefix |
| 36 | + except AttributeError: |
| 37 | + prefix = None |
| 38 | + if len(blobs) == 1 and blobs[0]['filePath'] == prefix: |
| 39 | + # Single file reference: filePath should be treated as file path |
| 40 | + file_path = resource_properties.file_path if resource_properties.file_path \ |
| 41 | + else blobs[0]['filePath'] |
| 42 | + resource_files.append(models.ExtendedResourceFile( |
| 43 | + http_url=blobs[0]['url'], |
| 44 | + file_path=file_path, |
| 45 | + )) |
| 46 | + else: |
| 47 | + # Multiple file reference: filePath should be treated as a directory |
| 48 | + base_file_path = '' |
| 49 | + if resource_properties.file_path: |
| 50 | + base_file_path = '{}/'.format( |
| 51 | + FileUtils.STRIP_PATH.sub('', resource_properties.file_path)) |
| 52 | + |
| 53 | + for blob in blobs: |
| 54 | + file_path = '{}{}'.format(base_file_path, blob['filePath']) |
| 55 | + resource_files.append(models.ExtendedResourceFile( |
| 56 | + http_url=blob['url'], |
| 57 | + file_path=file_path |
| 58 | + )) |
| 59 | + |
| 60 | + # Add filemode to every resourceFile |
| 61 | + if resource_properties.file_mode: |
| 62 | + for f in resource_files: |
| 63 | + f.file_mode = resource_properties.file_mode |
| 64 | + return resource_files |
| 65 | + |
| 66 | + |
| 67 | +def resolve_file_paths(local_path): |
| 68 | + """Generate list of files to upload and the relative directory""" |
| 69 | + local_path = os.path.abspath(local_path) |
| 70 | + files = [] |
| 71 | + if local_path.find('*') > -1: |
| 72 | + # Supplied path is a pattern - relative directory will be the |
| 73 | + # path up to the first wildcard |
| 74 | + ref_dir_str = local_path.split('*')[0].rstrip('/\\') |
| 75 | + if not os.path.isdir(ref_dir_str): |
| 76 | + ref_dir_str = os.path.dirname(ref_dir_str) |
| 77 | + ref_dir = pathlib.Path(ref_dir_str) |
| 78 | + pattern = local_path[len(ref_dir_str + os.pathsep):] |
| 79 | + files = [str(f) for f in ref_dir.glob(pattern) if f.is_file()] |
| 80 | + local_path = ref_dir_str |
| 81 | + else: |
| 82 | + if os.path.isdir(local_path): |
| 83 | + # Supplied path is a directory |
| 84 | + files = [os.path.join(local_path, f) for f in os.listdir(local_path) |
| 85 | + if os.path.isfile(os.path.join(local_path, f))] |
| 86 | + elif os.path.isfile(local_path): |
| 87 | + # Supplied path is a file |
| 88 | + files.append(local_path) |
| 89 | + local_path = os.path.dirname(local_path) |
| 90 | + return local_path, files |
| 91 | + |
| 92 | + |
| 93 | +def resolve_remote_paths(blob_service, file_group, remote_path): |
| 94 | + blobs = blob_service.list_blobs(get_container_name(file_group), prefix=remote_path) |
| 95 | + return list(blobs) |
| 96 | + |
| 97 | + |
| 98 | +def generate_container_name(file_group): |
| 99 | + """Generate valid container name from file group name.""" |
| 100 | + file_group = file_group.lower() |
| 101 | + # Check for any chars that aren't 'a-z', '0-9' or '-' |
| 102 | + valid_chars = r'^[a-z0-9][-a-z0-9]*$' |
| 103 | + # Replace any underscores or double-hyphens with single hyphen |
| 104 | + underscores_and_hyphens = r'[_-]+' |
| 105 | + |
| 106 | + clean_group = re.sub(underscores_and_hyphens, '-', file_group) |
| 107 | + clean_group = clean_group.rstrip('-') |
| 108 | + if not re.match(valid_chars, clean_group): |
| 109 | + raise ValueError('File group name \'{}\' contains illegal characters. ' |
| 110 | + 'File group names only support alphanumeric characters, ' |
| 111 | + 'underscores and hyphens.'.format(file_group)) |
| 112 | + |
| 113 | + if clean_group == file_group and len(file_group) <= FileUtils.MAX_GROUP_LENGTH: |
| 114 | + # If specified group name is clean, no need to add hash |
| 115 | + return file_group |
| 116 | + # If we had to transform the group name, add hash of original name |
| 117 | + hash_str = hashlib.sha1(file_group.encode()).hexdigest() |
| 118 | + new_group = '{}-{}'.format(clean_group, hash_str) |
| 119 | + if len(new_group) > FileUtils.MAX_GROUP_LENGTH: |
| 120 | + return '{}-{}'.format(clean_group[0:15], hash_str) |
| 121 | + return new_group |
| 122 | + |
| 123 | + |
| 124 | +def get_container_name(file_group): |
| 125 | + """Get valid container name from file group name with prefix.""" |
| 126 | + return '{}{}'.format(FileUtils.GROUP_PREFIX, generate_container_name(file_group)) |
| 127 | + |
| 128 | + |
| 129 | +def generate_blob_sas_token(blob, container, blob_service, permission=BlobPermissions.READ): |
| 130 | + """Generate a blob URL with SAS token.""" |
| 131 | + sas_token = blob_service.generate_blob_shared_access_signature( |
| 132 | + container, blob.name, |
| 133 | + permission=permission, |
| 134 | + start=datetime.datetime.utcnow() - datetime.timedelta(minutes=15), |
| 135 | + expiry=datetime.datetime.utcnow() + datetime.timedelta(days=FileUtils.SAS_EXPIRY_DAYS)) |
| 136 | + return blob_service.make_blob_url(container, quote(blob.name.encode('utf-8')), sas_token=sas_token) |
| 137 | + |
| 138 | + |
| 139 | +def generate_container_sas_token(container, blob_service, permission=BlobPermissions.WRITE): |
| 140 | + """Generate a container URL with SAS token.""" |
| 141 | + blob_service.create_container(container) |
| 142 | + sas_token = blob_service.generate_container_shared_access_signature( |
| 143 | + container, |
| 144 | + permission=permission, |
| 145 | + start=datetime.datetime.utcnow() - datetime.timedelta(minutes=15), |
| 146 | + expiry=datetime.datetime.utcnow() + datetime.timedelta(days=FileUtils.SAS_EXPIRY_DAYS)) |
| 147 | + url = '{}://{}/{}?{}'.format( |
| 148 | + blob_service.protocol, |
| 149 | + blob_service.primary_endpoint, |
| 150 | + container, |
| 151 | + sas_token) |
| 152 | + return url |
| 153 | + |
| 154 | + |
| 155 | +def download_blob(blob, file_group, destination, blob_service, progress_callback): |
| 156 | + """Download the specified file to the specified container""" |
| 157 | + |
| 158 | + def _wrap_callback(curr, total): |
| 159 | + if progress_callback: |
| 160 | + progress_callback(curr, total, destination) |
| 161 | + |
| 162 | + blob_service.get_blob_to_path( |
| 163 | + get_container_name(file_group), blob, destination, |
| 164 | + progress_callback=_wrap_callback) |
| 165 | + |
| 166 | + |
| 167 | +def upload_blob(source, destination, file_name, # pylint: disable=too-many-arguments |
| 168 | + blob_service, remote_path=None, flatten=None, progress_callback=None): |
| 169 | + """Upload the specified file to the specified container""" |
| 170 | + |
| 171 | + def _wrap_callback(curr, total): |
| 172 | + if progress_callback: |
| 173 | + progress_callback(curr, total, file_name) |
| 174 | + |
| 175 | + if not os.path.isfile(source): |
| 176 | + raise ValueError('Failed to locate file {}'.format(source)) |
| 177 | + |
| 178 | + statinfo = os.stat(source) |
| 179 | + if statinfo.st_size > 50000 * 4 * 1024 * 1024: |
| 180 | + raise ValueError('The local file size {} exceeds the Azure blob size limit'. |
| 181 | + format(statinfo.st_size)) |
| 182 | + if flatten: |
| 183 | + # Flatten local directory structure |
| 184 | + file_name = os.path.basename(file_name) |
| 185 | + |
| 186 | + # Create upload container with sanitized file group name |
| 187 | + container_name = get_container_name(destination) |
| 188 | + blob_service.create_container(container_name) |
| 189 | + |
| 190 | + blob_name = file_name |
| 191 | + if remote_path: |
| 192 | + # Add any specified virtual directories |
| 193 | + blob_prefix = FileUtils.STRIP_PATH.sub('', remote_path) |
| 194 | + blob_name = '{}/{}'.format(blob_prefix, FileUtils.STRIP_PATH.sub('', file_name)) |
| 195 | + blob_name = blob_name.replace('\\', '/') |
| 196 | + |
| 197 | + # We store the lastmodified timestamp in order to prevent overwriting with |
| 198 | + # out-dated or duplicate data. TODO: Investigate cleaner options for handling this. |
| 199 | + file_time = str(os.path.getmtime(source)) |
| 200 | + metadata = None |
| 201 | + try: |
| 202 | + metadata = blob_service.get_blob_metadata(container_name, blob_name) |
| 203 | + except Exception: # pylint: disable=broad-except |
| 204 | + # check notfound |
| 205 | + pass |
| 206 | + else: |
| 207 | + #TODO: Check whether the blob metadata is more recent |
| 208 | + if metadata and metadata['lastmodified']: |
| 209 | + if metadata['lastmodified'] == file_time: |
| 210 | + return |
| 211 | + |
| 212 | + # Upload block blob |
| 213 | + # TODO: Investigate compression + chunking performance enhancement proposal. |
| 214 | + blob_service.create_blob_from_path( |
| 215 | + container_name=container_name, |
| 216 | + blob_name=blob_name, |
| 217 | + file_path=source, |
| 218 | + progress_callback=_wrap_callback, |
| 219 | + metadata={'lastmodified': file_time}, |
| 220 | + # We want to validate the file as we upload, and only complete the operation |
| 221 | + # if all the data transfers successfully |
| 222 | + validate_content=True, |
| 223 | + max_connections=FileUtils.PARALLEL_OPERATION_THREAD_COUNT) |
| 224 | + |
| 225 | + |
| 226 | +def container_url_has_sas(container_url): |
| 227 | + return '?' in container_url |
| 228 | + |
| 229 | + |
| 230 | +def get_container_name_from_url(container_url): |
| 231 | + return container_url.split("/")[-1] |
| 232 | + |
| 233 | + |
| 234 | +class FileUtils(object): |
| 235 | + |
| 236 | + STRIP_PATH = re.compile(r"^[\/\\]+|[\/\\]+$") |
| 237 | + GROUP_PREFIX = 'fgrp-' |
| 238 | + MAX_GROUP_LENGTH = 63 - len(GROUP_PREFIX) |
| 239 | + MAX_FILE_SIZE = 50000 * 4 * 1024 * 1024 |
| 240 | + PARALLEL_OPERATION_THREAD_COUNT = 5 |
| 241 | + SAS_EXPIRY_DAYS = 7 # 7 days |
| 242 | + ROUND_DATE = 2 * 60 * 1000 # Round to nearest 2 minutes |
| 243 | + |
| 244 | + def __init__(self, get_storage_client): |
| 245 | + self.resource_file_cache = {} |
| 246 | + self.container_sas_cache = {} |
| 247 | + self.resolve_storage_account = get_storage_client |
| 248 | + |
| 249 | + def filter_resource_cache(self, container, prefix): |
| 250 | + """Return all blob refeferences in a container cache that meet a prefix requirement.""" |
| 251 | + filtered = [] |
| 252 | + for blob in self.resource_file_cache[container]: |
| 253 | + if not prefix: |
| 254 | + filtered.append(blob) |
| 255 | + elif blob['filePath'].startswith(prefix): |
| 256 | + filtered.append(blob) |
| 257 | + return filtered |
| 258 | + |
| 259 | + def list_container_contents(self, source, container, blob_service): |
| 260 | + """List blob references in container.""" |
| 261 | + if container not in self.resource_file_cache: |
| 262 | + self.resource_file_cache[container] = [] |
| 263 | + blobs = blob_service.list_blobs(container) |
| 264 | + for blob in blobs: |
| 265 | + if source.file_group: |
| 266 | + blob_sas = generate_blob_sas_token(blob, container, blob_service) |
| 267 | + elif source.container_url: |
| 268 | + blob_sas = construct_sas_url(blob, urlsplit(source.container_url)) |
| 269 | + elif source.url: |
| 270 | + blob_sas = source.url |
| 271 | + else: |
| 272 | + raise ValueError("FileSource has no file source.") |
| 273 | + file_name = os.path.basename(blob.name) |
| 274 | + file_name_only = os.path.splitext(file_name)[0] |
| 275 | + self.resource_file_cache[container].append( |
| 276 | + {'url': blob_sas, |
| 277 | + 'filePath': blob.name, |
| 278 | + 'fileName': file_name, |
| 279 | + 'fileNameWithoutExtension': file_name_only}) |
| 280 | + return self.filter_resource_cache(container, source.prefix) |
| 281 | + |
| 282 | + def resolve_container_sas_if_needed(self, container_url): |
| 283 | + if container_url_has_sas(container_url): |
| 284 | + return container_url |
| 285 | + # The container Url doesn't have a SAS signature, let's generate one. |
| 286 | + container_name = get_container_name_from_url(container_url) |
| 287 | + return self.get_container_sas(container_name, False) |
| 288 | + |
| 289 | + def get_container_sas(self, file_group_or_container_name, is_file_group=True): |
| 290 | + storage_client = self.resolve_storage_account() |
| 291 | + if is_file_group: |
| 292 | + container = get_container_name(file_group_or_container_name) |
| 293 | + else: |
| 294 | + container = file_group_or_container_name |
| 295 | + try: |
| 296 | + return self.container_sas_cache[container] |
| 297 | + except KeyError: |
| 298 | + self.container_sas_cache[container] = generate_container_sas_token(container, storage_client) |
| 299 | + return self.container_sas_cache[container] |
| 300 | + |
| 301 | + def get_container_list(self, source): |
| 302 | + """List blob references in container.""" |
| 303 | + if source.file_group: |
| 304 | + # Input data stored in auto-storage |
| 305 | + storage_client = self.resolve_storage_account() |
| 306 | + container = get_container_name(source.file_group) |
| 307 | + elif source.container_url: |
| 308 | + uri = urlsplit(source.container_url) |
| 309 | + if not uri.query: |
| 310 | + raise ValueError('Invalid container url.') |
| 311 | + storage_account_name = uri.netloc.split('.')[0] |
| 312 | + sas_token = uri.query |
| 313 | + storage_client = BlockBlobService(account_name=storage_account_name, |
| 314 | + sas_token=sas_token) |
| 315 | + container = uri.pathname.split('/')[1] |
| 316 | + else: |
| 317 | + raise ValueError('Unknown source.') |
| 318 | + |
| 319 | + return self.list_container_contents(source, container, storage_client) |
| 320 | + |
| 321 | + def resolve_resource_file(self, resource_file): |
| 322 | + """Convert new resourceFile reference to server-supported reference""" |
| 323 | + if resource_file.http_url: |
| 324 | + # Support original resourceFile reference |
| 325 | + if not resource_file.file_path: |
| 326 | + raise ValueError('Malformed ResourceFile: \'httpUrl\' must ' |
| 327 | + 'also have \'file_path\' attribute') |
| 328 | + return [resource_file] |
| 329 | + |
| 330 | + if resource_file.storage_container_url or resource_file.auto_storage_container_name: |
| 331 | + return [resource_file] |
| 332 | + |
| 333 | + if not hasattr(resource_file, 'source') or not resource_file.source: |
| 334 | + raise ValueError('Malformed ResourceFile: Must have either ' |
| 335 | + ' \'source\' or \'httpUrl\'') |
| 336 | + |
| 337 | + storage_client = self.resolve_storage_account() |
| 338 | + container = None |
| 339 | + blobs = [] |
| 340 | + |
| 341 | + if resource_file.source.file_group: |
| 342 | + # Input data stored in auto-storage |
| 343 | + container = get_container_name(resource_file.source.file_group) |
| 344 | + blobs = self.list_container_contents(resource_file.source, container, storage_client) |
| 345 | + return convert_blobs_to_resource_files(blobs, resource_file) |
| 346 | + if resource_file.source.container_url: |
| 347 | + # Input data storage in arbitrary container |
| 348 | + uri = urlsplit(resource_file.source.container_url) |
| 349 | + container = uri.pathname.split('/')[1] |
| 350 | + blobs = self.list_container_contents(resource_file.source, container, storage_client) |
| 351 | + return convert_blobs_to_resource_files(blobs, resource_file) |
| 352 | + if resource_file.source.url: |
| 353 | + # TODO: Input data from an arbitrary HTTP GET source |
| 354 | + raise ValueError('Not implemented') |
| 355 | + raise ValueError('Malformed ResourceFile') |
0 commit comments