Skip to content

Core

This section is for super users / developers only. These functions are subject to change as they are internal development functions. Use at your own risk.

Upload

synapseclient.core.upload.upload_functions

This module handles the various ways that a user can upload a file to Synapse.

Classes

Functions

upload_file_handle(syn, parent_entity, path, synapseStore=True, md5=None, file_size=None, mimetype=None, max_threads=None)

Uploads the file in the provided path (if necessary) to a storage location based on project settings. Returns a new FileHandle as a dict to represent the stored file.

PARAMETER DESCRIPTION
parent_entity

Entity object or id of the parent entity.

TYPE: Union[str, Mapping, Number]

path

The file path to the file being uploaded

TYPE: str

synapseStore

If False, will not upload the file, but instead create an ExternalFileHandle that references the file on the local machine. If True, will upload the file based on StorageLocation determined by the entity_parent_id

TYPE: bool DEFAULT: True

md5

The MD5 checksum for the file, if known. Otherwise if the file is a local file, it will be calculated automatically.

TYPE: str DEFAULT: None

file_size

The size the file, if known. Otherwise if the file is a local file, it will be calculated automatically.

TYPE: int DEFAULT: None

file_size

The MIME type the file, if known. Otherwise if the file is a local file, it will be calculated automatically.

TYPE: int DEFAULT: None

RETURNS DESCRIPTION

A dictionary of a new FileHandle as a dict that represents the uploaded file

Source code in synapseclient/core/upload/upload_functions.py
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
def upload_file_handle(
    syn: "Synapse",
    parent_entity: Union[str, collections.abc.Mapping, numbers.Number],
    path: str,
    synapseStore: bool = True,
    md5: str = None,
    file_size: int = None,
    mimetype: str = None,
    max_threads: int = None,
):
    """
    Uploads the file in the provided path (if necessary) to a storage location based on project settings.
    Returns a new FileHandle as a dict to represent the stored file.

    Arguments:
        parent_entity: Entity object or id of the parent entity.
        path:          The file path to the file being uploaded
        synapseStore:  If False, will not upload the file, but instead create an ExternalFileHandle that references
                       the file on the local machine.
                       If True, will upload the file based on StorageLocation determined by the entity_parent_id
        md5:           The MD5 checksum for the file, if known. Otherwise if the file is a local file, it will be
                       calculated automatically.
        file_size:     The size the file, if known. Otherwise if the file is a local file, it will be calculated
                       automatically.
        file_size:     The MIME type the file, if known. Otherwise if the file is a local file, it will be
                       calculated automatically.

    Returns:
        A dictionary of a new FileHandle as a dict that represents the uploaded file
    """
    if path is None:
        raise ValueError("path can not be None")

    # if doing a external file handle with no actual upload
    if not synapseStore:
        return create_external_file_handle(
            syn, path, mimetype=mimetype, md5=md5, file_size=file_size
        )

    # expand the path because past this point an upload is required and some upload functions require an absolute path
    expanded_upload_path = os.path.expandvars(os.path.expanduser(path))

    if md5 is None and os.path.isfile(expanded_upload_path):
        md5 = utils.md5_for_file(expanded_upload_path).hexdigest()

    entity_parent_id = id_of(parent_entity)

    # determine the upload function based on the UploadDestination
    location = syn._getDefaultUploadDestination(entity_parent_id)
    upload_destination_type = location["concreteType"]
    trace.get_current_span().set_attributes(
        {
            "synapse.parent_id": entity_parent_id,
            "synapse.upload_destination_type": upload_destination_type,
        }
    )

    if (
        sts_transfer.is_boto_sts_transfer_enabled(syn)
        and sts_transfer.is_storage_location_sts_enabled(
            syn, entity_parent_id, location
        )
        and upload_destination_type == concrete_types.EXTERNAL_S3_UPLOAD_DESTINATION
    ):
        log_upload_message(
            syn,
            "\n"
            + "#" * 50
            + "\n Uploading file to external S3 storage using boto3 \n"
            + "#" * 50
            + "\n",
        )

        return upload_synapse_sts_boto_s3(
            syn=syn,
            parent_id=entity_parent_id,
            upload_destination=location,
            local_path=expanded_upload_path,
            mimetype=mimetype,
            md5=md5,
        )

    elif upload_destination_type in (
        concrete_types.SYNAPSE_S3_UPLOAD_DESTINATION,
        concrete_types.EXTERNAL_S3_UPLOAD_DESTINATION,
        concrete_types.EXTERNAL_GCP_UPLOAD_DESTINATION,
    ):
        if upload_destination_type == concrete_types.SYNAPSE_S3_UPLOAD_DESTINATION:
            storage_str = "Synapse"
        elif upload_destination_type == concrete_types.EXTERNAL_S3_UPLOAD_DESTINATION:
            storage_str = "your external S3"
        else:
            storage_str = "your external Google Bucket"

        log_upload_message(
            syn,
            "\n"
            + "#" * 50
            + "\n Uploading file to "
            + storage_str
            + " storage \n"
            + "#" * 50
            + "\n",
        )

        return upload_synapse_s3(
            syn=syn,
            file_path=expanded_upload_path,
            storageLocationId=location["storageLocationId"],
            mimetype=mimetype,
            max_threads=max_threads,
            md5=md5,
        )
    # external file handle (sftp)
    elif upload_destination_type == concrete_types.EXTERNAL_UPLOAD_DESTINATION:
        if location["uploadType"] == "SFTP":
            log_upload_message(
                syn,
                "\n%s\n%s\nUploading to: %s\n%s\n"
                % (
                    "#" * 50,
                    location.get("banner", ""),
                    urllib_parse.urlparse(location["url"]).netloc,
                    "#" * 50,
                ),
            )
            return upload_external_file_handle_sftp(
                syn=syn,
                file_path=expanded_upload_path,
                sftp_url=location["url"],
                mimetype=mimetype,
                md5=md5,
            )
        else:
            raise NotImplementedError("Can only handle SFTP upload locations.")
    # client authenticated S3
    elif (
        upload_destination_type
        == concrete_types.EXTERNAL_OBJECT_STORE_UPLOAD_DESTINATION
    ):
        log_upload_message(
            syn,
            "\n%s\n%s\nUploading to endpoint: [%s] bucket: [%s]\n%s\n"
            % (
                "#" * 50,
                location.get("banner", ""),
                location.get("endpointUrl"),
                location.get("bucket"),
                "#" * 50,
            ),
        )
        return upload_client_auth_s3(
            syn=syn,
            file_path=expanded_upload_path,
            bucket=location["bucket"],
            endpoint_url=location["endpointUrl"],
            key_prefix=location["keyPrefixUUID"],
            storage_location_id=location["storageLocationId"],
            mimetype=mimetype,
            md5=md5,
        )
    else:  # unknown storage location
        log_upload_message(
            syn,
            "\n%s\n%s\nUNKNOWN STORAGE LOCATION. Defaulting upload to Synapse.\n%s\n"
            % ("!" * 50, location.get("banner", ""), "!" * 50),
        )
        return upload_synapse_s3(
            syn=syn,
            file_path=expanded_upload_path,
            storageLocationId=None,
            mimetype=mimetype,
            max_threads=max_threads,
            md5=md5,
        )

upload_synapse_sts_boto_s3(syn, parent_id, upload_destination, local_path, mimetype=None, md5=None)

When uploading to Synapse storage normally the back end will generate a random prefix for our uploaded object. Since in this case the client is responsible for the remote key, the client will instead generate a random prefix. this both ensures we don't have a collision with an existing S3 object and also mitigates potential performance issues, although key locality performance issues are likely resolved as of: https://aws.amazon.com/about-aws/whats-new/2018/07/amazon-s3-announces-increased-request-rate-performance/

PARAMETER DESCRIPTION
syn

The synapse client

TYPE: Synapse

parent_id

The synapse ID of the parent.

TYPE: str

upload_destination

The upload destination

local_path

The local path to the file to upload.

TYPE: str

mimetype

The mimetype is known. Defaults to None.

TYPE: str DEFAULT: None

md5

MD5 checksum for the file, if known.

TYPE: str DEFAULT: None

RETURNS DESCRIPTION

description

Source code in synapseclient/core/upload/upload_functions.py
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
def upload_synapse_sts_boto_s3(
    syn: "Synapse",
    parent_id: str,
    upload_destination,
    local_path: str,
    mimetype: str = None,
    md5: str = None,
):
    """
    When uploading to Synapse storage normally the back end will generate a random prefix
    for our uploaded object. Since in this case the client is responsible for the remote
    key, the client will instead generate a random prefix. this both ensures we don't have a collision
    with an existing S3 object and also mitigates potential performance issues, although
    key locality performance issues are likely resolved as of:
    <https://aws.amazon.com/about-aws/whats-new/2018/07/amazon-s3-announces-increased-request-rate-performance/>

    Arguments:
        syn: The synapse client
        parent_id: The synapse ID of the parent.
        upload_destination: The upload destination
        local_path: The local path to the file to upload.
        mimetype: The mimetype is known. Defaults to None.
        md5: MD5 checksum for the file, if known.

    Returns:
        _description_
    """
    key_prefix = str(uuid.uuid4())

    bucket_name = upload_destination["bucket"]
    storage_location_id = upload_destination["storageLocationId"]
    remote_file_key = "/".join(
        [upload_destination["baseKey"], key_prefix, os.path.basename(local_path)]
    )

    def upload_fn(credentials):
        return S3ClientWrapper.upload_file(
            bucket=bucket_name,
            endpoint_url=None,
            remote_file_key=remote_file_key,
            upload_file_path=local_path,
            credentials=credentials,
            transfer_config_kwargs={"max_concurrency": syn.max_threads},
        )

    sts_transfer.with_boto_sts_credentials(upload_fn, syn, parent_id, "read_write")
    return syn.create_external_s3_file_handle(
        bucket_name=bucket_name,
        s3_file_key=remote_file_key,
        file_path=local_path,
        storage_location_id=storage_location_id,
        mimetype=mimetype,
        md5=md5,
    )

Multipart Upload

synapseclient.core.upload.multipart_upload

Implements the client side of Synapse's Multipart File Upload API, which provides a robust means of uploading large files (into the 10s of GiB). End users should not need to call any of the methods under UploadAttempt directly.

Classes

UploadAttempt

Used to handle multi-threaded operations for uploading one or parts of a file.

Source code in synapseclient/core/upload/multipart_upload.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
class UploadAttempt:
    """
    Used to handle multi-threaded operations for uploading one or parts of a file.
    """

    def __init__(
        self,
        syn,
        dest_file_name,
        upload_request_payload,
        part_request_body_provider_fn,
        md5_fn,
        max_threads: int,
        force_restart: bool,
    ):
        self._syn = syn
        self._dest_file_name = dest_file_name
        self._part_size = upload_request_payload["partSizeBytes"]

        self._upload_request_payload = upload_request_payload

        self._part_request_body_provider_fn = part_request_body_provider_fn
        self._md5_fn = md5_fn

        self._max_threads = max_threads
        self._force_restart = force_restart

        self._lock = threading.Lock()
        self._aborted = False

        # populated later
        self._upload_id: str = None
        self._pre_signed_part_urls: Mapping[int, str] = None

    @classmethod
    def _get_remaining_part_numbers(cls, upload_status):
        part_numbers = []
        parts_state = upload_status["partsState"]

        # parts are 1-based
        for i, part_status in enumerate(parts_state, 1):
            if part_status == "0":
                part_numbers.append(i)

        return len(parts_state), part_numbers

    @classmethod
    def _get_thread_session(cls):
        # get a lazily initialized requests.Session from the thread.
        # we want to share a requests.Session over the course of a thread
        # to take advantage of persistent http connection. we put it on a
        # thread local rather that in the task closure since a connection can
        # be reused across separate part uploads so no reason to restrict it
        # per worker task.
        session = getattr(_thread_local, "session", None)
        if not session:
            session = _thread_local.session = requests.Session()
        return session

    def _is_copy(self):
        # is this a copy or upload request
        return (
            self._upload_request_payload.get("concreteType")
            == concrete_types.MULTIPART_UPLOAD_COPY_REQUEST
        )

    def _create_synapse_upload(self):
        return self._syn.restPOST(
            "/file/multipart?forceRestart={}".format(str(self._force_restart).lower()),
            json.dumps(self._upload_request_payload),
            endpoint=self._syn.fileHandleEndpoint,
        )

    def _fetch_pre_signed_part_urls(
        self,
        upload_id: str,
        part_numbers: List[int],
        requests_session: requests.Session = None,
    ) -> Mapping[int, str]:
        uri = "/file/multipart/{upload_id}/presigned/url/batch".format(
            upload_id=upload_id
        )
        body = {
            "uploadId": upload_id,
            "partNumbers": part_numbers,
        }

        response = self._syn.restPOST(
            uri,
            json.dumps(body),
            requests_session=requests_session,
            endpoint=self._syn.fileHandleEndpoint,
        )

        part_urls = {}
        for part in response["partPresignedUrls"]:
            part_urls[part["partNumber"]] = (
                part["uploadPresignedUrl"],
                part.get("signedHeaders", {}),
            )

        return part_urls

    def _refresh_pre_signed_part_urls(
        self,
        part_number: int,
        expired_url: str,
    ):
        """Refresh all unfetched presigned urls, and return the refreshed
        url for the given part number. If an existing expired_url is passed
        and the url for the given part has already changed that new url
        will be returned without a refresh (i.e. it is assumed that another
        thread has already refreshed the url since the passed url expired).

        Arguments:
            part_number: the part number whose refreshed url should
                         be returned
            expired_url: the url that was detected as expired triggering
                         this refresh
        Returns:
            refreshed URL

        """
        with self._lock:
            current_url = self._pre_signed_part_urls[part_number]
            if current_url != expired_url:
                # if the url has already changed since the given url
                # was detected as expired we can assume that another
                # thread already refreshed the url and can avoid the extra
                # fetch.
                refreshed_url = current_url
            else:
                self._pre_signed_part_urls = self._fetch_pre_signed_part_urls(
                    self._upload_id,
                    list(self._pre_signed_part_urls.keys()),
                )

                refreshed_url = self._pre_signed_part_urls[part_number]

            return refreshed_url

    def _handle_part(self, part_number, otel_context: typing.Union[Context, None]):
        if otel_context:
            context.attach(otel_context)
        with tracer.start_as_current_span("UploadAttempt::_handle_part"):
            trace.get_current_span().set_attributes(
                {"thread.id": threading.get_ident()}
            )
            with self._lock:
                if self._aborted:
                    # this upload attempt has already been aborted
                    # so we short circuit the attempt to upload this part
                    raise SynapseUploadAbortedException(
                        "Upload aborted, skipping part {}".format(part_number)
                    )

                part_url, signed_headers = self._pre_signed_part_urls.get(part_number)

            session = self._get_thread_session()

            # obtain the body (i.e. the upload bytes) for the given part number.
            body = (
                self._part_request_body_provider_fn(part_number)
                if self._part_request_body_provider_fn
                else None
            )
            part_size = len(body) if body else 0
            for retry in range(2):

                def put_fn():
                    with tracer.start_as_current_span("UploadAttempt::put_part"):
                        return session.put(part_url, body, headers=signed_headers)

                try:
                    # use our backoff mechanism here, we have encountered 500s on puts to AWS signed urls
                    response = with_retry(
                        put_fn, retry_exceptions=[requests.exceptions.ConnectionError]
                    )
                    _raise_for_status(response)

                    # completed upload part to s3 successfully
                    break

                except SynapseHTTPError as ex:
                    if ex.response.status_code == 403 and retry < 1:
                        # we interpret this to mean our pre_signed url expired.
                        self._syn.logger.debug(
                            "The pre-signed upload URL for part {} has expired."
                            "Refreshing urls and retrying.\n".format(part_number)
                        )

                        # we refresh all the urls and obtain this part's
                        # specific url for the retry
                        with tracer.start_as_current_span(
                            "UploadAttempt::refresh_pre_signed_part_urls"
                        ):
                            (
                                part_url,
                                signed_headers,
                            ) = self._refresh_pre_signed_part_urls(
                                part_number,
                                part_url,
                            )

                    else:
                        raise

            md5_hex = self._md5_fn(body, response)

            # now tell synapse that we uploaded that part successfully
            self._syn.restPUT(
                "/file/multipart/{upload_id}/add/{part_number}?partMD5Hex={md5}".format(
                    upload_id=self._upload_id,
                    part_number=part_number,
                    md5=md5_hex,
                ),
                requests_session=session,
                endpoint=self._syn.fileHandleEndpoint,
            )

            # remove so future batch pre_signed url fetches will exclude this part
            with self._lock:
                del self._pre_signed_part_urls[part_number]

            return part_number, part_size

    def _upload_parts(self, part_count, remaining_part_numbers):
        trace.get_current_span().set_attributes({"thread.id": threading.get_ident()})
        time_upload_started = time.time()
        completed_part_count = part_count - len(remaining_part_numbers)
        file_size = self._upload_request_payload.get("fileSizeBytes")

        if not self._is_copy():
            # we won't have bytes to measure during a copy so the byte oriented progress bar is not useful
            progress = previously_transferred = min(
                completed_part_count * self._part_size,
                file_size,
            )

            self._syn._print_transfer_progress(
                progress,
                file_size,
                prefix="Uploading",
                postfix=self._dest_file_name,
                previouslyTransferred=previously_transferred,
            )

        self._pre_signed_part_urls = self._fetch_pre_signed_part_urls(
            self._upload_id,
            remaining_part_numbers,
        )

        futures = []
        with _executor(self._max_threads, False) as executor:
            # we don't wait on the shutdown since we do so ourselves below

            for part_number in remaining_part_numbers:
                futures.append(
                    executor.submit(
                        self._handle_part,
                        part_number,
                        context.get_current(),
                    )
                )

        for result in concurrent.futures.as_completed(futures):
            try:
                _, part_size = result.result()

                if part_size and not self._is_copy():
                    progress += part_size
                    self._syn._print_transfer_progress(
                        min(progress, file_size),
                        file_size,
                        prefix="Uploading",
                        postfix=self._dest_file_name,
                        dt=time.time() - time_upload_started,
                        previouslyTransferred=previously_transferred,
                    )
            except (Exception, KeyboardInterrupt) as cause:
                with self._lock:
                    self._aborted = True

                # wait for all threads to complete before
                # raising the exception, we don't want to return
                # control while there are still threads from this
                # upload attempt running
                concurrent.futures.wait(futures)

                if isinstance(cause, KeyboardInterrupt):
                    raise SynapseUploadAbortedException("User interrupted upload")
                raise SynapseUploadFailedException("Part upload failed") from cause

    def _complete_upload(self):
        upload_status_response = self._syn.restPUT(
            "/file/multipart/{upload_id}/complete".format(
                upload_id=self._upload_id,
            ),
            requests_session=self._get_thread_session(),
            endpoint=self._syn.fileHandleEndpoint,
        )

        upload_state = upload_status_response.get("state")
        if upload_state != "COMPLETED":
            # at this point we think successfully uploaded all the parts
            # but the upload status isn't complete, we'll throw an error
            # and let a subsequent attempt try to reconcile
            raise SynapseUploadFailedException(
                "Upload status has an unexpected state {}".format(upload_state)
            )

        return upload_status_response

    def __call__(self):
        upload_status_response = self._create_synapse_upload()
        upload_state = upload_status_response.get("state")

        if upload_state != "COMPLETED":
            self._upload_id = upload_status_response["uploadId"]
            part_count, remaining_part_numbers = self._get_remaining_part_numbers(
                upload_status_response
            )

            # if no remaining part numbers then all the parts have been
            # uploaded but the upload has not been marked complete.
            if remaining_part_numbers:
                self._upload_parts(part_count, remaining_part_numbers)
            upload_status_response = self._complete_upload()

        return upload_status_response

Functions

multipart_upload_file(syn, file_path, dest_file_name=None, content_type=None, part_size=None, storage_location_id=None, preview=True, force_restart=False, max_threads=None, md5=None)

Upload a file to a Synapse upload destination in chunks.

PARAMETER DESCRIPTION
syn

a Synapse object

file_path

the file to upload

TYPE: str

dest_file_name

upload as a different filename

TYPE: str DEFAULT: None

content_type

Refers to the Content-Type of the API request.

TYPE: str DEFAULT: None

part_size

Number of bytes per part. Minimum is 5MiB (5 * 1024 * 1024 bytes).

TYPE: int DEFAULT: None

storage_location_id

an id indicating where the file should be stored. Retrieved from Synapse's UploadDestination

TYPE: str DEFAULT: None

preview

True to generate a preview

TYPE: bool DEFAULT: True

force_restart

True to restart a previously initiated upload from scratch, False to try to resume

TYPE: bool DEFAULT: False

max_threads

number of concurrent threads to devote to upload

TYPE: int DEFAULT: None

md5

The MD5 of the file. If not provided, it will be calculated.

TYPE: str DEFAULT: None

RETURNS DESCRIPTION
str

a File Handle ID

Keyword arguments are passed down to _multipart_upload().

