Skip to content

Core

This section is for super users / developers only. These functions are subject to change as they are internal development functions. Use at your own risk.

Upload

synapseclient.core.upload.upload_functions

This module handles the various ways that a user can upload a file to Synapse.

Classes

Functions

upload_file_handle(syn, parent_entity, path, synapseStore=True, md5=None, file_size=None, mimetype=None, max_threads=None)

Uploads the file in the provided path (if necessary) to a storage location based on project settings. Returns a new FileHandle as a dict to represent the stored file.

PARAMETER DESCRIPTION
parent_entity

Entity object or id of the parent entity.

TYPE: Union[str, Mapping, Number]

path

The file path to the file being uploaded

TYPE: str

synapseStore

If False, will not upload the file, but instead create an ExternalFileHandle that references the file on the local machine. If True, will upload the file based on StorageLocation determined by the entity_parent_id

TYPE: bool DEFAULT: True

md5

The MD5 checksum for the file, if known. Otherwise if the file is a local file, it will be calculated automatically.

TYPE: str DEFAULT: None

file_size

The size the file, if known. Otherwise if the file is a local file, it will be calculated automatically.

TYPE: int DEFAULT: None

file_size

The MIME type the file, if known. Otherwise if the file is a local file, it will be calculated automatically.

TYPE: int DEFAULT: None

RETURNS DESCRIPTION

A dictionary of a new FileHandle as a dict that represents the uploaded file

Source code in synapseclient/core/upload/upload_functions.py
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
def upload_file_handle(
    syn: "Synapse",
    parent_entity: Union[str, collections.abc.Mapping, numbers.Number],
    path: str,
    synapseStore: bool = True,
    md5: str = None,
    file_size: int = None,
    mimetype: str = None,
    max_threads: int = None,
):
    """
    Uploads the file in the provided path (if necessary) to a storage location based on project settings.
    Returns a new FileHandle as a dict to represent the stored file.

    Arguments:
        parent_entity: Entity object or id of the parent entity.
        path:          The file path to the file being uploaded
        synapseStore:  If False, will not upload the file, but instead create an ExternalFileHandle that references
                       the file on the local machine.
                       If True, will upload the file based on StorageLocation determined by the entity_parent_id
        md5:           The MD5 checksum for the file, if known. Otherwise if the file is a local file, it will be
                       calculated automatically.
        file_size:     The size the file, if known. Otherwise if the file is a local file, it will be calculated
                       automatically.
        file_size:     The MIME type the file, if known. Otherwise if the file is a local file, it will be
                       calculated automatically.

    Returns:
        A dictionary of a new FileHandle as a dict that represents the uploaded file
    """
    if path is None:
        raise ValueError("path can not be None")

    # if doing a external file handle with no actual upload
    if not synapseStore:
        return create_external_file_handle(
            syn, path, mimetype=mimetype, md5=md5, file_size=file_size
        )

    # expand the path because past this point an upload is required and some upload functions require an absolute path
    expanded_upload_path = os.path.expandvars(os.path.expanduser(path))

    if md5 is None and os.path.isfile(expanded_upload_path):
        md5 = utils.md5_for_file(expanded_upload_path).hexdigest()

    entity_parent_id = id_of(parent_entity)

    # determine the upload function based on the UploadDestination
    location = syn._getDefaultUploadDestination(entity_parent_id)
    upload_destination_type = location["concreteType"]
    trace.get_current_span().set_attributes(
        {
            "synapse.parent_id": entity_parent_id,
            "synapse.upload_destination_type": upload_destination_type,
        }
    )

    if (
        sts_transfer.is_boto_sts_transfer_enabled(syn)
        and sts_transfer.is_storage_location_sts_enabled(
            syn, entity_parent_id, location
        )
        and upload_destination_type == concrete_types.EXTERNAL_S3_UPLOAD_DESTINATION
    ):
        log_upload_message(
            syn,
            "\n"
            + "#" * 50
            + "\n Uploading file to external S3 storage using boto3 \n"
            + "#" * 50
            + "\n",
        )

        return upload_synapse_sts_boto_s3(
            syn=syn,
            parent_id=entity_parent_id,
            upload_destination=location,
            local_path=expanded_upload_path,
            mimetype=mimetype,
            md5=md5,
        )

    elif upload_destination_type in (
        concrete_types.SYNAPSE_S3_UPLOAD_DESTINATION,
        concrete_types.EXTERNAL_S3_UPLOAD_DESTINATION,
        concrete_types.EXTERNAL_GCP_UPLOAD_DESTINATION,
    ):
        if upload_destination_type == concrete_types.SYNAPSE_S3_UPLOAD_DESTINATION:
            storage_str = "Synapse"
        elif upload_destination_type == concrete_types.EXTERNAL_S3_UPLOAD_DESTINATION:
            storage_str = "your external S3"
        else:
            storage_str = "your external Google Bucket"

        log_upload_message(
            syn,
            "\n"
            + "#" * 50
            + "\n Uploading file to "
            + storage_str
            + " storage \n"
            + "#" * 50
            + "\n",
        )

        return upload_synapse_s3(
            syn=syn,
            file_path=expanded_upload_path,
            storageLocationId=location["storageLocationId"],
            mimetype=mimetype,
            max_threads=max_threads,
            md5=md5,
        )
    # external file handle (sftp)
    elif upload_destination_type == concrete_types.EXTERNAL_UPLOAD_DESTINATION:
        if location["uploadType"] == "SFTP":
            log_upload_message(
                syn,
                "\n%s\n%s\nUploading to: %s\n%s\n"
                % (
                    "#" * 50,
                    location.get("banner", ""),
                    urllib_parse.urlparse(location["url"]).netloc,
                    "#" * 50,
                ),
            )
            return upload_external_file_handle_sftp(
                syn=syn,
                file_path=expanded_upload_path,
                sftp_url=location["url"],
                mimetype=mimetype,
                md5=md5,
            )
        else:
            raise NotImplementedError("Can only handle SFTP upload locations.")
    # client authenticated S3
    elif (
        upload_destination_type
        == concrete_types.EXTERNAL_OBJECT_STORE_UPLOAD_DESTINATION
    ):
        log_upload_message(
            syn,
            "\n%s\n%s\nUploading to endpoint: [%s] bucket: [%s]\n%s\n"
            % (
                "#" * 50,
                location.get("banner", ""),
                location.get("endpointUrl"),
                location.get("bucket"),
                "#" * 50,
            ),
        )
        return upload_client_auth_s3(
            syn=syn,
            file_path=expanded_upload_path,
            bucket=location["bucket"],
            endpoint_url=location["endpointUrl"],
            key_prefix=location["keyPrefixUUID"],
            storage_location_id=location["storageLocationId"],
            mimetype=mimetype,
            md5=md5,
        )
    else:  # unknown storage location
        log_upload_message(
            syn,
            "\n%s\n%s\nUNKNOWN STORAGE LOCATION. Defaulting upload to Synapse.\n%s\n"
            % ("!" * 50, location.get("banner", ""), "!" * 50),
        )
        return upload_synapse_s3(
            syn=syn,
            file_path=expanded_upload_path,
            storageLocationId=None,
            mimetype=mimetype,
            max_threads=max_threads,
            md5=md5,
        )

upload_synapse_sts_boto_s3(syn, parent_id, upload_destination, local_path, mimetype=None, md5=None)

When uploading to Synapse storage normally the back end will generate a random prefix for our uploaded object. Since in this case the client is responsible for the remote key, the client will instead generate a random prefix. this both ensures we don't have a collision with an existing S3 object and also mitigates potential performance issues, although key locality performance issues are likely resolved as of: https://aws.amazon.com/about-aws/whats-new/2018/07/amazon-s3-announces-increased-request-rate-performance/

PARAMETER DESCRIPTION
syn

The synapse client

TYPE: Synapse

parent_id

The synapse ID of the parent.

TYPE: str

upload_destination

The upload destination

local_path

The local path to the file to upload.

TYPE: str

mimetype

The mimetype is known. Defaults to None.

TYPE: str DEFAULT: None

md5

MD5 checksum for the file, if known.

TYPE: str DEFAULT: None

RETURNS DESCRIPTION

description

Source code in synapseclient/core/upload/upload_functions.py
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
def upload_synapse_sts_boto_s3(
    syn: "Synapse",
    parent_id: str,
    upload_destination,
    local_path: str,
    mimetype: str = None,
    md5: str = None,
):
    """
    When uploading to Synapse storage normally the back end will generate a random prefix
    for our uploaded object. Since in this case the client is responsible for the remote
    key, the client will instead generate a random prefix. this both ensures we don't have a collision
    with an existing S3 object and also mitigates potential performance issues, although
    key locality performance issues are likely resolved as of:
    <https://aws.amazon.com/about-aws/whats-new/2018/07/amazon-s3-announces-increased-request-rate-performance/>

    Arguments:
        syn: The synapse client
        parent_id: The synapse ID of the parent.
        upload_destination: The upload destination
        local_path: The local path to the file to upload.
        mimetype: The mimetype is known. Defaults to None.
        md5: MD5 checksum for the file, if known.

    Returns:
        _description_
    """
    key_prefix = str(uuid.uuid4())

    bucket_name = upload_destination["bucket"]
    storage_location_id = upload_destination["storageLocationId"]
    remote_file_key = "/".join(
        [upload_destination["baseKey"], key_prefix, os.path.basename(local_path)]
    )

    def upload_fn(credentials):
        return S3ClientWrapper.upload_file(
            bucket=bucket_name,
            endpoint_url=None,
            remote_file_key=remote_file_key,
            upload_file_path=local_path,
            credentials=credentials,
            transfer_config_kwargs={"max_concurrency": syn.max_threads},
        )

    sts_transfer.with_boto_sts_credentials(upload_fn, syn, parent_id, "read_write")
    return syn.create_external_s3_file_handle(
        bucket_name=bucket_name,
        s3_file_key=remote_file_key,
        file_path=local_path,
        storage_location_id=storage_location_id,
        mimetype=mimetype,
        md5=md5,
    )

Multipart Upload

synapseclient.core.upload.multipart_upload

Implements the client side of Synapse's Multipart File Upload API, which provides a robust means of uploading large files (into the 10s of GiB). End users should not need to call any of the methods under UploadAttempt directly.

Classes

UploadAttempt

Used to handle multi-threaded operations for uploading one or parts of a file.

Source code in synapseclient/core/upload/multipart_upload.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
class UploadAttempt:
    """
    Used to handle multi-threaded operations for uploading one or parts of a file.
    """

    def __init__(
        self,
        syn,
        dest_file_name,
        upload_request_payload,
        part_request_body_provider_fn,
        md5_fn,
        max_threads: int,
        force_restart: bool,
    ):
        self._syn = syn
        self._dest_file_name = dest_file_name
        self._part_size = upload_request_payload["partSizeBytes"]

        self._upload_request_payload = upload_request_payload

        self._part_request_body_provider_fn = part_request_body_provider_fn
        self._md5_fn = md5_fn

        self._max_threads = max_threads
        self._force_restart = force_restart

        self._lock = threading.Lock()
        self._aborted = False

        # populated later
        self._upload_id: str = None
        self._pre_signed_part_urls: Mapping[int, str] = None

    @classmethod
    def _get_remaining_part_numbers(cls, upload_status):
        part_numbers = []
        parts_state = upload_status["partsState"]

        # parts are 1-based
        for i, part_status in enumerate(parts_state, 1):
            if part_status == "0":
                part_numbers.append(i)

        return len(parts_state), part_numbers

    @classmethod
    def _get_thread_session(cls):
        # get a lazily initialized requests.Session from the thread.
        # we want to share a requests.Session over the course of a thread
        # to take advantage of persistent http connection. we put it on a
        # thread local rather that in the task closure since a connection can
        # be reused across separate part uploads so no reason to restrict it
        # per worker task.
        session = getattr(_thread_local, "session", None)
        if not session:
            session = _thread_local.session = requests.Session()
        return session

    def _is_copy(self):
        # is this a copy or upload request
        return (
            self._upload_request_payload.get("concreteType")
            == concrete_types.MULTIPART_UPLOAD_COPY_REQUEST
        )

    def _create_synapse_upload(self):
        return self._syn.restPOST(
            "/file/multipart?forceRestart={}".format(str(self._force_restart).lower()),
            json.dumps(self._upload_request_payload),
            endpoint=self._syn.fileHandleEndpoint,
        )

    def _fetch_pre_signed_part_urls(
        self,
        upload_id: str,
        part_numbers: List[int],
        requests_session: requests.Session = None,
    ) -> Mapping[int, str]:
        uri = "/file/multipart/{upload_id}/presigned/url/batch".format(
            upload_id=upload_id
        )
        body = {
            "uploadId": upload_id,
            "partNumbers": part_numbers,
        }

        response = self._syn.restPOST(
            uri,
            json.dumps(body),
            requests_session=requests_session,
            endpoint=self._syn.fileHandleEndpoint,
        )

        part_urls = {}
        for part in response["partPresignedUrls"]:
            part_urls[part["partNumber"]] = (
                part["uploadPresignedUrl"],
                part.get("signedHeaders", {}),
            )

        return part_urls

    def _refresh_pre_signed_part_urls(
        self,
        part_number: int,
        expired_url: str,
    ):
        """Refresh all unfetched presigned urls, and return the refreshed
        url for the given part number. If an existing expired_url is passed
        and the url for the given part has already changed that new url
        will be returned without a refresh (i.e. it is assumed that another
        thread has already refreshed the url since the passed url expired).

        Arguments:
            part_number: the part number whose refreshed url should
                         be returned
            expired_url: the url that was detected as expired triggering
                         this refresh
        Returns:
            refreshed URL

        """
        with self._lock:
            current_url = self._pre_signed_part_urls[part_number]
            if current_url != expired_url:
                # if the url has already changed since the given url
                # was detected as expired we can assume that another
                # thread already refreshed the url and can avoid the extra
                # fetch.
                refreshed_url = current_url
            else:
                self._pre_signed_part_urls = self._fetch_pre_signed_part_urls(
                    self._upload_id,
                    list(self._pre_signed_part_urls.keys()),
                )

                refreshed_url = self._pre_signed_part_urls[part_number]

            return refreshed_url

    def _handle_part(self, part_number, otel_context: typing.Union[Context, None]):
        if otel_context:
            context.attach(otel_context)
        with tracer.start_as_current_span("UploadAttempt::_handle_part"):
            trace.get_current_span().set_attributes(
                {"thread.id": threading.get_ident()}
            )
            with self._lock:
                if self._aborted:
                    # this upload attempt has already been aborted
                    # so we short circuit the attempt to upload this part
                    raise SynapseUploadAbortedException(
                        "Upload aborted, skipping part {}".format(part_number)
                    )

                part_url, signed_headers = self._pre_signed_part_urls.get(part_number)

            session = self._get_thread_session()

            # obtain the body (i.e. the upload bytes) for the given part number.
            body = (
                self._part_request_body_provider_fn(part_number)
                if self._part_request_body_provider_fn
                else None
            )
            part_size = len(body) if body else 0
            for retry in range(2):

                def put_fn():
                    with tracer.start_as_current_span("UploadAttempt::put_part"):
                        return session.put(part_url, body, headers=signed_headers)

                try:
                    # use our backoff mechanism here, we have encountered 500s on puts to AWS signed urls
                    response = with_retry(
                        put_fn, retry_exceptions=[requests.exceptions.ConnectionError]
                    )
                    _raise_for_status(response)

                    # completed upload part to s3 successfully
                    break

                except SynapseHTTPError as ex:
                    if ex.response.status_code == 403 and retry < 1:
                        # we interpret this to mean our pre_signed url expired.
                        self._syn.logger.debug(
                            "The pre-signed upload URL for part {} has expired."
                            "Refreshing urls and retrying.\n".format(part_number)
                        )

                        # we refresh all the urls and obtain this part's
                        # specific url for the retry
                        with tracer.start_as_current_span(
                            "UploadAttempt::refresh_pre_signed_part_urls"
                        ):
                            (
                                part_url,
                                signed_headers,
                            ) = self._refresh_pre_signed_part_urls(
                                part_number,
                                part_url,
                            )

                    else:
                        raise

            md5_hex = self._md5_fn(body, response)

            # now tell synapse that we uploaded that part successfully
            self._syn.restPUT(
                "/file/multipart/{upload_id}/add/{part_number}?partMD5Hex={md5}".format(
                    upload_id=self._upload_id,
                    part_number=part_number,
                    md5=md5_hex,
                ),
                requests_session=session,
                endpoint=self._syn.fileHandleEndpoint,
            )

            # remove so future batch pre_signed url fetches will exclude this part
            with self._lock:
                del self._pre_signed_part_urls[part_number]

            return part_number, part_size

    def _upload_parts(self, part_count, remaining_part_numbers):
        trace.get_current_span().set_attributes({"thread.id": threading.get_ident()})
        time_upload_started = time.time()
        completed_part_count = part_count - len(remaining_part_numbers)
        file_size = self._upload_request_payload.get("fileSizeBytes")

        if not self._is_copy():
            # we won't have bytes to measure during a copy so the byte oriented progress bar is not useful
            progress = previously_transferred = min(
                completed_part_count * self._part_size,
                file_size,
            )

            self._syn._print_transfer_progress(
                progress,
                file_size,
                prefix="Uploading",
                postfix=self._dest_file_name,
                previouslyTransferred=previously_transferred,
            )

        self._pre_signed_part_urls = self._fetch_pre_signed_part_urls(
            self._upload_id,
            remaining_part_numbers,
        )

        futures = []
        with _executor(self._max_threads, False) as executor:
            # we don't wait on the shutdown since we do so ourselves below

            for part_number in remaining_part_numbers:
                futures.append(
                    executor.submit(
                        self._handle_part,
                        part_number,
                        context.get_current(),
                    )
                )

        for result in concurrent.futures.as_completed(futures):
            try:
                _, part_size = result.result()

                if part_size and not self._is_copy():
                    progress += part_size
                    self._syn._print_transfer_progress(
                        min(progress, file_size),
                        file_size,
                        prefix="Uploading",
                        postfix=self._dest_file_name,
                        dt=time.time() - time_upload_started,
                        previouslyTransferred=previously_transferred,
                    )
            except (Exception, KeyboardInterrupt) as cause:
                with self._lock:
                    self._aborted = True

                # wait for all threads to complete before
                # raising the exception, we don't want to return
                # control while there are still threads from this
                # upload attempt running
                concurrent.futures.wait(futures)

                if isinstance(cause, KeyboardInterrupt):
                    raise SynapseUploadAbortedException("User interrupted upload")
                raise SynapseUploadFailedException("Part upload failed") from cause

    def _complete_upload(self):
        upload_status_response = self._syn.restPUT(
            "/file/multipart/{upload_id}/complete".format(
                upload_id=self._upload_id,
            ),
            requests_session=self._get_thread_session(),
            endpoint=self._syn.fileHandleEndpoint,
        )

        upload_state = upload_status_response.get("state")
        if upload_state != "COMPLETED":
            # at this point we think successfully uploaded all the parts
            # but the upload status isn't complete, we'll throw an error
            # and let a subsequent attempt try to reconcile
            raise SynapseUploadFailedException(
                "Upload status has an unexpected state {}".format(upload_state)
            )

        return upload_status_response

    def __call__(self):
        upload_status_response = self._create_synapse_upload()
        upload_state = upload_status_response.get("state")

        if upload_state != "COMPLETED":
            self._upload_id = upload_status_response["uploadId"]
            part_count, remaining_part_numbers = self._get_remaining_part_numbers(
                upload_status_response
            )

            # if no remaining part numbers then all the parts have been
            # uploaded but the upload has not been marked complete.
            if remaining_part_numbers:
                self._upload_parts(part_count, remaining_part_numbers)
            upload_status_response = self._complete_upload()

        return upload_status_response

Functions

multipart_upload_file(syn, file_path, dest_file_name=None, content_type=None, part_size=None, storage_location_id=None, preview=True, force_restart=False, max_threads=None, md5=None)

Upload a file to a Synapse upload destination in chunks.

PARAMETER DESCRIPTION
syn

a Synapse object

file_path

the file to upload

TYPE: str

dest_file_name

upload as a different filename

TYPE: str DEFAULT: None

content_type

Refers to the Content-Type of the API request.

TYPE: str DEFAULT: None

part_size

Number of bytes per part. Minimum is 5MiB (5 * 1024 * 1024 bytes).

TYPE: int DEFAULT: None

storage_location_id

an id indicating where the file should be stored. Retrieved from Synapse's UploadDestination

TYPE: str DEFAULT: None

preview

True to generate a preview

TYPE: bool DEFAULT: True

force_restart

True to restart a previously initiated upload from scratch, False to try to resume

TYPE: bool DEFAULT: False

max_threads

number of concurrent threads to devote to upload

TYPE: int DEFAULT: None

md5

The MD5 of the file. If not provided, it will be calculated.

TYPE: str DEFAULT: None

RETURNS DESCRIPTION
str

a File Handle ID

Keyword arguments are passed down to _multipart_upload().