Source code in synapseclient/core/upload/multipart_upload.py
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
def multipart_upload_file(
    syn,
    file_path: str,
    dest_file_name: str = None,
    content_type: str = None,
    part_size: int = None,
    storage_location_id: str = None,
    preview: bool = True,
    force_restart: bool = False,
    max_threads: int = None,
    md5: str = None,
) -> str:
    """Upload a file to a Synapse upload destination in chunks.

    Arguments:
        syn: a Synapse object
        file_path: the file to upload
        dest_file_name: upload as a different filename
        content_type: Refers to the Content-Type of the API request.
        part_size: Number of bytes per part. Minimum is 5MiB (5 * 1024 * 1024 bytes).
        storage_location_id: an id indicating where the file should be
                             stored. Retrieved from Synapse's UploadDestination
        preview: True to generate a preview
        force_restart: True to restart a previously initiated upload
                       from scratch, False to try to resume
        max_threads: number of concurrent threads to devote
                     to upload
        md5: The MD5 of the file. If not provided, it will be calculated.

    Returns:
        a File Handle ID

    Keyword arguments are passed down to
    [_multipart_upload()][synapseclient.core.upload.multipart_upload._multipart_upload].

    """
    trace.get_current_span().set_attributes(
        {
            "synapse.storage_location_id": (
                storage_location_id if storage_location_id is not None else ""
            )
        }
    )

    if not os.path.exists(file_path):
        raise IOError('File "{}" not found.'.format(file_path))
    if os.path.isdir(file_path):
        raise IOError('File "{}" is a directory.'.format(file_path))

    file_size = os.path.getsize(file_path)
    if not dest_file_name:
        dest_file_name = os.path.basename(file_path)

    if content_type is None:
        mime_type, _ = mimetypes.guess_type(file_path, strict=False)
        content_type = mime_type or "application/octet-stream"

    callback_func = Spinner().print_tick if not syn.silent else None
    md5_hex = md5 or md5_for_file(file_path, callback=callback_func).hexdigest()

    part_size = get_part_size(
        part_size or DEFAULT_PART_SIZE,
        file_size,
        MIN_PART_SIZE,
        MAX_NUMBER_OF_PARTS,
    )

    upload_request = {
        "concreteType": concrete_types.MULTIPART_UPLOAD_REQUEST,
        "contentType": content_type,
        "contentMD5Hex": md5_hex,
        "fileName": dest_file_name,
        "fileSizeBytes": file_size,
        "generatePreview": preview,
        "partSizeBytes": part_size,
        "storageLocationId": storage_location_id,
    }

    def part_fn(part_number):
        return get_file_chunk(file_path, part_number, part_size)

    return _multipart_upload(
        syn,
        dest_file_name,
        upload_request,
        part_fn,
        md5_fn,
        force_restart=force_restart,
        max_threads=max_threads,
    )

multipart_upload_string(syn, text, dest_file_name=None, part_size=None, content_type=None, storage_location_id=None, preview=True, force_restart=False, max_threads=None)

Upload a file to a Synapse upload destination in chunks.

PARAMETER DESCRIPTION
syn

a Synapse object

text

a string to upload as a file.

TYPE: str

dest_file_name

upload as a different filename

TYPE: str DEFAULT: None

content_type

Refers to the Content-Type of the API request.

TYPE: str DEFAULT: None

part_size

number of bytes per part. Minimum 5MB.

TYPE: int DEFAULT: None

storage_location_id

an id indicating where the file should be stored. Retrieved from Synapse's UploadDestination

TYPE: str DEFAULT: None

preview

True to generate a preview

TYPE: bool DEFAULT: True

force_restart

True to restart a previously initiated upload from scratch, False to try to resume

TYPE: bool DEFAULT: False

max_threads

number of concurrent threads to devote to upload

TYPE: int DEFAULT: None

RETURNS DESCRIPTION

a File Handle ID

Keyword arguments are passed down to _multipart_upload().

Source code in synapseclient/core/upload/multipart_upload.py
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
def multipart_upload_string(
    syn,
    text: str,
    dest_file_name: str = None,
    part_size: int = None,
    content_type: str = None,
    storage_location_id: str = None,
    preview: bool = True,
    force_restart: bool = False,
    max_threads: int = None,
):
    """Upload a file to a Synapse upload destination in chunks.

    Arguments:
        syn: a Synapse object
        text: a string to upload as a file.
        dest_file_name: upload as a different filename
        content_type: Refers to the Content-Type of the API request.
        part_size: number of bytes per part. Minimum 5MB.
        storage_location_id: an id indicating where the file should be
                             stored. Retrieved from Synapse's UploadDestination
        preview: True to generate a preview
        force_restart: True to restart a previously initiated upload
                       from scratch, False to try to resume
        max_threads: number of concurrent threads to devote
                     to upload

    Returns:
        a File Handle ID

    Keyword arguments are passed down to
    [_multipart_upload()][synapseclient.core.upload.multipart_upload._multipart_upload].

    """
    data = text.encode("utf-8")
    file_size = len(data)
    md5_hex = md5_fn(data, None)

    if not dest_file_name:
        dest_file_name = "message.txt"

    if not content_type:
        content_type = "text/plain; charset=utf-8"

    part_size = get_part_size(
        part_size or DEFAULT_PART_SIZE,
        file_size,
        MIN_PART_SIZE,
        MAX_NUMBER_OF_PARTS,
    )

    upload_request = {
        "concreteType": concrete_types.MULTIPART_UPLOAD_REQUEST,
        "contentType": content_type,
        "contentMD5Hex": md5_hex,
        "fileName": dest_file_name,
        "fileSizeBytes": file_size,
        "generatePreview": preview,
        "partSizeBytes": part_size,
        "storageLocationId": storage_location_id,
    }

    def part_fn(part_number):
        return get_data_chunk(data, part_number, part_size)

    part_size = get_part_size(
        part_size or DEFAULT_PART_SIZE,
        file_size,
        MIN_PART_SIZE,
        MAX_NUMBER_OF_PARTS,
    )
    return _multipart_upload(
        syn,
        dest_file_name,
        upload_request,
        part_fn,
        md5_fn,
        force_restart=force_restart,
        max_threads=max_threads,
    )

multipart_copy(syn, source_file_handle_association, dest_file_name=None, part_size=None, storage_location_id=None, preview=True, force_restart=False, max_threads=None)

Makes a Multipart Upload Copy Request. This request performs a copy of an existing file handle without data transfer from the client.

PARAMETER DESCRIPTION
syn

A Synapse object

source_file_handle_association

Describes an association of a FileHandle with another object.

dest_file_name

The name of the file to be uploaded.

TYPE: str DEFAULT: None

part_size

The size that each part will be (in bytes).

TYPE: int DEFAULT: None

storage_location_id

The identifier of the storage location where this file should be copied to. The user must be the owner of the storage location.

TYPE: str DEFAULT: None

preview

True to generate a preview of the data.

TYPE: bool DEFAULT: True

force_restart

True to restart a previously initiated upload from scratch, False to try to resume.

TYPE: bool DEFAULT: False

max_threads

Number of concurrent threads to devote to copy.

TYPE: int DEFAULT: None

RETURNS DESCRIPTION

a File Handle ID

Keyword arguments are passed down to _multipart_upload().

Source code in synapseclient/core/upload/multipart_upload.py
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
def multipart_copy(
    syn,
    source_file_handle_association,
    dest_file_name: str = None,
    part_size: int = None,
    storage_location_id: str = None,
    preview: bool = True,
    force_restart: bool = False,
    max_threads: int = None,
):
    """Makes a
    [Multipart Upload Copy Request](https://rest-docs.synapse.org/rest/org/sagebionetworks/repo/model/file/MultipartUploadCopyRequest.html).
    This request performs a copy of an existing file handle without data transfer from the client.

    Arguments:
        syn: A Synapse object
        source_file_handle_association: Describes an association of a FileHandle with another object.
        dest_file_name: The name of the file to be uploaded.
        part_size: The size that each part will be (in bytes).
        storage_location_id: The identifier of the storage location where this file should be copied to.
                             The user must be the owner of the storage location.
        preview: True to generate a preview of the data.
        force_restart: True to restart a previously initiated upload from scratch, False to try to resume.
        max_threads: Number of concurrent threads to devote to copy.

    Returns:
        a File Handle ID

    Keyword arguments are passed down to
    [_multipart_upload()][synapseclient.core.upload.multipart_upload._multipart_upload].

    """
    part_size = part_size or DEFAULT_PART_SIZE

    upload_request = {
        "concreteType": concrete_types.MULTIPART_UPLOAD_COPY_REQUEST,
        "fileName": dest_file_name,
        "generatePreview": preview,
        "partSizeBytes": part_size,
        "sourceFileHandleAssociation": source_file_handle_association,
        "storageLocationId": storage_location_id,
    }

    return _multipart_upload(
        syn,
        dest_file_name,
        upload_request,
        copy_part_request_body_provider_fn,
        copy_md5_fn,
        force_restart=force_restart,
        max_threads=max_threads,
    )

_multipart_upload(syn, dest_file_name, upload_request, part_fn, md5_fn, force_restart=False, max_threads=None)

Calls upon an UploadAttempt object to initiate and/or retry a multipart file upload or copy. This function is wrapped by multipart_upload_file, multipart_upload_string, and multipart_copy. Retries cannot exceed 7 retries per call.

PARAMETER DESCRIPTION
syn

A Synapse object

dest_file_name

upload as a different filename

upload_request

A dictionary object with the user-fed logistical details of the upload/copy request.

part_fn

Function to calculate the partSize of each part

md5_fn

Function to calculate the MD5 of the file-like object

max_threads

number of concurrent threads to devote to upload.

TYPE: int DEFAULT: None

RETURNS DESCRIPTION

A File Handle ID

Source code in synapseclient/core/upload/multipart_upload.py
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
def _multipart_upload(
    syn,
    dest_file_name,
    upload_request,
    part_fn,
    md5_fn,
    force_restart: bool = False,
    max_threads: int = None,
):
    """Calls upon an [UploadAttempt][synapseclient.core.upload.multipart_upload.UploadAttempt]
    object to initiate and/or retry a multipart file upload or copy. This function is wrapped by
    [multipart_upload_file][synapseclient.core.upload.multipart_upload.multipart_upload_file],
    [multipart_upload_string][synapseclient.core.upload.multipart_upload.multipart_upload_string], and
    [multipart_copy][synapseclient.core.upload.multipart_upload.multipart_copy].
    Retries cannot exceed 7 retries per call.

    Arguments:
        syn: A Synapse object
        dest_file_name: upload as a different filename
        upload_request: A dictionary object with the user-fed logistical
                        details of the upload/copy request.
        part_fn: Function to calculate the partSize of each part
        md5_fn: Function to calculate the MD5 of the file-like object
        max_threads: number of concurrent threads to devote to upload.

    Returns:
        A File Handle ID

    """
    if max_threads is None:
        max_threads = pool_provider.DEFAULT_NUM_THREADS

    max_threads = max(max_threads, 1)

    retry = 0
    while True:
        try:
            upload_status_response = UploadAttempt(
                syn,
                dest_file_name,
                upload_request,
                part_fn,
                md5_fn,
                max_threads,
                # only force_restart the first time through (if requested).
                # a retry after a caught exception will not restart the upload
                # from scratch.
                force_restart and retry == 0,
            )()

            # success
            return upload_status_response["resultFileHandleId"]

        except SynapseUploadFailedException:
            if retry < MAX_RETRIES:
                retry += 1
            else:
                raise

Upload Async

synapseclient.core.upload.upload_functions_async

This module handles the various ways that a user can upload a file to Synapse.

Classes

Functions

upload_file_handle(syn, parent_entity_id, path, synapse_store=True, md5=None, file_size=None, mimetype=None) async

Uploads the file in the provided path (if necessary) to a storage location based on project settings. Returns a new FileHandle as a dict to represent the stored file.

PARAMETER DESCRIPTION
syn

The synapse client

TYPE: Synapse

parent_entity_id

The ID of the parent entity that the file will be attached to.

TYPE: str

path

The file path to the file being uploaded

TYPE: str

synapse_store

If False, will not upload the file, but instead create an ExternalFileHandle that references the file on the local machine. If True, will upload the file based on StorageLocation determined by the parent_entity_id.

TYPE: bool DEFAULT: True

md5

The MD5 checksum for the file, if known. Otherwise if the file is a local file, it will be calculated automatically.

TYPE: str DEFAULT: None

file_size

The size the file, if known. Otherwise if the file is a local file, it will be calculated automatically.

TYPE: int DEFAULT: None

mimetype

The MIME type the file, if known. Otherwise if the file is a local file, it will be calculated automatically.

TYPE: str DEFAULT: None

RETURNS DESCRIPTION
Dict[str, Union[str, int]]

A dictionary of a new FileHandle as a dict that represents the uploaded file

Source code in synapseclient/core/upload/upload_functions_async.py
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
async def upload_file_handle(
    syn: "Synapse",
    parent_entity_id: str,
    path: str,
    synapse_store: bool = True,
    md5: str = None,
    file_size: int = None,
    mimetype: str = None,
) -> Dict[str, Union[str, int]]:
    """
    Uploads the file in the provided path (if necessary) to a storage location based
    on project settings. Returns a new FileHandle as a dict to represent the
    stored file.

    Arguments:
        syn: The synapse client
        parent_entity_id: The ID of the parent entity that the file will be attached to.
        path: The file path to the file being uploaded
        synapse_store: If False, will not upload the file, but instead create an
            ExternalFileHandle that references the file on the local machine. If True,
            will upload the file based on StorageLocation determined by the
            parent_entity_id.
        md5: The MD5 checksum for the file, if known. Otherwise if the file is a
            local file, it will be calculated automatically.
        file_size: The size the file, if known. Otherwise if the file is a local file,
            it will be calculated automatically.
        mimetype: The MIME type the file, if known. Otherwise if the file is a local
            file, it will be calculated automatically.

    Returns:
        A dictionary of a new FileHandle as a dict that represents the uploaded file
    """
    if path is None:
        raise ValueError("path can not be None")

    # if doing a external file handle with no actual upload
    if not synapse_store:
        return await create_external_file_handle(
            syn, path, mimetype=mimetype, md5=md5, file_size=file_size
        )

    # expand the path because past this point an upload is required and some upload functions require an absolute path
    expanded_upload_path = os.path.expandvars(os.path.expanduser(path))

    if md5 is None and os.path.isfile(expanded_upload_path):
        md5 = utils.md5_for_file_hex(filename=expanded_upload_path)

    entity_parent_id = id_of(parent_entity_id)

    # determine the upload function based on the UploadDestination
    location = await get_upload_destination(
        entity_id=entity_parent_id, synapse_client=syn
    )
    upload_destination_type = location.get("concreteType", None) if location else None
    trace.get_current_span().set_attributes(
        {
            "synapse.parent_id": entity_parent_id,
            "synapse.upload_destination_type": upload_destination_type,
        }
    )

    if (
        sts_transfer.is_boto_sts_transfer_enabled(syn)
        and await sts_transfer.is_storage_location_sts_enabled_async(
            syn, entity_parent_id, location
        )
        and upload_destination_type == concrete_types.EXTERNAL_S3_UPLOAD_DESTINATION
    ):
        return await upload_synapse_sts_boto_s3(
            syn=syn,
            parent_id=entity_parent_id,
            upload_destination=location,
            local_path=expanded_upload_path,
            mimetype=mimetype,
            md5=md5,
            storage_str="Uploading file to external S3 storage using boto3",
        )

    elif upload_destination_type in (
        concrete_types.SYNAPSE_S3_UPLOAD_DESTINATION,
        concrete_types.EXTERNAL_S3_UPLOAD_DESTINATION,
        concrete_types.EXTERNAL_GCP_UPLOAD_DESTINATION,
    ):
        if upload_destination_type == concrete_types.SYNAPSE_S3_UPLOAD_DESTINATION:
            storage_str = "Uploading to Synapse storage"
        elif upload_destination_type == concrete_types.EXTERNAL_S3_UPLOAD_DESTINATION:
            storage_str = "Uploading to your external S3 storage"
        else:
            storage_str = "Uploading to your external Google Bucket storage"

        return await upload_synapse_s3(
            syn=syn,
            file_path=expanded_upload_path,
            storage_location_id=location["storageLocationId"],
            mimetype=mimetype,
            md5=md5,
            storage_str=storage_str,
        )
    # external file handle (sftp)
    elif upload_destination_type == concrete_types.EXTERNAL_UPLOAD_DESTINATION:
        if location["uploadType"] == "SFTP":
            storage_str = (
                f"Uploading to: {urllib_parse.urlparse(location['url']).netloc}"
            )
            banner = location.get("banner", None)
            if banner:
                syn.logger.info(banner)
            return await upload_external_file_handle_sftp(
                syn=syn,
                file_path=expanded_upload_path,
                sftp_url=location["url"],
                mimetype=mimetype,
                md5=md5,
                storage_str=storage_str,
            )
        else:
            raise NotImplementedError("Can only handle SFTP upload locations.")
    # client authenticated S3
    elif (
        upload_destination_type
        == concrete_types.EXTERNAL_OBJECT_STORE_UPLOAD_DESTINATION
    ):
        storage_str = f"Uploading to endpoint: [{location.get('endpointUrl')}] bucket: [{location.get('bucket')}]"
        banner = location.get("banner", None)
        if banner:
            syn.logger.info(banner)
        return await upload_client_auth_s3(
            syn=syn,
            file_path=expanded_upload_path,
            bucket=location["bucket"],
            endpoint_url=location["endpointUrl"],
            key_prefix=location["keyPrefixUUID"],
            storage_location_id=location["storageLocationId"],
            mimetype=mimetype,
            md5=md5,
            storage_str=storage_str,
        )
    else:  # unknown storage location
        return await upload_synapse_s3(
            syn=syn,
            file_path=expanded_upload_path,
            storage_location_id=None,
            mimetype=mimetype,
            md5=md5,
            storage_str="Uploading to Synapse storage",
        )

create_external_file_handle(syn, path, mimetype=None, md5=None, file_size=None) async

Create a file handle in Synapse without uploading any files. This is used in cases where one wishes to store a reference to a file that is not in Synapse.

Source code in synapseclient/core/upload/upload_functions_async.py
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
async def create_external_file_handle(
    syn: "Synapse",
    path: str,
    mimetype: str = None,
    md5: str = None,
    file_size: int = None,
) -> Dict[str, Union[str, int]]:
    """Create a file handle in Synapse without uploading any files. This is used in
    cases where one wishes to store a reference to a file that is not in Synapse."""
    is_local_file = False  # defaults to false
    url = as_url(os.path.expandvars(os.path.expanduser(path)))
    if is_url(url):
        parsed_url = urllib_parse.urlparse(url)
        parsed_path = file_url_to_path(url)
        if parsed_url.scheme == "file" and os.path.isfile(parsed_path):
            actual_md5 = utils.md5_for_file_hex(filename=parsed_path)
            if md5 is not None and md5 != actual_md5:
                raise SynapseMd5MismatchError(
                    f"The specified md5 [{md5}] does not match the calculated md5 "
                    f"[{actual_md5}] for local file [{parsed_path}]",
                )
            md5 = actual_md5
            file_size = os.stat(parsed_path).st_size
            is_local_file = True
    else:
        raise ValueError(f"externalUrl [{url}] is not a valid url")

    # just creates the file handle because there is nothing to upload
    file_handle = await post_external_filehandle(
        external_url=url, mimetype=mimetype, md5=md5, file_size=file_size
    )
    if is_local_file:
        syn.cache.add(
            file_handle_id=file_handle["id"], path=file_url_to_path(url), md5=md5
        )
    trace.get_current_span().set_attributes(
        {"synapse.file_handle_id": file_handle["id"]}
    )
    return file_handle

upload_external_file_handle_sftp(syn, file_path, sftp_url, mimetype=None, md5=None, storage_str=None) async

Upload a file to an SFTP server and create a file handle in Synapse.

Source code in synapseclient/core/upload/upload_functions_async.py
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
async def upload_external_file_handle_sftp(
    syn: "Synapse",
    file_path: str,
    sftp_url: str,
    mimetype: str = None,
    md5: str = None,
    storage_str: str = None,
) -> Dict[str, Union[str, int]]:
    """Upload a file to an SFTP server and create a file handle in Synapse."""
    username, password = syn._getUserCredentials(url=sftp_url)
    uploaded_url = SFTPWrapper.upload_file(
        file_path,
        urllib_parse.unquote(sftp_url),
        username,
        password,
        storage_str=storage_str,
    )

    file_md5 = md5 or utils.md5_for_file_hex(filename=file_path)
    file_handle = await post_external_filehandle(
        external_url=uploaded_url,
        mimetype=mimetype,
        md5=file_md5,
        file_size=os.stat(file_path).st_size,
    )
    syn.cache.add(file_handle_id=file_handle["id"], path=file_path, md5=file_md5)
    return file_handle

upload_synapse_s3(syn, file_path, storage_location_id=None, mimetype=None, force_restart=False, md5=None, storage_str=None) async

Upload a file to Synapse storage and create a file handle in Synapse.

Argments

syn: The synapse client file_path: The path to the file to upload. storage_location_id: The storage location ID. mimetype: The mimetype of the file. force_restart: If True, will force the upload to restart. md5: The MD5 checksum for the file. storage_str: The storage string.

RETURNS DESCRIPTION
Dict[str, Union[str, int]]

A dictionary of the file handle.

Source code in synapseclient/core/upload/upload_functions_async.py
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
async def upload_synapse_s3(
    syn: "Synapse",
    file_path: str,
    storage_location_id: Optional[int] = None,
    mimetype: str = None,
    force_restart: bool = False,
    md5: str = None,
    storage_str: str = None,
) -> Dict[str, Union[str, int]]:
    """Upload a file to Synapse storage and create a file handle in Synapse.

    Argments:
        syn: The synapse client
        file_path: The path to the file to upload.
        storage_location_id: The storage location ID.
        mimetype: The mimetype of the file.
        force_restart: If True, will force the upload to restart.
        md5: The MD5 checksum for the file.
        storage_str: The storage string.

    Returns:
        A dictionary of the file handle.
    """
    file_handle_id = await multipart_upload_file_async(
        syn=syn,
        file_path=file_path,
        content_type=mimetype,
        storage_location_id=storage_location_id,
        md5=md5,
        force_restart=force_restart,
        storage_str=storage_str,
    )
    syn.cache.add(file_handle_id=file_handle_id, path=file_path, md5=md5)

    return await get_file_handle(file_handle_id=file_handle_id, synapse_client=syn)

upload_synapse_sts_boto_s3(syn, parent_id, upload_destination, local_path, mimetype=None, md5=None, storage_str=None) async

When uploading to Synapse storage normally the back end will generate a random prefix for our uploaded object. Since in this case the client is responsible for the remote key, the client will instead generate a random prefix. this both ensures we don't have a collision with an existing S3 object and also mitigates potential performance issues, although key locality performance issues are likely resolved as of: https://aws.amazon.com/about-aws/whats-new/2018/07/amazon-s3-announces-increased-request-rate-performance/

PARAMETER DESCRIPTION
syn

The synapse client

TYPE: Synapse

parent_id

The synapse ID of the parent.

TYPE: str

upload_destination

The upload destination

TYPE: Dict[str, Union[str, int]]

local_path

The local path to the file to upload.

TYPE: str

mimetype

The mimetype is known. Defaults to None.

TYPE: str DEFAULT: None

md5

MD5 checksum for the file, if known.

TYPE: str DEFAULT: None

RETURNS DESCRIPTION
Dict[str, Union[str, int, bool]]

description

Source code in synapseclient/core/upload/upload_functions_async.py
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
async def upload_synapse_sts_boto_s3(
    syn: "Synapse",
    parent_id: str,
    upload_destination: Dict[str, Union[str, int]],
    local_path: str,
    mimetype: str = None,
    md5: str = None,
    storage_str: str = None,
) -> Dict[str, Union[str, int, bool]]:
    """
    When uploading to Synapse storage normally the back end will generate a random prefix
    for our uploaded object. Since in this case the client is responsible for the remote
    key, the client will instead generate a random prefix. this both ensures we don't have a collision
    with an existing S3 object and also mitigates potential performance issues, although
    key locality performance issues are likely resolved as of:
    <https://aws.amazon.com/about-aws/whats-new/2018/07/amazon-s3-announces-increased-request-rate-performance/>

    Arguments:
        syn: The synapse client
        parent_id: The synapse ID of the parent.
        upload_destination: The upload destination
        local_path: The local path to the file to upload.
        mimetype: The mimetype is known. Defaults to None.
        md5: MD5 checksum for the file, if known.

    Returns:
        _description_
    """
    key_prefix = str(uuid.uuid4())

    bucket_name = upload_destination["bucket"]
    storage_location_id = upload_destination["storageLocationId"]
    remote_file_key = "/".join(
        [
            upload_destination.get("baseKey", ""),
            key_prefix,
            os.path.basename(local_path),
        ]
    )

    def upload_fn(credentials: Dict[str, str]) -> str:
        """Wrapper for the upload function."""
        return S3ClientWrapper.upload_file(
            bucket=bucket_name,
            endpoint_url=None,
            remote_file_key=remote_file_key,
            upload_file_path=local_path,
            credentials=credentials,
            transfer_config_kwargs={"max_concurrency": syn.max_threads},
            storage_str=storage_str,
        )

    loop = asyncio.get_event_loop()

    await loop.run_in_executor(
        syn._get_thread_pool_executor(asyncio_event_loop=loop),
        lambda: sts_transfer.with_boto_sts_credentials(
            upload_fn, syn, parent_id, "read_write"
        ),
    )

    return await post_external_s3_file_handle(
        bucket_name=bucket_name,
        s3_file_key=remote_file_key,
        file_path=local_path,
        storage_location_id=storage_location_id,
        mimetype=mimetype,
        md5=md5,
        synapse_client=syn,
    )

upload_client_auth_s3(syn, file_path, bucket, endpoint_url, key_prefix, storage_location_id, mimetype=None, md5=None, storage_str=None) async

Use the S3 client to upload a file to an S3 bucket.

Source code in synapseclient/core/upload/upload_functions_async.py
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
async def upload_client_auth_s3(
    syn: "Synapse",
    file_path: str,
    bucket: str,
    endpoint_url: str,
    key_prefix: str,
    storage_location_id: int,
    mimetype: str = None,
    md5: str = None,
    storage_str: str = None,
) -> Dict[str, Union[str, int]]:
    """Use the S3 client to upload a file to an S3 bucket."""
    profile = get_client_authenticated_s3_profile(
        endpoint=endpoint_url, bucket=bucket, config_path=syn.configPath
    )
    file_key = key_prefix + "/" + os.path.basename(file_path)
    loop = asyncio.get_event_loop()

    await loop.run_in_executor(
        syn._get_thread_pool_executor(asyncio_event_loop=loop),
        lambda: S3ClientWrapper.upload_file(
            bucket=bucket,
            endpoint_url=endpoint_url,
            remote_file_key=file_key,
            upload_file_path=file_path,
            profile_name=profile,
            credentials=_get_aws_credentials(),
            storage_str=storage_str,
        ),
    )

    file_handle = await post_external_object_store_filehandle(
        s3_file_key=file_key,
        file_path=file_path,
        storage_location_id=storage_location_id,
        mimetype=mimetype,
        md5=md5,
        synapse_client=syn,
    )
    syn.cache.add(file_handle_id=file_handle["id"], path=file_path, md5=md5)

    return file_handle

Multipart Upload Async

synapseclient.core.upload.multipart_upload_async

Implements the client side of Synapse's Multipart File Upload API, which provides a robust means of uploading large files (into the 10s of GiB). End users should not need to call any of the methods under UploadAttempt directly.

This mermaid flowchart illustrates the process of uploading a file to Synapse using the multipart upload API.