Source code in synapseclient/core/upload/multipart_upload.py
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
def multipart_upload_file(
    syn,
    file_path: str,
    dest_file_name: str = None,
    content_type: str = None,
    part_size: int = None,
    storage_location_id: str = None,
    preview: bool = True,
    force_restart: bool = False,
    max_threads: int = None,
    md5: str = None,
) -> str:
    """Upload a file to a Synapse upload destination in chunks.

    Arguments:
        syn: a Synapse object
        file_path: the file to upload
        dest_file_name: upload as a different filename
        content_type: Refers to the Content-Type of the API request.
        part_size: Number of bytes per part. Minimum is 5MiB (5 * 1024 * 1024 bytes).
        storage_location_id: an id indicating where the file should be
                             stored. Retrieved from Synapse's UploadDestination
        preview: True to generate a preview
        force_restart: True to restart a previously initiated upload
                       from scratch, False to try to resume
        max_threads: number of concurrent threads to devote
                     to upload
        md5: The MD5 of the file. If not provided, it will be calculated.

    Returns:
        a File Handle ID

    Keyword arguments are passed down to
    [_multipart_upload()][synapseclient.core.upload.multipart_upload._multipart_upload].

    """
    trace.get_current_span().set_attributes(
        {
            "synapse.storage_location_id": (
                storage_location_id if storage_location_id is not None else ""
            )
        }
    )

    if not os.path.exists(file_path):
        raise IOError('File "{}" not found.'.format(file_path))
    if os.path.isdir(file_path):
        raise IOError('File "{}" is a directory.'.format(file_path))

    file_size = os.path.getsize(file_path)
    if not dest_file_name:
        dest_file_name = os.path.basename(file_path)

    if content_type is None:
        mime_type, _ = mimetypes.guess_type(file_path, strict=False)
        content_type = mime_type or "application/octet-stream"

    callback_func = Spinner().print_tick if not syn.silent else None
    md5_hex = md5 or md5_for_file(file_path, callback=callback_func).hexdigest()

    part_size = get_part_size(
        part_size or DEFAULT_PART_SIZE,
        file_size,
        MIN_PART_SIZE,
        MAX_NUMBER_OF_PARTS,
    )

    upload_request = {
        "concreteType": concrete_types.MULTIPART_UPLOAD_REQUEST,
        "contentType": content_type,
        "contentMD5Hex": md5_hex,
        "fileName": dest_file_name,
        "fileSizeBytes": file_size,
        "generatePreview": preview,
        "partSizeBytes": part_size,
        "storageLocationId": storage_location_id,
    }

    def part_fn(part_number):
        return get_file_chunk(file_path, part_number, part_size)

    return _multipart_upload(
        syn,
        dest_file_name,
        upload_request,
        part_fn,
        md5_fn,
        force_restart=force_restart,
        max_threads=max_threads,
    )

multipart_upload_string(syn, text, dest_file_name=None, part_size=None, content_type=None, storage_location_id=None, preview=True, force_restart=False, max_threads=None)

Upload a file to a Synapse upload destination in chunks.

PARAMETER DESCRIPTION
syn

a Synapse object

text

a string to upload as a file.

TYPE: str

dest_file_name

upload as a different filename

TYPE: str DEFAULT: None

content_type

Refers to the Content-Type of the API request.

TYPE: str DEFAULT: None

part_size

number of bytes per part. Minimum 5MB.

TYPE: int DEFAULT: None

storage_location_id

an id indicating where the file should be stored. Retrieved from Synapse's UploadDestination

TYPE: str DEFAULT: None

preview

True to generate a preview

TYPE: bool DEFAULT: True

force_restart

True to restart a previously initiated upload from scratch, False to try to resume

TYPE: bool DEFAULT: False

max_threads

number of concurrent threads to devote to upload

TYPE: int DEFAULT: None

RETURNS DESCRIPTION

a File Handle ID

Keyword arguments are passed down to _multipart_upload().

Source code in synapseclient/core/upload/multipart_upload.py
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
def multipart_upload_string(
    syn,
    text: str,
    dest_file_name: str = None,
    part_size: int = None,
    content_type: str = None,
    storage_location_id: str = None,
    preview: bool = True,
    force_restart: bool = False,
    max_threads: int = None,
):
    """Upload a file to a Synapse upload destination in chunks.

    Arguments:
        syn: a Synapse object
        text: a string to upload as a file.
        dest_file_name: upload as a different filename
        content_type: Refers to the Content-Type of the API request.
        part_size: number of bytes per part. Minimum 5MB.
        storage_location_id: an id indicating where the file should be
                             stored. Retrieved from Synapse's UploadDestination
        preview: True to generate a preview
        force_restart: True to restart a previously initiated upload
                       from scratch, False to try to resume
        max_threads: number of concurrent threads to devote
                     to upload

    Returns:
        a File Handle ID

    Keyword arguments are passed down to
    [_multipart_upload()][synapseclient.core.upload.multipart_upload._multipart_upload].

    """
    data = text.encode("utf-8")
    file_size = len(data)
    md5_hex = md5_fn(data, None)

    if not dest_file_name:
        dest_file_name = "message.txt"

    if not content_type:
        content_type = "text/plain; charset=utf-8"

    part_size = get_part_size(
        part_size or DEFAULT_PART_SIZE,
        file_size,
        MIN_PART_SIZE,
        MAX_NUMBER_OF_PARTS,
    )

    upload_request = {
        "concreteType": concrete_types.MULTIPART_UPLOAD_REQUEST,
        "contentType": content_type,
        "contentMD5Hex": md5_hex,
        "fileName": dest_file_name,
        "fileSizeBytes": file_size,
        "generatePreview": preview,
        "partSizeBytes": part_size,
        "storageLocationId": storage_location_id,
    }

    def part_fn(part_number):
        return get_data_chunk(data, part_number, part_size)

    part_size = get_part_size(
        part_size or DEFAULT_PART_SIZE,
        file_size,
        MIN_PART_SIZE,
        MAX_NUMBER_OF_PARTS,
    )
    return _multipart_upload(
        syn,
        dest_file_name,
        upload_request,
        part_fn,
        md5_fn,
        force_restart=force_restart,
        max_threads=max_threads,
    )

multipart_copy(syn, source_file_handle_association, dest_file_name=None, part_size=None, storage_location_id=None, preview=True, force_restart=False, max_threads=None)

Makes a Multipart Upload Copy Request. This request performs a copy of an existing file handle without data transfer from the client.

PARAMETER DESCRIPTION
syn

A Synapse object

source_file_handle_association

Describes an association of a FileHandle with another object.

dest_file_name

The name of the file to be uploaded.

TYPE: str DEFAULT: None

part_size

The size that each part will be (in bytes).

TYPE: int DEFAULT: None

storage_location_id

The identifier of the storage location where this file should be copied to. The user must be the owner of the storage location.

TYPE: str DEFAULT: None

preview

True to generate a preview of the data.

TYPE: bool DEFAULT: True

force_restart

True to restart a previously initiated upload from scratch, False to try to resume.

TYPE: bool DEFAULT: False

max_threads

Number of concurrent threads to devote to copy.

TYPE: int DEFAULT: None

RETURNS DESCRIPTION

a File Handle ID

Keyword arguments are passed down to _multipart_upload().

Source code in synapseclient/core/upload/multipart_upload.py
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
def multipart_copy(
    syn,
    source_file_handle_association,
    dest_file_name: str = None,
    part_size: int = None,
    storage_location_id: str = None,
    preview: bool = True,
    force_restart: bool = False,
    max_threads: int = None,
):
    """Makes a
    [Multipart Upload Copy Request](https://rest-docs.synapse.org/rest/org/sagebionetworks/repo/model/file/MultipartUploadCopyRequest.html).
    This request performs a copy of an existing file handle without data transfer from the client.

    Arguments:
        syn: A Synapse object
        source_file_handle_association: Describes an association of a FileHandle with another object.
        dest_file_name: The name of the file to be uploaded.
        part_size: The size that each part will be (in bytes).
        storage_location_id: The identifier of the storage location where this file should be copied to.
                             The user must be the owner of the storage location.
        preview: True to generate a preview of the data.
        force_restart: True to restart a previously initiated upload from scratch, False to try to resume.
        max_threads: Number of concurrent threads to devote to copy.

    Returns:
        a File Handle ID

    Keyword arguments are passed down to
    [_multipart_upload()][synapseclient.core.upload.multipart_upload._multipart_upload].

    """
    part_size = part_size or DEFAULT_PART_SIZE

    upload_request = {
        "concreteType": concrete_types.MULTIPART_UPLOAD_COPY_REQUEST,
        "fileName": dest_file_name,
        "generatePreview": preview,
        "partSizeBytes": part_size,
        "sourceFileHandleAssociation": source_file_handle_association,
        "storageLocationId": storage_location_id,
    }

    return _multipart_upload(
        syn,
        dest_file_name,
        upload_request,
        copy_part_request_body_provider_fn,
        copy_md5_fn,
        force_restart=force_restart,
        max_threads=max_threads,
    )

_multipart_upload(syn, dest_file_name, upload_request, part_fn, md5_fn, force_restart=False, max_threads=None)

Calls upon an UploadAttempt object to initiate and/or retry a multipart file upload or copy. This function is wrapped by multipart_upload_file, multipart_upload_string, and multipart_copy. Retries cannot exceed 7 retries per call.

PARAMETER DESCRIPTION
syn

A Synapse object

dest_file_name

upload as a different filename

upload_request

A dictionary object with the user-fed logistical details of the upload/copy request.

part_fn

Function to calculate the partSize of each part

md5_fn

Function to calculate the MD5 of the file-like object

max_threads

number of concurrent threads to devote to upload.

TYPE: int DEFAULT: None

RETURNS DESCRIPTION

A File Handle ID

Source code in synapseclient/core/upload/multipart_upload.py
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
def _multipart_upload(
    syn,
    dest_file_name,
    upload_request,
    part_fn,
    md5_fn,
    force_restart: bool = False,
    max_threads: int = None,
):
    """Calls upon an [UploadAttempt][synapseclient.core.upload.multipart_upload.UploadAttempt]
    object to initiate and/or retry a multipart file upload or copy. This function is wrapped by
    [multipart_upload_file][synapseclient.core.upload.multipart_upload.multipart_upload_file],
    [multipart_upload_string][synapseclient.core.upload.multipart_upload.multipart_upload_string], and
    [multipart_copy][synapseclient.core.upload.multipart_upload.multipart_copy].
    Retries cannot exceed 7 retries per call.

    Arguments:
        syn: A Synapse object
        dest_file_name: upload as a different filename
        upload_request: A dictionary object with the user-fed logistical
                        details of the upload/copy request.
        part_fn: Function to calculate the partSize of each part
        md5_fn: Function to calculate the MD5 of the file-like object
        max_threads: number of concurrent threads to devote to upload.

    Returns:
        A File Handle ID

    """
    if max_threads is None:
        max_threads = pool_provider.DEFAULT_NUM_THREADS

    max_threads = max(max_threads, 1)

    retry = 0
    while True:
        try:
            upload_status_response = UploadAttempt(
                syn,
                dest_file_name,
                upload_request,
                part_fn,
                md5_fn,
                max_threads,
                # only force_restart the first time through (if requested).
                # a retry after a caught exception will not restart the upload
                # from scratch.
                force_restart and retry == 0,
            )()

            # success
            return upload_status_response["resultFileHandleId"]

        except SynapseUploadFailedException:
            if retry < MAX_RETRIES:
                retry += 1
            else:
                raise

Upload Async

synapseclient.core.upload.upload_functions_async

This module handles the various ways that a user can upload a file to Synapse.

Classes

Functions

upload_file_handle(syn, parent_entity_id, path, synapse_store=True, md5=None, file_size=None, mimetype=None) async

Uploads the file in the provided path (if necessary) to a storage location based on project settings. Returns a new FileHandle as a dict to represent the stored file.

PARAMETER DESCRIPTION
syn

The synapse client

TYPE: Synapse

parent_entity_id

The ID of the parent entity that the file will be attached to.

TYPE: str

path

The file path to the file being uploaded

TYPE: str

synapse_store

If False, will not upload the file, but instead create an ExternalFileHandle that references the file on the local machine. If True, will upload the file based on StorageLocation determined by the parent_entity_id.

TYPE: bool DEFAULT: True

md5

The MD5 checksum for the file, if known. Otherwise if the file is a local file, it will be calculated automatically.

TYPE: str DEFAULT: None

file_size

The size the file, if known. Otherwise if the file is a local file, it will be calculated automatically.

TYPE: int DEFAULT: None

mimetype

The MIME type the file, if known. Otherwise if the file is a local file, it will be calculated automatically.

TYPE: str DEFAULT: None

RETURNS DESCRIPTION
Dict[str, Union[str, int]]

A dictionary of a new FileHandle as a dict that represents the uploaded file

Source code in synapseclient/core/upload/upload_functions_async.py
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
async def upload_file_handle(
    syn: "Synapse",
    parent_entity_id: str,
    path: str,
    synapse_store: bool = True,
    md5: str = None,
    file_size: int = None,
    mimetype: str = None,
) -> Dict[str, Union[str, int]]:
    """
    Uploads the file in the provided path (if necessary) to a storage location based
    on project settings. Returns a new FileHandle as a dict to represent the
    stored file.

    Arguments:
        syn: The synapse client
        parent_entity_id: The ID of the parent entity that the file will be attached to.
        path: The file path to the file being uploaded
        synapse_store: If False, will not upload the file, but instead create an
            ExternalFileHandle that references the file on the local machine. If True,
            will upload the file based on StorageLocation determined by the
            parent_entity_id.
        md5: The MD5 checksum for the file, if known. Otherwise if the file is a
            local file, it will be calculated automatically.
        file_size: The size the file, if known. Otherwise if the file is a local file,
            it will be calculated automatically.
        mimetype: The MIME type the file, if known. Otherwise if the file is a local
            file, it will be calculated automatically.

    Returns:
        A dictionary of a new FileHandle as a dict that represents the uploaded file
    """
    if path is None:
        raise ValueError("path can not be None")

    # if doing a external file handle with no actual upload
    if not synapse_store:
        return await create_external_file_handle(
            syn, path, mimetype=mimetype, md5=md5, file_size=file_size
        )

    # expand the path because past this point an upload is required and some upload functions require an absolute path
    expanded_upload_path = os.path.expandvars(os.path.expanduser(path))

    if md5 is None and os.path.isfile(expanded_upload_path):
        md5 = await utils.md5_for_file_multiprocessing(
            filename=expanded_upload_path,
            process_pool_executor=syn._get_process_pool_executor(
                asyncio_event_loop=asyncio.get_running_loop()
            ),
            md5_semaphore=syn._get_md5_semaphore(
                asyncio_event_loop=asyncio.get_running_loop()
            ),
        )

    entity_parent_id = id_of(parent_entity_id)

    # determine the upload function based on the UploadDestination
    location = await get_upload_destination(
        entity_id=entity_parent_id, synapse_client=syn
    )
    upload_destination_type = location.get("concreteType", None) if location else None
    trace.get_current_span().set_attributes(
        {
            "synapse.parent_id": entity_parent_id,
            "synapse.upload_destination_type": upload_destination_type,
        }
    )

    if (
        sts_transfer.is_boto_sts_transfer_enabled(syn)
        and await sts_transfer.is_storage_location_sts_enabled_async(
            syn, entity_parent_id, location
        )
        and upload_destination_type == concrete_types.EXTERNAL_S3_UPLOAD_DESTINATION
    ):
        return await upload_synapse_sts_boto_s3(
            syn=syn,
            parent_id=entity_parent_id,
            upload_destination=location,
            local_path=expanded_upload_path,
            mimetype=mimetype,
            md5=md5,
            storage_str="Uploading file to external S3 storage using boto3",
        )

    elif upload_destination_type in (
        concrete_types.SYNAPSE_S3_UPLOAD_DESTINATION,
        concrete_types.EXTERNAL_S3_UPLOAD_DESTINATION,
        concrete_types.EXTERNAL_GCP_UPLOAD_DESTINATION,
    ):
        if upload_destination_type == concrete_types.SYNAPSE_S3_UPLOAD_DESTINATION:
            storage_str = "Uploading to Synapse storage"
        elif upload_destination_type == concrete_types.EXTERNAL_S3_UPLOAD_DESTINATION:
            storage_str = "Uploading to your external S3 storage"
        else:
            storage_str = "Uploading to your external Google Bucket storage"

        return await upload_synapse_s3(
            syn=syn,
            file_path=expanded_upload_path,
            storage_location_id=location["storageLocationId"],
            mimetype=mimetype,
            md5=md5,
            storage_str=storage_str,
        )
    # external file handle (sftp)
    elif upload_destination_type == concrete_types.EXTERNAL_UPLOAD_DESTINATION:
        if location["uploadType"] == "SFTP":
            storage_str = (
                f"Uploading to: {urllib_parse.urlparse(location['url']).netloc}"
            )
            banner = location.get("banner", None)
            if banner:
                syn.logger.info(banner)
            return await upload_external_file_handle_sftp(
                syn=syn,
                file_path=expanded_upload_path,
                sftp_url=location["url"],
                mimetype=mimetype,
                md5=md5,
                storage_str=storage_str,
            )
        else:
            raise NotImplementedError("Can only handle SFTP upload locations.")
    # client authenticated S3
    elif (
        upload_destination_type
        == concrete_types.EXTERNAL_OBJECT_STORE_UPLOAD_DESTINATION
    ):
        storage_str = f"Uploading to endpoint: [{location.get('endpointUrl')}] bucket: [{location.get('bucket')}]"
        banner = location.get("banner", None)
        if banner:
            syn.logger.info(banner)
        return await upload_client_auth_s3(
            syn=syn,
            file_path=expanded_upload_path,
            bucket=location["bucket"],
            endpoint_url=location["endpointUrl"],
            key_prefix=location["keyPrefixUUID"],
            storage_location_id=location["storageLocationId"],
            mimetype=mimetype,
            md5=md5,
            storage_str=storage_str,
        )
    else:  # unknown storage location
        return await upload_synapse_s3(
            syn=syn,
            file_path=expanded_upload_path,
            storage_location_id=None,
            mimetype=mimetype,
            md5=md5,
            storage_str="Uploading to Synapse storage",
        )

create_external_file_handle(syn, path, mimetype=None, md5=None, file_size=None) async

Create a file handle in Synapse without uploading any files. This is used in cases where one wishes to store a reference to a file that is not in Synapse.

Source code in synapseclient/core/upload/upload_functions_async.py
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
async def create_external_file_handle(
    syn: "Synapse",
    path: str,
    mimetype: str = None,
    md5: str = None,
    file_size: int = None,
) -> Dict[str, Union[str, int]]:
    """Create a file handle in Synapse without uploading any files. This is used in
    cases where one wishes to store a reference to a file that is not in Synapse."""
    is_local_file = False  # defaults to false
    url = as_url(os.path.expandvars(os.path.expanduser(path)))
    if is_url(url):
        parsed_url = urllib_parse.urlparse(url)
        parsed_path = file_url_to_path(url)
        if parsed_url.scheme == "file" and os.path.isfile(parsed_path):
            actual_md5 = await utils.md5_for_file_multiprocessing(
                filename=parsed_path,
                process_pool_executor=syn._get_process_pool_executor(
                    asyncio_event_loop=asyncio.get_running_loop()
                ),
                md5_semaphore=syn._get_md5_semaphore(
                    asyncio_event_loop=asyncio.get_running_loop()
                ),
            )
            if md5 is not None and md5 != actual_md5:
                raise SynapseMd5MismatchError(
                    f"The specified md5 [{md5}] does not match the calculated md5 "
                    f"[{actual_md5}] for local file [{parsed_path}]",
                )
            md5 = actual_md5
            file_size = os.stat(parsed_path).st_size
            is_local_file = True
    else:
        raise ValueError(f"externalUrl [{url}] is not a valid url")

    # just creates the file handle because there is nothing to upload
    file_handle = await post_external_filehandle(
        external_url=url, mimetype=mimetype, md5=md5, file_size=file_size
    )
    if is_local_file:
        syn.cache.add(
            file_handle_id=file_handle["id"], path=file_url_to_path(url), md5=md5
        )
    trace.get_current_span().set_attributes(
        {"synapse.file_handle_id": file_handle["id"]}
    )
    return file_handle

upload_external_file_handle_sftp(syn, file_path, sftp_url, mimetype=None, md5=None, storage_str=None) async

Upload a file to an SFTP server and create a file handle in Synapse.

Source code in synapseclient/core/upload/upload_functions_async.py
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
async def upload_external_file_handle_sftp(
    syn: "Synapse",
    file_path: str,
    sftp_url: str,
    mimetype: str = None,
    md5: str = None,
    storage_str: str = None,
) -> Dict[str, Union[str, int]]:
    """Upload a file to an SFTP server and create a file handle in Synapse."""
    username, password = syn._getUserCredentials(url=sftp_url)
    uploaded_url = SFTPWrapper.upload_file(
        file_path,
        urllib_parse.unquote(sftp_url),
        username,
        password,
        storage_str=storage_str,
    )

    file_md5 = md5 or await utils.md5_for_file_multiprocessing(
        filename=file_path,
        process_pool_executor=syn._get_process_pool_executor(
            asyncio_event_loop=asyncio.get_running_loop()
        ),
        md5_semaphore=syn._get_md5_semaphore(
            asyncio_event_loop=asyncio.get_running_loop()
        ),
    )
    file_handle = await post_external_filehandle(
        external_url=uploaded_url,
        mimetype=mimetype,
        md5=file_md5,
        file_size=os.stat(file_path).st_size,
    )
    syn.cache.add(file_handle_id=file_handle["id"], path=file_path, md5=file_md5)
    return file_handle

upload_synapse_s3(syn, file_path, storage_location_id=None, mimetype=None, force_restart=False, md5=None, storage_str=None) async

Upload a file to Synapse storage and create a file handle in Synapse.

Argments

syn: The synapse client file_path: The path to the file to upload. storage_location_id: The storage location ID. mimetype: The mimetype of the file. force_restart: If True, will force the upload to restart. md5: The MD5 checksum for the file. storage_str: The storage string.

RETURNS DESCRIPTION
Dict[str, Union[str, int]]

A dictionary of the file handle.

Source code in synapseclient/core/upload/upload_functions_async.py
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
async def upload_synapse_s3(
    syn: "Synapse",
    file_path: str,
    storage_location_id: Optional[int] = None,
    mimetype: str = None,
    force_restart: bool = False,
    md5: str = None,
    storage_str: str = None,
) -> Dict[str, Union[str, int]]:
    """Upload a file to Synapse storage and create a file handle in Synapse.

    Argments:
        syn: The synapse client
        file_path: The path to the file to upload.
        storage_location_id: The storage location ID.
        mimetype: The mimetype of the file.
        force_restart: If True, will force the upload to restart.
        md5: The MD5 checksum for the file.
        storage_str: The storage string.

    Returns:
        A dictionary of the file handle.
    """
    file_handle_id = await multipart_upload_file_async(
        syn=syn,
        file_path=file_path,
        content_type=mimetype,
        storage_location_id=storage_location_id,
        md5=md5,
        force_restart=force_restart,
        storage_str=storage_str,
    )
    syn.cache.add(file_handle_id=file_handle_id, path=file_path, md5=md5)

    return await get_file_handle(file_handle_id=file_handle_id, synapse_client=syn)

upload_synapse_sts_boto_s3(syn, parent_id, upload_destination, local_path, mimetype=None, md5=None, storage_str=None) async

When uploading to Synapse storage normally the back end will generate a random prefix for our uploaded object. Since in this case the client is responsible for the remote key, the client will instead generate a random prefix. this both ensures we don't have a collision with an existing S3 object and also mitigates potential performance issues, although key locality performance issues are likely resolved as of: https://aws.amazon.com/about-aws/whats-new/2018/07/amazon-s3-announces-increased-request-rate-performance/

PARAMETER DESCRIPTION
syn

The synapse client

TYPE: Synapse

parent_id

The synapse ID of the parent.

TYPE: str

upload_destination

The upload destination

TYPE: Dict[str, Union[str, int]]

local_path

The local path to the file to upload.

TYPE: str

mimetype

The mimetype is known. Defaults to None.

TYPE: str DEFAULT: None

md5

MD5 checksum for the file, if known.

TYPE: str DEFAULT: None

RETURNS DESCRIPTION
Dict[str, Union[str, int, bool]]

description

Source code in synapseclient/core/upload/upload_functions_async.py
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
async def upload_synapse_sts_boto_s3(
    syn: "Synapse",
    parent_id: str,
    upload_destination: Dict[str, Union[str, int]],
    local_path: str,
    mimetype: str = None,
    md5: str = None,
    storage_str: str = None,
) -> Dict[str, Union[str, int, bool]]:
    """
    When uploading to Synapse storage normally the back end will generate a random prefix
    for our uploaded object. Since in this case the client is responsible for the remote
    key, the client will instead generate a random prefix. this both ensures we don't have a collision
    with an existing S3 object and also mitigates potential performance issues, although
    key locality performance issues are likely resolved as of:
    <https://aws.amazon.com/about-aws/whats-new/2018/07/amazon-s3-announces-increased-request-rate-performance/>

    Arguments:
        syn: The synapse client
        parent_id: The synapse ID of the parent.
        upload_destination: The upload destination
        local_path: The local path to the file to upload.
        mimetype: The mimetype is known. Defaults to None.
        md5: MD5 checksum for the file, if known.

    Returns:
        _description_
    """
    key_prefix = str(uuid.uuid4())

    bucket_name = upload_destination["bucket"]
    storage_location_id = upload_destination["storageLocationId"]
    remote_file_key = "/".join(
        [
            upload_destination.get("baseKey", ""),
            key_prefix,
            os.path.basename(local_path),
        ]
    )

    def upload_fn(credentials: Dict[str, str]) -> str:
        """Wrapper for the upload function."""
        return S3ClientWrapper.upload_file(
            bucket=bucket_name,
            endpoint_url=None,
            remote_file_key=remote_file_key,
            upload_file_path=local_path,
            credentials=credentials,
            transfer_config_kwargs={"max_concurrency": syn.max_threads},
            storage_str=storage_str,
        )

    loop = asyncio.get_event_loop()

    await loop.run_in_executor(
        syn._get_thread_pool_executor(asyncio_event_loop=loop),
        lambda: sts_transfer.with_boto_sts_credentials(
            upload_fn, syn, parent_id, "read_write"
        ),
    )

    return await post_external_s3_file_handle(
        bucket_name=bucket_name,
        s3_file_key=remote_file_key,
        file_path=local_path,
        storage_location_id=storage_location_id,
        mimetype=mimetype,
        md5=md5,
        synapse_client=syn,
    )

upload_client_auth_s3(syn, file_path, bucket, endpoint_url, key_prefix, storage_location_id, mimetype=None, md5=None, storage_str=None) async

Use the S3 client to upload a file to an S3 bucket.

Source code in synapseclient/core/upload/upload_functions_async.py
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
async def upload_client_auth_s3(
    syn: "Synapse",
    file_path: str,
    bucket: str,
    endpoint_url: str,
    key_prefix: str,
    storage_location_id: int,
    mimetype: str = None,
    md5: str = None,
    storage_str: str = None,
) -> Dict[str, Union[str, int]]:
    """Use the S3 client to upload a file to an S3 bucket."""
    profile = syn._get_client_authenticated_s3_profile(endpoint_url, bucket)
    file_key = key_prefix + "/" + os.path.basename(file_path)
    loop = asyncio.get_event_loop()

    await loop.run_in_executor(
        syn._get_thread_pool_executor(asyncio_event_loop=loop),
        lambda: S3ClientWrapper.upload_file(
            bucket,
            endpoint_url,
            file_key,
            file_path,
            profile_name=profile,
            storage_str=storage_str,
        ),
    )

    file_handle = await post_external_object_store_filehandle(
        s3_file_key=file_key,
        file_path=file_path,
        storage_location_id=storage_location_id,
        mimetype=mimetype,
        md5=md5,
        synapse_client=syn,
    )
    syn.cache.add(file_handle_id=file_handle["id"], path=file_path, md5=md5)

    return file_handle

Multipart Upload Async

synapseclient.core.upload.multipart_upload_async

Implements the client side of Synapse's Multipart File Upload API, which provides a robust means of uploading large files (into the 10s of GiB). End users should not need to call any of the methods under UploadAttempt directly.

This mermaid flowchart illustrates the process of uploading a file to Synapse using the multipart upload API.