flowchart  TD
    upload_file_handle --> before-upload
    subgraph before-upload
        subgraph Disk I/O & CPU
            subgraph Multi-Processing
                md5["Calculate MD5"]
            end
            mime["Guess mime type"]
            file_size["Get file size"]
            file_name["Get file name"]
        end

        subgraph HTTP
            upload_destination["Find where to Upload 
 GET /entity/{entity_id}/uploadDestination"]
            start_upload["Start upload with Synapse 
 POST /file/multipart"]
            presigned_urls["Get URLs to upload to 
 POST /file/multipart/{upload_id}/presigned/url/batch"]
        end
    end

    before-upload --> during-upload

    subgraph during-upload
        subgraph multi-threaded["multi-threaded for each part"]
            read_part["Read part to upload into Memory"]
            read_part --> put_part["HTTP PUT to storage provider"]

            subgraph thread_locked1["Lock thread"]
                refresh_check{"New URl available?"}
                refresh_check --> |no|refresh
                refresh["Refresh remaining URLs to upload to 
 POST /file/multipart/{upload_id}/presigned/url/batch"]
            end


            put_part --> |URL Expired|refresh_check
            refresh_check --> |yes|put_part
            refresh --> put_part
            put_part --> |Finished|md5_part["Calculate MD5 of part"]
        end
        complete_part["PUT /file/multipart/{upload_id}/add/{part_number}?partMD5Hex={md5_hex}"]
        multi-threaded -->|Upload finished| complete_part
    end

    during-upload --> post-upload

    subgraph post-upload
        post_upload_compelete["PUT /file/multipart/{upload_id}/complete"]
        get_file_handle["GET /fileHandle/{file_handle_id}"]
    end

    post-upload --> entity["Create/Update Synapse entity"]

Classes

HandlePartResult dataclass

Result of a part upload.

ATTRIBUTE DESCRIPTION
part_number

The part number that was uploaded.

TYPE: int

part_size

The size of the part that was uploaded.

TYPE: int

md5_hex

The MD5 hash of the part that was uploaded.

TYPE: str

Source code in synapseclient/core/upload/multipart_upload_async.py
146
147
148
149
150
151
152
153
154
155
156
157
158
@dataclass
class HandlePartResult:
    """Result of a part upload.

    Attributes:
        part_number: The part number that was uploaded.
        part_size: The size of the part that was uploaded.
        md5_hex: The MD5 hash of the part that was uploaded.
    """

    part_number: int
    part_size: int
    md5_hex: str

UploadAttemptAsync

Used to handle multi-threaded operations for uploading one or parts of a file.

Source code in synapseclient/core/upload/multipart_upload_async.py
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
class UploadAttemptAsync:
    """
    Used to handle multi-threaded operations for uploading one or parts of a file.
    """

    def __init__(
        self,
        syn: "Synapse",
        dest_file_name: str,
        upload_request_payload: Dict[str, Any],
        part_request_body_provider_fn: Union[None, Callable[[int], bytes]],
        md5_fn: Callable[[bytes, httpx.Response], str],
        force_restart: bool,
        storage_str: str = None,
    ) -> None:
        self._syn = syn
        self._dest_file_name = dest_file_name
        self._part_size = upload_request_payload["partSizeBytes"]

        self._upload_request_payload = upload_request_payload

        self._part_request_body_provider_fn = part_request_body_provider_fn
        self._md5_fn = md5_fn

        self._force_restart = force_restart

        self._lock = asyncio.Lock()
        self._thread_lock = threading.Lock()
        self._aborted = False
        self._storage_str = storage_str

        self._close_progress_bar = getattr(_thread_local, "progress_bar", None) is None
        # populated later
        self._upload_id: Optional[str] = None
        self._pre_signed_part_urls: Optional[Mapping[int, str]] = None
        self._progress_bar = None

    async def __call__(self) -> Dict[str, str]:
        """Orchestrate the upload of a file to Synapse."""
        upload_status_response = await post_file_multipart(
            upload_request_payload=self._upload_request_payload,
            force_restart=self._force_restart,
            endpoint=self._syn.fileHandleEndpoint,
            synapse_client=self._syn,
        )
        upload_state = upload_status_response.get("state")

        if upload_state != "COMPLETED":
            self._upload_id = upload_status_response["uploadId"]
            part_count, remaining_part_numbers = self._get_remaining_part_numbers(
                upload_status_response
            )

            # if no remaining part numbers then all the parts have been
            # uploaded but the upload has not been marked complete.
            if remaining_part_numbers:
                await self._upload_parts(part_count, remaining_part_numbers)
            upload_status_response = await self._complete_upload()

        return upload_status_response

    @classmethod
    def _get_remaining_part_numbers(
        cls, upload_status: Dict[str, str]
    ) -> Tuple[int, List[int]]:
        part_numbers = []
        parts_state = upload_status["partsState"]

        # parts are 1-based
        for i, part_status in enumerate(parts_state, 1):
            if part_status == "0":
                part_numbers.append(i)

        return len(parts_state), part_numbers

    def _is_copy(self) -> bool:
        # is this a copy or upload request
        return (
            self._upload_request_payload.get("concreteType")
            == concrete_types.MULTIPART_UPLOAD_COPY_REQUEST
        )

    async def _fetch_pre_signed_part_urls_async(
        self,
        upload_id: str,
        part_numbers: List[int],
    ) -> Mapping[int, str]:
        trace.get_current_span().set_attributes({"synapse.upload_id": upload_id})
        response = await post_file_multipart_presigned_urls(
            upload_id=upload_id,
            part_numbers=part_numbers,
            synapse_client=self._syn,
        )

        part_urls = {}
        for part in response["partPresignedUrls"]:
            part_urls[part["partNumber"]] = (
                part["uploadPresignedUrl"],
                part.get("signedHeaders", {}),
            )

        return part_urls

    def _refresh_pre_signed_part_urls(
        self,
        part_number: int,
        expired_url: str,
    ) -> Tuple[str, Dict[str, str]]:
        """Refresh all unfetched presigned urls, and return the refreshed
        url for the given part number. If an existing expired_url is passed
        and the url for the given part has already changed that new url
        will be returned without a refresh (i.e. it is assumed that another
        thread has already refreshed the url since the passed url expired).

        Arguments:
            part_number: the part number whose refreshed url should
                         be returned
            expired_url: the url that was detected as expired triggering
                         this refresh
        Returns:
            refreshed URL

        """
        with self._thread_lock:
            current_url, headers = self._pre_signed_part_urls[part_number]
            if current_url != expired_url:
                # if the url has already changed since the given url
                # was detected as expired we can assume that another
                # thread already refreshed the url and can avoid the extra
                # fetch.
                refreshed_url = current_url, headers
            else:
                self._pre_signed_part_urls = wrap_async_to_sync(
                    self._fetch_pre_signed_part_urls_async(
                        self._upload_id,
                        list(self._pre_signed_part_urls.keys()),
                    ),
                    syn=self._syn,
                )

                refreshed_url = self._pre_signed_part_urls[part_number]

        return refreshed_url

    async def _handle_part_wrapper(self, part_number: int) -> HandlePartResult:
        loop = asyncio.get_running_loop()
        otel_context = context.get_current()

        mem_info = psutil.virtual_memory()

        if mem_info.available <= self._part_size * 2:
            gc.collect()

        return await loop.run_in_executor(
            self._syn._get_thread_pool_executor(asyncio_event_loop=loop),
            self._handle_part,
            part_number,
            otel_context,
        )

    async def _upload_parts(
        self, part_count: int, remaining_part_numbers: List[int]
    ) -> None:
        """Take a list of part numbers and upload them to the pre-signed URLs.

        Arguments:
            part_count: The total number of parts in the upload.
            remaining_part_numbers: The parts that still need to be uploaded.
        """
        completed_part_count = part_count - len(remaining_part_numbers)
        file_size = self._upload_request_payload.get("fileSizeBytes")

        self._pre_signed_part_urls = await self._fetch_pre_signed_part_urls_async(
            upload_id=self._upload_id,
            part_numbers=remaining_part_numbers,
        )

        async_tasks = []

        for part_number in remaining_part_numbers:
            async_tasks.append(
                asyncio.create_task(self._handle_part_wrapper(part_number=part_number))
            )

        if not self._syn.silent and not self._progress_bar:
            if self._is_copy():
                # we won't have bytes to measure during a copy so the byte oriented
                # progress bar is not useful
                self._progress_bar = getattr(
                    _thread_local, "progress_bar", None
                ) or tqdm(
                    total=part_count,
                    desc=self._storage_str or "Copying",
                    unit_scale=True,
                    postfix=self._dest_file_name,
                    smoothing=0,
                )
                self._progress_bar.update(completed_part_count)
            else:
                previously_transferred = min(
                    completed_part_count * self._part_size,
                    file_size,
                )

                self._progress_bar = getattr(
                    _thread_local, "progress_bar", None
                ) or tqdm(
                    total=file_size,
                    desc=self._storage_str or "Uploading",
                    unit="B",
                    unit_scale=True,
                    postfix=self._dest_file_name,
                    smoothing=0,
                )
                self._progress_bar.update(previously_transferred)

        raised_exception = await self._orchestrate_upload_part_tasks(async_tasks)

        if raised_exception is not None:
            if isinstance(raised_exception, KeyboardInterrupt):
                raise SynapseUploadAbortedException(
                    "User interrupted upload"
                ) from raised_exception
            raise SynapseUploadFailedException(
                "Part upload failed"
            ) from raised_exception

    def _update_progress_bar(self, part_size: int) -> None:
        """Update the progress bar with the given part size."""
        if self._syn.silent or not self._progress_bar:
            return
        self._progress_bar.update(1 if self._is_copy() else part_size)

    async def _orchestrate_upload_part_tasks(
        self, async_tasks: List[asyncio.Task]
    ) -> Union[Exception, KeyboardInterrupt, None]:
        """
        Orchestrate the result of the upload part tasks. If successful, send a
        request to the server to add the part to the upload.

        Arguments:
            async_tasks: A set of tasks to orchestrate.

        Returns:
            An exception if one was raised, otherwise None.
        """
        raised_exception = None

        while async_tasks:
            done_tasks, pending_tasks = await asyncio.wait(
                async_tasks, return_when=asyncio.FIRST_COMPLETED
            )
            async_tasks = pending_tasks
            for completed_task in done_tasks:
                try:
                    task_result = completed_task.result()

                    if isinstance(task_result, HandlePartResult):
                        part_number = task_result.part_number
                        part_size = task_result.part_size
                        part_md5_hex = task_result.md5_hex
                    elif (
                        isinstance(task_result, AddPartResponse)
                        and task_result.add_part_state != "ADD_SUCCESS"
                    ):
                        # Restart the file upload process resuming where this left off.
                        # Rest docs state:
                        # "If add part fails for any reason, the client must re-upload
                        # the part and then re-attempt to add the part to the upload."
                        raise SynapseUploadFailedException(
                            (
                                "Adding individual part failed with unexpected state: "
                                f"{task_result.add_part_state}, for upload "
                                f"{task_result.upload_id} and part "
                                f"{task_result.part_number} with message: "
                                f"{task_result.error_message}"
                            )
                        )
                    else:
                        continue

                    async_tasks.add(
                        asyncio.create_task(
                            put_file_multipart_add(
                                upload_id=self._upload_id,
                                part_number=part_number,
                                md5_hex=part_md5_hex,
                                synapse_client=self._syn,
                            )
                        )
                    )

                    self._update_progress_bar(part_size=part_size)

                except (Exception, KeyboardInterrupt) as cause:
                    with self._thread_lock:
                        if self._aborted:
                            # we've already aborted, no need to raise
                            # another exception
                            continue
                        self._aborted = True
                    raised_exception = cause
                    continue
        return raised_exception

    async def _complete_upload(self) -> Dict[str, str]:
        """Close the upload and mark it as complete.

        Returns:
            The response from the server for the completed upload.
        """
        if not self._syn.silent and self._progress_bar and self._close_progress_bar:
            self._progress_bar.close()
        upload_status_response = await put_file_multipart_complete(
            upload_id=self._upload_id,
            endpoint=self._syn.fileHandleEndpoint,
            synapse_client=self._syn,
        )

        upload_state = upload_status_response.get("state")
        if upload_state != "COMPLETED":
            # at this point we think successfully uploaded all the parts
            # but the upload status isn't complete, we'll throw an error
            # and let a subsequent attempt try to reconcile
            raise SynapseUploadFailedException(
                f"Upload status has an unexpected state {upload_state}"
            )

        return upload_status_response

    def _handle_part(
        self, part_number: int, otel_context: Union[Context, None]
    ) -> HandlePartResult:
        """Take an individual part number and upload it to the pre-signed URL.

        Arguments:
            part_number: The part number to upload.
            otel_context: The OpenTelemetry context to use for tracing.

        Returns:
            The result of the part upload.

        Raises:
            SynapseUploadAbortedException: If the upload has been aborted.
            ValueError: If the part body is None.
        """
        if otel_context:
            context.attach(otel_context)
        with self._thread_lock:
            if self._aborted:
                # this upload attempt has already been aborted
                # so we short circuit the attempt to upload this part
                raise SynapseUploadAbortedException(
                    f"Upload aborted, skipping part {part_number}"
                )

            part_url, signed_headers = self._pre_signed_part_urls.get(part_number)

        session: httpx.Client = self._syn._requests_session_storage

        # obtain the body (i.e. the upload bytes) for the given part number.
        body = (
            self._part_request_body_provider_fn(part_number)
            if self._part_request_body_provider_fn
            else None
        )
        part_size = len(body) if body else 0
        self._syn.logger.debug(f"Uploading part {part_number} of size {part_size}")
        if not self._is_copy() and body is None:
            raise ValueError(f"No body for part {part_number}")

        response = self._put_part_with_retry(
            session=session,
            body=body,
            part_url=part_url,
            signed_headers=signed_headers,
            part_number=part_number,
        )

        md5_hex = self._md5_fn(body, response)
        del response
        del body

        # # remove so future batch pre_signed url fetches will exclude this part
        with self._thread_lock:
            del self._pre_signed_part_urls[part_number]

        return HandlePartResult(part_number, part_size, md5_hex)

    def _put_part_with_retry(
        self,
        session: httpx.Client,
        body: bytes,
        part_url: str,
        signed_headers: Dict[str, str],
        part_number: int,
    ) -> Union[httpx.Response, None]:
        """Put a part to the storage provider with retries.

        Arguments:
            session: The requests session to use for the put.
            body: The body of the part to put.
            part_url: The URL to put the part to.
            signed_headers: The signed headers to use for the put.
            part_number: The part number being put.

        Returns:
            The response from the put.

        Raises:
            SynapseHTTPError: If the put fails.
        """
        response = None
        for retry in range(2):
            try:
                # use our backoff mechanism here, we have encountered 500s on puts to AWS signed urls

                response = with_retry_time_based(
                    lambda part_url=part_url, signed_headers=signed_headers: session.put(
                        url=part_url,
                        content=body,  # noqa: F821
                        headers=signed_headers,
                    ),
                    retry_exceptions=[requests.exceptions.ConnectionError],
                )

                _raise_for_status_httpx(response=response, logger=self._syn.logger)

                # completed upload part to s3 successfully
                break

            except SynapseHTTPError as ex:
                if ex.response.status_code == 403 and retry < 1:
                    # we interpret this to mean our pre_signed url expired.
                    self._syn.logger.debug(
                        f"The pre-signed upload URL for part {part_number} has expired. "
                        "Refreshing urls and retrying.\n"
                    )

                    # we refresh all the urls and obtain this part's
                    # specific url for the retry
                    (
                        part_url,
                        signed_headers,
                    ) = self._refresh_pre_signed_part_urls(
                        part_number,
                        part_url,
                    )

                else:
                    raise
        return response

Functions

shared_progress_bar(progress_bar)

An outside process that will eventually trigger an upload through this module can configure a shared Progress Bar by running its code within this context manager.

Source code in synapseclient/core/upload/multipart_upload_async.py
132
133
134
135
136
137
138
139
140
141
142
143
@contextmanager
def shared_progress_bar(progress_bar):
    """An outside process that will eventually trigger an upload through this module
    can configure a shared Progress Bar by running its code within this context manager.
    """
    _thread_local.progress_bar = progress_bar
    try:
        yield
    finally:
        _thread_local.progress_bar.close()
        _thread_local.progress_bar.refresh()
        del _thread_local.progress_bar

multipart_upload_file_async(syn, file_path, dest_file_name=None, content_type=None, part_size=None, storage_location_id=None, preview=True, force_restart=False, md5=None, storage_str=None) async

Upload a file to a Synapse upload destination in chunks.

PARAMETER DESCRIPTION
syn

a Synapse object

TYPE: Synapse

file_path

the file to upload

TYPE: str

dest_file_name

upload as a different filename

TYPE: str DEFAULT: None

content_type

Refers to the Content-Type of the API request.

TYPE: str DEFAULT: None

part_size

Number of bytes per part. Minimum is 5MiB (5 * 1024 * 1024 bytes).

TYPE: int DEFAULT: None

storage_location_id

an id indicating where the file should be stored. Retrieved from Synapse's UploadDestination

TYPE: str DEFAULT: None

preview

True to generate a preview

TYPE: bool DEFAULT: True

force_restart

True to restart a previously initiated upload from scratch, False to try to resume

TYPE: bool DEFAULT: False

md5

The MD5 of the file. If not provided, it will be calculated.

TYPE: str DEFAULT: None

storage_str

Optional string to append to the upload message

TYPE: str DEFAULT: None

RETURNS DESCRIPTION
str

a File Handle ID

Keyword arguments are passed down to _multipart_upload().

Source code in synapseclient/core/upload/multipart_upload_async.py
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
async def multipart_upload_file_async(
    syn: "Synapse",
    file_path: str,
    dest_file_name: str = None,
    content_type: str = None,
    part_size: int = None,
    storage_location_id: str = None,
    preview: bool = True,
    force_restart: bool = False,
    md5: str = None,
    storage_str: str = None,
) -> str:
    """Upload a file to a Synapse upload destination in chunks.

    Arguments:
        syn: a Synapse object
        file_path: the file to upload
        dest_file_name: upload as a different filename
        content_type: Refers to the Content-Type of the API request.
        part_size: Number of bytes per part. Minimum is 5MiB (5 * 1024 * 1024 bytes).
        storage_location_id: an id indicating where the file should be
                             stored. Retrieved from Synapse's UploadDestination
        preview: True to generate a preview
        force_restart: True to restart a previously initiated upload
                       from scratch, False to try to resume
        md5: The MD5 of the file. If not provided, it will be calculated.
        storage_str: Optional string to append to the upload message

    Returns:
        a File Handle ID

    Keyword arguments are passed down to
    [_multipart_upload()][synapseclient.core.upload.multipart_upload._multipart_upload].

    """
    trace.get_current_span().set_attributes(
        {
            "synapse.storage_location_id": (
                storage_location_id if storage_location_id is not None else ""
            )
        }
    )

    if not os.path.exists(file_path):
        raise IOError(f'File "{file_path}" not found.')
    if os.path.isdir(file_path):
        raise IOError(f'File "{file_path}" is a directory.')

    file_size = os.path.getsize(file_path)
    if not dest_file_name:
        dest_file_name = os.path.basename(file_path)

    if content_type is None:
        mime_type, _ = mimetypes.guess_type(file_path, strict=False)
        content_type = mime_type or "application/octet-stream"

    md5_hex = md5 or md5_for_file_hex(filename=file_path)

    part_size = get_part_size(
        part_size or DEFAULT_PART_SIZE,
        file_size,
        MIN_PART_SIZE,
        MAX_NUMBER_OF_PARTS,
    )

    upload_request = {
        "concreteType": concrete_types.MULTIPART_UPLOAD_REQUEST,
        "contentType": content_type,
        "contentMD5Hex": md5_hex,
        "fileName": dest_file_name,
        "fileSizeBytes": file_size,
        "generatePreview": preview,
        "partSizeBytes": part_size,
        "storageLocationId": storage_location_id,
    }

    def part_fn(part_number: int) -> bytes:
        """Return the nth chunk of a file."""
        return get_file_chunk(file_path, part_number, part_size)

    return await _multipart_upload_async(
        syn,
        dest_file_name,
        upload_request,
        part_fn,
        md5_fn_util,
        force_restart=force_restart,
        storage_str=storage_str,
    )

multipart_upload_string_async(syn, text, dest_file_name=None, part_size=None, content_type=None, storage_location_id=None, preview=True, force_restart=False) async

Upload a file to a Synapse upload destination in chunks.

PARAMETER DESCRIPTION
syn

a Synapse object

TYPE: Synapse

text

a string to upload as a file.

TYPE: str

dest_file_name

upload as a different filename

TYPE: str DEFAULT: None

content_type

Refers to the Content-Type of the API request.

TYPE: str DEFAULT: None

part_size

number of bytes per part. Minimum 5MB.

TYPE: int DEFAULT: None

storage_location_id

an id indicating where the file should be stored. Retrieved from Synapse's UploadDestination

TYPE: str DEFAULT: None

preview

True to generate a preview

TYPE: bool DEFAULT: True

force_restart

True to restart a previously initiated upload from scratch, False to try to resume

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
str

a File Handle ID

Keyword arguments are passed down to _multipart_upload().

Source code in synapseclient/core/upload/multipart_upload_async.py
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
async def multipart_upload_string_async(
    syn: "Synapse",
    text: str,
    dest_file_name: str = None,
    part_size: int = None,
    content_type: str = None,
    storage_location_id: str = None,
    preview: bool = True,
    force_restart: bool = False,
) -> str:
    """Upload a file to a Synapse upload destination in chunks.

    Arguments:
        syn: a Synapse object
        text: a string to upload as a file.
        dest_file_name: upload as a different filename
        content_type: Refers to the Content-Type of the API request.
        part_size: number of bytes per part. Minimum 5MB.
        storage_location_id: an id indicating where the file should be
                             stored. Retrieved from Synapse's UploadDestination
        preview: True to generate a preview
        force_restart: True to restart a previously initiated upload
                       from scratch, False to try to resume

    Returns:
        a File Handle ID

    Keyword arguments are passed down to
    [_multipart_upload()][synapseclient.core.upload.multipart_upload._multipart_upload].

    """
    data = text.encode("utf-8")
    file_size = len(data)
    md5_hex = md5_fn_util(data, None)

    if not dest_file_name:
        dest_file_name = "message.txt"

    if not content_type:
        content_type = "text/plain; charset=utf-8"

    part_size = get_part_size(
        part_size or DEFAULT_PART_SIZE, file_size, MIN_PART_SIZE, MAX_NUMBER_OF_PARTS
    )

    upload_request = {
        "concreteType": concrete_types.MULTIPART_UPLOAD_REQUEST,
        "contentType": content_type,
        "contentMD5Hex": md5_hex,
        "fileName": dest_file_name,
        "fileSizeBytes": file_size,
        "generatePreview": preview,
        "partSizeBytes": part_size,
        "storageLocationId": storage_location_id,
    }

    def part_fn(part_number: int) -> bytes:
        """Get the nth chunk of a buffer."""
        return get_data_chunk(data, part_number, part_size)

    part_size = get_part_size(
        part_size or DEFAULT_PART_SIZE, file_size, MIN_PART_SIZE, MAX_NUMBER_OF_PARTS
    )
    return await _multipart_upload_async(
        syn,
        dest_file_name,
        upload_request,
        part_fn,
        md5_fn_util,
        force_restart=force_restart,
    )

multipart_copy_async(syn, source_file_handle_association, dest_file_name=None, part_size=None, storage_location_id=None, preview=True, force_restart=False) async

Makes a Multipart Upload Copy Request. This request performs a copy of an existing file handle without data transfer from the client.

PARAMETER DESCRIPTION
syn

A Synapse object

TYPE: Synapse

source_file_handle_association

Describes an association of a FileHandle with another object.

TYPE: Dict[str, str]

dest_file_name

The name of the file to be uploaded.

TYPE: str DEFAULT: None

part_size

The size that each part will be (in bytes).

TYPE: int DEFAULT: None

storage_location_id

The identifier of the storage location where this file should be copied to. The user must be the owner of the storage location.

TYPE: str DEFAULT: None

preview

True to generate a preview of the data.

TYPE: bool DEFAULT: True

force_restart

True to restart a previously initiated upload from scratch, False to try to resume.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
str

a File Handle ID

Keyword arguments are passed down to _multipart_upload().

Source code in synapseclient/core/upload/multipart_upload_async.py
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
async def multipart_copy_async(
    syn: "Synapse",
    source_file_handle_association: Dict[str, str],
    dest_file_name: str = None,
    part_size: int = None,
    storage_location_id: str = None,
    preview: bool = True,
    force_restart: bool = False,
) -> str:
    """Makes a
    [Multipart Upload Copy Request](https://rest-docs.synapse.org/rest/org/sagebionetworks/repo/model/file/MultipartUploadCopyRequest.html).
    This request performs a copy of an existing file handle without data transfer from the client.

    Arguments:
        syn: A Synapse object
        source_file_handle_association: Describes an association of a FileHandle with another object.
        dest_file_name: The name of the file to be uploaded.
        part_size: The size that each part will be (in bytes).
        storage_location_id: The identifier of the storage location where this file should be copied to.
                             The user must be the owner of the storage location.
        preview: True to generate a preview of the data.
        force_restart: True to restart a previously initiated upload from scratch, False to try to resume.

    Returns:
        a File Handle ID

    Keyword arguments are passed down to
    [_multipart_upload()][synapseclient.core.upload.multipart_upload._multipart_upload].

    """
    part_size = part_size or DEFAULT_PART_SIZE

    upload_request = {
        "concreteType": concrete_types.MULTIPART_UPLOAD_COPY_REQUEST,
        "fileName": dest_file_name,
        "generatePreview": preview,
        "partSizeBytes": part_size,
        "sourceFileHandleAssociation": source_file_handle_association,
        "storageLocationId": storage_location_id,
    }

    return await _multipart_upload_async(
        syn,
        dest_file_name,
        upload_request,
        copy_part_request_body_provider_fn,
        copy_md5_fn,
        force_restart=force_restart,
    )

Multithreaded Downloading

synapseclient.core.multithread_download

Classes

DownloadRequest

Bases: NamedTuple

A request to download a file from Synapse

ATTRIBUTE DESCRIPTION
file_handle_id

The file handle ID to download.

object_id

The Synapse object this file associated to.

object_type

The type of the associated Synapse object.

path

The local path to download the file to. This path can be either an absolute path or a relative path from where the code is executed to the download location.

debug

A boolean to specify if debug mode is on.

TYPE: bool

Source code in synapseclient/core/multithread_download/download_threads.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
@deprecated(
    version="4.4.0",
    reason="To be removed in 5.0.0. "
    "Moved to synapseclient/core/download/download_async.py",
)
class DownloadRequest(NamedTuple):
    """
    A request to download a file from Synapse

    Attributes:
        file_handle_id : The file handle ID to download.
        object_id : The Synapse object this file associated to.
        object_type : The type of the associated Synapse object.
        path : The local path to download the file to.
            This path can be either an absolute path or
            a relative path from where the code is executed to the download location.
        debug: A boolean to specify if debug mode is on.
    """

    file_handle_id: int
    object_id: str
    object_type: str
    path: str
    debug: bool = False

Functions

download_file(client, download_request, *, max_concurrent_parts=None)

Main driver for the multi-threaded download. Users an ExecutorService, either set externally onto a thread local by an outside process, or creating one as needed otherwise.

PARAMETER DESCRIPTION
client

A synapseclient

download_request

A batch of DownloadRequest objects specifying what Synapse files to download

TYPE: DownloadRequest

max_concurrent_parts

The maximum concurrent number parts to download at once when downloading this file

TYPE: int DEFAULT: None

Source code in synapseclient/core/multithread_download/download_threads.py
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
@deprecated(
    version="4.4.0",
    reason="To be removed in 5.0.0. "
    "Moved to synapseclient/core/download/download_async.py",
)
def download_file(
    client,
    download_request: DownloadRequest,
    *,
    max_concurrent_parts: int = None,
):
    """
    Main driver for the multi-threaded download. Users an ExecutorService,
    either set externally onto a thread local by an outside process,
    or creating one as needed otherwise.

    Arguments:
        client: A synapseclient
        download_request: A batch of DownloadRequest objects specifying what
                            Synapse files to download
        max_concurrent_parts: The maximum concurrent number parts to download
                                at once when downloading this file
    """

    # we obtain an executor from a thread local if we are in the context of a Synapse sync
    # and wan't to re-use the same threadpool as was created for that
    executor = getattr(_thread_local, "executor", None)
    shutdown_after = False
    if not executor:
        shutdown_after = True
        executor = get_executor(client.max_threads)

    max_concurrent_parts = max_concurrent_parts or client.max_threads
    try:
        downloader = _MultithreadedDownloader(client, executor, max_concurrent_parts)
        downloader.download_file(download_request)
    finally:
        # if we created the Executor for the purposes of processing this download we also
        # shut it down. if it was passed in from the outside then it's managed by the caller
        if shutdown_after:
            executor.shutdown()

shared_executor(executor)

An outside process that will eventually trigger a download through the this module can configure a shared Executor by running its code within this context manager.

Source code in synapseclient/core/multithread_download/download_threads.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
@contextmanager
@deprecated(
    version="4.4.0",
    reason="To be removed in 5.0.0. "
    "Moved to synapseclient/core/download/download_async.py",
)
def shared_executor(executor):
    """An outside process that will eventually trigger a download through the this module
    can configure a shared Executor by running its code within this context manager."""
    _thread_local.executor = executor
    try:
        yield
    finally:
        del _thread_local.executor

Download Functions

synapseclient.core.download.download_functions

This module handles the various ways that a user can download a file to Synapse.

Classes

Functions

download_file_entity(download_location, entity, if_collision, submission, *, synapse_client=None) async

Download file entity

PARAMETER DESCRIPTION
download_location

The location on disk where the entity will be downloaded. If there is a matching file at the location, the download collision will be handled according to the if_collision argument.

TYPE: str

entity

The Synapse Entity object

TYPE: Entity

if_collision

Determines how to handle file collisions. May be

  • overwrite.local
  • keep.local
  • keep.both

TYPE: str

submission

Access associated files through a submission rather than through an entity.

TYPE: str

synapse_client

If not passed in or None this will use the last client from the .login() method.

TYPE: Optional[Synapse] DEFAULT: None

Source code in synapseclient/core/download/download_functions.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
async def download_file_entity(
    download_location: str,
    entity: "Entity",
    if_collision: str,
    submission: str,
    *,
    synapse_client: Optional["Synapse"] = None,
) -> None:
    """
    Download file entity

    Arguments:
        download_location: The location on disk where the entity will be downloaded. If
            there is a matching file at the location, the download collision will be
            handled according to the `if_collision` argument.
        entity:           The Synapse Entity object
        if_collision:      Determines how to handle file collisions.
                            May be

            - `overwrite.local`
            - `keep.local`
            - `keep.both`

        submission:       Access associated files through a submission rather than through an entity.
        synapse_client: If not passed in or None this will use the last client from
            the `.login()` method.
    """
    from synapseclient import Synapse

    client = Synapse.get_client(synapse_client=synapse_client)
    # set the initial local state
    entity.path = None
    entity.files = []
    entity.cacheDir = None

    # check to see if an UNMODIFIED version of the file (since it was last downloaded) already exists
    # this location could be either in .synapseCache or a user specified location to which the user previously
    # downloaded the file
    cached_file_path = client.cache.get(
        file_handle_id=entity.dataFileHandleId, path=download_location
    )

    # location in .synapseCache where the file would be corresponding to its FileHandleId
    synapse_cache_location = client.cache.get_cache_dir(
        file_handle_id=entity.dataFileHandleId
    )

    file_name = (
        entity._file_handle.fileName
        if cached_file_path is None
        else os.path.basename(cached_file_path)
    )

    # Decide the best download location for the file
    if download_location is not None:
        # Make sure the specified download location is a fully resolved directory
        download_location = ensure_download_location_is_directory(download_location)
    elif cached_file_path is not None:
        # file already cached so use that as the download location
        download_location = os.path.dirname(cached_file_path)
    else:
        # file not cached and no user-specified location so default to .synapseCache
        download_location = synapse_cache_location

    # resolve file path collisions by either overwriting, renaming, or not downloading, depending on the
    # ifcollision value
    download_path = resolve_download_path_collisions(
        download_location=download_location,
        file_name=file_name,
        if_collision=if_collision,
        synapse_cache_location=synapse_cache_location,
        cached_file_path=cached_file_path,
    )
    if download_path is None:
        return

    if cached_file_path is not None:  # copy from cache
        if download_path != cached_file_path:
            # create the foider if it does not exist already
            if not os.path.exists(download_location):
                os.makedirs(download_location)
            client.logger.info(
                f"Copying existing file from {cached_file_path} to {download_path}"
            )
            shutil.copy(cached_file_path, download_path)

    else:  # download the file from URL (could be a local file)
        object_type = "FileEntity" if submission is None else "SubmissionAttachment"
        object_id = entity["id"] if submission is None else submission

        # reassign downloadPath because if url points to local file (e.g. file://~/someLocalFile.txt)
        # it won't be "downloaded" and, instead, downloadPath will just point to '~/someLocalFile.txt'
        # _downloadFileHandle may also return None to indicate that the download failed
        with logging_redirect_tqdm(loggers=[client.logger]):
            download_path = await download_by_file_handle(
                file_handle_id=entity.dataFileHandleId,
                synapse_id=object_id,
                entity_type=object_type,
                destination=download_path,
                synapse_client=client,
            )

        if download_path is None or not os.path.exists(download_path):
            return

    # converts the path format from forward slashes back to backward slashes on Windows
    entity.path = os.path.normpath(download_path)
    entity.files = [os.path.basename(download_path)]
    entity.cacheDir = os.path.dirname(download_path)

download_file_entity_model(download_location, file, if_collision, submission, *, synapse_client=None) async

Download file entity

PARAMETER DESCRIPTION
download_location

The location on disk where the entity will be downloaded. If there is a matching file at the location, the download collision will be handled according to the if_collision argument.

TYPE: Union[str, None]

entity

The File object

if_collision

Determines how to handle file collisions. May be

  • overwrite.local
  • keep.local
  • keep.both

TYPE: str

submission

Access associated files through a submission rather than through an entity.

TYPE: str

synapse_client

If not passed in or None this will use the last client from the .login() method.

TYPE: Optional[Synapse] DEFAULT: None

Source code in synapseclient/core/download/download_functions.py
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
async def download_file_entity_model(
    download_location: Union[str, None],
    file: "File",
    if_collision: str,
    submission: str,
    *,
    synapse_client: Optional["Synapse"] = None,
) -> None:
    """
    Download file entity

    Arguments:
        download_location: The location on disk where the entity will be downloaded. If
            there is a matching file at the location, the download collision will be
            handled according to the `if_collision` argument.
        entity:           The File object
        if_collision:      Determines how to handle file collisions.
                            May be

            - `overwrite.local`
            - `keep.local`
            - `keep.both`

        submission:       Access associated files through a submission rather than through an entity.
        synapse_client: If not passed in or None this will use the last client from
            the `.login()` method.
    """
    from synapseclient import Synapse

    client = Synapse.get_client(synapse_client=synapse_client)
    # set the initial local state
    file.path = None

    # check to see if an UNMODIFIED version of the file (since it was last downloaded) already exists
    # this location could be either in .synapseCache or a user specified location to which the user previously
    # downloaded the file
    cached_file_path = client.cache.get(
        file_handle_id=file.data_file_handle_id, path=download_location
    )

    # location in .synapseCache where the file would be corresponding to its FileHandleId
    synapse_cache_location = client.cache.get_cache_dir(
        file_handle_id=file.data_file_handle_id
    )

    file_name = (
        file.file_handle.file_name
        if cached_file_path is None
        else os.path.basename(cached_file_path)
    )

    # Decide the best download location for the file
    if download_location is not None:
        # Make sure the specified download location is a fully resolved directory
        download_location = ensure_download_location_is_directory(download_location)
    elif cached_file_path is not None:
        # file already cached so use that as the download location
        download_location = os.path.dirname(cached_file_path)
    else:
        # file not cached and no user-specified location so default to .synapseCache
        download_location = synapse_cache_location

    # resolve file path collisions by either overwriting, renaming, or not downloading, depending on the
    # ifcollision value
    download_path = resolve_download_path_collisions(
        download_location=download_location,
        file_name=file_name,
        if_collision=if_collision,
        synapse_cache_location=synapse_cache_location,
        cached_file_path=cached_file_path,
    )
    if download_path is None:
        return

    if cached_file_path is not None:  # copy from cache
        if download_path != cached_file_path:
            # create the foider if it does not exist already
            if not os.path.exists(download_location):
                os.makedirs(download_location)
            client.logger.info(
                f"Copying existing file from {cached_file_path} to {download_path}"
            )
            shutil.copy(cached_file_path, download_path)

    else:  # download the file from URL (could be a local file)
        object_type = "FileEntity" if submission is None else "SubmissionAttachment"
        object_id = file.id if submission is None else submission

        # reassign downloadPath because if url points to local file (e.g. file://~/someLocalFile.txt)
        # it won't be "downloaded" and, instead, downloadPath will just point to '~/someLocalFile.txt'
        # _downloadFileHandle may also return None to indicate that the download failed
        with logging_redirect_tqdm(loggers=[client.logger]):
            download_path = await download_by_file_handle(
                file_handle_id=file.data_file_handle_id,
                synapse_id=object_id,
                entity_type=object_type,
                destination=download_path,
                synapse_client=client,
            )

        if download_path is None or not os.path.exists(download_path):
            return

    # converts the path format from forward slashes back to backward slashes on Windows
    file.path = os.path.normpath(download_path)

download_by_file_handle(file_handle_id, synapse_id, entity_type, destination, retries=5, *, synapse_client=None) async

Download a file from the given URL to the local file system.

PARAMETER DESCRIPTION
file_handle_id

The id of the FileHandle to download

TYPE: str

synapse_id

The id of the Synapse object that uses the FileHandle e.g. "syn123"

TYPE: str

entity_type

The type of the Synapse object that uses the FileHandle e.g. "FileEntity"

TYPE: str

destination

The destination on local file system

TYPE: str

retries

The Number of download retries attempted before throwing an exception.

TYPE: int DEFAULT: 5

synapse_client

If not passed in or None this will use the last client from the .login() method.

TYPE: Optional[Synapse] DEFAULT: None

RETURNS DESCRIPTION
str

The path to downloaded file

sequenceDiagram
    title Multi-Threaded Download Process with Retry Mechanism

    actor Client as Client
    participant download_functions as download_functions
    participant download_async as download_async
    participant download_execution as download_execution
    participant multi_threaded_download as multi_threaded_download
    participant remote_storage_server as remote_storage_server
    participant file as file

    activate Client
    Client ->> download_functions: download_by_file_handle
    activate download_functions

    loop retryable

        alt Download type = multi_threaded
            note over download_functions: download_from_url_multi_threaded

            download_functions ->> download_async: download_file
            activate download_async

            download_async ->> download_async: _generate_stream_and_write_chunk_tasks


            loop for each download task
                download_async ->> download_execution: _execute_download_tasks
                activate download_execution

                par MULTI-THREADED: Run in thread executor
                    download_execution ->> multi_threaded_download: _stream_and_write_chunk
                    activate multi_threaded_download

                    loop stream chunk into memory
                        multi_threaded_download ->> remote_storage_server: stream chunk from remote server
                        remote_storage_server -->> multi_threaded_download: Return partial range
                    end

                    note over multi_threaded_download: Chunk loaded into memory

                    alt obtain thread lock [Failed]
                        note over multi_threaded_download: Wait to obtain lock
                    else obtain thread lock [Success]
                        multi_threaded_download ->> file: write chunk to file
                        file -->> multi_threaded_download: .
                        note over multi_threaded_download: Update progress bar
                        note over multi_threaded_download: Release lock
                    end
                    multi_threaded_download -->> download_execution: .
                end
                download_execution -->> download_async: .
                note over download_async: Run garbage collection every 100 iterations
                deactivate multi_threaded_download
                deactivate download_execution
            end

            download_async -->> download_functions: .
            deactivate download_async

            download_functions ->> download_functions: md5_for_file
            download_functions -->> Client: File downloaded
            deactivate download_functions
        else Download type = non multi_threaded
            note over download_functions: Execute `download_from_url`
        else Download type = external s3 object store
            note over download_functions: Execute `S3ClientWrapper.download_file`
        else Download type = aws s3 sts storage
            note over download_functions: Execute `S3ClientWrapper.download_file` with with_boto_sts_credentials
        end
    end

    deactivate Client
Source code in synapseclient/core/download/download_functions.py
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
async def download_by_file_handle(
    file_handle_id: str,
    synapse_id: str,
    entity_type: str,
    destination: str,
    retries: int = 5,
    *,
    synapse_client: Optional["Synapse"] = None,
) -> str:
    """
    Download a file from the given URL to the local file system.

    Arguments:
        file_handle_id: The id of the FileHandle to download
        synapse_id: The id of the Synapse object that uses the FileHandle e.g. "syn123"
        entity_type: The type of the Synapse object that uses the FileHandle e.g. "FileEntity"
        destination: The destination on local file system
        retries: The Number of download retries attempted before throwing an exception.
        synapse_client: If not passed in or None this will use the last client from
            the `.login()` method.

    Returns:
        The path to downloaded file


    ```mermaid
    sequenceDiagram
        title Multi-Threaded Download Process with Retry Mechanism

        actor Client as Client
        participant download_functions as download_functions
        participant download_async as download_async
        participant download_execution as download_execution
        participant multi_threaded_download as multi_threaded_download
        participant remote_storage_server as remote_storage_server
        participant file as file

        activate Client
        Client ->> download_functions: download_by_file_handle
        activate download_functions

        loop retryable

            alt Download type = multi_threaded
                note over download_functions: download_from_url_multi_threaded

                download_functions ->> download_async: download_file
                activate download_async

                download_async ->> download_async: _generate_stream_and_write_chunk_tasks


                loop for each download task
                    download_async ->> download_execution: _execute_download_tasks
                    activate download_execution

                    par MULTI-THREADED: Run in thread executor
                        download_execution ->> multi_threaded_download: _stream_and_write_chunk
                        activate multi_threaded_download

                        loop stream chunk into memory
                            multi_threaded_download ->> remote_storage_server: stream chunk from remote server
                            remote_storage_server -->> multi_threaded_download: Return partial range
                        end

                        note over multi_threaded_download: Chunk loaded into memory

                        alt obtain thread lock [Failed]
                            note over multi_threaded_download: Wait to obtain lock
                        else obtain thread lock [Success]
                            multi_threaded_download ->> file: write chunk to file
                            file -->> multi_threaded_download: .
                            note over multi_threaded_download: Update progress bar
                            note over multi_threaded_download: Release lock
                        end
                        multi_threaded_download -->> download_execution: .
                    end
                    download_execution -->> download_async: .
                    note over download_async: Run garbage collection every 100 iterations
                    deactivate multi_threaded_download
                    deactivate download_execution
                end

                download_async -->> download_functions: .
                deactivate download_async

                download_functions ->> download_functions: md5_for_file
                download_functions -->> Client: File downloaded
                deactivate download_functions
            else Download type = non multi_threaded
                note over download_functions: Execute `download_from_url`
            else Download type = external s3 object store
                note over download_functions: Execute `S3ClientWrapper.download_file`
            else Download type = aws s3 sts storage
                note over download_functions: Execute `S3ClientWrapper.download_file` with with_boto_sts_credentials
            end
        end

        deactivate Client
    ```
    """
    from synapseclient import Synapse

    syn = Synapse.get_client(synapse_client=synapse_client)
    os.makedirs(os.path.dirname(destination), exist_ok=True)

    while retries > 0:
        try:
            file_handle_result: Dict[
                str, str
            ] = await get_file_handle_for_download_async(
                file_handle_id=file_handle_id,
                synapse_id=synapse_id,
                entity_type=entity_type,
                synapse_client=syn,
            )
            file_handle = file_handle_result["fileHandle"]
            concrete_type = file_handle["concreteType"]
            storage_location_id = file_handle.get("storageLocationId")

            if concrete_type == concrete_types.EXTERNAL_OBJECT_STORE_FILE_HANDLE:
                profile = get_client_authenticated_s3_profile(
                    endpoint=file_handle["endpointUrl"],
                    bucket=file_handle["bucket"],
                    config_path=syn.configPath,
                )

                progress_bar = get_or_create_download_progress_bar(
                    file_size=1, postfix=synapse_id, synapse_client=syn
                )
                loop = asyncio.get_running_loop()
                downloaded_path = await loop.run_in_executor(
                    syn._get_thread_pool_executor(asyncio_event_loop=loop),
                    lambda: S3ClientWrapper.download_file(
                        bucket=file_handle["bucket"],
                        endpoint_url=file_handle["endpointUrl"],
                        remote_file_key=file_handle["fileKey"],
                        download_file_path=destination,
                        profile_name=profile,
                        credentials=_get_aws_credentials(),
                        progress_bar=progress_bar,
                    ),
                )

            elif (
                sts_transfer.is_boto_sts_transfer_enabled(syn=syn)
                and await sts_transfer.is_storage_location_sts_enabled_async(
                    syn=syn, entity_id=synapse_id, location=storage_location_id
                )
                and concrete_type == concrete_types.S3_FILE_HANDLE
            ):
                progress_bar = get_or_create_download_progress_bar(
                    file_size=1, postfix=synapse_id, synapse_client=syn
                )

                def download_fn(
                    credentials: Dict[str, str],
                    file_handle: Dict[str, str] = file_handle,
                ) -> str:
                    """Use the STS credentials to download the file from S3.

                    Arguments:
                        credentials: The STS credentials

                    Returns:
                        The path to the downloaded file
                    """
                    return S3ClientWrapper.download_file(
                        bucket=file_handle["bucketName"],
                        endpoint_url=None,
                        remote_file_key=file_handle["key"],
                        download_file_path=destination,
                        credentials=credentials,
                        progress_bar=progress_bar,
                        # pass through our synapse threading config to boto s3
                        transfer_config_kwargs={"max_concurrency": syn.max_threads},
                    )

                loop = asyncio.get_running_loop()
                downloaded_path = await loop.run_in_executor(
                    syn._get_thread_pool_executor(asyncio_event_loop=loop),
                    lambda: sts_transfer.with_boto_sts_credentials(
                        download_fn, syn, synapse_id, "read_only"
                    ),
                )

            elif (
                syn.multi_threaded
                and concrete_type == concrete_types.S3_FILE_HANDLE
                and file_handle.get("contentSize", 0)
                > SYNAPSE_DEFAULT_DOWNLOAD_PART_SIZE
            ):
                # run the download multi threaded if the file supports it, we're configured to do so,
                # and the file is large enough that it would be broken into parts to take advantage of
                # multiple downloading threads. otherwise it's more efficient to run the download as a simple
                # single threaded URL download.
                downloaded_path = await download_from_url_multi_threaded(
                    file_handle_id=file_handle_id,
                    object_id=synapse_id,
                    object_type=entity_type,
                    destination=destination,
                    expected_md5=file_handle.get("contentMd5"),
                    synapse_client=syn,
                )

            else:
                loop = asyncio.get_running_loop()
                progress_bar = get_or_create_download_progress_bar(
                    file_size=1, postfix=synapse_id, synapse_client=syn
                )
                downloaded_path = await loop.run_in_executor(
                    syn._get_thread_pool_executor(asyncio_event_loop=loop),
                    lambda: download_from_url(
                        url=file_handle_result["preSignedURL"],
                        destination=destination,
                        file_handle_id=file_handle["id"],
                        expected_md5=file_handle.get("contentMd5"),
                        progress_bar=progress_bar,
                        synapse_client=syn,
                    ),
                )

            syn.logger.info(f"Downloaded {synapse_id} to {downloaded_path}")
            syn.cache.add(
                file_handle["id"], downloaded_path, file_handle.get("contentMd5", None)
            )
            close_download_progress_bar()
            return downloaded_path

        except Exception as ex:
            if not is_retryable_download_error(ex):
                close_download_progress_bar()
                raise

            exc_info = sys.exc_info()
            ex.progress = 0 if not hasattr(ex, "progress") else ex.progress
            syn.logger.debug(
                f"\nRetrying download on error: [{exc_info[0]}] after progressing {ex.progress} bytes",
                exc_info=True,
            )  # this will include stack trace
            if ex.progress == 0:  # No progress was made reduce remaining retries.
                retries -= 1
            if retries <= 0:
                close_download_progress_bar()
                # Re-raise exception
                raise

    close_download_progress_bar()
    raise RuntimeError("should not reach this line")

download_from_url_multi_threaded(file_handle_id, object_id, object_type, destination, *, expected_md5=None, synapse_client=None) async

Download a file from the given URL using multiple threads.

PARAMETER DESCRIPTION
file_handle_id

The id of the FileHandle to download

TYPE: str

object_id

The id of the Synapse object that uses the FileHandle e.g. "syn123"

TYPE: str

object_type

The type of the Synapse object that uses the FileHandle e.g. "FileEntity". Any of https://rest-docs.synapse.org/rest/org/sagebionetworks/repo/model/file/FileHandleAssociateType.html

TYPE: str

destination

The destination on local file system

TYPE: str

expected_md5

The expected MD5

TYPE: str DEFAULT: None

content_size

The size of the content

synapse_client

If not passed in or None this will use the last client from the .login() method.

TYPE: Optional[Synapse] DEFAULT: None

Raises: SynapseMd5MismatchError: If the actual MD5 does not match expected MD5.

RETURNS DESCRIPTION
str

The path to downloaded file

Source code in synapseclient/core/download/download_functions.py
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
async def download_from_url_multi_threaded(
    file_handle_id: str,
    object_id: str,
    object_type: str,
    destination: str,
    *,
    expected_md5: str = None,
    synapse_client: Optional["Synapse"] = None,
) -> str:
    """
    Download a file from the given URL using multiple threads.

    Arguments:
        file_handle_id: The id of the FileHandle to download
        object_id:      The id of the Synapse object that uses the FileHandle
            e.g. "syn123"
        object_type:    The type of the Synapse object that uses the
            FileHandle e.g. "FileEntity". Any of
            <https://rest-docs.synapse.org/rest/org/sagebionetworks/repo/model/file/FileHandleAssociateType.html>
        destination:    The destination on local file system
        expected_md5:   The expected MD5
        content_size:   The size of the content
        synapse_client: If not passed in or None this will use the last client from
            the `.login()` method.
    Raises:
        SynapseMd5MismatchError: If the actual MD5 does not match expected MD5.

    Returns:
        The path to downloaded file
    """
    from synapseclient import Synapse

    client = Synapse.get_client(synapse_client=synapse_client)
    destination = os.path.abspath(destination)
    temp_destination = utils.temp_download_filename(
        destination=destination, file_handle_id=file_handle_id
    )

    request = DownloadRequest(
        file_handle_id=int(file_handle_id),
        object_id=object_id,
        object_type=object_type,
        path=temp_destination,
        debug=client.debug,
    )

    await download_file(client=client, download_request=request)

    if expected_md5:  # if md5 not set (should be the case for all except http download)
        actual_md5 = utils.md5_for_file_hex(filename=temp_destination)
        # check md5 if given
        if actual_md5 != expected_md5:
            try:
                os.remove(temp_destination)
            except FileNotFoundError:
                # file already does not exist. nothing to do
                pass
            raise SynapseMd5MismatchError(
                f"Downloaded file {temp_destination}'s md5 {actual_md5} does not match expected MD5 of {expected_md5}"
            )
    # once download completed, rename to desired destination
    shutil.move(temp_destination, destination)

    return destination

download_from_url(url, destination, file_handle_id=None, expected_md5=None, progress_bar=None, *, synapse_client=None)

Download a file from the given URL to the local file system.

PARAMETER DESCRIPTION
url

The source of download

TYPE: str

destination

The destination on local file system

TYPE: str

file_handle_id

Optional. If given, the file will be given a temporary name that includes the file handle id which allows resuming partial downloads of the same file from previous sessions

TYPE: Optional[str] DEFAULT: None

expected_md5

Optional. If given, check that the MD5 of the downloaded file matches the expected MD5

TYPE: Optional[str] DEFAULT: None

synapse_client

If not passed in or None this will use the last client from the .login() method.

TYPE: Optional[Synapse] DEFAULT: None

RAISES DESCRIPTION
IOError

If the local file does not exist.

SynapseError

If fail to download the file.

SynapseHTTPError

If there are too many redirects.

SynapseMd5MismatchError

If the actual MD5 does not match expected MD5.

RETURNS DESCRIPTION
Union[str, None]

The path to downloaded file or None if the download failed

Source code in synapseclient/core/download/download_functions.py
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
def download_from_url(
    url: str,
    destination: str,
    file_handle_id: Optional[str] = None,
    expected_md5: Optional[str] = None,
    progress_bar: Optional[tqdm] = None,
    *,
    synapse_client: Optional["Synapse"] = None,
) -> Union[str, None]:
    """
    Download a file from the given URL to the local file system.

    Arguments:
        url:           The source of download
        destination:   The destination on local file system
        file_handle_id:  Optional. If given, the file will be given a temporary name that includes the file
                                handle id which allows resuming partial downloads of the same file from previous
                                sessions
        expected_md5:  Optional. If given, check that the MD5 of the downloaded file matches the expected MD5
        synapse_client: If not passed in or None this will use the last client from
            the `.login()` method.

    Raises:
        IOError:                 If the local file does not exist.
        SynapseError:            If fail to download the file.
        SynapseHTTPError:        If there are too many redirects.
        SynapseMd5MismatchError: If the actual MD5 does not match expected MD5.

    Returns:
        The path to downloaded file or None if the download failed
    """
    from synapseclient import Synapse

    client = Synapse.get_client(synapse_client=synapse_client)

    destination = os.path.abspath(destination)
    actual_md5 = None
    redirect_count = 0
    delete_on_md5_mismatch = True
    client.logger.debug(f"Downloading from {url} to {destination}")
    while redirect_count < REDIRECT_LIMIT:
        redirect_count += 1
        scheme = urllib_urlparse.urlparse(url).scheme
        if scheme == "file":
            delete_on_md5_mismatch = False
            destination = utils.file_url_to_path(url, verify_exists=True)
            if destination is None:
                raise IOError(f"Local file ({url}) does not exist.")
            if progress_bar is not None:
                file_size = os.path.getsize(destination)
                increment_progress_bar_total(total=file_size, progress_bar=progress_bar)
                increment_progress_bar(n=progress_bar.total, progress_bar=progress_bar)
            break
        elif scheme == "sftp":
            username, password = client._getUserCredentials(url)
            destination = SFTPWrapper.download_file(
                url=url,
                localFilepath=destination,
                username=username,
                password=password,
                progress_bar=progress_bar,
            )
            break
        elif scheme == "ftp":
            updated_progress_bar_with_total = False

            def _ftp_report_hook(
                _: int,
                read_size: int,
                total_size: int,
            ) -> None:
                """Report hook for urllib.request.urlretrieve to show download progress.

                Arguments:
                    _: The number of blocks transferred so far
                    read_size: The size of each block
                    total_size: The total size of the file

                Returns:
                    None
                """
                nonlocal updated_progress_bar_with_total
                if progress_bar is not None:
                    if not updated_progress_bar_with_total:
                        updated_progress_bar_with_total = True
                        increment_progress_bar_total(
                            total=total_size, progress_bar=progress_bar
                        )
                    increment_progress_bar(n=read_size, progress_bar=progress_bar)

            urllib_request.urlretrieve(
                url=url, filename=destination, reporthook=_ftp_report_hook
            )
            break
        elif scheme in ["http", "https"]:
            # if a partial download exists with the temporary name,
            temp_destination = utils.temp_download_filename(
                destination=destination, file_handle_id=file_handle_id
            )
            range_header = (
                {"Range": f"bytes={os.path.getsize(filename=temp_destination)}-"}
                if os.path.exists(temp_destination)
                else {}
            )

            # pass along synapse auth credentials only if downloading directly from synapse
            auth = (
                client.credentials
                if is_synapse_uri(uri=url, synapse_client=client)
                else None
            )

            response = with_retry(
                lambda url=url, range_header=range_header, auth=auth: client._requests_session.get(
                    url=url,
                    headers=client._generate_headers(range_header),
                    stream=True,
                    allow_redirects=False,
                    auth=auth,
                ),
                verbose=client.debug,
                **STANDARD_RETRY_PARAMS,
            )
            try:
                exceptions._raise_for_status(response, verbose=client.debug)
            except SynapseHTTPError as err:
                if err.response.status_code == 404:
                    raise SynapseError(f"Could not download the file at {url}") from err
                elif (
                    err.response.status_code == 416
                ):  # Requested Range Not Statisfiable
                    # this is a weird error when the client already finished downloading but the loop continues
                    # When this exception occurs, the range we request is guaranteed to be >= file size so we
                    # assume that the file has been fully downloaded, rename it to destination file
                    # and break out of the loop to perform the MD5 check.
                    # If it fails the user can retry with another download.
                    shutil.move(temp_destination, destination)
                    break
                raise

            # handle redirects
            if response.status_code in [301, 302, 303, 307, 308]:
                url = response.headers["location"]
                # don't break, loop again
            else:
                # get filename from content-disposition, if we don't have it already
                if os.path.isdir(destination):
                    filename = utils.extract_filename(
                        content_disposition_header=response.headers.get(
                            "content-disposition", None
                        ),
                        default_filename=utils.guess_file_name(url),
                    )
                    destination = os.path.join(destination, filename)
                # Stream the file to disk
                if "content-length" in response.headers:
                    to_be_transferred = float(response.headers["content-length"])
                else:
                    to_be_transferred = -1
                transferred = 0

                # Servers that respect the Range header return 206 Partial Content
                if response.status_code == 206:
                    mode = "ab"
                    previously_transferred = os.path.getsize(filename=temp_destination)
                    to_be_transferred += previously_transferred
                    transferred += previously_transferred
                    increment_progress_bar_total(
                        total=to_be_transferred, progress_bar=progress_bar
                    )
                    increment_progress_bar(n=transferred, progress_bar=progress_bar)
                    client.logger.debug(
                        f"Resuming partial download to {temp_destination}. "
                        f"{previously_transferred}/{to_be_transferred} bytes already "
                        "transferred."
                    )
                    sig = utils.md5_for_file(filename=temp_destination)
                else:
                    mode = "wb"
                    previously_transferred = 0
                    increment_progress_bar_total(
                        total=to_be_transferred, progress_bar=progress_bar
                    )
                    sig = hashlib.new("md5", usedforsecurity=False)  # nosec

                try:
                    with open(temp_destination, mode) as fd:
                        for _, chunk in enumerate(
                            response.iter_content(FILE_BUFFER_SIZE)
                        ):
                            fd.write(chunk)
                            sig.update(chunk)

                            # the 'content-length' header gives the total number of bytes that will be transferred
                            # to us len(chunk) cannot be used to track progress because iter_content automatically
                            # decodes the chunks if the response body is encoded so the len(chunk) could be
                            # different from the total number of bytes we've read read from the response body
                            # response.raw.tell() is the total number of response body bytes transferred over the
                            # wire so far
                            transferred = response.raw.tell() + previously_transferred
                            increment_progress_bar(
                                n=len(chunk), progress_bar=progress_bar
                            )
                except (
                    Exception
                ) as ex:  # We will add a progress parameter then push it back to retry.
                    ex.progress = transferred - previously_transferred
                    raise

                # verify that the file was completely downloaded and retry if it is not complete
                if to_be_transferred > 0 and transferred < to_be_transferred:
                    client.logger.warning(
                        "\nRetrying download because the connection ended early.\n"
                    )
                    continue

                actual_md5 = sig.hexdigest()
                # rename to final destination
                shutil.move(temp_destination, destination)
                break
        else:
            client.logger.error(f"Unable to download URLs of type {scheme}")
            return None

    else:  # didn't break out of loop
        raise SynapseHTTPError("Too many redirects")

    if (
        actual_md5 is None
    ):  # if md5 not set (should be the case for all except http download)
        actual_md5 = utils.md5_for_file_hex(filename=destination)

    # check md5 if given
    if expected_md5 and actual_md5 != expected_md5:
        if delete_on_md5_mismatch and os.path.exists(destination):
            os.remove(destination)
        raise SynapseMd5MismatchError(
            f"Downloaded file {destination}'s md5 {actual_md5} does not match expected MD5 of {expected_md5}"
        )

    return destination

is_retryable_download_error(ex)

Check if the download error is retryable

PARAMETER DESCRIPTION
ex

An exception

TYPE: Exception

RETURNS DESCRIPTION
bool

Boolean value indicating whether the download error is retryable

Source code in synapseclient/core/download/download_functions.py
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
def is_retryable_download_error(ex: Exception) -> bool:
    """
    Check if the download error is retryable

    Arguments:
        ex: An exception

    Returns:
        Boolean value indicating whether the download error is retryable
    """
    # some exceptions caught during download indicate non-recoverable situations that
    # will not be remedied by a repeated download attempt.
    return not (
        (isinstance(ex, OSError) and ex.errno == errno.ENOSPC)
        or isinstance(ex, SynapseMd5MismatchError)  # out of disk space
    )

resolve_download_path_collisions(download_location, file_name, if_collision, synapse_cache_location, cached_file_path, *, synapse_client=None)

Resolve file path collisions

PARAMETER DESCRIPTION
download_location

The location on disk where the entity will be downloaded. If there is a matching file at the location, the download collision will be handled according to the if_collision argument.

TYPE: str

file_name

The file name

TYPE: str

if_collision

Determines how to handle file collisions. May be "overwrite.local", "keep.local", or "keep.both".

TYPE: str

synapse_cache_location

The location in .synapseCache where the file would be corresponding to its FileHandleId.

TYPE: str

cached_file_path

The file path of the cached copy

TYPE: str

RAISES DESCRIPTION
ValueError

Invalid ifcollision. Should be "overwrite.local", "keep.local", or "keep.both".

RETURNS DESCRIPTION
Union[str, None]

The download file path with collisions resolved or None if the file should

Union[str, None]

not be downloaded

Source code in synapseclient/core/download/download_functions.py
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
def resolve_download_path_collisions(
    download_location: str,
    file_name: str,
    if_collision: str,
    synapse_cache_location: str,
    cached_file_path: str,
    *,
    synapse_client: Optional["Synapse"] = None,
) -> Union[str, None]:
    """
    Resolve file path collisions

    Arguments:
        download_location: The location on disk where the entity will be downloaded. If
            there is a matching file at the location, the download collision will be
            handled according to the `if_collision` argument.
        file_name:             The file name
        if_collision:           Determines how to handle file collisions.
                                May be "overwrite.local", "keep.local", or "keep.both".
        synapse_cache_location: The location in .synapseCache where the file would be
                                corresponding to its FileHandleId.
        cached_file_path:      The file path of the cached copy

    Raises:
        ValueError: Invalid ifcollision. Should be "overwrite.local", "keep.local", or "keep.both".

    Returns:
        The download file path with collisions resolved or None if the file should
        not be downloaded
    """
    from synapseclient import Synapse

    client = Synapse.get_client(synapse_client=synapse_client)

    # always overwrite if we are downloading to .synapseCache
    if utils.normalize_path(download_location) == synapse_cache_location:
        if if_collision is not None and if_collision != COLLISION_OVERWRITE_LOCAL:
            client.logger.warning(
                "\n"
                + "!" * 50
                + f"\nifcollision={if_collision} "
                + "is being IGNORED because the download destination is synapse's cache."
                f' Instead, the behavior is "{COLLISION_OVERWRITE_LOCAL}". \n'
                + "!" * 50
                + "\n"
            )
        if_collision = COLLISION_OVERWRITE_LOCAL
    # if ifcollision not specified, keep.local
    if_collision = if_collision or COLLISION_KEEP_BOTH

    download_path = utils.normalize_path(os.path.join(download_location, file_name))
    # resolve collision
    if os.path.exists(path=download_path):
        if if_collision == COLLISION_OVERWRITE_LOCAL:
            pass  # Let the download proceed and overwrite the local file.
        elif if_collision == COLLISION_KEEP_LOCAL:
            client.logger.info(
                f"Found existing file at {download_path}, skipping download."
            )

            # Don't want to overwrite the local file.
            download_path = None
        elif if_collision == COLLISION_KEEP_BOTH:
            if download_path != cached_file_path:
                download_path = utils.unique_filename(download_path)
        else:
            raise ValueError(
                f'Invalid parameter: "{if_collision}" is not a valid value for "ifcollision"'
            )
    return download_path

ensure_download_location_is_directory(download_location)

Check if the download location is a directory

PARAMETER DESCRIPTION
download_location

The location on disk where the entity will be downloaded.

TYPE: str

RAISES DESCRIPTION
ValueError

If the download_location is not a directory

RETURNS DESCRIPTION
str

The download location

Source code in synapseclient/core/download/download_functions.py
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
def ensure_download_location_is_directory(download_location: str) -> str:
    """
    Check if the download location is a directory

    Arguments:
        download_location: The location on disk where the entity will be downloaded.

    Raises:
        ValueError: If the download_location is not a directory

    Returns:
        The download location
    """
    download_dir = os.path.expandvars(os.path.expanduser(download_location))
    if os.path.isfile(download_dir):
        raise ValueError(
            "Parameter 'download_location' should be a directory, not a file."
        )
    return download_dir

is_synapse_uri(uri, *, synapse_client=None)

Check whether the given uri is hosted at the configured Synapse repo endpoint

PARAMETER DESCRIPTION
uri

A given uri

TYPE: str

RETURNS DESCRIPTION
bool

A boolean value indicating whether the given uri is hosted at the configured Synapse repo endpoint

Source code in synapseclient/core/download/download_functions.py
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
def is_synapse_uri(
    uri: str,
    *,
    synapse_client: Optional["Synapse"] = None,
) -> bool:
    """
    Check whether the given uri is hosted at the configured Synapse repo endpoint

    Arguments:
        uri: A given uri

    Returns:
        A boolean value indicating whether the given uri is hosted at the configured Synapse repo endpoint
    """
    from synapseclient import Synapse

    client = Synapse.get_client(synapse_client=synapse_client)

    uri_domain = urllib_urlparse.urlparse(uri).netloc
    synapse_repo_domain = urllib_urlparse.urlparse(client.repoEndpoint).netloc
    return uri_domain.lower() == synapse_repo_domain.lower()

Async managed Multithreaded Downloads

synapseclient.core.download.download_async

Logic required for the actual transferring of files.

Classes

DownloadRequest

Bases: NamedTuple

A request to download a file from Synapse

ATTRIBUTE DESCRIPTION
file_handle_id

The file handle ID to download.

object_id

The Synapse object this file associated to.

object_type

path

The local path to download the file to. This path can be either an absolute path or a relative path from where the code is executed to the download location.

debug

A boolean to specify if debug mode is on.

TYPE: bool

Source code in synapseclient/core/download/download_async.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
class DownloadRequest(NamedTuple):
    """
    A request to download a file from Synapse

    Attributes:
        file_handle_id : The file handle ID to download.
        object_id : The Synapse object this file associated to.
        object_type : The type of the associated Synapse object. Any of
            <https://rest-docs.synapse.org/rest/org/sagebionetworks/repo/model/file/FileHandleAssociateType.html>
        path : The local path to download the file to.
            This path can be either an absolute path or
            a relative path from where the code is executed to the download location.
        debug: A boolean to specify if debug mode is on.
    """

    file_handle_id: int
    object_id: str
    object_type: str
    path: str
    debug: bool = False

PresignedUrlInfo

Bases: NamedTuple

Information about a retrieved presigned-url

ATTRIBUTE DESCRIPTION
file_name

Name of the file for the presigned url

TYPE: str

url

The actual presigned url

TYPE: str

expiration_utc

datetime in UTC at which the url will expire

TYPE: datetime

Source code in synapseclient/core/download/download_async.py
86
87
88
89
90
91
92
93
94
95
96
97
98
class PresignedUrlInfo(NamedTuple):
    """
    Information about a retrieved presigned-url

    Attributes:
        file_name: Name of the file for the presigned url
        url: The actual presigned url
        expiration_utc: datetime in UTC at which the url will expire
    """

    file_name: str
    url: str
    expiration_utc: datetime.datetime

PresignedUrlProvider dataclass

Provides an un-exipired pre-signed url to download a file

Source code in synapseclient/core/download/download_async.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
@dataclass
class PresignedUrlProvider:
    """
    Provides an un-exipired pre-signed url to download a file
    """

    client: "Synapse"
    request: DownloadRequest
    _lock: _threading.Lock = _threading.Lock()
    _cached_info: Optional[PresignedUrlInfo] = None

    # offset parameter used to buffer url expiration checks, time in seconds
    _TIME_BUFFER: datetime.timedelta = datetime.timedelta(seconds=5)

    async def get_info_async(self) -> PresignedUrlInfo:
        """
        Using async, returns the cached info if it's not expired, otherwise
        retrieves a new pre-signed url and returns that.

        Returns:
            Information about a retrieved presigned-url from either the cache or a
            new request
        """
        if not self._cached_info or (
            datetime.datetime.now(tz=datetime.timezone.utc)
            + PresignedUrlProvider._TIME_BUFFER
            >= self._cached_info.expiration_utc
        ):
            self._cached_info = await self._get_pre_signed_info_async()

        return self._cached_info

    def get_info(self) -> PresignedUrlInfo:
        """
        Using a thread lock, returns the cached info if it's not expired, otherwise
        retrieves a new pre-signed url and returns that.

        Returns:
            Information about a retrieved presigned-url from either the cache or a
            new request
        """
        with self._lock:
            if not self._cached_info or (
                datetime.datetime.now(tz=datetime.timezone.utc)
                + PresignedUrlProvider._TIME_BUFFER
                >= self._cached_info.expiration_utc
            ):
                self._cached_info = self._get_pre_signed_info()

            return self._cached_info

    def _get_pre_signed_info(self) -> PresignedUrlInfo:
        """
        Make an HTTP request to get a pre-signed url to download a file.

        Returns:
            Information about a retrieved presigned-url from a new request.
        """
        response = get_file_handle_for_download(
            file_handle_id=self.request.file_handle_id,
            synapse_id=self.request.object_id,
            entity_type=self.request.object_type,
            synapse_client=self.client,
        )
        file_name = response["fileHandle"]["fileName"]
        pre_signed_url = response["preSignedURL"]
        return PresignedUrlInfo(
            file_name=file_name,
            url=pre_signed_url,
            expiration_utc=_pre_signed_url_expiration_time(pre_signed_url),
        )

    async def _get_pre_signed_info_async(self) -> PresignedUrlInfo:
        """
        Make an HTTP request to get a pre-signed url to download a file.

        Returns:
            Information about a retrieved presigned-url from a new request.
        """
        response = await get_file_handle_for_download_async(
            file_handle_id=self.request.file_handle_id,
            synapse_id=self.request.object_id,
            entity_type=self.request.object_type,
            synapse_client=self.client,
        )
        file_name = response["fileHandle"]["fileName"]
        pre_signed_url = response["preSignedURL"]
        return PresignedUrlInfo(
            file_name=file_name,
            url=pre_signed_url,
            expiration_utc=_pre_signed_url_expiration_time(pre_signed_url),
        )
Functions
get_info_async() async

Using async, returns the cached info if it's not expired, otherwise retrieves a new pre-signed url and returns that.

RETURNS DESCRIPTION
PresignedUrlInfo

Information about a retrieved presigned-url from either the cache or a

PresignedUrlInfo

new request

Source code in synapseclient/core/download/download_async.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
async def get_info_async(self) -> PresignedUrlInfo:
    """
    Using async, returns the cached info if it's not expired, otherwise
    retrieves a new pre-signed url and returns that.

    Returns:
        Information about a retrieved presigned-url from either the cache or a
        new request
    """
    if not self._cached_info or (
        datetime.datetime.now(tz=datetime.timezone.utc)
        + PresignedUrlProvider._TIME_BUFFER
        >= self._cached_info.expiration_utc
    ):
        self._cached_info = await self._get_pre_signed_info_async()

    return self._cached_info
get_info()

Using a thread lock, returns the cached info if it's not expired, otherwise retrieves a new pre-signed url and returns that.

RETURNS DESCRIPTION
PresignedUrlInfo

Information about a retrieved presigned-url from either the cache or a

PresignedUrlInfo

new request

Source code in synapseclient/core/download/download_async.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
def get_info(self) -> PresignedUrlInfo:
    """
    Using a thread lock, returns the cached info if it's not expired, otherwise
    retrieves a new pre-signed url and returns that.

    Returns:
        Information about a retrieved presigned-url from either the cache or a
        new request
    """
    with self._lock:
        if not self._cached_info or (
            datetime.datetime.now(tz=datetime.timezone.utc)
            + PresignedUrlProvider._TIME_BUFFER
            >= self._cached_info.expiration_utc
        ):
            self._cached_info = self._get_pre_signed_info()

        return self._cached_info

Functions

download_file(client, download_request) async

Main driver for the multi-threaded download. Users an ExecutorService, either set externally onto a thread local by an outside process, or creating one as needed otherwise.

PARAMETER DESCRIPTION
client

A synapseclient

TYPE: Synapse

download_request

A batch of DownloadRequest objects specifying what Synapse files to download

TYPE: DownloadRequest

Source code in synapseclient/core/download/download_async.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
async def download_file(
    client: "Synapse",
    download_request: DownloadRequest,
) -> None:
    """
    Main driver for the multi-threaded download. Users an ExecutorService,
    either set externally onto a thread local by an outside process,
    or creating one as needed otherwise.

    Arguments:
        client: A synapseclient
        download_request: A batch of DownloadRequest objects specifying what
                            Synapse files to download
    """
    downloader = _MultithreadedDownloader(syn=client, download_request=download_request)
    await downloader.download_file()

Cache

synapseclient.core.cache


File Caching


Implements a cache on local disk for Synapse file entities and other objects with a FileHandle. This is part of the internal implementation of the client and should not be accessed directly by users of the client.

Classes

Cache

Represent a cache in which files are accessed by file handle ID.

Source code in synapseclient/core/cache.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
class Cache:
    """
    Represent a cache in which files are accessed by file handle ID.
    """

    def __setattr__(self, key, value):
        # expand out home shortcut ('~') and environment variables when setting cache_root_dir
        if key == "cache_root_dir":
            value = os.path.expandvars(os.path.expanduser(value))
            # create the cache_root_dir if it does not already exist
            if not os.path.exists(value):
                os.makedirs(value, exist_ok=True)
        self.__dict__[key] = value

    def __init__(self, cache_root_dir=CACHE_ROOT_DIR, fanout=1000):
        # set root dir of cache in which meta data will be stored and files
        # will be stored here by default, but other locations can be specified
        self.cache_root_dir = cache_root_dir
        self.fanout = fanout
        self.cache_map_file_name = ".cacheMap"

    def get_cache_dir(
        self, file_handle_id: typing.Union[collections.abc.Mapping, str]
    ) -> str:
        if isinstance(file_handle_id, collections.abc.Mapping):
            if "dataFileHandleId" in file_handle_id:
                file_handle_id = file_handle_id["dataFileHandleId"]
            elif (
                "concreteType" in file_handle_id
                and "id" in file_handle_id
                and file_handle_id["concreteType"].startswith(
                    "org.sagebionetworks.repo.model.file"
                )
            ):
                file_handle_id = file_handle_id["id"]
        return os.path.join(
            self.cache_root_dir,
            str(int(file_handle_id) % self.fanout),
            str(file_handle_id),
        )

    def _read_cache_map(self, cache_dir: str) -> dict:
        cache_map_file = os.path.join(cache_dir, self.cache_map_file_name)

        if not os.path.exists(cache_map_file):
            return {}

        with open(cache_map_file, "r") as f:
            try:
                cache_map = json.load(f)
            except json.decoder.JSONDecodeError:
                # a corrupt cache map file that is not parseable as JSON is treated
                # as if it does not exist at all (will be overwritten).
                return {}

        return cache_map

    def _write_cache_map(self, cache_dir: str, cache_map: dict) -> None:
        if not os.path.exists(cache_dir):
            os.makedirs(cache_dir)

        cache_map_file = os.path.join(cache_dir, self.cache_map_file_name)

        with open(cache_map_file, "w") as f:
            json.dump(cache_map, f)
            f.write("\n")  # For compatibility with R's JSON parser

    def _get_cache_modified_time(
        self, cache_map_entry: typing.Union[str, dict, None]
    ) -> typing.Union[str, None]:
        """
        Retrieve the `modified_time` from the `cache_map_entry`. This needed to be
        backwards-compatible any cache entries that do not have the new JSON structure
        for data. That means that if the cache_map_entry has a `modified_time` key,
        then it is a new entry and we can return the value. If it does not, then it
        is an old entry and we should return the `cache_map_entry` itself.

        The caveat is that `cache_map_entry` needs to be a string to return the value
        otherwise it will return None.

        Arguments:
            cache_map_entry: The entry from the cache map

        Returns:
            The modified time if it exists, otherwise the cache_map_entry
        """
        if cache_map_entry is not None and "modified_time" in cache_map_entry:
            return cache_map_entry.get("modified_time", None)
        elif cache_map_entry is not None and isinstance(cache_map_entry, str):
            return cache_map_entry
        return None

    def _get_cache_content_md5(
        self, cache_map_entry: typing.Union[str, dict, None]
    ) -> typing.Union[str, None]:
        """
        Retrieve the `content_md5` from the cache_map_entry.

        Arguments:
            cache_map_entry: The entry from the cache map

        Returns:
            The content md5 if it exists, otherwise None
        """
        if cache_map_entry is not None and "content_md5" in cache_map_entry:
            return cache_map_entry.get("content_md5", None)
        else:
            return None

    def _cache_item_unmodified(
        self, cache_map_entry: typing.Union[str, dict], path: str
    ) -> bool:
        """
        Determine if the cache_map_entry is unmodified by comparing the modified_time
        and content_md5 to the file at the given path.

        Arguments:
            cache_map_entry: The entry from the cache map
            path:            The path to the file to compare to

        Returns:
            True if the cache_map_entry is unmodified, otherwise False
        """
        cached_time = self._get_cache_modified_time(cache_map_entry)
        cached_md5 = self._get_cache_content_md5(cache_map_entry)

        # compare_timestamps has an implicit check for whether the path exists
        return compare_timestamps(_get_modified_time(path), cached_time) and (
            cached_md5 is None or cached_md5 == utils.md5_for_file(path).hexdigest()
        )

    def contains(
        self, file_handle_id: typing.Union[collections.abc.Mapping, str], path: str
    ) -> bool:
        """
        Given a file and file_handle_id, return True if an unmodified cached
        copy of the file exists at the exact path given or False otherwise.

        Arguments:
            file_handle_id: The ID of the fileHandle
            path:           The file path at which to look for a cached copy
        """
        cache_dir = self.get_cache_dir(file_handle_id)
        if not os.path.exists(cache_dir):
            return False

        with Lock(self.cache_map_file_name, dir=cache_dir):
            cache_map = self._read_cache_map(cache_dir)

            path = utils.normalize_path(path)

            cached_time = self._get_cache_modified_time(cache_map.get(path, None))

            if cached_time:
                return compare_timestamps(_get_modified_time(path), cached_time)
        return False

    @tracer.start_as_current_span("cache::get")
    def get(
        self,
        file_handle_id: typing.Union[collections.abc.Mapping, str],
        path: str = None,
    ) -> typing.Union[str, None]:
        """
        Retrieve a file with the given file handle from the cache.

        Arguments:
            file_handle_id: The ID of the fileHandle
            path:           If the given path is None, look for a cached copy of the
                            file in the cache directory. If the path is a directory,
                            look there for a cached copy. If a full file-path is
                            given, only check whether that exact file exists and is
                            unmodified since it was cached.

        Returns:
            Either a file path, if an unmodified cached copy of the file
            exists in the specified location or None if it does not
        """
        cache_dir = self.get_cache_dir(file_handle_id)
        trace.get_current_span().set_attributes(
            {
                "synapse.cache.dir": cache_dir,
                "synapse.cache.file_handle_id": file_handle_id,
            }
        )
        if not os.path.exists(cache_dir):
            trace.get_current_span().set_attributes({"synapse.cache.hit": False})
            return None

        with Lock(self.cache_map_file_name, dir=cache_dir):
            cache_map = self._read_cache_map(cache_dir)

            path = utils.normalize_path(path)

            # If the caller specifies a path and that path exists in the cache
            # but has been modified, we need to indicate no match by returning
            # None. The logic for updating a synapse entity depends on this to
            # determine the need to upload a new file.

            if path is not None:
                # If we're given a path to a directory, look for a cached file in that directory
                if os.path.isdir(path):
                    matching_unmodified_directory = None
                    removed_entry_from_cache = (
                        False  # determines if cache_map needs to be rewritten to disk
                    )

                    # iterate a copy of cache_map to allow modifying original cache_map
                    for cached_file_path, cache_map_entry in dict(cache_map).items():
                        if path == os.path.dirname(cached_file_path):
                            if self._cache_item_unmodified(
                                cache_map_entry, cached_file_path
                            ):
                                # "break" instead of "return" to write removed invalid entries to disk if necessary
                                matching_unmodified_directory = cached_file_path
                                break
                            else:
                                # remove invalid cache entries pointing to files that that no longer exist
                                # or have been modified
                                del cache_map[cached_file_path]
                                removed_entry_from_cache = True

                    if removed_entry_from_cache:
                        # write cache_map with non-existent entries removed
                        self._write_cache_map(cache_dir, cache_map)

                    if matching_unmodified_directory is not None:
                        trace.get_current_span().set_attributes(
                            {"synapse.cache.hit": True}
                        )
                        return matching_unmodified_directory

                # if we're given a full file path, look up a matching file in the cache
                else:
                    cache_map_entry = cache_map.get(path, None)
                    if cache_map_entry:
                        matching_file_path = (
                            path
                            if self._cache_item_unmodified(cache_map_entry, path)
                            else None
                        )
                        trace.get_current_span().set_attributes(
                            {"synapse.cache.hit": matching_file_path is not None}
                        )
                        return matching_file_path

            # return most recently cached and unmodified file OR
            # None if there are no unmodified files
            for cached_file_path, cache_map_entry in sorted(
                cache_map.items(),
                key=lambda item: (
                    item[1]["modified_time"] if isinstance(item[1], dict) else item[1]
                ),
                reverse=True,
            ):
                if self._cache_item_unmodified(cache_map_entry, cached_file_path):
                    trace.get_current_span().set_attributes({"synapse.cache.hit": True})
                    return cached_file_path

            trace.get_current_span().set_attributes({"synapse.cache.hit": False})
            return None

    def add(
        self,
        file_handle_id: typing.Union[collections.abc.Mapping, str],
        path: str,
        md5: str = None,
    ) -> dict:
        """
        Add a file to the cache
        """
        if not path or not os.path.exists(path):
            raise ValueError('Can\'t find file "%s"' % path)

        cache_dir = self.get_cache_dir(file_handle_id)
        content_md5 = md5 or utils.md5_for_file(path).hexdigest()
        with Lock(self.cache_map_file_name, dir=cache_dir):
            cache_map = self._read_cache_map(cache_dir)

            path = utils.normalize_path(path)
            # write .000 milliseconds for backward compatibility
            cache_map[path] = {
                "modified_time": epoch_time_to_iso(
                    math.floor(_get_modified_time(path))
                ),
                "content_md5": content_md5,
            }
            self._write_cache_map(cache_dir, cache_map)

        return cache_map

    def remove(
        self,
        file_handle_id: typing.Union[collections.abc.Mapping, str],
        path: str = None,
        delete: bool = None,
    ) -> typing.List[str]:
        """
        Remove a file from the cache.

        Arguments:
            file_handle_id: Will also extract file handle id from either a File or file handle
            path:           If the given path is None, remove (and potentially delete)
                            all cached copies. If the path is that of a file in the
                            .cacheMap file, remove it.
            delete:         If True, delete the file from disk as well as removing it from the cache

        Returns:
            A list of files removed
        """
        removed = []
        cache_dir = self.get_cache_dir(file_handle_id)

        # if we've passed an entity and not a path, get path from entity
        if (
            path is None
            and isinstance(file_handle_id, collections.abc.Mapping)
            and "path" in file_handle_id
        ):
            path = file_handle_id["path"]

        with Lock(self.cache_map_file_name, dir=cache_dir):
            cache_map = self._read_cache_map(cache_dir)

            if path is None:
                for path in cache_map:
                    if delete is True and os.path.exists(path):
                        os.remove(path)
                    removed.append(path)
                cache_map = {}
            else:
                path = utils.normalize_path(path)
                if path in cache_map:
                    if delete is True and os.path.exists(path):
                        os.remove(path)
                    del cache_map[path]
                    removed.append(path)

            self._write_cache_map(cache_dir, cache_map)

        return removed

    def _cache_dirs(self):
        """
        Generate a list of all cache dirs, directories of the form:
        [cache.cache_root_dir]/949/59949
        """
        for item1 in os.listdir(self.cache_root_dir):
            path1 = os.path.join(self.cache_root_dir, item1)
            if os.path.isdir(path1) and re.match("\\d+", item1):
                for item2 in os.listdir(path1):
                    path2 = os.path.join(path1, item2)
                    if os.path.isdir(path2) and re.match("\\d+", item2):
                        yield path2

    def purge(
        self,
        before_date: typing.Union[datetime.datetime, int] = None,
        after_date: typing.Union[datetime.datetime, int] = None,
        dry_run: bool = False,
    ) -> int:
        """
        Purge the cache. Use with caution. Deletes files whose cache maps were last updated in a specified period.

        Deletes .cacheMap files and files stored in the cache.cache_root_dir, but does not delete files stored outside
        the cache.

        Arguments:
            before_date: If specified, all files before this date will be removed
            after_date:  If specified, all files after this date will be removed
            dry_run:     If dry_run is True, then the selected files are printed rather than removed

        Returns:
            The number of files selected for removal

        Example: Using this function
            Either the before_date or after_date must be specified. If both are passed, files between the two dates are
            selected for removal. Dates must either be a timezone naive Python datetime.datetime instance or the number
            of seconds since the unix epoch. For example to delete all the files modified in January 2021, either of the
            following can be used::

            using offset naive datetime objects

                cache.purge(after_date=datetime.datetime(2021, 1, 1), before_date=datetime.datetime(2021, 2, 1))

            using seconds since the unix epoch

                cache.purge(after_date=1609459200, before_date=1612137600)
        """
        if before_date is None and after_date is None:
            raise ValueError("Either before date or after date should be provided")

        if isinstance(before_date, datetime.datetime):
            before_date = utils.to_unix_epoch_time_secs(before_date)
        if isinstance(after_date, datetime.datetime):
            after_date = utils.to_unix_epoch_time_secs(after_date)

        if before_date and after_date and before_date < after_date:
            raise ValueError("Before date should be larger than after date")

        count = 0
        for cache_dir in self._cache_dirs():
            # _get_modified_time returns None if the cache map file doesn't
            # exist and n > None evaluates to True in python 2.7(wtf?). I'm guessing it's
            # OK to purge directories in the cache that have no .cacheMap file

            last_modified_time = _get_modified_time(
                os.path.join(cache_dir, self.cache_map_file_name)
            )
            if last_modified_time is None or (
                (not before_date or before_date > last_modified_time)
                and (not after_date or after_date < last_modified_time)
            ):
                if dry_run:
                    print(cache_dir)
                else:
                    shutil.rmtree(cache_dir)
                count += 1
        return count
Functions
contains(file_handle_id, path)

Given a file and file_handle_id, return True if an unmodified cached copy of the file exists at the exact path given or False otherwise.

PARAMETER DESCRIPTION
file_handle_id

The ID of the fileHandle

TYPE: Union[Mapping, str]

path

The file path at which to look for a cached copy

TYPE: str

Source code in synapseclient/core/cache.py
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
def contains(
    self, file_handle_id: typing.Union[collections.abc.Mapping, str], path: str
) -> bool:
    """
    Given a file and file_handle_id, return True if an unmodified cached
    copy of the file exists at the exact path given or False otherwise.

    Arguments:
        file_handle_id: The ID of the fileHandle
        path:           The file path at which to look for a cached copy
    """
    cache_dir = self.get_cache_dir(file_handle_id)
    if not os.path.exists(cache_dir):
        return False

    with Lock(self.cache_map_file_name, dir=cache_dir):
        cache_map = self._read_cache_map(cache_dir)

        path = utils.normalize_path(path)

        cached_time = self._get_cache_modified_time(cache_map.get(path, None))

        if cached_time:
            return compare_timestamps(_get_modified_time(path), cached_time)
    return False
get(file_handle_id, path=None)

Retrieve a file with the given file handle from the cache.

PARAMETER DESCRIPTION
file_handle_id

The ID of the fileHandle

TYPE: Union[Mapping, str]

path

If the given path is None, look for a cached copy of the file in the cache directory. If the path is a directory, look there for a cached copy. If a full file-path is given, only check whether that exact file exists and is unmodified since it was cached.

TYPE: str DEFAULT: None

RETURNS DESCRIPTION
Union[str, None]

Either a file path, if an unmodified cached copy of the file

Union[str, None]

exists in the specified location or None if it does not

Source code in synapseclient/core/cache.py
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
@tracer.start_as_current_span("cache::get")
def get(
    self,
    file_handle_id: typing.Union[collections.abc.Mapping, str],
    path: str = None,
) -> typing.Union[str, None]:
    """
    Retrieve a file with the given file handle from the cache.

    Arguments:
        file_handle_id: The ID of the fileHandle
        path:           If the given path is None, look for a cached copy of the
                        file in the cache directory. If the path is a directory,
                        look there for a cached copy. If a full file-path is
                        given, only check whether that exact file exists and is
                        unmodified since it was cached.

    Returns:
        Either a file path, if an unmodified cached copy of the file
        exists in the specified location or None if it does not
    """
    cache_dir = self.get_cache_dir(file_handle_id)
    trace.get_current_span().set_attributes(
        {
            "synapse.cache.dir": cache_dir,
            "synapse.cache.file_handle_id": file_handle_id,
        }
    )
    if not os.path.exists(cache_dir):
        trace.get_current_span().set_attributes({"synapse.cache.hit": False})
        return None

    with Lock(self.cache_map_file_name, dir=cache_dir):
        cache_map = self._read_cache_map(cache_dir)

        path = utils.normalize_path(path)

        # If the caller specifies a path and that path exists in the cache
        # but has been modified, we need to indicate no match by returning
        # None. The logic for updating a synapse entity depends on this to
        # determine the need to upload a new file.

        if path is not None:
            # If we're given a path to a directory, look for a cached file in that directory
            if os.path.isdir(path):
                matching_unmodified_directory = None
                removed_entry_from_cache = (
                    False  # determines if cache_map needs to be rewritten to disk
                )

                # iterate a copy of cache_map to allow modifying original cache_map
                for cached_file_path, cache_map_entry in dict(cache_map).items():
                    if path == os.path.dirname(cached_file_path):
                        if self._cache_item_unmodified(
                            cache_map_entry, cached_file_path
                        ):
                            # "break" instead of "return" to write removed invalid entries to disk if necessary
                            matching_unmodified_directory = cached_file_path
                            break
                        else:
                            # remove invalid cache entries pointing to files that that no longer exist
                            # or have been modified
                            del cache_map[cached_file_path]
                            removed_entry_from_cache = True

                if removed_entry_from_cache:
                    # write cache_map with non-existent entries removed
                    self._write_cache_map(cache_dir, cache_map)

                if matching_unmodified_directory is not None:
                    trace.get_current_span().set_attributes(
                        {"synapse.cache.hit": True}
                    )
                    return matching_unmodified_directory

            # if we're given a full file path, look up a matching file in the cache
            else:
                cache_map_entry = cache_map.get(path, None)
                if cache_map_entry:
                    matching_file_path = (
                        path
                        if self._cache_item_unmodified(cache_map_entry, path)
                        else None
                    )
                    trace.get_current_span().set_attributes(
                        {"synapse.cache.hit": matching_file_path is not None}
                    )
                    return matching_file_path

        # return most recently cached and unmodified file OR
        # None if there are no unmodified files
        for cached_file_path, cache_map_entry in sorted(
            cache_map.items(),
            key=lambda item: (
                item[1]["modified_time"] if isinstance(item[1], dict) else item[1]
            ),
            reverse=True,
        ):
            if self._cache_item_unmodified(cache_map_entry, cached_file_path):
                trace.get_current_span().set_attributes({"synapse.cache.hit": True})
                return cached_file_path

        trace.get_current_span().set_attributes({"synapse.cache.hit": False})
        return None
add(file_handle_id, path, md5=None)

Add a file to the cache

Source code in synapseclient/core/cache.py
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
def add(
    self,
    file_handle_id: typing.Union[collections.abc.Mapping, str],
    path: str,
    md5: str = None,
) -> dict:
    """
    Add a file to the cache
    """
    if not path or not os.path.exists(path):
        raise ValueError('Can\'t find file "%s"' % path)

    cache_dir = self.get_cache_dir(file_handle_id)
    content_md5 = md5 or utils.md5_for_file(path).hexdigest()
    with Lock(self.cache_map_file_name, dir=cache_dir):
        cache_map = self._read_cache_map(cache_dir)

        path = utils.normalize_path(path)
        # write .000 milliseconds for backward compatibility
        cache_map[path] = {
            "modified_time": epoch_time_to_iso(
                math.floor(_get_modified_time(path))
            ),
            "content_md5": content_md5,
        }
        self._write_cache_map(cache_dir, cache_map)

    return cache_map
remove(file_handle_id, path=None, delete=None)

Remove a file from the cache.

PARAMETER DESCRIPTION
file_handle_id

Will also extract file handle id from either a File or file handle

TYPE: Union[Mapping, str]

path

If the given path is None, remove (and potentially delete) all cached copies. If the path is that of a file in the .cacheMap file, remove it.

TYPE: str DEFAULT: None

delete

If True, delete the file from disk as well as removing it from the cache

TYPE: bool DEFAULT: None

RETURNS DESCRIPTION
List[str]

A list of files removed

Source code in synapseclient/core/cache.py
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
def remove(
    self,
    file_handle_id: typing.Union[collections.abc.Mapping, str],
    path: str = None,
    delete: bool = None,
) -> typing.List[str]:
    """
    Remove a file from the cache.

    Arguments:
        file_handle_id: Will also extract file handle id from either a File or file handle
        path:           If the given path is None, remove (and potentially delete)
                        all cached copies. If the path is that of a file in the
                        .cacheMap file, remove it.
        delete:         If True, delete the file from disk as well as removing it from the cache

    Returns:
        A list of files removed
    """
    removed = []
    cache_dir = self.get_cache_dir(file_handle_id)

    # if we've passed an entity and not a path, get path from entity
    if (
        path is None
        and isinstance(file_handle_id, collections.abc.Mapping)
        and "path" in file_handle_id
    ):
        path = file_handle_id["path"]

    with Lock(self.cache_map_file_name, dir=cache_dir):
        cache_map = self._read_cache_map(cache_dir)

        if path is None:
            for path in cache_map:
                if delete is True and os.path.exists(path):
                    os.remove(path)
                removed.append(path)
            cache_map = {}
        else:
            path = utils.normalize_path(path)
            if path in cache_map:
                if delete is True and os.path.exists(path):
                    os.remove(path)
                del cache_map[path]
                removed.append(path)

        self._write_cache_map(cache_dir, cache_map)

    return removed
purge(before_date=None, after_date=None, dry_run=False)

Purge the cache. Use with caution. Deletes files whose cache maps were last updated in a specified period.

Deletes .cacheMap files and files stored in the cache.cache_root_dir, but does not delete files stored outside the cache.

PARAMETER DESCRIPTION
before_date

If specified, all files before this date will be removed

TYPE: Union[datetime, int] DEFAULT: None

after_date

If specified, all files after this date will be removed

TYPE: Union[datetime, int] DEFAULT: None

dry_run

If dry_run is True, then the selected files are printed rather than removed

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
int

The number of files selected for removal

Using this function

Either the before_date or after_date must be specified. If both are passed, files between the two dates are selected for removal. Dates must either be a timezone naive Python datetime.datetime instance or the number of seconds since the unix epoch. For example to delete all the files modified in January 2021, either of the following can be used::

using offset naive datetime objects

cache.purge(after_date=datetime.datetime(2021, 1, 1), before_date=datetime.datetime(2021, 2, 1))

using seconds since the unix epoch

cache.purge(after_date=1609459200, before_date=1612137600)
Source code in synapseclient/core/cache.py
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
def purge(
    self,
    before_date: typing.Union[datetime.datetime, int] = None,
    after_date: typing.Union[datetime.datetime, int] = None,
    dry_run: bool = False,
) -> int:
    """
    Purge the cache. Use with caution. Deletes files whose cache maps were last updated in a specified period.

    Deletes .cacheMap files and files stored in the cache.cache_root_dir, but does not delete files stored outside
    the cache.

    Arguments:
        before_date: If specified, all files before this date will be removed
        after_date:  If specified, all files after this date will be removed
        dry_run:     If dry_run is True, then the selected files are printed rather than removed

    Returns:
        The number of files selected for removal

    Example: Using this function
        Either the before_date or after_date must be specified. If both are passed, files between the two dates are
        selected for removal. Dates must either be a timezone naive Python datetime.datetime instance or the number
        of seconds since the unix epoch. For example to delete all the files modified in January 2021, either of the
        following can be used::

        using offset naive datetime objects

            cache.purge(after_date=datetime.datetime(2021, 1, 1), before_date=datetime.datetime(2021, 2, 1))

        using seconds since the unix epoch

            cache.purge(after_date=1609459200, before_date=1612137600)
    """
    if before_date is None and after_date is None:
        raise ValueError("Either before date or after date should be provided")

    if isinstance(before_date, datetime.datetime):
        before_date = utils.to_unix_epoch_time_secs(before_date)
    if isinstance(after_date, datetime.datetime):
        after_date = utils.to_unix_epoch_time_secs(after_date)

    if before_date and after_date and before_date < after_date:
        raise ValueError("Before date should be larger than after date")

    count = 0
    for cache_dir in self._cache_dirs():
        # _get_modified_time returns None if the cache map file doesn't
        # exist and n > None evaluates to True in python 2.7(wtf?). I'm guessing it's
        # OK to purge directories in the cache that have no .cacheMap file

        last_modified_time = _get_modified_time(
            os.path.join(cache_dir, self.cache_map_file_name)
        )
        if last_modified_time is None or (
            (not before_date or before_date > last_modified_time)
            and (not after_date or after_date < last_modified_time)
        ):
            if dry_run:
                print(cache_dir)
            else:
                shutil.rmtree(cache_dir)
            count += 1
    return count

Functions

epoch_time_to_iso(epoch_time)

Convert seconds since unix epoch to a string in ISO format

Source code in synapseclient/core/cache.py
32
33
34
35
36
37
38
39
40
def epoch_time_to_iso(epoch_time):
    """
    Convert seconds since unix epoch to a string in ISO format
    """
    return (
        None
        if epoch_time is None
        else utils.datetime_to_iso(utils.from_unix_epoch_time_secs(epoch_time))
    )

iso_time_to_epoch(iso_time)

Convert an ISO formatted time into seconds since unix epoch

Source code in synapseclient/core/cache.py
43
44
45
46
47
48
49
50
51
def iso_time_to_epoch(iso_time):
    """
    Convert an ISO formatted time into seconds since unix epoch
    """
    return (
        None
        if iso_time is None
        else utils.to_unix_epoch_time_secs(utils.iso_to_datetime(iso_time))
    )

compare_timestamps(modified_time, cached_time)

Compare two ISO formatted timestamps, with a special case when cached_time ends in .000Z.

For backward compatibility, we always write .000 for milliseconds into the cache. We then match a cached time ending in .000Z, meaning zero milliseconds with a modified time with any number of milliseconds.

PARAMETER DESCRIPTION
modified_time

The float representing seconds since unix epoch

cached_time

The string holding a ISO formatted time

Source code in synapseclient/core/cache.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
def compare_timestamps(modified_time, cached_time):
    """
    Compare two ISO formatted timestamps, with a special case when cached_time ends in .000Z.

    For backward compatibility, we always write .000 for milliseconds into the cache.
    We then match a cached time ending in .000Z, meaning zero milliseconds with a modified time with any number of
    milliseconds.

    Arguments:
        modified_time: The float representing seconds since unix epoch
        cached_time:   The string holding a ISO formatted time
    """
    if cached_time is None or modified_time is None:
        return False
    if cached_time.endswith(".000Z"):
        return cached_time == epoch_time_to_iso(math.floor(modified_time))
    else:
        return cached_time == epoch_time_to_iso(modified_time)

Credentials

synapseclient.core.credentials.cred_data.SynapseCredentials

Bases: AuthBase, ABC

Source code in synapseclient/core/credentials/cred_data.py
12
13
14
15
16
17
18
19
20
21
class SynapseCredentials(requests.auth.AuthBase, abc.ABC):
    @property
    @abc.abstractmethod
    def username(self) -> None:
        """The username associated with these credentials."""

    @property
    @abc.abstractmethod
    def secret(self) -> None:
        """The secret associated with these credentials."""

Attributes

username: None abstractmethod property

The username associated with these credentials.

secret: None abstractmethod property

The secret associated with these credentials.

synapseclient.core.credentials.cred_data.SynapseAuthTokenCredentials

Bases: SynapseCredentials

Source code in synapseclient/core/credentials/cred_data.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
class SynapseAuthTokenCredentials(SynapseCredentials):
    @classmethod
    def get_keyring_service_name(
        cls,
    ) -> typing.Literal["SYNAPSE.ORG_CLIENT_AUTH_TOKEN"]:
        return "SYNAPSE.ORG_CLIENT_AUTH_TOKEN"

    @classmethod
    def _validate_token(cls, token):
        # decode the token to ensure it minimally has view scope.
        # if it doesn't raise an error, the client will not be useful without it.

        # if for any reason we are not able to decode the token and check its scopes
        # we do NOT raise an error. this is to accommodate the possibility of a changed
        # token format some day that this version of the client may still be able to
        # pass as a bearer token.
        try:
            token_body = json.loads(
                str(
                    base64.urlsafe_b64decode(
                        # we add padding to ensure that lack of padding won't prevent a decode error.
                        # the python base64 implementation will truncate extra padding so we can overpad
                        # rather than compute exactly how much padding we might need.
                        # https://stackoverflow.com/a/49459036
                        token.split(".")[1]
                        + "==="
                    ),
                    "utf-8",
                )
            )
            scopes = token_body.get("access", {}).get("scope")
            if scopes is not None and "view" not in scopes:
                raise SynapseAuthenticationError("A view scoped token is required")

        except (IndexError, ValueError):
            # possible errors if token is not encoded as expected:
            # IndexError if the token is not a '.' delimited base64 string with a header and body
            # ValueError if the split string is not base64 encoded or if the decoded base64 is not json
            pass

    def __init__(
        self, token: str, username: str = None, displayname: str = None
    ) -> None:
        self._validate_token(token)

        self._token = token
        self.username = username
        self.displayname = displayname

    @property
    def username(self) -> str:
        """The username associated with this token."""
        return self._username

    @username.setter
    def username(self, username: str) -> None:
        self._username = username

    @property
    def displayname(self) -> str:
        """The displayname associated with this token."""
        return self._displayname

    @displayname.setter
    def displayname(self, displayname: str) -> None:
        self._displayname = displayname

    @property
    def secret(self) -> str:
        """The bearer token."""
        return self._token

    def __call__(self, r):
        r.headers.update({"Authorization": f"Bearer {self.secret}"})
        return r

    def __repr__(self):
        return (
            f"SynapseAuthTokenCredentials("
            f"username='{self.username}', "
            f"displayname='{self.displayname}', "
            f"token='{self.secret}')"
        )

Attributes

username: str property writable

The username associated with this token.

displayname: str property writable

The displayname associated with this token.

secret: str property

The bearer token.

synapseclient.core.credentials.credential_provider

This module contains classes that are responsible for retrieving synapse authentication information (e.g. authToken) from a source (e.g. login args, config file).

Classes

SynapseCredentialsProvider

A credential provider is responsible for retrieving synapse authentication information (e.g. authToken) from a source (e.g. login args, config file), and use them to return a SynapseCredentials instance.

Source code in synapseclient/core/credentials/credential_provider.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
class SynapseCredentialsProvider(metaclass=abc.ABCMeta):
    """
    A credential provider is responsible for retrieving synapse authentication
    information (e.g. authToken) from a source (e.g. login args, config file),
    and use them to return a
    [SynapseCredentials][synapseclient.core.credentials.cred_data.SynapseCredentials]
    instance.
    """

    @abc.abstractmethod
    def _get_auth_info(
        self, syn: "Synapse", user_login_args: Dict[str, str]
    ) -> Tuple[None, None]:
        """
        Subclasses must implement this to decide how to obtain an authentication token.
        For any of these values, return None if it is not possible to get that value.

        Not all implementations will need to make use of the user_login_args parameter
        or syn. These parameters provide context about the Synapse client's configuration
        and login() arguments.

        Arguments:
            syn: Synapse client instance
            user_login_args: subset of arguments passed during syn.login()

        Returns:
            Tuple of (username, bearer authentication token e.g. a personal access token),
            any of these values could None if it is not available.
        """
        return None, None

    def get_synapse_credentials(
        self, syn: "Synapse", user_login_args: Dict[str, str]
    ) -> Union[SynapseCredentials, None]:
        """
        Returns
        [SynapseCredentials][synapseclient.core.credentials.cred_data.SynapseCredentials]
        if this provider is able to get valid credentials, returns None otherwise.

        Arguments:
            syn: Synapse client instance
            user_login_args: subset of arguments passed during syn.login()

        Returns:
            [SynapseCredentials][synapseclient.core.credentials.cred_data.SynapseCredentials]
                if valid credentials can be found by this provider, None otherwise.
        """
        return self._create_synapse_credential(
            syn, *self._get_auth_info(syn=syn, user_login_args=user_login_args)
        )

    def _create_synapse_credential(
        self, syn: "Synapse", username: str, auth_token: str
    ) -> Union[SynapseCredentials, None]:
        if auth_token is not None:
            credentials = SynapseAuthTokenCredentials(auth_token)
            profile = syn.restGET("/userProfile", auth=credentials)
            profile_username = profile.get("userName")
            profile_emails = profile.get("emails", [])
            profile_displayname = profile.get("displayName")

            if username and (
                username != profile_username and username not in profile_emails
            ):
                # a username/email is not required when logging in with an auth token
                # however if both are provided raise an error if they do not correspond
                # to avoid any ambiguity about what profile was logged in
                raise SynapseAuthenticationError(
                    "username/email and auth_token both provided but username does not "
                    "match token profile"
                )
            credentials.username = profile_username
            credentials.displayname = profile_displayname

            return credentials

        return None
Functions
get_synapse_credentials(syn, user_login_args)

Returns SynapseCredentials if this provider is able to get valid credentials, returns None otherwise.

PARAMETER DESCRIPTION
syn

Synapse client instance

TYPE: Synapse

user_login_args

subset of arguments passed during syn.login()

TYPE: Dict[str, str]

RETURNS DESCRIPTION
Union[SynapseCredentials, None]

SynapseCredentials if valid credentials can be found by this provider, None otherwise.

Source code in synapseclient/core/credentials/credential_provider.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
def get_synapse_credentials(
    self, syn: "Synapse", user_login_args: Dict[str, str]
) -> Union[SynapseCredentials, None]:
    """
    Returns
    [SynapseCredentials][synapseclient.core.credentials.cred_data.SynapseCredentials]
    if this provider is able to get valid credentials, returns None otherwise.

    Arguments:
        syn: Synapse client instance
        user_login_args: subset of arguments passed during syn.login()

    Returns:
        [SynapseCredentials][synapseclient.core.credentials.cred_data.SynapseCredentials]
            if valid credentials can be found by this provider, None otherwise.
    """
    return self._create_synapse_credential(
        syn, *self._get_auth_info(syn=syn, user_login_args=user_login_args)
    )

UserArgsCredentialsProvider

Bases: SynapseCredentialsProvider

Retrieves auth info from user_login_args during a CLI session.

Source code in synapseclient/core/credentials/credential_provider.py
100
101
102
103
104
105
106
107
108
109
110
111
class UserArgsCredentialsProvider(SynapseCredentialsProvider):
    """
    Retrieves auth info from user_login_args during a CLI session.
    """

    def _get_auth_info(
        self, syn: "Synapse", user_login_args: Dict[str, str]
    ) -> Tuple[str, str]:
        return (
            user_login_args.username,
            user_login_args.auth_token,
        )

ConfigFileCredentialsProvider

Bases: SynapseCredentialsProvider

Retrieves auth info from the ~/.synapseConfig file

Source code in synapseclient/core/credentials/credential_provider.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
class ConfigFileCredentialsProvider(SynapseCredentialsProvider):
    """
    Retrieves auth info from the `~/.synapseConfig` file
    """

    def _get_auth_info(
        self, syn: "Synapse", user_login_args: Dict[str, str]
    ) -> Tuple[Union[str, None], Union[str, None]]:
        config_dict = get_config_authentication(config_path=syn.configPath)
        # check to make sure we didn't accidentally provide the wrong user

        username = config_dict.get("username")
        token = config_dict.get("authtoken")

        if user_login_args.username and username != user_login_args.username:
            # if the username is provided and there is a config file username but they
            # don't match then we don't use any of the values from the config
            # to prevent ambiguity
            username = None
            token = None
            syn.logger.warning(
                f"{user_login_args.username} was defined in the user login "
                "arguments, however, it is also defined in the `~/.synapseConfig` "
                "file. Becuase they do not match we will not use the `authtoken` "
                "in the `~/.synapseConfig` file.",
            )

        return username, token

AWSParameterStoreCredentialsProvider

Bases: SynapseCredentialsProvider

Retrieves user's authentication token from AWS SSM Parameter store

Source code in synapseclient/core/credentials/credential_provider.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
class AWSParameterStoreCredentialsProvider(SynapseCredentialsProvider):
    """
    Retrieves user's authentication token from AWS SSM Parameter store
    """

    ENVIRONMENT_VAR_NAME = "SYNAPSE_TOKEN_AWS_SSM_PARAMETER_NAME"

    def _get_auth_info(
        self, syn: "Synapse", user_login_args: Dict[str, str]
    ) -> Tuple[Union[str, None], Union[str, None]]:
        ssm_param_name = os.environ.get(self.ENVIRONMENT_VAR_NAME)
        token = None
        if ssm_param_name:
            try:
                import boto3
                import botocore

                ssm_client = boto3.client("ssm")
                result = ssm_client.get_parameter(
                    Name=ssm_param_name,
                    WithDecryption=True,
                )
                token = result["Parameter"]["Value"]
            except ImportError:
                syn.logger.warning(
                    f"{self.ENVIRONMENT_VAR_NAME} was defined as {ssm_param_name}, "
                    'but "boto3" could not be imported. The Synapse client uses "boto3" '
                    "in order to access Systems Manager Parameter Storage. Please ensure "
                    'that you have installed "boto3" to enable this feature.'
                )
            # this except block must be defined after the ImportError except block
            # otherwise, there's no guarantee "botocore" is already imported and defined
            except botocore.exceptions.ClientError:
                syn.logger.warning(
                    f"{self.ENVIRONMENT_VAR_NAME} was defined as {ssm_param_name}, "
                    "but the matching parameter name could not be found in AWS Parameter "
                    "Store. Caused by AWS error:\n",
                    exc_info=True,
                )

        # if username is included in user's arguments, return it so that
        # it may be validated against the username authenticated by the token
        return user_login_args.username, token

EnvironmentVariableCredentialsProvider

Bases: SynapseCredentialsProvider

Retrieves the user's authentication token from an environment variable

Source code in synapseclient/core/credentials/credential_provider.py
189
190
191
192
193
194
195
196
197
198
199
200
201
202
class EnvironmentVariableCredentialsProvider(SynapseCredentialsProvider):
    """
    Retrieves the user's authentication token from an environment variable
    """

    ENVIRONMENT_VAR_NAME = "SYNAPSE_AUTH_TOKEN"

    def _get_auth_info(
        self, syn: "Synapse", user_login_args: Dict[str, str]
    ) -> Tuple[Union[str, None], Union[str, None]]:
        return (
            user_login_args.username,
            os.environ.get(self.ENVIRONMENT_VAR_NAME),
        )

SynapseCredentialsProviderChain

Bases: object

Class that has a list of SynapseCredentialsProvider from which this class attempts to retrieve SynapseCredentials.

By default this class uses the following providers in this order:

  1. UserArgsCredentialsProvider
  2. ConfigFileCredentialsProvider
  3. EnvironmentVariableCredentialsProvider
  4. AWSParameterStoreCredentialsProvider
ATTRIBUTE DESCRIPTION
cred_providers

list of (SynapseCredentialsProvider) credential providers

Source code in synapseclient/core/credentials/credential_provider.py
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
class SynapseCredentialsProviderChain(object):
    """
    Class that has a list of
    [SynapseCredentialsProvider][synapseclient.core.credentials.credential_provider.SynapseCredentialsProvider]
    from which this class attempts to retrieve
    [SynapseCredentials][synapseclient.core.credentials.cred_data.SynapseCredentials].


    By default this class uses the following providers in this order:

    1. [UserArgsCredentialsProvider][synapseclient.core.credentials.credential_provider.UserArgsCredentialsProvider]
    2. [ConfigFileCredentialsProvider][synapseclient.core.credentials.credential_provider.ConfigFileCredentialsProvider]
    3. [EnvironmentVariableCredentialsProvider][synapseclient.core.credentials.credential_provider.EnvironmentVariableCredentialsProvider]
    4. [AWSParameterStoreCredentialsProvider][synapseclient.core.credentials.credential_provider.AWSParameterStoreCredentialsProvider]

    Attributes:
        cred_providers: list of
            ([SynapseCredentialsProvider][synapseclient.core.credentials.credential_provider.SynapseCredentialsProvider])
            credential providers
    """

    def __init__(self, cred_providers) -> None:
        self.cred_providers = list(cred_providers)

    def get_credentials(
        self, syn: "Synapse", user_login_args: Dict[str, str]
    ) -> Union[SynapseCredentials, None]:
        """
        Iterates its list of
        [SynapseCredentialsProvider][synapseclient.core.credentials.credential_provider.SynapseCredentialsProvider]
        and returns the first non-None
        [SynapseCredentials][synapseclient.core.credentials.cred_data.SynapseCredentials]
        returned by a provider. If no provider is able to provide a
        [SynapseCredentials][synapseclient.core.credentials.cred_data.SynapseCredentials],
        returns None.

        Arguments:
            syn: Synapse client instance
            user_login_args: subset of arguments passed during syn.login()

        Returns:
            [SynapseCredentials][synapseclient.core.credentials.cred_data.SynapseCredentials]
                returned by the first non-None provider in its list, None otherwise
        """
        for provider in self.cred_providers:
            creds = provider.get_synapse_credentials(syn, user_login_args)
            if creds is not None:
                return creds
        return None
Functions
get_credentials(syn, user_login_args)

Iterates its list of SynapseCredentialsProvider and returns the first non-None SynapseCredentials returned by a provider. If no provider is able to provide a SynapseCredentials, returns None.

PARAMETER DESCRIPTION
syn

Synapse client instance

TYPE: Synapse

user_login_args

subset of arguments passed during syn.login()

TYPE: Dict[str, str]

RETURNS DESCRIPTION
Union[SynapseCredentials, None]

SynapseCredentials returned by the first non-None provider in its list, None otherwise

Source code in synapseclient/core/credentials/credential_provider.py
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
def get_credentials(
    self, syn: "Synapse", user_login_args: Dict[str, str]
) -> Union[SynapseCredentials, None]:
    """
    Iterates its list of
    [SynapseCredentialsProvider][synapseclient.core.credentials.credential_provider.SynapseCredentialsProvider]
    and returns the first non-None
    [SynapseCredentials][synapseclient.core.credentials.cred_data.SynapseCredentials]
    returned by a provider. If no provider is able to provide a
    [SynapseCredentials][synapseclient.core.credentials.cred_data.SynapseCredentials],
    returns None.

    Arguments:
        syn: Synapse client instance
        user_login_args: subset of arguments passed during syn.login()

    Returns:
        [SynapseCredentials][synapseclient.core.credentials.cred_data.SynapseCredentials]
            returned by the first non-None provider in its list, None otherwise
    """
    for provider in self.cred_providers:
        creds = provider.get_synapse_credentials(syn, user_login_args)
        if creds is not None:
            return creds
    return None

Functions

get_default_credential_chain()

Creates and uses a default credential chain to retrieve SynapseCredentials. The order this is returned is the order in which the credential providers are attempted.

RETURNS DESCRIPTION
SynapseCredentialsProviderChain

credential chain

Source code in synapseclient/core/credentials/credential_provider.py
269
270
271
272
273
274
275
276
277
278
279
def get_default_credential_chain() -> SynapseCredentialsProviderChain:
    """
    Creates and uses a default credential chain to retrieve
    [SynapseCredentials][synapseclient.core.credentials.cred_data.SynapseCredentials].
    The order this is returned is the order in which the credential providers
    are attempted.

    Returns:
        credential chain
    """
    return DEFAULT_CREDENTIAL_PROVIDER_CHAIN

Remote File Storage Wrappers

synapseclient.core.remote_file_storage_wrappers

Wrappers for remote file storage clients like S3 and SFTP.

Classes

S3ClientWrapper

Wrapper class for S3 client.

Source code in synapseclient/core/remote_file_storage_wrappers.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
class S3ClientWrapper:
    """
    Wrapper class for S3 client.
    """

    # These methods are static because in our use case, we always have the bucket and
    # endpoint and usually only call the download/upload once so there is no need to instantiate multiple objects

    @staticmethod
    def _attempt_import_boto3():
        """
        Check if boto3 installed and give instructions if not.

        Returns:
            The boto3 module or instructions to install it if unavailable
        """
        return attempt_import(
            "boto3",
            "\n\nLibraries required for client authenticated S3 access are not installed!\n"
            "The Synapse client uses boto3 in order to access S3-like storage "
            "locations.\n",
        )

    @staticmethod
    def _create_progress_callback_func(
        progress_bar: tqdm,
    ) -> callable:
        """
        Creates a progress callback function for tracking the progress of a file transfer.

        Arguments:
            file_size: The total size of the file being transferred.
            filename: The name of the file being transferred.
            prefix: A prefix to display before the progress bar. Defaults to None.

        Returns:
            progress_callback: The progress callback function.
        """

        def progress_callback(transferred_bytes: int) -> None:
            """
            Update the progress of a transfer.

            Arguments:
                bytes: The number of bytes transferred.
            """
            increment_progress_bar(n=transferred_bytes, progress_bar=progress_bar)

        return progress_callback

    @staticmethod
    def download_file(
        bucket: str,
        endpoint_url: Union[str, None],
        remote_file_key: str,
        download_file_path: str,
        *,
        profile_name: str = None,
        credentials: typing.Dict[str, str] = None,
        progress_bar: Union[tqdm, None] = None,
        transfer_config_kwargs: dict = None,
    ) -> str:
        """
        Download a file from s3 using boto3.

        Arguments:
            bucket: name of bucket to upload to
            endpoint_url: a boto3 compatible endpoint url
            remote_file_key: object key to upload the file to
            download_file_path: local path to save the file to
            profile_name: AWS profile name from local aws config, **mutually exclusive with credentials**
            credentials: a dictionary of AWS credentials to use, **mutually exclusive with profile_name**

                Expected items:

                - `aws_access_key_id`
                - `aws_secret_access_key`
                - `aws_session_token`
            progress_bar: The progress bar to update. Defaults to None.
            transfer_config_kwargs: boto S3 transfer configuration (see boto3.s3.transfer.TransferConfig)

        Returns:
            download_file_path: S3 path of the file

        Raises:
            ValueError: If the key does not exist in the bucket.
            botocore.exceptions.ClientError: If there is an error with the S3 client.
        """

        S3ClientWrapper._attempt_import_boto3()

        import boto3.s3.transfer
        import botocore

        transfer_config = boto3.s3.transfer.TransferConfig(
            **(transfer_config_kwargs or {})
        )

        session_args = credentials if credentials else {"profile_name": profile_name}
        boto_session = boto3.session.Session(**session_args)
        s3 = boto_session.resource("s3", endpoint_url=endpoint_url)

        try:
            s3_obj = s3.Object(bucket, remote_file_key)

            progress_callback = None

            if progress_bar is not None:
                s3_obj.load()
                file_size = s3_obj.content_length
                increment_progress_bar_total(total=file_size, progress_bar=progress_bar)
                progress_callback = S3ClientWrapper._create_progress_callback_func(
                    progress_bar
                )

            s3_obj.download_file(
                download_file_path,
                Callback=progress_callback,
                Config=transfer_config,
            )

            return download_file_path

        except botocore.exceptions.ClientError as e:
            if e.response["Error"]["Code"] == "404":
                raise ValueError(
                    "The key:%s does not exist in bucket:%s.", remote_file_key, bucket
                )
            else:
                raise

    @staticmethod
    def upload_file(
        bucket: str,
        endpoint_url: typing.Optional[str],
        remote_file_key: str,
        upload_file_path: str,
        *,
        profile_name: str = None,
        credentials: typing.Dict[str, str] = None,
        show_progress: bool = True,
        transfer_config_kwargs: dict = None,
        storage_str: str = None,
    ) -> str:
        """
        Upload a file to s3 using boto3.

        Arguments:
            bucket: name of bucket to upload to
            endpoint_url: a boto3 compatible endpoint url
            remote_file_key: object key to upload the file to
            upload_file_path: local path of the file to upload
            profile_name: AWS profile name from local aws config, **mutually exclusive with credentials**
            credentials: a dictionary of AWS credentials to use, **mutually exclusive with profile_name**

                Expected items:

                - `aws_access_key_id`
                - `aws_secret_access_key`
                - `aws_session_token`
            show_progress: whether to print progress indicator to console
            transfer_config_kwargs: boto S3 transfer configuration (see boto3.s3.transfer.TransferConfig)

        Returns:
            upload_file_path: S3 path of the file

        Raises:
            ValueError: If the path does not exist or is not a file
            botocore.exceptions.ClientError: If there is an error with the S3 client.
        """

        if not os.path.isfile(upload_file_path):
            raise ValueError(
                "The path: [%s] does not exist or is not a file", upload_file_path
            )

        S3ClientWrapper._attempt_import_boto3()
        import boto3.s3.transfer

        transfer_config = boto3.s3.transfer.TransferConfig(
            **(transfer_config_kwargs or {})
        )

        session_args = credentials if credentials else {"profile_name": profile_name}
        boto_session = boto3.session.Session(**session_args)
        s3 = boto_session.resource("s3", endpoint_url=endpoint_url)

        progress_callback = None
        progress_bar = None
        if show_progress:
            file_size = os.stat(upload_file_path).st_size
            filename = os.path.basename(upload_file_path)
            progress_bar = tqdm(
                total=file_size,
                desc=storage_str,
                unit="B",
                unit_scale=True,
                postfix=filename,
                smoothing=0,
            )
            progress_callback = S3ClientWrapper._create_progress_callback_func(
                progress_bar
            )

        # automatically determines whether to perform multi-part upload
        s3.Bucket(bucket).upload_file(
            upload_file_path,
            remote_file_key,
            Callback=progress_callback,
            Config=transfer_config,
            ExtraArgs={"ACL": "bucket-owner-full-control"},
        )
        if progress_bar is not None:
            progress_bar.close()
        return upload_file_path
Functions
download_file(bucket, endpoint_url, remote_file_key, download_file_path, *, profile_name=None, credentials=None, progress_bar=None, transfer_config_kwargs=None) staticmethod

Download a file from s3 using boto3.

PARAMETER DESCRIPTION
bucket

name of bucket to upload to

TYPE: str

endpoint_url

a boto3 compatible endpoint url

TYPE: Union[str, None]

remote_file_key

object key to upload the file to

TYPE: str

download_file_path

local path to save the file to

TYPE: str

profile_name

AWS profile name from local aws config, mutually exclusive with credentials

TYPE: str DEFAULT: None

credentials

a dictionary of AWS credentials to use, mutually exclusive with profile_name

Expected items:

  • aws_access_key_id
  • aws_secret_access_key
  • aws_session_token

TYPE: Dict[str, str] DEFAULT: None

progress_bar

The progress bar to update. Defaults to None.

TYPE: Union[tqdm, None] DEFAULT: None

transfer_config_kwargs

boto S3 transfer configuration (see boto3.s3.transfer.TransferConfig)

TYPE: dict DEFAULT: None

RETURNS DESCRIPTION
download_file_path

S3 path of the file

TYPE: str

RAISES DESCRIPTION
ValueError

If the key does not exist in the bucket.

ClientError

If there is an error with the S3 client.

Source code in synapseclient/core/remote_file_storage_wrappers.py
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
@staticmethod
def download_file(
    bucket: str,
    endpoint_url: Union[str, None],
    remote_file_key: str,
    download_file_path: str,
    *,
    profile_name: str = None,
    credentials: typing.Dict[str, str] = None,
    progress_bar: Union[tqdm, None] = None,
    transfer_config_kwargs: dict = None,
) -> str:
    """
    Download a file from s3 using boto3.

    Arguments:
        bucket: name of bucket to upload to
        endpoint_url: a boto3 compatible endpoint url
        remote_file_key: object key to upload the file to
        download_file_path: local path to save the file to
        profile_name: AWS profile name from local aws config, **mutually exclusive with credentials**
        credentials: a dictionary of AWS credentials to use, **mutually exclusive with profile_name**

            Expected items:

            - `aws_access_key_id`
            - `aws_secret_access_key`
            - `aws_session_token`
        progress_bar: The progress bar to update. Defaults to None.
        transfer_config_kwargs: boto S3 transfer configuration (see boto3.s3.transfer.TransferConfig)

    Returns:
        download_file_path: S3 path of the file

    Raises:
        ValueError: If the key does not exist in the bucket.
        botocore.exceptions.ClientError: If there is an error with the S3 client.
    """

    S3ClientWrapper._attempt_import_boto3()

    import boto3.s3.transfer
    import botocore

    transfer_config = boto3.s3.transfer.TransferConfig(
        **(transfer_config_kwargs or {})
    )

    session_args = credentials if credentials else {"profile_name": profile_name}
    boto_session = boto3.session.Session(**session_args)
    s3 = boto_session.resource("s3", endpoint_url=endpoint_