flowchart  TD
    upload_file_handle --> before-upload
    subgraph before-upload
        subgraph Disk I/O & CPU
            subgraph Multi-Processing
                md5["Calculate MD5"]
            end
            mime["Guess mime type"]
            file_size["Get file size"]
            file_name["Get file name"]
        end

        subgraph HTTP
            upload_destination["Find where to Upload 
 GET /entity/{entity_id}/uploadDestination"]
            start_upload["Start upload with Synapse 
 POST /file/multipart"]
            presigned_urls["Get URLs to upload to 
 POST /file/multipart/{upload_id}/presigned/url/batch"]
        end
    end

    before-upload --> during-upload

    subgraph during-upload
        subgraph multi-threaded["multi-threaded for each part"]
            read_part["Read part to upload into Memory"]
            read_part --> put_part["HTTP PUT to storage provider"]

            subgraph thread_locked1["Lock thread"]
                refresh_check{"New URl available?"}
                refresh_check --> |no|refresh
                refresh["Refresh remaining URLs to upload to 
 POST /file/multipart/{upload_id}/presigned/url/batch"]
            end


            put_part --> |URL Expired|refresh_check
            refresh_check --> |yes|put_part
            refresh --> put_part
            put_part --> |Finished|md5_part["Calculate MD5 of part"]
        end
        complete_part["PUT /file/multipart/{upload_id}/add/{part_number}?partMD5Hex={md5_hex}"]
        multi-threaded -->|Upload finished| complete_part
    end

    during-upload --> post-upload

    subgraph post-upload
        post_upload_compelete["PUT /file/multipart/{upload_id}/complete"]
        get_file_handle["GET /fileHandle/{file_handle_id}"]
    end

    post-upload --> entity["Create/Update Synapse entity"]

Classes

HandlePartResult dataclass

Result of a part upload.

ATTRIBUTE DESCRIPTION
part_number

The part number that was uploaded.

TYPE: int

part_size

The size of the part that was uploaded.

TYPE: int

md5_hex

The MD5 hash of the part that was uploaded.

TYPE: str

Source code in synapseclient/core/upload/multipart_upload_async.py
147
148
149
150
151
152
153
154
155
156
157
158
159
@dataclass
class HandlePartResult:
    """Result of a part upload.

    Attributes:
        part_number: The part number that was uploaded.
        part_size: The size of the part that was uploaded.
        md5_hex: The MD5 hash of the part that was uploaded.
    """

    part_number: int
    part_size: int
    md5_hex: str

UploadAttemptAsync

Used to handle multi-threaded operations for uploading one or parts of a file.

Source code in synapseclient/core/upload/multipart_upload_async.py
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
class UploadAttemptAsync:
    """
    Used to handle multi-threaded operations for uploading one or parts of a file.
    """

    def __init__(
        self,
        syn: "Synapse",
        dest_file_name: str,
        upload_request_payload: Dict[str, Any],
        part_request_body_provider_fn: Union[None, Callable[[int], bytes]],
        md5_fn: Callable[[bytes, httpx.Response], str],
        force_restart: bool,
        storage_str: str = None,
    ) -> None:
        self._syn = syn
        self._dest_file_name = dest_file_name
        self._part_size = upload_request_payload["partSizeBytes"]

        self._upload_request_payload = upload_request_payload

        self._part_request_body_provider_fn = part_request_body_provider_fn
        self._md5_fn = md5_fn

        self._force_restart = force_restart

        self._lock = asyncio.Lock()
        self._thread_lock = threading.Lock()
        self._aborted = False
        self._storage_str = storage_str

        self._close_progress_bar = getattr(_thread_local, "progress_bar", None) is None
        # populated later
        self._upload_id: Optional[str] = None
        self._pre_signed_part_urls: Optional[Mapping[int, str]] = None
        self._progress_bar = None

    async def __call__(self) -> Dict[str, str]:
        """Orchestrate the upload of a file to Synapse."""
        upload_status_response = await post_file_multipart(
            upload_request_payload=self._upload_request_payload,
            force_restart=self._force_restart,
            endpoint=self._syn.fileHandleEndpoint,
            synapse_client=self._syn,
        )
        upload_state = upload_status_response.get("state")

        if upload_state != "COMPLETED":
            self._upload_id = upload_status_response["uploadId"]
            part_count, remaining_part_numbers = self._get_remaining_part_numbers(
                upload_status_response
            )

            # if no remaining part numbers then all the parts have been
            # uploaded but the upload has not been marked complete.
            if remaining_part_numbers:
                await self._upload_parts(part_count, remaining_part_numbers)
            upload_status_response = await self._complete_upload()

        return upload_status_response

    @classmethod
    def _get_remaining_part_numbers(
        cls, upload_status: Dict[str, str]
    ) -> Tuple[int, List[int]]:
        part_numbers = []
        parts_state = upload_status["partsState"]

        # parts are 1-based
        for i, part_status in enumerate(parts_state, 1):
            if part_status == "0":
                part_numbers.append(i)

        return len(parts_state), part_numbers

    def _is_copy(self) -> bool:
        # is this a copy or upload request
        return (
            self._upload_request_payload.get("concreteType")
            == concrete_types.MULTIPART_UPLOAD_COPY_REQUEST
        )

    async def _fetch_pre_signed_part_urls_async(
        self,
        upload_id: str,
        part_numbers: List[int],
    ) -> Mapping[int, str]:
        trace.get_current_span().set_attributes({"synapse.upload_id": upload_id})
        response = await post_file_multipart_presigned_urls(
            upload_id=upload_id,
            part_numbers=part_numbers,
            synapse_client=self._syn,
        )

        part_urls = {}
        for part in response["partPresignedUrls"]:
            part_urls[part["partNumber"]] = (
                part["uploadPresignedUrl"],
                part.get("signedHeaders", {}),
            )

        return part_urls

    def _refresh_pre_signed_part_urls(
        self,
        part_number: int,
        expired_url: str,
    ) -> Tuple[str, Dict[str, str]]:
        """Refresh all unfetched presigned urls, and return the refreshed
        url for the given part number. If an existing expired_url is passed
        and the url for the given part has already changed that new url
        will be returned without a refresh (i.e. it is assumed that another
        thread has already refreshed the url since the passed url expired).

        Arguments:
            part_number: the part number whose refreshed url should
                         be returned
            expired_url: the url that was detected as expired triggering
                         this refresh
        Returns:
            refreshed URL

        """
        with self._thread_lock:
            current_url, headers = self._pre_signed_part_urls[part_number]
            if current_url != expired_url:
                # if the url has already changed since the given url
                # was detected as expired we can assume that another
                # thread already refreshed the url and can avoid the extra
                # fetch.
                refreshed_url = current_url, headers
            else:
                self._pre_signed_part_urls = wrap_async_to_sync(
                    self._fetch_pre_signed_part_urls_async(
                        self._upload_id,
                        list(self._pre_signed_part_urls.keys()),
                    ),
                    syn=self._syn,
                )

                refreshed_url = self._pre_signed_part_urls[part_number]

        return refreshed_url

    async def _handle_part_wrapper(self, part_number: int) -> HandlePartResult:
        loop = asyncio.get_running_loop()
        otel_context = context.get_current()

        mem_info = psutil.virtual_memory()

        if mem_info.available <= self._part_size * 2:
            gc.collect()

        return await loop.run_in_executor(
            self._syn._get_thread_pool_executor(asyncio_event_loop=loop),
            self._handle_part,
            part_number,
            otel_context,
        )

    async def _upload_parts(
        self, part_count: int, remaining_part_numbers: List[int]
    ) -> None:
        """Take a list of part numbers and upload them to the pre-signed URLs.

        Arguments:
            part_count: The total number of parts in the upload.
            remaining_part_numbers: The parts that still need to be uploaded.
        """
        completed_part_count = part_count - len(remaining_part_numbers)
        file_size = self._upload_request_payload.get("fileSizeBytes")

        self._pre_signed_part_urls = await self._fetch_pre_signed_part_urls_async(
            upload_id=self._upload_id,
            part_numbers=remaining_part_numbers,
        )

        async_tasks = []

        for part_number in remaining_part_numbers:
            async_tasks.append(
                asyncio.create_task(self._handle_part_wrapper(part_number=part_number))
            )

        if not self._syn.silent and not self._progress_bar:
            if self._is_copy():
                # we won't have bytes to measure during a copy so the byte oriented
                # progress bar is not useful
                self._progress_bar = getattr(
                    _thread_local, "progress_bar", None
                ) or tqdm(
                    total=part_count,
                    desc=self._storage_str or "Copying",
                    unit_scale=True,
                    postfix=self._dest_file_name,
                    smoothing=0,
                )
                self._progress_bar.update(completed_part_count)
            else:
                previously_transferred = min(
                    completed_part_count * self._part_size,
                    file_size,
                )

                self._progress_bar = getattr(
                    _thread_local, "progress_bar", None
                ) or tqdm(
                    total=file_size,
                    desc=self._storage_str or "Uploading",
                    unit="B",
                    unit_scale=True,
                    postfix=self._dest_file_name,
                    smoothing=0,
                )
                self._progress_bar.update(previously_transferred)

        raised_exception = await self._orchestrate_upload_part_tasks(async_tasks)

        if raised_exception is not None:
            if isinstance(raised_exception, KeyboardInterrupt):
                raise SynapseUploadAbortedException(
                    "User interrupted upload"
                ) from raised_exception
            raise SynapseUploadFailedException(
                "Part upload failed"
            ) from raised_exception

    def _update_progress_bar(self, part_size: int) -> None:
        """Update the progress bar with the given part size."""
        if self._syn.silent or not self._progress_bar:
            return
        self._progress_bar.update(1 if self._is_copy() else part_size)

    async def _orchestrate_upload_part_tasks(
        self, async_tasks: List[asyncio.Task]
    ) -> Union[Exception, KeyboardInterrupt, None]:
        """
        Orchestrate the result of the upload part tasks. If successful, send a
        request to the server to add the part to the upload.

        Arguments:
            async_tasks: A set of tasks to orchestrate.

        Returns:
            An exception if one was raised, otherwise None.
        """
        raised_exception = None

        while async_tasks:
            done_tasks, pending_tasks = await asyncio.wait(
                async_tasks, return_when=asyncio.FIRST_COMPLETED
            )
            async_tasks = pending_tasks
            for completed_task in done_tasks:
                try:
                    task_result = completed_task.result()

                    if isinstance(task_result, HandlePartResult):
                        part_number = task_result.part_number
                        part_size = task_result.part_size
                        part_md5_hex = task_result.md5_hex
                    elif (
                        isinstance(task_result, AddPartResponse)
                        and task_result.add_part_state != "ADD_SUCCESS"
                    ):
                        # Restart the file upload process resuming where this left off.
                        # Rest docs state:
                        # "If add part fails for any reason, the client must re-upload
                        # the part and then re-attempt to add the part to the upload."
                        raise SynapseUploadFailedException(
                            (
                                "Adding individual part failed with unexpected state: "
                                f"{task_result.add_part_state}, for upload "
                                f"{task_result.upload_id} and part "
                                f"{task_result.part_number} with message: "
                                f"{task_result.error_message}"
                            )
                        )
                    else:
                        continue

                    async_tasks.add(
                        asyncio.create_task(
                            put_file_multipart_add(
                                upload_id=self._upload_id,
                                part_number=part_number,
                                md5_hex=part_md5_hex,
                                synapse_client=self._syn,
                            )
                        )
                    )

                    self._update_progress_bar(part_size=part_size)

                except (Exception, KeyboardInterrupt) as cause:
                    with self._thread_lock:
                        if self._aborted:
                            # we've already aborted, no need to raise
                            # another exception
                            continue
                        self._aborted = True
                    raised_exception = cause
                    continue
        return raised_exception

    async def _complete_upload(self) -> Dict[str, str]:
        """Close the upload and mark it as complete.

        Returns:
            The response from the server for the completed upload.
        """
        if not self._syn.silent and self._progress_bar and self._close_progress_bar:
            self._progress_bar.close()
        upload_status_response = await put_file_multipart_complete(
            upload_id=self._upload_id,
            endpoint=self._syn.fileHandleEndpoint,
            synapse_client=self._syn,
        )

        upload_state = upload_status_response.get("state")
        if upload_state != "COMPLETED":
            # at this point we think successfully uploaded all the parts
            # but the upload status isn't complete, we'll throw an error
            # and let a subsequent attempt try to reconcile
            raise SynapseUploadFailedException(
                f"Upload status has an unexpected state {upload_state}"
            )

        return upload_status_response

    def _handle_part(
        self, part_number: int, otel_context: Union[Context, None]
    ) -> HandlePartResult:
        """Take an individual part number and upload it to the pre-signed URL.

        Arguments:
            part_number: The part number to upload.
            otel_context: The OpenTelemetry context to use for tracing.

        Returns:
            The result of the part upload.

        Raises:
            SynapseUploadAbortedException: If the upload has been aborted.
            ValueError: If the part body is None.
        """
        if otel_context:
            context.attach(otel_context)
        with self._thread_lock:
            if self._aborted:
                # this upload attempt has already been aborted
                # so we short circuit the attempt to upload this part
                raise SynapseUploadAbortedException(
                    f"Upload aborted, skipping part {part_number}"
                )

            part_url, signed_headers = self._pre_signed_part_urls.get(part_number)

        session: httpx.Client = self._syn._requests_session_storage

        # obtain the body (i.e. the upload bytes) for the given part number.
        body = (
            self._part_request_body_provider_fn(part_number)
            if self._part_request_body_provider_fn
            else None
        )
        part_size = len(body) if body else 0
        self._syn.logger.debug(f"Uploading part {part_number} of size {part_size}")
        if not self._is_copy() and body is None:
            raise ValueError(f"No body for part {part_number}")

        response = self._put_part_with_retry(
            session=session,
            body=body,
            part_url=part_url,
            signed_headers=signed_headers,
            part_number=part_number,
        )

        md5_hex = self._md5_fn(body, response)
        del response
        del body

        # # remove so future batch pre_signed url fetches will exclude this part
        with self._thread_lock:
            del self._pre_signed_part_urls[part_number]

        return HandlePartResult(part_number, part_size, md5_hex)

    def _put_part_with_retry(
        self,
        session: httpx.Client,
        body: bytes,
        part_url: str,
        signed_headers: Dict[str, str],
        part_number: int,
    ) -> Union[httpx.Response, None]:
        """Put a part to the storage provider with retries.

        Arguments:
            session: The requests session to use for the put.
            body: The body of the part to put.
            part_url: The URL to put the part to.
            signed_headers: The signed headers to use for the put.
            part_number: The part number being put.

        Returns:
            The response from the put.

        Raises:
            SynapseHTTPError: If the put fails.
        """
        response = None
        for retry in range(2):
            try:
                # use our backoff mechanism here, we have encountered 500s on puts to AWS signed urls

                response = with_retry_time_based(
                    lambda part_url=part_url, signed_headers=signed_headers: session.put(
                        url=part_url,
                        content=body,  # noqa: F821
                        headers=signed_headers,
                    ),
                    retry_exceptions=[requests.exceptions.ConnectionError],
                )

                _raise_for_status_httpx(response=response, logger=self._syn.logger)

                # completed upload part to s3 successfully
                break

            except SynapseHTTPError as ex:
                if ex.response.status_code == 403 and retry < 1:
                    # we interpret this to mean our pre_signed url expired.
                    self._syn.logger.debug(
                        f"The pre-signed upload URL for part {part_number} has expired. "
                        "Refreshing urls and retrying.\n"
                    )

                    # we refresh all the urls and obtain this part's
                    # specific url for the retry
                    (
                        part_url,
                        signed_headers,
                    ) = self._refresh_pre_signed_part_urls(
                        part_number,
                        part_url,
                    )

                else:
                    raise
        return response

Functions

shared_progress_bar(progress_bar)

An outside process that will eventually trigger an upload through this module can configure a shared Progress Bar by running its code within this context manager.

Source code in synapseclient/core/upload/multipart_upload_async.py
133
134
135
136
137
138
139
140
141
142
143
144
@contextmanager
def shared_progress_bar(progress_bar):
    """An outside process that will eventually trigger an upload through this module
    can configure a shared Progress Bar by running its code within this context manager.
    """
    _thread_local.progress_bar = progress_bar
    try:
        yield
    finally:
        _thread_local.progress_bar.close()
        _thread_local.progress_bar.refresh()
        del _thread_local.progress_bar

multipart_upload_file_async(syn, file_path, dest_file_name=None, content_type=None, part_size=None, storage_location_id=None, preview=True, force_restart=False, md5=None, storage_str=None) async

Upload a file to a Synapse upload destination in chunks.

PARAMETER DESCRIPTION
syn

a Synapse object

TYPE: Synapse

file_path

the file to upload

TYPE: str

dest_file_name

upload as a different filename

TYPE: str DEFAULT: None

content_type

Refers to the Content-Type of the API request.

TYPE: str DEFAULT: None

part_size

Number of bytes per part. Minimum is 5MiB (5 * 1024 * 1024 bytes).

TYPE: int DEFAULT: None

storage_location_id

an id indicating where the file should be stored. Retrieved from Synapse's UploadDestination

TYPE: str DEFAULT: None

preview

True to generate a preview

TYPE: bool DEFAULT: True

force_restart

True to restart a previously initiated upload from scratch, False to try to resume

TYPE: bool DEFAULT: False

md5

The MD5 of the file. If not provided, it will be calculated.

TYPE: str DEFAULT: None

storage_str

Optional string to append to the upload message

TYPE: str DEFAULT: None

RETURNS DESCRIPTION
str

a File Handle ID

Keyword arguments are passed down to _multipart_upload().

Source code in synapseclient/core/upload/multipart_upload_async.py
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
async def multipart_upload_file_async(
    syn: "Synapse",
    file_path: str,
    dest_file_name: str = None,
    content_type: str = None,
    part_size: int = None,
    storage_location_id: str = None,
    preview: bool = True,
    force_restart: bool = False,
    md5: str = None,
    storage_str: str = None,
) -> str:
    """Upload a file to a Synapse upload destination in chunks.

    Arguments:
        syn: a Synapse object
        file_path: the file to upload
        dest_file_name: upload as a different filename
        content_type: Refers to the Content-Type of the API request.
        part_size: Number of bytes per part. Minimum is 5MiB (5 * 1024 * 1024 bytes).
        storage_location_id: an id indicating where the file should be
                             stored. Retrieved from Synapse's UploadDestination
        preview: True to generate a preview
        force_restart: True to restart a previously initiated upload
                       from scratch, False to try to resume
        md5: The MD5 of the file. If not provided, it will be calculated.
        storage_str: Optional string to append to the upload message

    Returns:
        a File Handle ID

    Keyword arguments are passed down to
    [_multipart_upload()][synapseclient.core.upload.multipart_upload._multipart_upload].

    """
    trace.get_current_span().set_attributes(
        {
            "synapse.storage_location_id": (
                storage_location_id if storage_location_id is not None else ""
            )
        }
    )

    if not os.path.exists(file_path):
        raise IOError(f'File "{file_path}" not found.')
    if os.path.isdir(file_path):
        raise IOError(f'File "{file_path}" is a directory.')

    file_size = os.path.getsize(file_path)
    if not dest_file_name:
        dest_file_name = os.path.basename(file_path)

    if content_type is None:
        mime_type, _ = mimetypes.guess_type(file_path, strict=False)
        content_type = mime_type or "application/octet-stream"

    md5_hex = md5 or (
        await md5_for_file_multiprocessing(
            filename=file_path,
            process_pool_executor=syn._get_process_pool_executor(
                asyncio_event_loop=asyncio.get_running_loop()
            ),
            md5_semaphore=syn._get_md5_semaphore(
                asyncio_event_loop=asyncio.get_running_loop()
            ),
        )
    )

    part_size = get_part_size(
        part_size or DEFAULT_PART_SIZE,
        file_size,
        MIN_PART_SIZE,
        MAX_NUMBER_OF_PARTS,
    )

    upload_request = {
        "concreteType": concrete_types.MULTIPART_UPLOAD_REQUEST,
        "contentType": content_type,
        "contentMD5Hex": md5_hex,
        "fileName": dest_file_name,
        "fileSizeBytes": file_size,
        "generatePreview": preview,
        "partSizeBytes": part_size,
        "storageLocationId": storage_location_id,
    }

    def part_fn(part_number: int) -> bytes:
        """Return the nth chunk of a file."""
        return get_file_chunk(file_path, part_number, part_size)

    return await _multipart_upload_async(
        syn,
        dest_file_name,
        upload_request,
        part_fn,
        md5_fn_util,
        force_restart=force_restart,
        storage_str=storage_str,
    )

multipart_upload_string_async(syn, text, dest_file_name=None, part_size=None, content_type=None, storage_location_id=None, preview=True, force_restart=False) async

Upload a file to a Synapse upload destination in chunks.

PARAMETER DESCRIPTION
syn

a Synapse object

TYPE: Synapse

text

a string to upload as a file.

TYPE: str

dest_file_name

upload as a different filename

TYPE: str DEFAULT: None

content_type

Refers to the Content-Type of the API request.

TYPE: str DEFAULT: None

part_size

number of bytes per part. Minimum 5MB.

TYPE: int DEFAULT: None

storage_location_id

an id indicating where the file should be stored. Retrieved from Synapse's UploadDestination

TYPE: str DEFAULT: None

preview

True to generate a preview

TYPE: bool DEFAULT: True

force_restart

True to restart a previously initiated upload from scratch, False to try to resume

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
str

a File Handle ID

Keyword arguments are passed down to _multipart_upload().

Source code in synapseclient/core/upload/multipart_upload_async.py
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
async def multipart_upload_string_async(
    syn: "Synapse",
    text: str,
    dest_file_name: str = None,
    part_size: int = None,
    content_type: str = None,
    storage_location_id: str = None,
    preview: bool = True,
    force_restart: bool = False,
) -> str:
    """Upload a file to a Synapse upload destination in chunks.

    Arguments:
        syn: a Synapse object
        text: a string to upload as a file.
        dest_file_name: upload as a different filename
        content_type: Refers to the Content-Type of the API request.
        part_size: number of bytes per part. Minimum 5MB.
        storage_location_id: an id indicating where the file should be
                             stored. Retrieved from Synapse's UploadDestination
        preview: True to generate a preview
        force_restart: True to restart a previously initiated upload
                       from scratch, False to try to resume

    Returns:
        a File Handle ID

    Keyword arguments are passed down to
    [_multipart_upload()][synapseclient.core.upload.multipart_upload._multipart_upload].

    """
    data = text.encode("utf-8")
    file_size = len(data)
    md5_hex = md5_fn_util(data, None)

    if not dest_file_name:
        dest_file_name = "message.txt"

    if not content_type:
        content_type = "text/plain; charset=utf-8"

    part_size = get_part_size(
        part_size or DEFAULT_PART_SIZE, file_size, MIN_PART_SIZE, MAX_NUMBER_OF_PARTS
    )

    upload_request = {
        "concreteType": concrete_types.MULTIPART_UPLOAD_REQUEST,
        "contentType": content_type,
        "contentMD5Hex": md5_hex,
        "fileName": dest_file_name,
        "fileSizeBytes": file_size,
        "generatePreview": preview,
        "partSizeBytes": part_size,
        "storageLocationId": storage_location_id,
    }

    def part_fn(part_number: int) -> bytes:
        """Get the nth chunk of a buffer."""
        return get_data_chunk(data, part_number, part_size)

    part_size = get_part_size(
        part_size or DEFAULT_PART_SIZE, file_size, MIN_PART_SIZE, MAX_NUMBER_OF_PARTS
    )
    return await _multipart_upload_async(
        syn,
        dest_file_name,
        upload_request,
        part_fn,
        md5_fn_util,
        force_restart=force_restart,
    )

multipart_copy_async(syn, source_file_handle_association, dest_file_name=None, part_size=None, storage_location_id=None, preview=True, force_restart=False) async

Makes a Multipart Upload Copy Request. This request performs a copy of an existing file handle without data transfer from the client.

PARAMETER DESCRIPTION
syn

A Synapse object

TYPE: Synapse

source_file_handle_association

Describes an association of a FileHandle with another object.

TYPE: Dict[str, str]

dest_file_name

The name of the file to be uploaded.

TYPE: str DEFAULT: None

part_size

The size that each part will be (in bytes).

TYPE: int DEFAULT: None

storage_location_id

The identifier of the storage location where this file should be copied to. The user must be the owner of the storage location.

TYPE: str DEFAULT: None

preview

True to generate a preview of the data.

TYPE: bool DEFAULT: True

force_restart

True to restart a previously initiated upload from scratch, False to try to resume.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
str

a File Handle ID

Keyword arguments are passed down to _multipart_upload().

Source code in synapseclient/core/upload/multipart_upload_async.py
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
async def multipart_copy_async(
    syn: "Synapse",
    source_file_handle_association: Dict[str, str],
    dest_file_name: str = None,
    part_size: int = None,
    storage_location_id: str = None,
    preview: bool = True,
    force_restart: bool = False,
) -> str:
    """Makes a
    [Multipart Upload Copy Request](https://rest-docs.synapse.org/rest/org/sagebionetworks/repo/model/file/MultipartUploadCopyRequest.html).
    This request performs a copy of an existing file handle without data transfer from the client.

    Arguments:
        syn: A Synapse object
        source_file_handle_association: Describes an association of a FileHandle with another object.
        dest_file_name: The name of the file to be uploaded.
        part_size: The size that each part will be (in bytes).
        storage_location_id: The identifier of the storage location where this file should be copied to.
                             The user must be the owner of the storage location.
        preview: True to generate a preview of the data.
        force_restart: True to restart a previously initiated upload from scratch, False to try to resume.

    Returns:
        a File Handle ID

    Keyword arguments are passed down to
    [_multipart_upload()][synapseclient.core.upload.multipart_upload._multipart_upload].

    """
    part_size = part_size or DEFAULT_PART_SIZE

    upload_request = {
        "concreteType": concrete_types.MULTIPART_UPLOAD_COPY_REQUEST,
        "fileName": dest_file_name,
        "generatePreview": preview,
        "partSizeBytes": part_size,
        "sourceFileHandleAssociation": source_file_handle_association,
        "storageLocationId": storage_location_id,
    }

    return await _multipart_upload_async(
        syn,
        dest_file_name,
        upload_request,
        copy_part_request_body_provider_fn,
        copy_md5_fn,
        force_restart=force_restart,
    )

Multithreaded Downloading

synapseclient.core.multithread_download

Classes

DownloadRequest

Bases: NamedTuple

A request to download a file from Synapse

ATTRIBUTE DESCRIPTION
file_handle_id

The file handle ID to download.

object_id

The Synapse object this file associated to.

object_type

The type of the associated Synapse object.

path

The local path to download the file to. This path can be either an absolute path or a relative path from where the code is executed to the download location.

debug

A boolean to specify if debug mode is on.

TYPE: bool

Source code in synapseclient/core/multithread_download/download_threads.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
class DownloadRequest(NamedTuple):
    """
    A request to download a file from Synapse

    Attributes:
        file_handle_id : The file handle ID to download.
        object_id : The Synapse object this file associated to.
        object_type : The type of the associated Synapse object.
        path : The local path to download the file to.
            This path can be either an absolute path or
            a relative path from where the code is executed to the download location.
        debug: A boolean to specify if debug mode is on.
    """

    file_handle_id: int
    object_id: str
    object_type: str
    path: str
    debug: bool = False

Functions

download_file(client, download_request, *, max_concurrent_parts=None)

Main driver for the multi-threaded download. Users an ExecutorService, either set externally onto a thread local by an outside process, or creating one as needed otherwise.

PARAMETER DESCRIPTION
client

A synapseclient

download_request

A batch of DownloadRequest objects specifying what Synapse files to download

TYPE: DownloadRequest

max_concurrent_parts

The maximum concurrent number parts to download at once when downloading this file

TYPE: int DEFAULT: None

Source code in synapseclient/core/multithread_download/download_threads.py
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
def download_file(
    client,
    download_request: DownloadRequest,
    *,
    max_concurrent_parts: int = None,
):
    """
    Main driver for the multi-threaded download. Users an ExecutorService,
    either set externally onto a thread local by an outside process,
    or creating one as needed otherwise.

    Arguments:
        client: A synapseclient
        download_request: A batch of DownloadRequest objects specifying what
                            Synapse files to download
        max_concurrent_parts: The maximum concurrent number parts to download
                                at once when downloading this file
    """

    # we obtain an executor from a thread local if we are in the context of a Synapse sync
    # and wan't to re-use the same threadpool as was created for that
    executor = getattr(_thread_local, "executor", None)
    shutdown_after = False
    if not executor:
        shutdown_after = True
        executor = get_executor(client.max_threads)

    max_concurrent_parts = max_concurrent_parts or client.max_threads
    try:
        downloader = _MultithreadedDownloader(client, executor, max_concurrent_parts)
        downloader.download_file(download_request)
    finally:
        # if we created the Executor for the purposes of processing this download we also
        # shut it down. if it was passed in from the outside then it's managed by the caller
        if shutdown_after:
            executor.shutdown()

shared_executor(executor)

An outside process that will eventually trigger a download through the this module can configure a shared Executor by running its code within this context manager.

Source code in synapseclient/core/multithread_download/download_threads.py
39
40
41
42
43
44
45
46
47
@contextmanager
def shared_executor(executor):
    """An outside process that will eventually trigger a download through the this module
    can configure a shared Executor by running its code within this context manager."""
    _thread_local.executor = executor
    try:
        yield
    finally:
        del _thread_local.executor

Cache

synapseclient.core.cache


File Caching


Implements a cache on local disk for Synapse file entities and other objects with a FileHandle. This is part of the internal implementation of the client and should not be accessed directly by users of the client.

Classes

Cache

Represent a cache in which files are accessed by file handle ID.

Source code in synapseclient/core/cache.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
class Cache:
    """
    Represent a cache in which files are accessed by file handle ID.
    """

    def __setattr__(self, key, value):
        # expand out home shortcut ('~') and environment variables when setting cache_root_dir
        if key == "cache_root_dir":
            value = os.path.expandvars(os.path.expanduser(value))
            # create the cache_root_dir if it does not already exist
            if not os.path.exists(value):
                os.makedirs(value, exist_ok=True)
        self.__dict__[key] = value

    def __init__(self, cache_root_dir=CACHE_ROOT_DIR, fanout=1000):
        # set root dir of cache in which meta data will be stored and files
        # will be stored here by default, but other locations can be specified
        self.cache_root_dir = cache_root_dir
        self.fanout = fanout
        self.cache_map_file_name = ".cacheMap"

    def get_cache_dir(
        self, file_handle_id: typing.Union[collections.abc.Mapping, str]
    ) -> str:
        if isinstance(file_handle_id, collections.abc.Mapping):
            if "dataFileHandleId" in file_handle_id:
                file_handle_id = file_handle_id["dataFileHandleId"]
            elif (
                "concreteType" in file_handle_id
                and "id" in file_handle_id
                and file_handle_id["concreteType"].startswith(
                    "org.sagebionetworks.repo.model.file"
                )
            ):
                file_handle_id = file_handle_id["id"]
        return os.path.join(
            self.cache_root_dir,
            str(int(file_handle_id) % self.fanout),
            str(file_handle_id),
        )

    def _read_cache_map(self, cache_dir: str) -> dict:
        cache_map_file = os.path.join(cache_dir, self.cache_map_file_name)

        if not os.path.exists(cache_map_file):
            return {}

        with open(cache_map_file, "r") as f:
            try:
                cache_map = json.load(f)
            except json.decoder.JSONDecodeError:
                # a corrupt cache map file that is not parseable as JSON is treated
                # as if it does not exist at all (will be overwritten).
                return {}

        return cache_map

    def _write_cache_map(self, cache_dir: str, cache_map: dict) -> None:
        if not os.path.exists(cache_dir):
            os.makedirs(cache_dir)

        cache_map_file = os.path.join(cache_dir, self.cache_map_file_name)

        with open(cache_map_file, "w") as f:
            json.dump(cache_map, f)
            f.write("\n")  # For compatibility with R's JSON parser

    def _get_cache_modified_time(
        self, cache_map_entry: typing.Union[str, dict, None]
    ) -> typing.Union[str, None]:
        """
        Retrieve the `modified_time` from the `cache_map_entry`. This needed to be
        backwards-compatible any cache entries that do not have the new JSON structure
        for data. That means that if the cache_map_entry has a `modified_time` key,
        then it is a new entry and we can return the value. If it does not, then it
        is an old entry and we should return the `cache_map_entry` itself.

        The caveat is that `cache_map_entry` needs to be a string to return the value
        otherwise it will return None.

        Arguments:
            cache_map_entry: The entry from the cache map

        Returns:
            The modified time if it exists, otherwise the cache_map_entry
        """
        if cache_map_entry is not None and "modified_time" in cache_map_entry:
            return cache_map_entry.get("modified_time", None)
        elif cache_map_entry is not None and isinstance(cache_map_entry, str):
            return cache_map_entry
        return None

    def _get_cache_content_md5(
        self, cache_map_entry: typing.Union[str, dict, None]
    ) -> typing.Union[str, None]:
        """
        Retrieve the `content_md5` from the cache_map_entry.

        Arguments:
            cache_map_entry: The entry from the cache map

        Returns:
            The content md5 if it exists, otherwise None
        """
        if cache_map_entry is not None and "content_md5" in cache_map_entry:
            return cache_map_entry.get("content_md5", None)
        else:
            return None

    def _cache_item_unmodified(
        self, cache_map_entry: typing.Union[str, dict], path: str
    ) -> bool:
        """
        Determine if the cache_map_entry is unmodified by comparing the modified_time
        and content_md5 to the file at the given path.

        Arguments:
            cache_map_entry: The entry from the cache map
            path:            The path to the file to compare to

        Returns:
            True if the cache_map_entry is unmodified, otherwise False
        """
        cached_time = self._get_cache_modified_time(cache_map_entry)
        cached_md5 = self._get_cache_content_md5(cache_map_entry)

        # compare_timestamps has an implicit check for whether the path exists
        return compare_timestamps(_get_modified_time(path), cached_time) and (
            cached_md5 is None or cached_md5 == utils.md5_for_file(path).hexdigest()
        )

    def contains(
        self, file_handle_id: typing.Union[collections.abc.Mapping, str], path: str
    ) -> bool:
        """
        Given a file and file_handle_id, return True if an unmodified cached
        copy of the file exists at the exact path given or False otherwise.

        Arguments:
            file_handle_id: The ID of the fileHandle
            path:           The file path at which to look for a cached copy
        """
        cache_dir = self.get_cache_dir(file_handle_id)
        if not os.path.exists(cache_dir):
            return False

        with Lock(self.cache_map_file_name, dir=cache_dir):
            cache_map = self._read_cache_map(cache_dir)

            path = utils.normalize_path(path)

            cached_time = self._get_cache_modified_time(cache_map.get(path, None))

            if cached_time:
                return compare_timestamps(_get_modified_time(path), cached_time)
        return False

    @tracer.start_as_current_span("cache::get")
    def get(
        self,
        file_handle_id: typing.Union[collections.abc.Mapping, str],
        path: str = None,
    ) -> typing.Union[str, None]:
        """
        Retrieve a file with the given file handle from the cache.

        Arguments:
            file_handle_id: The ID of the fileHandle
            path:           If the given path is None, look for a cached copy of the
                            file in the cache directory. If the path is a directory,
                            look there for a cached copy. If a full file-path is
                            given, only check whether that exact file exists and is
                            unmodified since it was cached.

        Returns:
            Either a file path, if an unmodified cached copy of the file
            exists in the specified location or None if it does not
        """
        cache_dir = self.get_cache_dir(file_handle_id)
        trace.get_current_span().set_attributes(
            {
                "synapse.cache.dir": cache_dir,
                "synapse.cache.file_handle_id": file_handle_id,
            }
        )
        if not os.path.exists(cache_dir):
            trace.get_current_span().set_attributes({"synapse.cache.hit": False})
            return None

        with Lock(self.cache_map_file_name, dir=cache_dir):
            cache_map = self._read_cache_map(cache_dir)

            path = utils.normalize_path(path)

            # If the caller specifies a path and that path exists in the cache
            # but has been modified, we need to indicate no match by returning
            # None. The logic for updating a synapse entity depends on this to
            # determine the need to upload a new file.

            if path is not None:
                # If we're given a path to a directory, look for a cached file in that directory
                if os.path.isdir(path):
                    matching_unmodified_directory = None
                    removed_entry_from_cache = (
                        False  # determines if cache_map needs to be rewritten to disk
                    )

                    # iterate a copy of cache_map to allow modifying original cache_map
                    for cached_file_path, cache_map_entry in dict(cache_map).items():
                        if path == os.path.dirname(cached_file_path):
                            if self._cache_item_unmodified(
                                cache_map_entry, cached_file_path
                            ):
                                # "break" instead of "return" to write removed invalid entries to disk if necessary
                                matching_unmodified_directory = cached_file_path
                                break
                            else:
                                # remove invalid cache entries pointing to files that that no longer exist
                                # or have been modified
                                del cache_map[cached_file_path]
                                removed_entry_from_cache = True

                    if removed_entry_from_cache:
                        # write cache_map with non-existent entries removed
                        self._write_cache_map(cache_dir, cache_map)

                    if matching_unmodified_directory is not None:
                        trace.get_current_span().set_attributes(
                            {"synapse.cache.hit": True}
                        )
                        return matching_unmodified_directory

                # if we're given a full file path, look up a matching file in the cache
                else:
                    cache_map_entry = cache_map.get(path, None)
                    if cache_map_entry:
                        matching_file_path = (
                            path
                            if self._cache_item_unmodified(cache_map_entry, path)
                            else None
                        )
                        trace.get_current_span().set_attributes(
                            {"synapse.cache.hit": matching_file_path is not None}
                        )
                        return matching_file_path

            # return most recently cached and unmodified file OR
            # None if there are no unmodified files
            for cached_file_path, cache_map_entry in sorted(
                cache_map.items(),
                key=lambda item: (
                    item[1]["modified_time"] if isinstance(item[1], dict) else item[1]
                ),
                reverse=True,
            ):
                if self._cache_item_unmodified(cache_map_entry, cached_file_path):
                    trace.get_current_span().set_attributes({"synapse.cache.hit": True})
                    return cached_file_path

            trace.get_current_span().set_attributes({"synapse.cache.hit": False})
            return None

    def add(
        self,
        file_handle_id: typing.Union[collections.abc.Mapping, str],
        path: str,
        md5: str = None,
    ) -> dict:
        """
        Add a file to the cache
        """
        if not path or not os.path.exists(path):
            raise ValueError('Can\'t find file "%s"' % path)

        cache_dir = self.get_cache_dir(file_handle_id)
        with Lock(self.cache_map_file_name, dir=cache_dir):
            cache_map = self._read_cache_map(cache_dir)

            path = utils.normalize_path(path)
            # write .000 milliseconds for backward compatibility
            cache_map[path] = {
                "modified_time": epoch_time_to_iso(
                    math.floor(_get_modified_time(path))
                ),
                "content_md5": md5 or utils.md5_for_file(path).hexdigest(),
            }
            self._write_cache_map(cache_dir, cache_map)

        return cache_map

    def remove(
        self,
        file_handle_id: typing.Union[collections.abc.Mapping, str],
        path: str = None,
        delete: bool = None,
    ) -> typing.List[str]:
        """
        Remove a file from the cache.

        Arguments:
            file_handle_id: Will also extract file handle id from either a File or file handle
            path:           If the given path is None, remove (and potentially delete)
                            all cached copies. If the path is that of a file in the
                            .cacheMap file, remove it.
            delete:         If True, delete the file from disk as well as removing it from the cache

        Returns:
            A list of files removed
        """
        removed = []
        cache_dir = self.get_cache_dir(file_handle_id)

        # if we've passed an entity and not a path, get path from entity
        if (
            path is None
            and isinstance(file_handle_id, collections.abc.Mapping)
            and "path" in file_handle_id
        ):
            path = file_handle_id["path"]

        with Lock(self.cache_map_file_name, dir=cache_dir):
            cache_map = self._read_cache_map(cache_dir)

            if path is None:
                for path in cache_map:
                    if delete is True and os.path.exists(path):
                        os.remove(path)
                    removed.append(path)
                cache_map = {}
            else:
                path = utils.normalize_path(path)
                if path in cache_map:
                    if delete is True and os.path.exists(path):
                        os.remove(path)
                    del cache_map[path]
                    removed.append(path)

            self._write_cache_map(cache_dir, cache_map)

        return removed

    def _cache_dirs(self):
        """
        Generate a list of all cache dirs, directories of the form:
        [cache.cache_root_dir]/949/59949
        """
        for item1 in os.listdir(self.cache_root_dir):
            path1 = os.path.join(self.cache_root_dir, item1)
            if os.path.isdir(path1) and re.match("\\d+", item1):
                for item2 in os.listdir(path1):
                    path2 = os.path.join(path1, item2)
                    if os.path.isdir(path2) and re.match("\\d+", item2):
                        yield path2

    def purge(
        self,
        before_date: typing.Union[datetime.datetime, int] = None,
        after_date: typing.Union[datetime.datetime, int] = None,
        dry_run: bool = False,
    ) -> int:
        """
        Purge the cache. Use with caution. Deletes files whose cache maps were last updated in a specified period.

        Deletes .cacheMap files and files stored in the cache.cache_root_dir, but does not delete files stored outside
        the cache.

        Arguments:
            before_date: If specified, all files before this date will be removed
            after_date:  If specified, all files after this date will be removed
            dry_run:     If dry_run is True, then the selected files are printed rather than removed

        Returns:
            The number of files selected for removal

        Example: Using this function
            Either the before_date or after_date must be specified. If both are passed, files between the two dates are
            selected for removal. Dates must either be a timezone naive Python datetime.datetime instance or the number
            of seconds since the unix epoch. For example to delete all the files modified in January 2021, either of the
            following can be used::

            using offset naive datetime objects

                cache.purge(after_date=datetime.datetime(2021, 1, 1), before_date=datetime.datetime(2021, 2, 1))

            using seconds since the unix epoch

                cache.purge(after_date=1609459200, before_date=1612137600)
        """
        if before_date is None and after_date is None:
            raise ValueError("Either before date or after date should be provided")

        if isinstance(before_date, datetime.datetime):
            before_date = utils.to_unix_epoch_time_secs(before_date)
        if isinstance(after_date, datetime.datetime):
            after_date = utils.to_unix_epoch_time_secs(after_date)

        if before_date and after_date and before_date < after_date:
            raise ValueError("Before date should be larger than after date")

        count = 0
        for cache_dir in self._cache_dirs():
            # _get_modified_time returns None if the cache map file doesn't
            # exist and n > None evaluates to True in python 2.7(wtf?). I'm guessing it's
            # OK to purge directories in the cache that have no .cacheMap file

            last_modified_time = _get_modified_time(
                os.path.join(cache_dir, self.cache_map_file_name)
            )
            if last_modified_time is None or (
                (not before_date or before_date > last_modified_time)
                and (not after_date or after_date < last_modified_time)
            ):
                if dry_run:
                    print(cache_dir)
                else:
                    shutil.rmtree(cache_dir)
                count += 1
        return count
Functions
contains(file_handle_id, path)

Given a file and file_handle_id, return True if an unmodified cached copy of the file exists at the exact path given or False otherwise.

PARAMETER DESCRIPTION
file_handle_id

The ID of the fileHandle

TYPE: Union[Mapping, str]

path

The file path at which to look for a cached copy

TYPE: str

Source code in synapseclient/core/cache.py
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
def contains(
    self, file_handle_id: typing.Union[collections.abc.Mapping, str], path: str
) -> bool:
    """
    Given a file and file_handle_id, return True if an unmodified cached
    copy of the file exists at the exact path given or False otherwise.

    Arguments:
        file_handle_id: The ID of the fileHandle
        path:           The file path at which to look for a cached copy
    """
    cache_dir = self.get_cache_dir(file_handle_id)
    if not os.path.exists(cache_dir):
        return False

    with Lock(self.cache_map_file_name, dir=cache_dir):
        cache_map = self._read_cache_map(cache_dir)

        path = utils.normalize_path(path)

        cached_time = self._get_cache_modified_time(cache_map.get(path, None))

        if cached_time:
            return compare_timestamps(_get_modified_time(path), cached_time)
    return False
get(file_handle_id, path=None)

Retrieve a file with the given file handle from the cache.

PARAMETER DESCRIPTION
file_handle_id

The ID of the fileHandle

TYPE: Union[Mapping, str]

path

If the given path is None, look for a cached copy of the file in the cache directory. If the path is a directory, look there for a cached copy. If a full file-path is given, only check whether that exact file exists and is unmodified since it was cached.

TYPE: str DEFAULT: None

RETURNS DESCRIPTION
Union[str, None]

Either a file path, if an unmodified cached copy of the file

Union[str, None]

exists in the specified location or None if it does not

Source code in synapseclient/core/cache.py
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
@tracer.start_as_current_span("cache::get")
def get(
    self,
    file_handle_id: typing.Union[collections.abc.Mapping, str],
    path: str = None,
) -> typing.Union[str, None]:
    """
    Retrieve a file with the given file handle from the cache.

    Arguments:
        file_handle_id: The ID of the fileHandle
        path:           If the given path is None, look for a cached copy of the
                        file in the cache directory. If the path is a directory,
                        look there for a cached copy. If a full file-path is
                        given, only check whether that exact file exists and is
                        unmodified since it was cached.

    Returns:
        Either a file path, if an unmodified cached copy of the file
        exists in the specified location or None if it does not
    """
    cache_dir = self.get_cache_dir(file_handle_id)
    trace.get_current_span().set_attributes(
        {
            "synapse.cache.dir": cache_dir,
            "synapse.cache.file_handle_id": file_handle_id,
        }
    )
    if not os.path.exists(cache_dir):
        trace.get_current_span().set_attributes({"synapse.cache.hit": False})
        return None

    with Lock(self.cache_map_file_name, dir=cache_dir):
        cache_map = self._read_cache_map(cache_dir)

        path = utils.normalize_path(path)

        # If the caller specifies a path and that path exists in the cache
        # but has been modified, we need to indicate no match by returning
        # None. The logic for updating a synapse entity depends on this to
        # determine the need to upload a new file.

        if path is not None:
            # If we're given a path to a directory, look for a cached file in that directory
            if os.path.isdir(path):
                matching_unmodified_directory = None
                removed_entry_from_cache = (
                    False  # determines if cache_map needs to be rewritten to disk
                )

                # iterate a copy of cache_map to allow modifying original cache_map
                for cached_file_path, cache_map_entry in dict(cache_map).items():
                    if path == os.path.dirname(cached_file_path):
                        if self._cache_item_unmodified(
                            cache_map_entry, cached_file_path
                        ):
                            # "break" instead of "return" to write removed invalid entries to disk if necessary
                            matching_unmodified_directory = cached_file_path
                            break
                        else:
                            # remove invalid cache entries pointing to files that that no longer exist
                            # or have been modified
                            del cache_map[cached_file_path]
                            removed_entry_from_cache = True

                if removed_entry_from_cache:
                    # write cache_map with non-existent entries removed
                    self._write_cache_map(cache_dir, cache_map)

                if matching_unmodified_directory is not None:
                    trace.get_current_span().set_attributes(
                        {"synapse.cache.hit": True}
                    )
                    return matching_unmodified_directory

            # if we're given a full file path, look up a matching file in the cache
            else:
                cache_map_entry = cache_map.get(path, None)
                if cache_map_entry:
                    matching_file_path = (
                        path
                        if self._cache_item_unmodified(cache_map_entry, path)
                        else None
                    )
                    trace.get_current_span().set_attributes(
                        {"synapse.cache.hit": matching_file_path is not None}
                    )
                    return matching_file_path

        # return most recently cached and unmodified file OR
        # None if there are no unmodified files
        for cached_file_path, cache_map_entry in sorted(
            cache_map.items(),
            key=lambda item: (
                item[1]["modified_time"] if isinstance(item[1], dict) else item[1]
            ),
            reverse=True,
        ):
            if self._cache_item_unmodified(cache_map_entry, cached_file_path):
                trace.get_current_span().set_attributes({"synapse.cache.hit": True})
                return cached_file_path

        trace.get_current_span().set_attributes({"synapse.cache.hit": False})
        return None
add(file_handle_id, path, md5=None)

Add a file to the cache

Source code in synapseclient/core/cache.py
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
def add(
    self,
    file_handle_id: typing.Union[collections.abc.Mapping, str],
    path: str,
    md5: str = None,
) -> dict:
    """
    Add a file to the cache
    """
    if not path or not os.path.exists(path):
        raise ValueError('Can\'t find file "%s"' % path)

    cache_dir = self.get_cache_dir(file_handle_id)
    with Lock(self.cache_map_file_name, dir=cache_dir):
        cache_map = self._read_cache_map(cache_dir)

        path = utils.normalize_path(path)
        # write .000 milliseconds for backward compatibility
        cache_map[path] = {
            "modified_time": epoch_time_to_iso(
                math.floor(_get_modified_time(path))
            ),
            "content_md5": md5 or utils.md5_for_file(path).hexdigest(),
        }
        self._write_cache_map(cache_dir, cache_map)

    return cache_map
remove(file_handle_id, path=None, delete=None)

Remove a file from the cache.

PARAMETER DESCRIPTION
file_handle_id

Will also extract file handle id from either a File or file handle

TYPE: Union[Mapping, str]

path

If the given path is None, remove (and potentially delete) all cached copies. If the path is that of a file in the .cacheMap file, remove it.

TYPE: str DEFAULT: None

delete

If True, delete the file from disk as well as removing it from the cache

TYPE: bool DEFAULT: None

RETURNS DESCRIPTION
List[str]

A list of files removed

Source code in synapseclient/core/cache.py
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
def remove(
    self,
    file_handle_id: typing.Union[collections.abc.Mapping, str],
    path: str = None,
    delete: bool = None,
) -> typing.List[str]:
    """
    Remove a file from the cache.

    Arguments:
        file_handle_id: Will also extract file handle id from either a File or file handle
        path:           If the given path is None, remove (and potentially delete)
                        all cached copies. If the path is that of a file in the
                        .cacheMap file, remove it.
        delete:         If True, delete the file from disk as well as removing it from the cache

    Returns:
        A list of files removed
    """
    removed = []
    cache_dir = self.get_cache_dir(file_handle_id)

    # if we've passed an entity and not a path, get path from entity
    if (
        path is None
        and isinstance(file_handle_id, collections.abc.Mapping)
        and "path" in file_handle_id
    ):
        path = file_handle_id["path"]

    with Lock(self.cache_map_file_name, dir=cache_dir):
        cache_map = self._read_cache_map(cache_dir)

        if path is None:
            for path in cache_map:
                if delete is True and os.path.exists(path):
                    os.remove(path)
                removed.append(path)
            cache_map = {}
        else:
            path = utils.normalize_path(path)
            if path in cache_map:
                if delete is True and os.path.exists(path):
                    os.remove(path)
                del cache_map[path]
                removed.append(path)

        self._write_cache_map(cache_dir, cache_map)

    return removed
purge(before_date=None, after_date=None, dry_run=False)

Purge the cache. Use with caution. Deletes files whose cache maps were last updated in a specified period.

Deletes .cacheMap files and files stored in the cache.cache_root_dir, but does not delete files stored outside the cache.

PARAMETER DESCRIPTION
before_date

If specified, all files before this date will be removed

TYPE: Union[datetime, int] DEFAULT: None

after_date

If specified, all files after this date will be removed

TYPE: Union[datetime, int] DEFAULT: None

dry_run

If dry_run is True, then the selected files are printed rather than removed

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
int

The number of files selected for removal

Using this function

Either the before_date or after_date must be specified. If both are passed, files between the two dates are selected for removal. Dates must either be a timezone naive Python datetime.datetime instance or the number of seconds since the unix epoch. For example to delete all the files modified in January 2021, either of the following can be used::

using offset naive datetime objects

cache.purge(after_date=datetime.datetime(2021, 1, 1), before_date=datetime.datetime(2021, 2, 1))

using seconds since the unix epoch

cache.purge(after_date=1609459200, before_date=1612137600)
Source code in synapseclient/core/cache.py
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
def purge(
    self,
    before_date: typing.Union[datetime.datetime, int] = None,
    after_date: typing.Union[datetime.datetime, int] = None,
    dry_run: bool = False,
) -> int:
    """
    Purge the cache. Use with caution. Deletes files whose cache maps were last updated in a specified period.

    Deletes .cacheMap files and files stored in the cache.cache_root_dir, but does not delete files stored outside
    the cache.

    Arguments:
        before_date: If specified, all files before this date will be removed
        after_date:  If specified, all files after this date will be removed
        dry_run:     If dry_run is True, then the selected files are printed rather than removed

    Returns:
        The number of files selected for removal

    Example: Using this function
        Either the before_date or after_date must be specified. If both are passed, files between the two dates are
        selected for removal. Dates must either be a timezone naive Python datetime.datetime instance or the number
        of seconds since the unix epoch. For example to delete all the files modified in January 2021, either of the
        following can be used::

        using offset naive datetime objects

            cache.purge(after_date=datetime.datetime(2021, 1, 1), before_date=datetime.datetime(2021, 2, 1))

        using seconds since the unix epoch

            cache.purge(after_date=1609459200, before_date=1612137600)
    """
    if before_date is None and after_date is None:
        raise ValueError("Either before date or after date should be provided")

    if isinstance(before_date, datetime.datetime):
        before_date = utils.to_unix_epoch_time_secs(before_date)
    if isinstance(after_date, datetime.datetime):
        after_date = utils.to_unix_epoch_time_secs(after_date)

    if before_date and after_date and before_date < after_date:
        raise ValueError("Before date should be larger than after date")

    count = 0
    for cache_dir in self._cache_dirs():
        # _get_modified_time returns None if the cache map file doesn't
        # exist and n > None evaluates to True in python 2.7(wtf?). I'm guessing it's
        # OK to purge directories in the cache that have no .cacheMap file

        last_modified_time = _get_modified_time(
            os.path.join(cache_dir, self.cache_map_file_name)
        )
        if last_modified_time is None or (
            (not before_date or before_date > last_modified_time)
            and (not after_date or after_date < last_modified_time)
        ):
            if dry_run:
                print(cache_dir)
            else:
                shutil.rmtree(cache_dir)
            count += 1
    return count

Functions

epoch_time_to_iso(epoch_time)

Convert seconds since unix epoch to a string in ISO format

Source code in synapseclient/core/cache.py
31
32
33
34
35
36
37
38
39
def epoch_time_to_iso(epoch_time):
    """
    Convert seconds since unix epoch to a string in ISO format
    """
    return (
        None
        if epoch_time is None
        else utils.datetime_to_iso(utils.from_unix_epoch_time_secs(epoch_time))
    )

iso_time_to_epoch(iso_time)

Convert an ISO formatted time into seconds since unix epoch

Source code in synapseclient/core/cache.py
42
43
44
45
46
47
48
49
50
def iso_time_to_epoch(iso_time):
    """
    Convert an ISO formatted time into seconds since unix epoch
    """
    return (
        None
        if iso_time is None
        else utils.to_unix_epoch_time_secs(utils.iso_to_datetime(iso_time))
    )

compare_timestamps(modified_time, cached_time)

Compare two ISO formatted timestamps, with a special case when cached_time ends in .000Z.

For backward compatibility, we always write .000 for milliseconds into the cache. We then match a cached time ending in .000Z, meaning zero milliseconds with a modified time with any number of milliseconds.

PARAMETER DESCRIPTION
modified_time

The float representing seconds since unix epoch

cached_time

The string holding a ISO formatted time

Source code in synapseclient/core/cache.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
def compare_timestamps(modified_time, cached_time):
    """
    Compare two ISO formatted timestamps, with a special case when cached_time ends in .000Z.

    For backward compatibility, we always write .000 for milliseconds into the cache.
    We then match a cached time ending in .000Z, meaning zero milliseconds with a modified time with any number of
    milliseconds.

    Arguments:
        modified_time: The float representing seconds since unix epoch
        cached_time:   The string holding a ISO formatted time
    """
    if cached_time is None or modified_time is None:
        return False
    if cached_time.endswith(".000Z"):
        return cached_time == epoch_time_to_iso(math.floor(modified_time))
    else:
        return cached_time == epoch_time_to_iso(modified_time)

Credentials

synapseclient.core.credentials.cred_data.SynapseCredentials

Bases: AuthBase, ABC

Source code in synapseclient/core/credentials/cred_data.py
11
12
13
14
15
16
17
18
19
20
class SynapseCredentials(requests.auth.AuthBase, abc.ABC):
    @property
    @abc.abstractmethod
    def username(self) -> None:
        """The username associated with these credentials."""

    @property
    @abc.abstractmethod
    def secret(self) -> None:
        """The secret associated with these credentials."""

Attributes

username: None abstractmethod property

The username associated with these credentials.

secret: None abstractmethod property

The secret associated with these credentials.

synapseclient.core.credentials.cred_data.SynapseAuthTokenCredentials

Bases: SynapseCredentials

Source code in synapseclient/core/credentials/cred_data.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
class SynapseAuthTokenCredentials(SynapseCredentials):
    @classmethod
    def get_keyring_service_name(
        cls,
    ) -> typing.Literal["SYNAPSE.ORG_CLIENT_AUTH_TOKEN"]:
        return "SYNAPSE.ORG_CLIENT_AUTH_TOKEN"

    @classmethod
    def _validate_token(cls, token):
        # decode the token to ensure it minimally has view scope.
        # if it doesn't raise an error, the client will not be useful without it.

        # if for any reason we are not able to decode the token and check its scopes
        # we do NOT raise an error. this is to accommodate the possibility of a changed
        # token format some day that this version of the client may still be able to
        # pass as a bearer token.
        try:
            token_body = json.loads(
                str(
                    base64.urlsafe_b64decode(
                        # we add padding to ensure that lack of padding won't prevent a decode error.
                        # the python base64 implementation will truncate extra padding so we can overpad
                        # rather than compute exactly how much padding we might need.
                        # https://stackoverflow.com/a/49459036
                        token.split(".")[1]
                        + "==="
                    ),
                    "utf-8",
                )
            )
            scopes = token_body.get("access", {}).get("scope")
            if scopes is not None and "view" not in scopes:
                raise SynapseAuthenticationError("A view scoped token is required")

        except (IndexError, ValueError):
            # possible errors if token is not encoded as expected:
            # IndexError if the token is not a '.' delimited base64 string with a header and body
            # ValueError if the split string is not base64 encoded or if the decoded base64 is not json
            pass

    def __init__(self, token, username=None):
        self._validate_token(token)

        self._token = token
        self.username = username

    @property
    def username(self) -> str:
        """The username associated with this token."""
        return self._username

    @username.setter
    def username(self, username: str) -> None:
        self._username = username

    @property
    def secret(self) -> str:
        """The bearer token."""
        return self._token

    def __call__(self, r):
        r.headers.update({"Authorization": f"Bearer {self.secret}"})
        return r

    def __repr__(self):
        return f"SynapseAuthTokenCredentials(username='{self.username}', token='{self.secret}')"

Attributes

username: str property writable

The username associated with this token.

secret: str property

The bearer token.

synapseclient.core.credentials.credential_provider

This module contains classes that are responsible for retrieving synapse authentication information (e.g. authToken) from a source (e.g. login args, config file).

Classes

SynapseCredentialsProvider

A credential provider is responsible for retrieving synapse authentication information (e.g. authToken) from a source (e.g. login args, config file), and use them to return a SynapseCredentials instance.

Source code in synapseclient/core/credentials/credential_provider.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
class SynapseCredentialsProvider(metaclass=abc.ABCMeta):
    """
    A credential provider is responsible for retrieving synapse authentication
    information (e.g. authToken) from a source (e.g. login args, config file),
    and use them to return a
    [SynapseCredentials][synapseclient.core.credentials.cred_data.SynapseCredentials]
    instance.
    """

    @abc.abstractmethod
    def _get_auth_info(
        self, syn: "Synapse", user_login_args: Dict[str, str]
    ) -> Tuple[None, None]:
        """
        Subclasses must implement this to decide how to obtain an authentication token.
        For any of these values, return None if it is not possible to get that value.

        Not all implementations will need to make use of the user_login_args parameter
        or syn. These parameters provide context about the Synapse client's configuration
        and login() arguments.

        Arguments:
            syn: Synapse client instance
            user_login_args: subset of arguments passed during syn.login()

        Returns:
            Tuple of (username, bearer authentication token e.g. a personal access token),
            any of these values could None if it is not available.
        """
        return None, None

    def get_synapse_credentials(
        self, syn: "Synapse", user_login_args: Dict[str, str]
    ) -> Union[SynapseCredentials, None]:
        """
        Returns
        [SynapseCredentials][synapseclient.core.credentials.cred_data.SynapseCredentials]
        if this provider is able to get valid credentials, returns None otherwise.

        Arguments:
            syn: Synapse client instance
            user_login_args: subset of arguments passed during syn.login()

        Returns:
            [SynapseCredentials][synapseclient.core.credentials.cred_data.SynapseCredentials]
                if valid credentials can be found by this provider, None otherwise.
        """
        return self._create_synapse_credential(
            syn, *self._get_auth_info(syn=syn, user_login_args=user_login_args)
        )

    def _create_synapse_credential(
        self, syn: "Synapse", username: str, auth_token: str
    ) -> Union[SynapseCredentials, None]:
        if auth_token is not None:
            credentials = SynapseAuthTokenCredentials(auth_token)
            profile = syn.restGET("/userProfile", auth=credentials)
            profile_username = profile.get("userName")
            profile_emails = profile.get("emails", [])

            if username and (
                username != profile_username and username not in profile_emails
            ):
                # a username/email is not required when logging in with an auth token
                # however if both are provided raise an error if they do not correspond
                # to avoid any ambiguity about what profile was logged in
                raise SynapseAuthenticationError(
                    "username/email and auth_token both provided but username does not "
                    "match token profile"
                )

            credentials.username = profile_username
            return credentials

        return None
Functions
get_synapse_credentials(syn, user_login_args)

Returns SynapseCredentials if this provider is able to get valid credentials, returns None otherwise.

PARAMETER DESCRIPTION
syn

Synapse client instance

TYPE: Synapse

user_login_args

subset of arguments passed during syn.login()

TYPE: Dict[str, str]

RETURNS DESCRIPTION
Union[SynapseCredentials, None]

SynapseCredentials if valid credentials can be found by this provider, None otherwise.

Source code in synapseclient/core/credentials/credential_provider.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def get_synapse_credentials(
    self, syn: "Synapse", user_login_args: Dict[str, str]
) -> Union[SynapseCredentials, None]:
    """
    Returns
    [SynapseCredentials][synapseclient.core.credentials.cred_data.SynapseCredentials]
    if this provider is able to get valid credentials, returns None otherwise.

    Arguments:
        syn: Synapse client instance
        user_login_args: subset of arguments passed during syn.login()

    Returns:
        [SynapseCredentials][synapseclient.core.credentials.cred_data.SynapseCredentials]
            if valid credentials can be found by this provider, None otherwise.
    """
    return self._create_synapse_credential(
        syn, *self._get_auth_info(syn=syn, user_login_args=user_login_args)
    )

UserArgsCredentialsProvider

Bases: SynapseCredentialsProvider

Retrieves auth info from user_login_args during a CLI session.

Source code in synapseclient/core/credentials/credential_provider.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
class UserArgsCredentialsProvider(SynapseCredentialsProvider):
    """
    Retrieves auth info from user_login_args during a CLI session.
    """

    def _get_auth_info(
        self, syn: "Synapse", user_login_args: Dict[str, str]
    ) -> Tuple[str, str]:
        return (
            user_login_args.username,
            user_login_args.auth_token,
        )

ConfigFileCredentialsProvider

Bases: SynapseCredentialsProvider

Retrieves auth info from the ~/.synapseConfig file

Source code in synapseclient/core/credentials/credential_provider.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
class ConfigFileCredentialsProvider(SynapseCredentialsProvider):
    """
    Retrieves auth info from the `~/.synapseConfig` file
    """

    def _get_auth_info(
        self, syn: "Synapse", user_login_args: Dict[str, str]
    ) -> Tuple[Union[str, None], Union[str, None]]:
        config_dict = syn._get_config_authentication()
        # check to make sure we didn't accidentally provide the wrong user

        username = config_dict.get("username")
        token = config_dict.get("authtoken")

        if user_login_args.username and username != user_login_args.username:
            # if the username is provided and there is a config file username but they
            # don't match then we don't use any of the values from the config
            # to prevent ambiguity
            username = None
            token = None
            syn.logger.warning(
                f"{user_login_args.username} was defined in the user login "
                "arguments, however, it is also defined in the `~/.synapseConfig` "
                "file. Becuase they do not match we will not use the `authtoken` "
                "in the `~/.synapseConfig` file.",
            )

        return username, token

AWSParameterStoreCredentialsProvider

Bases: SynapseCredentialsProvider

Retrieves user's authentication token from AWS SSM Parameter store

Source code in synapseclient/core/credentials/credential_provider.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
class AWSParameterStoreCredentialsProvider(SynapseCredentialsProvider):
    """
    Retrieves user's authentication token from AWS SSM Parameter store
    """

    ENVIRONMENT_VAR_NAME = "SYNAPSE_TOKEN_AWS_SSM_PARAMETER_NAME"

    def _get_auth_info(
        self, syn: "Synapse", user_login_args: Dict[str, str]
    ) -> Tuple[Union[str, None], Union[str, None]]:
        ssm_param_name = os.environ.get(self.ENVIRONMENT_VAR_NAME)
        token = None
        if ssm_param_name:
            try:
                import boto3
                import botocore

                ssm_client = boto3.client("ssm")
                result = ssm_client.get_parameter(
                    Name=ssm_param_name,
                    WithDecryption=True,
                )
                token = result["Parameter"]["Value"]
            except ImportError:
                syn.logger.warning(
                    f"{self.ENVIRONMENT_VAR_NAME} was defined as {ssm_param_name}, "
                    'but "boto3" could not be imported. The Synapse client uses "boto3" '
                    "in order to access Systems Manager Parameter Storage. Please ensure "
                    'that you have installed "boto3" to enable this feature.'
                )
            # this except block must be defined after the ImportError except block
            # otherwise, there's no guarantee "botocore" is already imported and defined
            except botocore.exceptions.ClientError:
                syn.logger.warning(
                    f"{self.ENVIRONMENT_VAR_NAME} was defined as {ssm_param_name}, "
                    "but the matching parameter name could not be found in AWS Parameter "
                    "Store. Caused by AWS error:\n",
                    exc_info=True,
                )

        # if username is included in user's arguments, return it so that
        # it may be validated against the username authenticated by the token
        return user_login_args.username, token

EnvironmentVariableCredentialsProvider

Bases: SynapseCredentialsProvider

Retrieves the user's authentication token from an environment variable

Source code in synapseclient/core/credentials/credential_provider.py
186
187
188
189
190
191
192
193
194
195
196
197
198
199
class EnvironmentVariableCredentialsProvider(SynapseCredentialsProvider):
    """
    Retrieves the user's authentication token from an environment variable
    """

    ENVIRONMENT_VAR_NAME = "SYNAPSE_AUTH_TOKEN"

    def _get_auth_info(
        self, syn: "Synapse", user_login_args: Dict[str, str]
    ) -> Tuple[Union[str, None], Union[str, None]]:
        return (
            user_login_args.username,
            os.environ.get(self.ENVIRONMENT_VAR_NAME),
        )

SynapseCredentialsProviderChain

Bases: object

Class that has a list of SynapseCredentialsProvider from which this class attempts to retrieve SynapseCredentials.

By default this class uses the following providers in this order:

  1. ConfigFileCredentialsProvider
  2. UserArgsCredentialsProvider
  3. EnvironmentVariableCredentialsProvider
  4. AWSParameterStoreCredentialsProvider
ATTRIBUTE DESCRIPTION
cred_providers

list of (SynapseCredentialsProvider) credential providers

Source code in synapseclient/core/credentials/credential_provider.py
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
class SynapseCredentialsProviderChain(object):
    """
    Class that has a list of
    [SynapseCredentialsProvider][synapseclient.core.credentials.credential_provider.SynapseCredentialsProvider]
    from which this class attempts to retrieve
    [SynapseCredentials][synapseclient.core.credentials.cred_data.SynapseCredentials].


    By default this class uses the following providers in this order:

    1. [ConfigFileCredentialsProvider][synapseclient.core.credentials.credential_provider.ConfigFileCredentialsProvider]
    2. [UserArgsCredentialsProvider][synapseclient.core.credentials.credential_provider.UserArgsCredentialsProvider]
    3. [EnvironmentVariableCredentialsProvider][synapseclient.core.credentials.credential_provider.EnvironmentVariableCredentialsProvider]
    4. [AWSParameterStoreCredentialsProvider][synapseclient.core.credentials.credential_provider.AWSParameterStoreCredentialsProvider]

    Attributes:
        cred_providers: list of
            ([SynapseCredentialsProvider][synapseclient.core.credentials.credential_provider.SynapseCredentialsProvider])
            credential providers
    """

    def __init__(self, cred_providers) -> None:
        self.cred_providers = list(cred_providers)

    def get_credentials(
        self, syn: "Synapse", user_login_args: Dict[str, str]
    ) -> Union[SynapseCredentials, None]:
        """
        Iterates its list of
        [SynapseCredentialsProvider][synapseclient.core.credentials.credential_provider.SynapseCredentialsProvider]
        and returns the first non-None
        [SynapseCredentials][synapseclient.core.credentials.cred_data.SynapseCredentials]
        returned by a provider. If no provider is able to provide a
        [SynapseCredentials][synapseclient.core.credentials.cred_data.SynapseCredentials],
        returns None.

        Arguments:
            syn: Synapse client instance
            user_login_args: subset of arguments passed during syn.login()

        Returns:
            [SynapseCredentials][synapseclient.core.credentials.cred_data.SynapseCredentials]
                returned by the first non-None provider in its list, None otherwise
        """
        for provider in self.cred_providers:
            creds = provider.get_synapse_credentials(syn, user_login_args)
            if creds is not None:
                return creds
        return None
Functions
get_credentials(syn, user_login_args)

Iterates its list of SynapseCredentialsProvider and returns the first non-None SynapseCredentials returned by a provider. If no provider is able to provide a SynapseCredentials, returns None.

PARAMETER DESCRIPTION
syn

Synapse client instance

TYPE: Synapse

user_login_args

subset of arguments passed during syn.login()

TYPE: Dict[str, str]

RETURNS DESCRIPTION
Union[SynapseCredentials, None]

SynapseCredentials returned by the first non-None provider in its list, None otherwise

Source code in synapseclient/core/credentials/credential_provider.py
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
def get_credentials(
    self, syn: "Synapse", user_login_args: Dict[str, str]
) -> Union[SynapseCredentials, None]:
    """
    Iterates its list of
    [SynapseCredentialsProvider][synapseclient.core.credentials.credential_provider.SynapseCredentialsProvider]
    and returns the first non-None
    [SynapseCredentials][synapseclient.core.credentials.cred_data.SynapseCredentials]
    returned by a provider. If no provider is able to provide a
    [SynapseCredentials][synapseclient.core.credentials.cred_data.SynapseCredentials],
    returns None.

    Arguments:
        syn: Synapse client instance
        user_login_args: subset of arguments passed during syn.login()

    Returns:
        [SynapseCredentials][synapseclient.core.credentials.cred_data.SynapseCredentials]
            returned by the first non-None provider in its list, None otherwise
    """
    for provider in self.cred_providers:
        creds = provider.get_synapse_credentials(syn, user_login_args)
        if creds is not None:
            return creds
    return None

Functions

get_default_credential_chain()

Creates and uses a default credential chain to retrieve SynapseCredentials. The order this is returned is the order in which the credential providers are attempted.

RETURNS DESCRIPTION
SynapseCredentialsProviderChain

credential chain

Source code in synapseclient/core/credentials/credential_provider.py
266
267
268
269
270
271
272
273
274
275
276
def get_default_credential_chain() -> SynapseCredentialsProviderChain:
    """
    Creates and uses a default credential chain to retrieve
    [SynapseCredentials][synapseclient.core.credentials.cred_data.SynapseCredentials].
    The order this is returned is the order in which the credential providers
    are attempted.

    Returns:
        credential chain
    """
    return DEFAULT_CREDENTIAL_PROVIDER_CHAIN

Remote File Storage Wrappers

synapseclient.core.remote_file_storage_wrappers

Wrappers for remote file storage clients like S3 and SFTP.

Classes

S3ClientWrapper

Wrapper class for S3 client.

Source code in synapseclient/core/remote_file_storage_wrappers.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
class S3ClientWrapper:
    """
    Wrapper class for S3 client.
    """

    # These methods are static because in our use case, we always have the bucket and
    # endpoint and usually only call the download/upload once so there is no need to instantiate multiple objects

    @staticmethod
    def _attempt_import_boto3():
        """
        Check if boto3 installed and give instructions if not.

        Returns:
            The boto3 module or instructions to install it if unavailable
        """
        return attempt_import(
            "boto3",
            "\n\nLibraries required for client authenticated S3 access are not installed!\n"
            "The Synapse client uses boto3 in order to access S3-like storage "
            "locations.\n",
        )

    @staticmethod
    def _create_progress_callback_func(
        progress_bar: tqdm,
    ) -> callable:
        """
        Creates a progress callback function for tracking the progress of a file transfer.

        Arguments:
            file_size: The total size of the file being transferred.
            filename: The name of the file being transferred.
            prefix: A prefix to display before the progress bar. Defaults to None.

        Returns:
            progress_callback: The progress callback function.
        """

        def progress_callback(transferred_bytes: int) -> None:
            """
            Update the progress of a transfer.

            Arguments:
                bytes: The number of bytes transferred.
            """
            progress_bar.update(transferred_bytes)

        return progress_callback

    @staticmethod
    def download_file(
        bucket: str,
        endpoint_url: str,
        remote_file_key: str,
        download_file_path: str,
        *,
        profile_name: str = None,
        credentials: typing.Dict[str, str] = None,
        show_progress: bool = True,
        transfer_config_kwargs: dict = None,
    ) -> str:
        """
        Download a file from s3 using boto3.

        Arguments:
            bucket: name of bucket to upload to
            endpoint_url: a boto3 compatible endpoint url
            remote_file_key: object key to upload the file to
            download_file_path: local path to save the file to
            profile_name: AWS profile name from local aws config, **mutually exclusive with credentials**
            credentials: a dictionary of AWS credentials to use, **mutually exclusive with profile_name**

                Expected items:

                - `aws_access_key_id`
                - `aws_secret_access_key`
                - `aws_session_token`
            show_progress: whether to print progress indicator to console
            transfer_config_kwargs: boto S3 transfer configuration (see boto3.s3.transfer.TransferConfig)

        Returns:
            download_file_path: S3 path of the file

        Raises:
            ValueError: If the key does not exist in the bucket.
            botocore.exceptions.ClientError: If there is an error with the S3 client.
        """

        S3ClientWrapper._attempt_import_boto3()

        import boto3.s3.transfer
        import botocore

        transfer_config = boto3.s3.transfer.TransferConfig(
            **(transfer_config_kwargs or {})
        )

        session_args = credentials if credentials else {"profile_name": profile_name}
        boto_session = boto3.session.Session(**session_args)
        s3 = boto_session.resource("s3", endpoint_url=endpoint_url)

        try:
            s3_obj = s3.Object(bucket, remote_file_key)

            progress_callback = None
            progress_bar = None
            if show_progress:
                s3_obj.load()
                file_size = s3_obj.content_length
                filename = os.path.basename(download_file_path)
                progress_bar = tqdm(
                    total=file_size,
                    desc="Downloading",
                    unit="B",
                    unit_scale=True,
                    postfix=filename,
                    smoothing=0,
                )
                progress_callback = S3ClientWrapper._create_progress_callback_func(
                    progress_bar
                )

            s3_obj.download_file(
                download_file_path,
                Callback=progress_callback,
                Config=transfer_config,
            )
            if progress_bar:
                progress_bar.close()

            return download_file_path

        except botocore.exceptions.ClientError as e:
            if e.response["Error"]["Code"] == "404":
                raise ValueError(
                    "The key:%s does not exist in bucket:%s.", remote_file_key, bucket
                )
            else:
                raise

    @staticmethod
    def upload_file(
        bucket: str,
        endpoint_url: typing.Optional[str],
        remote_file_key: str,
        upload_file_path: str,
        *,
        profile_name: str = None,
        credentials: typing.Dict[str, str] = None,
        show_progress: bool = True,
        transfer_config_kwargs: dict = None,
        storage_str: str = None,
    ) -> str:
        """
        Upload a file to s3 using boto3.

        Arguments:
            bucket: name of bucket to upload to
            endpoint_url: a boto3 compatible endpoint url
            remote_file_key: object key to upload the file to
            upload_file_path: local path of the file to upload
            profile_name: AWS profile name from local aws config, **mutually exclusive with credentials**
            credentials: a dictionary of AWS credentials to use, **mutually exclusive with profile_name**

                Expected items:

                - `aws_access_key_id`
                - `aws_secret_access_key`
                - `aws_session_token`
            show_progress: whether to print progress indicator to console
            transfer_config_kwargs: boto S3 transfer configuration (see boto3.s3.transfer.TransferConfig)

        Returns:
            upload_file_path: S3 path of the file

        Raises:
            ValueError: If the path does not exist or is not a file
            botocore.exceptions.ClientError: If there is an error with the S3 client.
        """

        if not os.path.isfile(upload_file_path):
            raise ValueError(
                "The path: [%s] does not exist or is not a file", upload_file_path
            )

        S3ClientWrapper._attempt_import_boto3()
        import boto3.s3.transfer

        transfer_config = boto3.s3.transfer.TransferConfig(
            **(transfer_config_kwargs or {})
        )

        session_args = credentials if credentials else {"profile_name": profile_name}
        boto_session = boto3.session.Session(**session_args)
        s3 = boto_session.resource("s3", endpoint_url=endpoint_url)

        progress_callback = None
        progress_bar = None
        if show_progress:
            file_size = os.stat(upload_file_path).st_size
            filename = os.path.basename(upload_file_path)
            progress_bar = tqdm(
                total=file_size,
                desc=storage_str,
                unit="B",
                unit_scale=True,
                postfix=filename,
                smoothing=0,
            )
            progress_callback = S3ClientWrapper._create_progress_callback_func(
                progress_bar
            )

        # automatically determines whether to perform multi-part upload
        s3.Bucket(bucket).upload_file(
            upload_file_path,
            remote_file_key,
            Callback=progress_callback,
            Config=transfer_config,
            ExtraArgs={"ACL": "bucket-owner-full-control"},
        )
        if progress_bar:
            progress_bar.close()
        return upload_file_path
Functions
download_file(bucket, endpoint_url, remote_file_key, download_file_path, *, profile_name=None, credentials=None, show_progress=True, transfer_config_kwargs=None) staticmethod

Download a file from s3 using boto3.

PARAMETER DESCRIPTION
bucket

name of bucket to upload to

TYPE: str

endpoint_url

a boto3 compatible endpoint url

TYPE: str

remote_file_key

object key to upload the file to

TYPE: str

download_file_path

local path to save the file to

TYPE: str

profile_name

AWS profile name from local aws config, mutually exclusive with credentials

TYPE: str DEFAULT: None

credentials

a dictionary of AWS credentials to use, mutually exclusive with profile_name

Expected items:

  • aws_access_key_id
  • aws_secret_access_key
  • aws_session_token

TYPE: Dict[str, str] DEFAULT: None

show_progress

whether to print progress indicator to console

TYPE: bool DEFAULT: True

transfer_config_kwargs

boto S3 transfer configuration (see boto3.s3.transfer.TransferConfig)

TYPE: dict DEFAULT: None

RETURNS DESCRIPTION
download_file_path

S3 path of the file

TYPE: str

RAISES DESCRIPTION
ValueError

If the key does not exist in the bucket.

ClientError

If there is an error with the S3 client.

Source code in synapseclient/core/remote_file_storage_wrappers.py
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
@staticmethod
def download_file(
    bucket: str,
    endpoint_url: str,
    remote_file_key: str,
    download_file_path: str,
    *,
    profile_name: str = None,
    credentials: typing.Dict[str, str] = None,
    show_progress: bool = True,
    transfer_config_kwargs: dict = None,
) -> str:
    """
    Download a file from s3 using boto3.

    Arguments:
        bucket: name of bucket to upload to
        endpoint_url: a boto3 compatible endpoint url
        remote_file_key: object key to upload the file to
        download_file_path: local path to save the file to
        profile_name: AWS profile name from local aws config, **mutually exclusive with credentials**
        credentials: a dictionary of AWS credentials to use, **mutually exclusive with profile_name**

            Expected items:

            - `aws_access_key_id`
            - `aws_secret_access_key`
            - `aws_session_token`
        show_progress: whether to print progress indicator to console
        transfer_config_kwargs: boto S3 transfer configuration (see boto3.s3.transfer.TransferConfig)

    Returns:
        download_file_path: S3 path of the file

    Raises:
        ValueError: If the key does not exist in the bucket.
        botocore.exceptions.ClientError: If there is an error with the S3 client.
    """

    S3ClientWrapper._attempt_import_boto3()

    import boto3.s3.transfer
    import botocore

    transfer_config = boto3.s3.transfer.TransferConfig(
        **(transfer_config_kwargs or {})
    )

    session_args = credentials if credentials else {"profile_name": profile_name}
    boto_session = boto3.session.Session(**session_args)
    s3 = boto_session.resource("s3", endpoint_url=endpoint_url)

    try:
        s3_obj = s3.Object(bucket, remote_file_key)

        progress_callback = None
        progress_bar = None
        if show_progress:
            s3_obj.load()
            file_size = s3_obj.content_length
            filename = os.path.basename(download_file_path)
            progress_bar = tqdm(
                total=file_size,
                desc="Downloading",
                unit="B",
                unit_scale=True,
                postfix=filename,
                smoothing=0,
            )
            progress_callback = S3ClientWrapper._create_progress_callback_func(
                progress_bar
            )

        s3_obj.download_file(
            download_file_path,
            Callback=progress_callback,
            Config=transfer_config,
        )
        if progress_bar:
            progress_bar.close()

        return download_file_path

    except botocore.exceptions.ClientError as e:
        if e.response["Error"]["Code"] == "404":
            raise ValueError(
                "The key:%s does not exist in bucket:%s.", remote_file_key, bucket
            )
        else:
            raise
upload_file(bucket, endpoint_url, remote_file_key, upload_file_path, *, profile_name=None, credentials=None, show_progress=True, transfer_config_kwargs=None, storage_str=None) staticmethod

Upload a file to s3 using boto3.

PARAMETER DESCRIPTION
bucket

name of bucket to upload to

TYPE: str

endpoint_url

a boto3 compatible endpoint url

TYPE: Optional[str]

remote_file_key

object key to upload the file to

TYPE: str

upload_file_path

local path of the file to upload

TYPE: str

profile_name

AWS profile name from local aws config, mutually exclusive with credentials

TYPE: str DEFAULT: None

credentials

a dictionary of AWS credentials to use, mutually exclusive with profile_name

Expected items:

  • aws_access_key_id
  • aws_secret_access_key
  • aws_session_token

TYPE: Dict[str, str] DEFAULT: None

show_progress

whether to print progress indicator to console

TYPE: bool DEFAULT: True

transfer_config_kwargs

boto S3 transfer configuration (see boto3.s3.transfer.TransferConfig)

TYPE: dict DEFAULT: None

RETURNS DESCRIPTION
upload_file_path

S3 path of the file

TYPE: str

RAISES DESCRIPTION
ValueError

If the path does not exist or is not a file

ClientError

If there is an error with the S3 client.

Source code in synapseclient/core/remote_file_storage_wrappers.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
@staticmethod
def upload_file(
    bucket: str,
    endpoint_url: typing.Optional[str],
    remote_file_key: str,
    upload_file_path: str,
    *,
    profile_name: str = None,
    credentials: typing.Dict[str, str] = None,
    show_progress: bool = True,
    transfer_config_kwargs: dict = None,
    storage_str: str = None,
) -> str:
    """
    Upload a file to s3 using boto3.

    Arguments:
        bucket: name of bucket to upload to
        endpoint_url: a boto3 compatible endpoint url
        remote_file_key: object key to upload the file to
        upload_file_path: local path of the file to upload
        profile_name: AWS profile name from local aws config, **mutually exclusive with credentials**
        credentials: a dictionary of AWS credentials to use, **mutually exclusive with profile_name**

            Expected items:

            - `aws_access_key_id`
            - `aws_secret_access_key`
            - `aws_session_token`
        show_progress: whether to print progress indicator to console
        transfer_config_kwargs: boto S3 transfer configuration (see boto3.s3.transfer.TransferConfig)

    Returns:
        upload_file_path: S3 path of the file

    Raises:
        ValueError: If the path does not exist or is not a file
        botocore.exceptions.ClientError: If there is an error with the S3 client.
    """

    if not os.path.isfile(upload_file_path):
        raise ValueError(
            "The path: [%s] does not exist or is not a file", upload_file_path
        )

    S3ClientWrapper._attempt_import_boto3()
    import boto3.s3.transfer

    transfer_config = boto3.s3.transfer.TransferConfig(
        **(transfer_config_kwargs or {})
    )

    session_args = credentials if credentials else {"profile_name": profile_name}
    boto_session = boto3.session.Session(**session_args)
    s3 = boto_session.resource("s3", endpoint_url=endpoint_url)

    progress_callback = None
    progress_bar = None
    if show_progress:
        file_size = os.stat(upload_file_path).st_size
        filename = os.path.basename(upload_file_path)
        progress_bar = tqdm(
            total=file_size,
            desc=storage_str,
            unit="B",
            unit_scale=True,
            postfix=filename,
            smoothing=0,
        )
        progress_callback = S3ClientWrapper._create_progress_callback_func(
            progress_bar
        )

    # automatically determines whether to perform multi-part upload
    s3.Bucket(bucket).upload_file(
        upload_file_path,
        remote_file_key,
        Callback=progress_callback,
        Config=transfer_config,
        ExtraArgs={"ACL": "bucket-owner-full-control"},
    )
    if progress_bar:
        progress_bar.close()
    return upload_file_path

SFTPWrapper

Source code in synapseclient/core/remote_file_storage_wrappers.py
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
class SFTPWrapper:
    @staticmethod
    def _attempt_import_sftp():
        """
        Check if pysftp is installed and give instructions if not.

        Returns:
            The pysftp module if available
        """
        return attempt_import(
            "pysftp",
            "\n\nLibraries required for SFTP are not installed!\n"
            "The Synapse client uses pysftp in order to access SFTP storage locations. "
            "This library in turn depends on pycrypto.\n"
            "For Windows systems without a C/C++ compiler, install the appropriate binary "
            "distribution of pycrypto from:\n"
            "http://www.voidspace.org.uk/python/modules.shtml#pycrypto\n\n"
            "For more information, see: http://python-docs.synapse.org/build/html/sftp.html",
        )

    @staticmethod
    def _parse_for_sftp(url):
        parsedURL = urllib_parse.urlparse(url)
        if parsedURL.scheme != "sftp":
            raise (
                NotImplementedError(
                    "This method only supports sftp URLs of the form sftp://..."
                )
            )
        return parsedURL

    @staticmethod
    def upload_file(
        filepath: str,
        url: str,
        username: str = None,
        password: str = None,
        storage_str: str = None,
    ) -> str:
        """
        Performs upload of a local file to an sftp server.

        Arguments:
            filepath: The path to the file to be uploaded.
            url: URL where file will be deposited. Should include path and protocol. e.g.
                        sftp://sftp.example.com/path/to/file/store
            username: The username for authentication. Defaults to None.
            password: The password for authentication. Defaults to None.

        Returns:
            The URL of the uploaded file.
        """
        progress_bar = tqdm(
            desc=storage_str,
            unit="B",
            unit_scale=True,
            smoothing=0,
            postfix=filepath,
        )

        def progress_callback(*args, **kwargs) -> None:
            if not progress_bar.total:
                progress_bar.total = args[1]
            progress_bar.update(args[0] - progress_bar.n)

        parsedURL = SFTPWrapper._parse_for_sftp(url)
        with _retry_pysftp_connection(
            parsedURL.hostname, username=username, password=password
        ) as sftp:
            sftp.makedirs(parsedURL.path)
            with sftp.cd(parsedURL.path):
                sftp.put(filepath, preserve_mtime=True, callback=progress_callback)
        progress_bar.close()
        path = urllib_parse.quote(parsedURL.path + "/" + os.path.split(filepath)[-1])
        parsedURL = parsedURL._replace(path=path)
        return urllib_parse.urlunparse(parsedURL)

    @staticmethod
    def download_file(
        url: str,
        localFilepath: str = None,
        username: str = None,
        password: str = None,
        show_progress: bool = True,
    ) -> str:
        """
        Performs download of a file from an sftp server.

        Arguments:
            url: URL where file will be deposited.  Path will be chopped out.
            localFilepath: location where to store file
            username: username on server
            password: password for authentication on  server
            show_progress: whether to print progress indicator to console

        Returns:
            The local filepath where the file was saved.
        """

        parsedURL = SFTPWrapper._parse_for_sftp(url)

        # Create the local file path if it doesn't exist
        path = urllib_parse.unquote(parsedURL.path)
        if localFilepath is None:
            localFilepath = os.getcwd()
        if os.path.isdir(localFilepath):
            localFilepath = os.path.join(localFilepath, path.split("/")[-1])
        # Check and create the directory
        dir = os.path.dirname(localFilepath)
        if not os.path.exists(dir):
            os.makedirs(dir)

        # Download file
        with _retry_pysftp_connection(
            parsedURL.hostname, username=username, password=password
        ) as sftp:
            sftp.get(
                path,
                localFilepath,
                preserve_mtime=True,
                callback=(printTransferProgress if show_progress else None),
            )
        return localFilepath
Functions
upload_file(filepath, url, username=None, password=None, storage_str=None) staticmethod

Performs upload of a local file to an sftp server.

PARAMETER DESCRIPTION
filepath

The path to the file to be uploaded.

TYPE: str

url

URL where file will be deposited. Should include path and protocol. e.g. sftp://sftp.example.com/path/to/file/store

TYPE: str

username

The username for authentication. Defaults to None.

TYPE: str DEFAULT: None

password

The password for authentication. Defaults to None.

TYPE: str DEFAULT: None

RETURNS DESCRIPTION
str

The URL of the uploaded file.

Source code in synapseclient/core/remote_file_storage_wrappers.py
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
@staticmethod
def upload_file(
    filepath: str,
    url: str,
    username: str = None,
    password: str = None,
    storage_str: str = None,
) -> str:
    """
    Performs upload of a local file to an sftp server.

    Arguments:
        filepath: The path to the file to be uploaded.
        url: URL where file will be deposited. Should include path and protocol. e.g.
                    sftp://sftp.example.com/path/to/file/store
        username: The username for authentication. Defaults to None.
        password: The password for authentication. Defaults to None.

    Returns:
        The URL of the uploaded file.
    """
    progress_bar = tqdm(
        desc=storage_str,
        unit="B",
        unit_scale=True,
        smoothing=0,
        postfix=filepath,
    )

    def progress_callback(*args, **kwargs) -> None:
        if not progress_bar.total:
            progress_bar.total = args[1]
        progress_bar.update(args[0] - progress_bar.n)

    parsedURL = SFTPWrapper._parse_for_sftp(url)
    with _retry_pysftp_connection(
        parsedURL.hostname, username=username, password=password
    ) as sftp:
        sftp.makedirs(parsedURL.path)
        with sftp.cd(parsedURL.path):
            sftp.put(filepath, preserve_mtime=True, callback=progress_callback)
    progress_bar.close()
    path = urllib_parse.quote(parsedURL.path + "/" + os.path.split(filepath)[-1])
    parsedURL = parsedURL._replace(path=path)
    return urllib_parse.urlunparse(parsedURL)
download_file(url, localFilepath=None, username=None, password=None, show_progress=True) staticmethod

Performs download of a file from an sftp server.

PARAMETER DESCRIPTION
url

URL where file will be deposited. Path will be chopped out.

TYPE: str

localFilepath

location where to store file

TYPE: str DEFAULT: None

username

username on server

TYPE: str DEFAULT: None

password

password for authentication on server

TYPE: str DEFAULT: None

show_progress

whether to print progress indicator to console

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
str

The local filepath where the file was saved.

Source code in synapseclient/core/remote_file_storage_wrappers.py
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
@staticmethod
def download_file(
    url: str,
    localFilepath: str = None,
    username: str = None,
    password: str = None,
    show_progress: bool = True,
) -> str:
    """
    Performs download of a file from an sftp server.

    Arguments:
        url: URL where file will be deposited.  Path will be chopped out.
        localFilepath: location where to store file
        username: username on server
        password: password for authentication on  server
        show_progress: whether to print progress indicator to console

    Returns:
        The local filepath where the file was saved.
    """

    parsedURL = SFTPWrapper._parse_for_sftp(url)

    # Create the local file path if it doesn't exist
    path = urllib_parse.unquote(parsedURL.path)
    if localFilepath is None:
        localFilepath = os.getcwd()
    if os.path.isdir(localFilepath):
        localFilepath = os.path.join(localFilepath, path.split("/")[-1])
    # Check and create the directory
    dir = os.path.dirname(localFilepath)
    if not os.path.exists(dir):
        os.makedirs(dir)

    # Download file
    with _retry_pysftp_connection(
        parsedURL.hostname, username=username, password=password
    ) as sftp:
        sftp.get(
            path,
            localFilepath,
            preserve_mtime=True,
            callback=(printTransferProgress if show_progress else None),
        )
    return localFilepath

Functions

Retry

synapseclient.core.retry

A helper tool that allows the Python client to make more than one attempt at connecting to the server if initially met with an error. These retry attempts can be made under certain conditions, i.e. for certain status codes, connection errors, and/or connection exceptions.

Functions

with_retry(function, verbose=False, retry_status_codes=[429, 500, 502, 503, 504], expected_status_codes=[], retry_errors=[], retry_exceptions=[], retries=DEFAULT_RETRIES, wait=DEFAULT_WAIT, back_off=DEFAULT_BACK_OFF, max_wait=DEFAULT_MAX_WAIT)

Retries the given function under certain conditions.

PARAMETER DESCRIPTION
function

A function with no arguments. If arguments are needed, use a lambda (see example).

retry_status_codes

What status codes to retry upon in the case of a SynapseHTTPError.

DEFAULT: [429, 500, 502, 503, 504]

expected_status_codes

If specified responses with any other status codes result in a retry.

DEFAULT: []

retry_errors

What reasons to retry upon, if function().response.json()['reason'] exists.

DEFAULT: []

retry_exceptions

What types of exceptions, specified as strings or Exception classes, to retry upon.

DEFAULT: []

retries

How many times to retry maximum.

DEFAULT: DEFAULT_RETRIES

wait

How many seconds to wait between retries.

DEFAULT: DEFAULT_WAIT

back_off

Exponential constant to increase wait for between progressive failures.

DEFAULT: DEFAULT_BACK_OFF

max_wait

back_off between requests will not exceed this value

DEFAULT: DEFAULT_MAX_WAIT

RETURNS DESCRIPTION

function()

Using with_retry

Using with_retry to consolidate inputs into a list.

from synapseclient.core.retry import with_retry

def foo(a, b, c): return [a, b, c]
result = with_retry(lambda: foo("1", "2", "3"), **STANDARD_RETRY_PARAMS)
Source code in synapseclient/core/retry.py
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
def with_retry(
    function,
    verbose=False,
    retry_status_codes=[429, 500, 502, 503, 504],
    expected_status_codes=[],
    retry_errors=[],
    retry_exceptions=[],
    retries=DEFAULT_RETRIES,
    wait=DEFAULT_WAIT,
    back_off=DEFAULT_BACK_OFF,
    max_wait=DEFAULT_MAX_WAIT,
):
    """
    Retries the given function under certain conditions.

    Arguments:
        function: A function with no arguments.  If arguments are needed, use a lambda (see example).
        retry_status_codes: What status codes to retry upon in the case of a SynapseHTTPError.
        expected_status_codes: If specified responses with any other status codes result in a retry.
        retry_errors: What reasons to retry upon, if function().response.json()['reason'] exists.
        retry_exceptions: What types of exceptions, specified as strings or Exception classes, to retry upon.
        retries: How many times to retry maximum.
        wait: How many seconds to wait between retries.
        back_off: Exponential constant to increase wait for between progressive failures.
        max_wait: back_off between requests will not exceed this value

    Returns:
        function()

    Example: Using with_retry
        Using ``with_retry`` to consolidate inputs into a list.

            from synapseclient.core.retry import with_retry

            def foo(a, b, c): return [a, b, c]
            result = with_retry(lambda: foo("1", "2", "3"), **STANDARD_RETRY_PARAMS)
    """

    if verbose:
        logger = logging.getLogger(DEBUG_LOGGER_NAME)
    else:
        logger = logging.getLogger(DEFAULT_LOGGER_NAME)

    # Retry until we succeed or run out of tries
    total_wait = 0
    while True:
        # Start with a clean slate
        exc = None
        exc_info = None
        retry = False
        response = None

        # Try making the call
        try:
            response = function()
        except Exception as ex:
            exc = ex
            exc_info = sys.exc_info()
            logger.debug(DEBUG_EXCEPTION, function, exc_info=True)
            if hasattr(ex, "response"):
                response = ex.response

        # Check if we got a retry-able error
        if response is not None and hasattr(response, "status_code"):
            if (
                expected_status_codes
                and response.status_code not in expected_status_codes
            ) or (retry_status_codes and response.status_code in retry_status_codes):
                response_message = _get_message(response)
                retry = True
                logger.debug("retrying on status code: %s" % str(response.status_code))
                # TODO: this was originally printed regardless of 'verbose' was that behavior correct?
                logger.debug(str(response_message))
                if (response.status_code == 429) and (wait > 10):
                    logger.warning("%s...\n" % response_message)
                    logger.warning("Retrying in %i seconds" % wait)

            elif response.status_code not in range(200, 299):
                # For all other non 200 messages look for retryable errors in the body or reason field
                response_message = _get_message(response)
                if any(
                    [msg.lower() in response_message.lower() for msg in retry_errors]
                ):
                    retry = True
                    logger.debug("retrying %s" % response_message)
                # special case for message throttling
                elif (
                    "Please slow down.  You may send a maximum of 10 message"
                    in response
                ):
                    retry = True
                    wait = 16
                    logger.debug("retrying " + response_message)

        # Check if we got a retry-able exception
        if exc is not None:
            if (
                exc.__class__.__name__ in retry_exceptions
                or exc.__class__ in retry_exceptions
                or any(
                    [msg.lower() in str(exc_info[1]).lower() for msg in retry_errors]
                )
            ):
                retry = True
                logger.debug("retrying exception: " + str(exc))

        # Wait then retry
        retries -= 1
        if retries >= 0 and retry:
            randomized_wait = wait * random.uniform(0.5, 1.5)
            logger.debug(
                "total wait time {total_wait:5.0f} seconds\n "
                "... Retrying in {wait:5.1f} seconds...".format(
                    total_wait=total_wait, wait=randomized_wait
                )
            )
            total_wait += randomized_wait
            doze(randomized_wait)
            wait = min(max_wait, wait * back_off)
            continue

        # Out of retries, re-raise the exception or return the response
        if exc_info is not None and exc_info[0] is not None:
            logger.debug(
                "retries have run out. re-raising the exception", exc_info=True
            )
            raise exc
        return response

calculate_exponential_backoff(retries, base_wait, wait_random_lower, wait_random_upper, back_off_factor, max_back_off)

Handle calculating the exponential backoff.

PARAMETER DESCRIPTION
retries

The number of retries that have been attempted

TYPE: int

base_wait

The base wait time

TYPE: float

wait_random_lower

The lower bound of the random wait time

TYPE: float

wait_random_upper

The upper bound of the random wait time

TYPE: float

back_off_factor

The factor to increase the wait time by for each retry

TYPE: float

max_back_off

The maximum wait time

TYPE: float

RETURNS DESCRIPTION
float

The total wait time

Source code in synapseclient/core/retry.py
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
def calculate_exponential_backoff(
    retries: int,
    base_wait: float,
    wait_random_lower: float,
    wait_random_upper: float,
    back_off_factor: float,
    max_back_off: float,
) -> float:
    """
    Handle calculating the exponential backoff.

    Arguments:
        retries: The number of retries that have been attempted
        base_wait: The base wait time
        wait_random_lower: The lower bound of the random wait time
        wait_random_upper: The upper bound of the random wait time
        back_off_factor: The factor to increase the wait time by for each retry
        max_back_off: The maximum wait time

    Returns:
        The total wait time
    """
    random_jitter = random.uniform(wait_random_lower, wait_random_upper)
    time_to_wait = min(
        (base_wait * (back_off_factor**retries)) + random_jitter,
        max_back_off,
    )
    return time_to_wait

with_retry_time_based_async(function, verbose=False, retry_status_codes=None, expected_status_codes=None, retry_errors=None, retry_exceptions=None, retry_base_wait=DEFAULT_BASE_WAIT_ASYNC, retry_wait_random_lower=DEFAULT_WAIT_RANDOM_LOWER_ASYNC, retry_wait_random_upper=DEFAULT_WAIT_RANDOM_UPPER_ASYNC, retry_back_off_factor=DEFAULT_BACK_OFF_FACTOR_ASYNC, retry_max_back_off=DEFAULT_MAX_BACK_OFF_ASYNC, retry_max_wait_before_failure=DEFAULT_MAX_WAIT_BEFORE_FAIL_ASYNC) async

Retries the given function under certain conditions. This is created such that it will retry an unbounded number of times until the maximum wait time is reached. The backoff is calculated using an exponential backoff algorithm with a random jitter. The maximum backoff inbetween retries is capped at retry_max_back_off.

PARAMETER DESCRIPTION
verbose

Whether to log debug messages

TYPE: bool DEFAULT: False

function

A function with no arguments. If arguments are needed, use a lambda (see example).

TYPE: Coroutine[Any, Any, Any]

retry_status_codes

What status codes to retry upon in the case of a SynapseHTTPError.

TYPE: List[int] DEFAULT: None

expected_status_codes

If specified responses with any other status codes result in a retry.

TYPE: List[int] DEFAULT: None

retry_errors

What reasons to retry upon, if function().response.json()['reason'] exists.

TYPE: List[str] DEFAULT: None

retry_exceptions

What types of exceptions, specified as strings or Exception classes, to retry upon.

TYPE: List[Union[Exception, str]] DEFAULT: None

retry_base_wait

The base wait time inbetween retries.

TYPE: float DEFAULT: DEFAULT_BASE_WAIT_ASYNC

retry_wait_random_lower

The lower bound of the random wait time.

TYPE: float DEFAULT: DEFAULT_WAIT_RANDOM_LOWER_ASYNC

retry_wait_random_upper

The upper bound of the random wait time.

TYPE: float DEFAULT: DEFAULT_WAIT_RANDOM_UPPER_ASYNC

retry_back_off_factor

The factor to increase the wait time by for each retry.

TYPE: float DEFAULT: DEFAULT_BACK_OFF_FACTOR_ASYNC

retry_max_back_off

The maximum wait time.

TYPE: float DEFAULT: DEFAULT_MAX_BACK_OFF_ASYNC

retry_max_wait_before_failure

The maximum wait time before failure.

TYPE: float DEFAULT: DEFAULT_MAX_WAIT_BEFORE_FAIL_ASYNC

Using with_retry

Using with_retry_time_based_async to consolidate inputs into a list.

from synapseclient.core.retry import with_retry_time_based_async

async def foo(a, b, c): return [a, b, c]
result = await with_retry_time_based_async(lambda: foo("1", "2", "3"))
Source code in synapseclient/core/retry.py
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
async def with_retry_time_based_async(
    function: Coroutine[Any, Any, Any],
    verbose: bool = False,
    retry_status_codes: List[int] = None,
    expected_status_codes: List[int] = None,
    retry_errors: List[str] = None,
    retry_exceptions: List[Union[Exception, str]] = None,
    retry_base_wait: float = DEFAULT_BASE_WAIT_ASYNC,
    retry_wait_random_lower: float = DEFAULT_WAIT_RANDOM_LOWER_ASYNC,
    retry_wait_random_upper: float = DEFAULT_WAIT_RANDOM_UPPER_ASYNC,
    retry_back_off_factor: float = DEFAULT_BACK_OFF_FACTOR_ASYNC,
    retry_max_back_off: float = DEFAULT_MAX_BACK_OFF_ASYNC,
    retry_max_wait_before_failure: float = DEFAULT_MAX_WAIT_BEFORE_FAIL_ASYNC,
) -> Union[Exception, httpx.Response, Any, None]:
    """
    Retries the given function under certain conditions. This is created such that it
    will retry an unbounded number of times until the maximum wait time is reached. The
    backoff is calculated using an exponential backoff algorithm with a random jitter.
    The maximum backoff inbetween retries is capped at `retry_max_back_off`.

    Arguments:
        verbose: Whether to log debug messages
        function: A function with no arguments. If arguments are needed, use a lambda
            (see example).
        retry_status_codes: What status codes to retry upon in the case of a
            SynapseHTTPError.
        expected_status_codes: If specified responses with any other status codes result
            in a retry.
        retry_errors: What reasons to retry upon, if
            `function().response.json()['reason']` exists.
        retry_exceptions: What types of exceptions, specified as strings or Exception
            classes, to retry upon.
        retry_base_wait: The base wait time inbetween retries.
        retry_wait_random_lower: The lower bound of the random wait time.
        retry_wait_random_upper: The upper bound of the random wait time.
        retry_back_off_factor: The factor to increase the wait time by for each retry.
        retry_max_back_off: The maximum wait time.
        retry_max_wait_before_failure: The maximum wait time before failure.

    Example: Using with_retry
        Using ``with_retry_time_based_async`` to consolidate inputs into a list.

            from synapseclient.core.retry import with_retry_time_based_async

            async def foo(a, b, c): return [a, b, c]
            result = await with_retry_time_based_async(lambda: foo("1", "2", "3"))
    """
    (
        retry_status_codes,
        expected_status_codes,
        retry_errors,
        retry_exceptions,
        logger,
    ) = _assign_default_values(
        retry_status_codes=retry_status_codes,
        expected_status_codes=expected_status_codes,
        retry_errors=retry_errors,
        retry_exceptions=retry_exceptions,
        verbose=verbose,
    )

    # Retry until we succeed or run past the maximum wait time
    total_wait = 0
    retries = -1
    while True:
        caught_exception = None
        caught_exception_info = None
        response = None
        current_span = trace.get_current_span()
        current_span.set_attribute("synapse.retries", str(retries + 1))

        try:
            response = await function()
        except Exception as ex:
            caught_exception = ex
            caught_exception_info = sys.exc_info()
            logger.debug(DEBUG_EXCEPTION, function, exc_info=True)
            if hasattr(ex, "response"):
                response = ex.response

        retry = _is_retryable(
            response=response,
            caught_exception=caught_exception,
            caught_exception_info=caught_exception_info,
            expected_status_codes=expected_status_codes,
            retry_status_codes=retry_status_codes,
            retry_exceptions=retry_exceptions,
            retry_errors=retry_errors,
        )

        # Wait then retry
        retries += 1
        if total_wait < retry_max_wait_before_failure and retry:
            _log_for_retry(
                logger=logger, response=response, caught_exception=caught_exception
            )

            backoff_wait = calculate_exponential_backoff(
                retries=retries,
                base_wait=retry_base_wait,
                wait_random_lower=retry_wait_random_lower,
                wait_random_upper=retry_wait_random_upper,
                back_off_factor=retry_back_off_factor,
                max_back_off=retry_max_back_off,
            )
            total_wait += backoff_wait
            await asyncio.sleep(backoff_wait)
            continue

        # Out of retries, re-raise the exception or return the response
        if caught_exception_info is not None and caught_exception_info[0] is not None:
            logger.debug(
                (
                    "Retries have run out. re-raising the exception: %s"
                    if retry
                    else "Raising the exception: %s"
                ),
                str(caught_exception_info[0]),
            )
            raise caught_exception
        return response

with_retry_time_based(function, verbose=False, retry_status_codes=None, expected_status_codes=None, retry_errors=None, retry_exceptions=None, retry_base_wait=DEFAULT_BASE_WAIT_ASYNC, retry_wait_random_lower=DEFAULT_WAIT_RANDOM_LOWER_ASYNC, retry_wait_random_upper=DEFAULT_WAIT_RANDOM_UPPER_ASYNC, retry_back_off_factor=DEFAULT_BACK_OFF_FACTOR_ASYNC, retry_max_back_off=DEFAULT_MAX_BACK_OFF_ASYNC, retry_max_wait_before_failure=DEFAULT_MAX_WAIT_BEFORE_FAIL_ASYNC)

Retries the given function under certain conditions. This is created such that it will retry an unbounded number of times until the maximum wait time is reached. The backoff is calculated using an exponential backoff algorithm with a random jitter. The maximum backoff inbetween retries is capped at retry_max_back_off.

PARAMETER DESCRIPTION
verbose

Whether to log debug messages

TYPE: bool DEFAULT: False

function

A function with no arguments. If arguments are needed, use a lambda (see example).

retry_status_codes

What status codes to retry upon in the case of a SynapseHTTPError.

TYPE: List[int] DEFAULT: None

expected_status_codes

If specified responses with any other status codes result in a retry.

TYPE: List[int] DEFAULT: None

retry_errors

What reasons to retry upon, if function().response.json()['reason'] exists.

TYPE: List[str] DEFAULT: None

retry_exceptions

What types of exceptions, specified as strings or Exception classes, to retry upon.

TYPE: List[Union[Exception, str]] DEFAULT: None

retry_base_wait

The base wait time inbetween retries.

TYPE: float DEFAULT: DEFAULT_BASE_WAIT_ASYNC

retry_wait_random_lower

The lower bound of the random wait time.

TYPE: float DEFAULT: DEFAULT_WAIT_RANDOM_LOWER_ASYNC

retry_wait_random_upper

The upper bound of the random wait time.

TYPE: float DEFAULT: DEFAULT_WAIT_RANDOM_UPPER_ASYNC

retry_back_off_factor

The factor to increase the wait time by for each retry.

TYPE: float DEFAULT: DEFAULT_BACK_OFF_FACTOR_ASYNC

retry_max_back_off

The maximum wait time.

TYPE: float DEFAULT: DEFAULT_MAX_BACK_OFF_ASYNC

retry_max_wait_before_failure

The maximum wait time before failure.

TYPE: float DEFAULT: DEFAULT_MAX_WAIT_BEFORE_FAIL_ASYNC

Using with_retry

Using with_retry_time_based to consolidate inputs into a list.

from synapseclient.core.retry import with_retry_time_based

async def foo(a, b, c): return [a, b, c]
result = with_retry_time_based(lambda: foo("1", "2", "3"))
Source code in synapseclient/core/retry.py
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
def with_retry_time_based(
    function,
    verbose: bool = False,
    retry_status_codes: List[int] = None,
    expected_status_codes: List[int] = None,
    retry_errors: List[str] = None,
    retry_exceptions: List[Union[Exception, str]] = None,
    retry_base_wait: float = DEFAULT_BASE_WAIT_ASYNC,
    retry_wait_random_lower: float = DEFAULT_WAIT_RANDOM_LOWER_ASYNC,
    retry_wait_random_upper: float = DEFAULT_WAIT_RANDOM_UPPER_ASYNC,
    retry_back_off_factor: float = DEFAULT_BACK_OFF_FACTOR_ASYNC,
    retry_max_back_off: float = DEFAULT_MAX_BACK_OFF_ASYNC,
    retry_max_wait_before_failure: float = DEFAULT_MAX_WAIT_BEFORE_FAIL_ASYNC,
) -> Union[Exception, httpx.Response, Any, None]:
    """
    Retries the given function under certain conditions. This is created such that it
    will retry an unbounded number of times until the maximum wait time is reached. The
    backoff is calculated using an exponential backoff algorithm with a random jitter.
    The maximum backoff inbetween retries is capped at `retry_max_back_off`.

    Arguments:
        verbose: Whether to log debug messages
        function: A function with no arguments. If arguments are needed, use a lambda
            (see example).
        retry_status_codes: What status codes to retry upon in the case of a
            SynapseHTTPError.
        expected_status_codes: If specified responses with any other status codes result
            in a retry.
        retry_errors: What reasons to retry upon, if
            `function().response.json()['reason']` exists.
        retry_exceptions: What types of exceptions, specified as strings or Exception
            classes, to retry upon.
        retry_base_wait: The base wait time inbetween retries.
        retry_wait_random_lower: The lower bound of the random wait time.
        retry_wait_random_upper: The upper bound of the random wait time.
        retry_back_off_factor: The factor to increase the wait time by for each retry.
        retry_max_back_off: The maximum wait time.
        retry_max_wait_before_failure: The maximum wait time before failure.

    Example: Using with_retry
        Using ``with_retry_time_based`` to consolidate inputs into a list.

            from synapseclient.core.retry import with_retry_time_based

            async def foo(a, b, c): return [a, b, c]
            result = with_retry_time_based(lambda: foo("1", "2", "3"))
    """
    (
        retry_status_codes,
        expected_status_codes,
        retry_errors,
        retry_exceptions,
        logger,
    ) = _assign_default_values(
        retry_status_codes=retry_status_codes,
        expected_status_codes=expected_status_codes,
        retry_errors=retry_errors,
        retry_exceptions=retry_exceptions,
        verbose=verbose,
    )

    # Retry until we succeed or run past the maximum wait time
    total_wait = 0
    retries = -1
    while True:
        caught_exception = None
        caught_exception_info = None
        response = None
        current_span = trace.get_current_span()
        current_span.set_attribute("synapse.retries", str(retries + 1))

        try:
            response = function()
        except Exception as ex:
            caught_exception = ex
            caught_exception_info = sys.exc_info()
            logger.debug(DEBUG_EXCEPTION, function, exc_info=True)
            if hasattr(ex, "response"):
                response = ex.response

        retry = _is_retryable(
            response=response,
            caught_exception=caught_exception,
            caught_exception_info=caught_exception_info,
            expected_status_codes=expected_status_codes,
            retry_status_codes=retry_status_codes,
            retry_exceptions=retry_exceptions,
            retry_errors=retry_errors,
        )

        # Wait then retry
        retries += 1
        if total_wait < retry_max_wait_before_failure and retry:
            _log_for_retry(
                logger=logger, response=response, caught_exception=caught_exception
            )

            backoff_wait = calculate_exponential_backoff(
                retries=retries,
                base_wait=retry_base_wait,
                wait_random_lower=retry_wait_random_lower,
                wait_random_upper=retry_wait_random_upper,
                back_off_factor=retry_back_off_factor,
                max_back_off=retry_max_back_off,
            )
            total_wait += backoff_wait
            time.sleep(backoff_wait)
            continue

        # Out of retries, re-raise the exception or return the response
        if caught_exception_info is not None and caught_exception_info[0] is not None:
            logger.debug(
                (
                    "Retries have run out. re-raising the exception: %s"
                    if retry
                    else "Raising the exception: %s"
                ),
                str(caught_exception_info[0]),
            )
            raise caught_exception
        return response

Utils

synapseclient.core.utils

Utility functions useful in the implementation and testing of the Synapse client.

Classes

threadsafe_iter

Takes an iterator/generator and makes it thread-safe by serializing call to the next method of given iterator/generator. See: http://anandology.com/blog/using-iterators-and-generators/

Source code in synapseclient/core/utils.py
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
class threadsafe_iter:
    """Takes an iterator/generator and makes it thread-safe by serializing call to the
    `next` method of given iterator/generator.
    See: <http://anandology.com/blog/using-iterators-and-generators/>
    """

    def __init__(self, it):
        self.it = it
        self.lock = threading.Lock()

    def __iter__(self):
        return self

    def __next__(self):
        with self.lock:
            return next(self.it)

deprecated_keyword_param

A decorator to use to warn when a keyword parameter from a function has been deprecated and is intended for future removal. Will emit a warning such a keyword is passed.

Source code in synapseclient/core/utils.py
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
class deprecated_keyword_param:
    """A decorator to use to warn when a keyword parameter from a function has been deprecated
    and is intended for future removal. Will emit a warning such a keyword is passed."""

    def __init__(self, keywords, version, reason):
        self.keywords = set(keywords)
        self.version = version
        self.reason = reason

    def __call__(self, fn):
        def wrapper(*args, **kwargs):
            found = self.keywords.intersection(kwargs)
            if found:
                warnings.warn(
                    "Parameter(s) {} deprecated since version {}; {}".format(
                        sorted(list(found)), self.version, self.reason
                    ),
                    category=DeprecationWarning,
                    stacklevel=2,
                )

            return fn(*args, **kwargs)

        return wrapper

Functions

md5_for_file(filename, block_size=2 * MB, callback=None)

Calculates the MD5 of the given file. See source http://stackoverflow.com/questions/1131220/get-md5-hash-of-a-files-without-open-it-in-python.

PARAMETER DESCRIPTION
filename

The file to read in

TYPE: str

block_size

How much of the file to read in at once (bytes). Defaults to 2 MB

TYPE: int DEFAULT: 2 * MB

callback

The callback function that help us show loading spinner on terminal. Defaults to None

TYPE: Callable DEFAULT: None

RETURNS DESCRIPTION

The MD5 Checksum

Source code in synapseclient/core/utils.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def md5_for_file(
    filename: str, block_size: int = 2 * MB, callback: typing.Callable = None
):
    """
    Calculates the MD5 of the given file.
    See source <http://stackoverflow.com/questions/1131220/get-md5-hash-of-a-files-without-open-it-in-python>.

    Arguments:
        filename: The file to read in
        block_size: How much of the file to read in at once (bytes).
                    Defaults to 2 MB
        callback: The callback function that help us show loading spinner on terminal.
                    Defaults to None

    Returns:
        The MD5 Checksum
    """
    loop_iteration = 0
    md5 = hashlib.new("md5", usedforsecurity=False)
    with open(filename, "rb") as f:
        while True:
            loop_iteration += 1
            if callback:
                callback()
            data = f.read(block_size)
            if not data:
                break
            md5.update(data)
            del data
            # Garbage collect every 100 iterations
            if loop_iteration % 100 == 0:
                gc.collect()
    return md5

md5_for_file_hex(filename, block_size=2 * MB, callback=None)

Calculates the MD5 of the given file. See source http://stackoverflow.com/questions/1131220/get-md5-hash-of-a-files-without-open-it-in-python.

PARAMETER DESCRIPTION
filename

The file to read in

TYPE: str

block_size

How much of the file to read in at once (bytes). Defaults to 2 MB

TYPE: int DEFAULT: 2 * MB

callback

The callback function that help us show loading spinner on terminal. Defaults to None

TYPE: Callable DEFAULT: None

RETURNS DESCRIPTION
str

The MD5 Checksum

Source code in synapseclient/core/utils.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def md5_for_file_hex(
    filename: str, block_size: int = 2 * MB, callback: typing.Callable = None
) -> str:
    """
    Calculates the MD5 of the given file.
    See source <http://stackoverflow.com/questions/1131220/get-md5-hash-of-a-files-without-open-it-in-python>.

    Arguments:
        filename: The file to read in
        block_size: How much of the file to read in at once (bytes).
                    Defaults to 2 MB
        callback: The callback function that help us show loading spinner on terminal.
                    Defaults to None

    Returns:
        The MD5 Checksum
    """

    return md5_for_file(filename, block_size, callback).hexdigest()

md5_for_file_multiprocessing(filename, process_pool_executor, md5_semaphore, block_size=2 * MB) async

Calculates the MD5 of the given file. See source http://stackoverflow.com/questions/1131220/get-md5-hash-of-a-files-without-open-it-in-python.

PARAMETER DESCRIPTION
filename

The file to read in

TYPE: str

process_pool_executor

The process pool executor to use for the calculation.

md5_semaphore

The semaphore to use for waiting to calculate.

TYPE: Semaphore

block_size

How much of the file to read in at once (bytes). Defaults to 2 MB.

TYPE: int DEFAULT: 2 * MB

RETURNS DESCRIPTION
str

The MD5 Checksum

Source code in synapseclient/core/utils.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
async def md5_for_file_multiprocessing(
    filename: str,
    process_pool_executor,
    md5_semaphore: asyncio.Semaphore,
    block_size: int = 2 * MB,
) -> str:
    """
    Calculates the MD5 of the given file.
    See source <http://stackoverflow.com/questions/1131220/get-md5-hash-of-a-files-without-open-it-in-python>.

    Arguments:
        filename: The file to read in
        process_pool_executor: The process pool executor to use for the calculation.
        md5_semaphore: The semaphore to use for waiting to calculate.
        block_size: How much of the file to read in at once (bytes).
                    Defaults to 2 MB.

    Returns:
        The MD5 Checksum
    """
    async with md5_semaphore:
        with tracer.start_as_current_span("Utils::md5_for_file_multiprocessing"):
            future = process_pool_executor.submit(
                md5_for_file_hex, filename, block_size
            )
            while not future.done():
                await asyncio.sleep(0)
            result = future.result()
            return result

md5_fn(part, _)

Calculate the MD5 of a file-like object.

PARAMETER DESCRIPTION
part

A file-like object to read from.

RETURNS DESCRIPTION
str

The MD5 Checksum

Source code in synapseclient/core/utils.py
140
141
142
143
144
145
146
147
148
149
150
151
152
@tracer.start_as_current_span("Utils::md5_fn")
def md5_fn(part, _) -> str:
    """Calculate the MD5 of a file-like object.

    Arguments:
        part: A file-like object to read from.

    Returns:
        The MD5 Checksum
    """
    md5 = hashlib.new("md5", usedforsecurity=False)
    md5.update(part)
    return md5.hexdigest()

download_file(url, localFilepath=None)

Downloads a remote file.

PARAMETER DESCRIPTION
localFilePath

May be None, in which case a temporary file is created

RETURNS DESCRIPTION
localFilePath

The path to the downloaded file

TYPE: str

Source code in synapseclient/core/utils.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
def download_file(url: str, localFilepath: str = None) -> str:
    """
    Downloads a remote file.

    Arguments:
        localFilePath: May be None, in which case a temporary file is created

    Returns:
        localFilePath: The path to the downloaded file
    """

    f = None
    try:
        if localFilepath:
            dir = os.path.dirname(localFilepath)
            if not os.path.exists(dir):
                os.makedirs(dir)
            f = open(localFilepath, "wb")
        else:
            f = tempfile.NamedTemporaryFile(delete=False)
            localFilepath = f.name

        r = requests.get(url, stream=True)
        toBeTransferred = float(r.headers["content-length"])
        for nChunks, chunk in enumerate(r.iter_content(chunk_size=1024 * 10)):
            if chunk:
                f.write(chunk)
                printTransferProgress(nChunks * 1024 * 10, toBeTransferred)
    finally:
        if f:
            f.close()
            printTransferProgress(toBeTransferred, toBeTransferred)

    return localFilepath

extract_filename(content_disposition_header, default_filename=None)

Extract a filename from an HTTP content-disposition header field.

See this memo and this package for cryptic details.

Source code in synapseclient/core/utils.py
191
192
193
194
195
196
197
198
199
200
201
202
203
def extract_filename(content_disposition_header, default_filename=None):
    """
    Extract a filename from an HTTP content-disposition header field.

    See [this memo](http://tools.ietf.org/html/rfc6266) and
    [this package](http://pypi.python.org/pypi/rfc6266)
    for cryptic details.
    """

    if not content_disposition_header:
        return default_filename
    value, params = cgi.parse_header(content_disposition_header)
    return params.get("filename", default_filename)

extract_user_name(profile)

Extract a displayable user name from a user's profile

Source code in synapseclient/core/utils.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
def extract_user_name(profile):
    """
    Extract a displayable user name from a user's profile
    """
    if "userName" in profile and profile["userName"]:
        return profile["userName"]
    elif "displayName" in profile and profile["displayName"]:
        return profile["displayName"]
    else:
        if (
            "firstName" in profile
            and profile["firstName"]
            and "lastName" in profile
            and profile["lastName"]
        ):
            return profile["firstName"] + " " + profile["lastName"]
        elif "lastName" in profile and profile["lastName"]:
            return profile["lastName"]
        elif "firstName" in profile and profile["firstName"]:
            return profile["firstName"]
        else:
            return str(profile.get("id", "Unknown-user"))

id_of(obj)

Try to figure out the Synapse ID of the given object.

PARAMETER DESCRIPTION
obj

May be a string, Entity object, or dictionary

TYPE: Union[str, Mapping, Number]

RETURNS DESCRIPTION
str

The ID

RAISES DESCRIPTION
ValueError

if the object doesn't have an ID

Source code in synapseclient/core/utils.py
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
def id_of(obj: typing.Union[str, collections.abc.Mapping, numbers.Number]) -> str:
    """
    Try to figure out the Synapse ID of the given object.

    Arguments:
        obj: May be a string, Entity object, or dictionary

    Returns:
        The ID

    Raises:
        ValueError: if the object doesn't have an ID
    """
    if isinstance(obj, str):
        return str(obj)
    if isinstance(obj, numbers.Number):
        return str(obj)

    id_attr_names = [
        "id",
        "ownerId",
        "tableId",
    ]  # possible attribute names for a synapse Id
    for attribute_name in id_attr_names:
        syn_id = _get_from_members_items_or_properties(obj, attribute_name)
        if syn_id is not None:
            return str(syn_id)

    raise ValueError("Invalid parameters: couldn't find id of " + str(obj))

concrete_type_of(obj)

Return the concrete type of an object representing a Synapse entity. This is meant to operate either against an actual Entity object, or the lighter weight dictionary returned by Synapse#getChildren, both of which are Mappings.

Source code in synapseclient/core/utils.py
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
def concrete_type_of(obj: collections.abc.Mapping):
    """
    Return the concrete type of an object representing a Synapse entity.
    This is meant to operate either against an actual Entity object, or the lighter
    weight dictionary returned by Synapse#getChildren, both of which are Mappings.
    """
    concrete_type = None
    if isinstance(obj, collections.abc.Mapping):
        for key in ("concreteType", "type"):
            concrete_type = obj.get(key)
            if concrete_type:
                break

    if not isinstance(concrete_type, str) or not concrete_type.startswith(
        "org.sagebionetworks.repo.model"
    ):
        raise ValueError("Unable to determine concreteType")

    return concrete_type

is_in_path(id, path)

Determines whether id is in the path as returned from /entity/{id}/path

PARAMETER DESCRIPTION
id

synapse id string

TYPE: str

path

object as returned from '/entity/{id}/path'

TYPE: Mapping

RETURNS DESCRIPTION
bool

True or False

Source code in synapseclient/core/utils.py
299
300
301
302
303
304
305
306
307
308
309
def is_in_path(id: str, path: collections.abc.Mapping) -> bool:
    """Determines whether id is in the path as returned from /entity/{id}/path

    Arguments:
        id: synapse id string
        path: object as returned from '/entity/{id}/path'

    Returns:
        True or False
    """
    return id in [item["id"] for item in path["path"]]

get_properties(entity)

Returns the dictionary of properties of the given Entity.

Source code in synapseclient/core/utils.py
312
313
314
315
def get_properties(entity):
    """Returns the dictionary of properties of the given Entity."""

    return entity.properties if hasattr(entity, "properties") else entity

is_url(s)

Return True if the string appears to be a valid URL.

Source code in synapseclient/core/utils.py
318
319
320
321
322
323
324
325
326
327
328
329
330
331
def is_url(s):
    """Return True if the string appears to be a valid URL."""
    if isinstance(s, str):
        try:
            url_parts = urllib_parse.urlsplit(s)
            # looks like a Windows drive letter?
            if len(url_parts.scheme) == 1 and url_parts.scheme.isalpha():
                return False
            if url_parts.scheme == "file" and bool(url_parts.path):
                return True
            return bool(url_parts.scheme) and bool(url_parts.netloc)
        except Exception:
            return False
    return False

as_url(