Skip to content

Core

This section is for super users / developers only. These functions are subject to change as they are internal development functions. Use at your own risk.

Upload

synapseclient.core.upload.upload_functions

This module handles the various ways that a user can upload a file to Synapse.

Classes

Functions

upload_file_handle(syn, parent_entity, path, synapseStore=True, md5=None, file_size=None, mimetype=None, max_threads=None)

Uploads the file in the provided path (if necessary) to a storage location based on project settings. Returns a new FileHandle as a dict to represent the stored file.

PARAMETER DESCRIPTION
parent_entity

Entity object or id of the parent entity.

TYPE: Union[str, Mapping, Number]

path

The file path to the file being uploaded

TYPE: str

synapseStore

If False, will not upload the file, but instead create an ExternalFileHandle that references the file on the local machine. If True, will upload the file based on StorageLocation determined by the entity_parent_id

TYPE: bool DEFAULT: True

md5

The MD5 checksum for the file, if known. Otherwise if the file is a local file, it will be calculated automatically.

TYPE: str DEFAULT: None

file_size

The size the file, if known. Otherwise if the file is a local file, it will be calculated automatically.

TYPE: int DEFAULT: None

file_size

The MIME type the file, if known. Otherwise if the file is a local file, it will be calculated automatically.

TYPE: int DEFAULT: None

RETURNS DESCRIPTION

A dictionary of a new FileHandle as a dict that represents the uploaded file

Source code in synapseclient/core/upload/upload_functions.py
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
def upload_file_handle(
    syn: "Synapse",
    parent_entity: Union[str, collections.abc.Mapping, numbers.Number],
    path: str,
    synapseStore: bool = True,
    md5: str = None,
    file_size: int = None,
    mimetype: str = None,
    max_threads: int = None,
):
    """
    Uploads the file in the provided path (if necessary) to a storage location based on project settings.
    Returns a new FileHandle as a dict to represent the stored file.

    Arguments:
        parent_entity: Entity object or id of the parent entity.
        path:          The file path to the file being uploaded
        synapseStore:  If False, will not upload the file, but instead create an ExternalFileHandle that references
                       the file on the local machine.
                       If True, will upload the file based on StorageLocation determined by the entity_parent_id
        md5:           The MD5 checksum for the file, if known. Otherwise if the file is a local file, it will be
                       calculated automatically.
        file_size:     The size the file, if known. Otherwise if the file is a local file, it will be calculated
                       automatically.
        file_size:     The MIME type the file, if known. Otherwise if the file is a local file, it will be
                       calculated automatically.

    Returns:
        A dictionary of a new FileHandle as a dict that represents the uploaded file
    """
    if path is None:
        raise ValueError("path can not be None")

    # if doing a external file handle with no actual upload
    if not synapseStore:
        return create_external_file_handle(
            syn, path, mimetype=mimetype, md5=md5, file_size=file_size
        )

    # expand the path because past this point an upload is required and some upload functions require an absolute path
    expanded_upload_path = os.path.expandvars(os.path.expanduser(path))

    if md5 is None and os.path.isfile(expanded_upload_path):
        md5 = utils.md5_for_file(expanded_upload_path).hexdigest()

    entity_parent_id = id_of(parent_entity)

    # determine the upload function based on the UploadDestination
    location = syn._getDefaultUploadDestination(entity_parent_id)
    upload_destination_type = location["concreteType"]
    trace.get_current_span().set_attributes(
        {
            "synapse.parent_id": entity_parent_id,
            "synapse.upload_destination_type": upload_destination_type,
        }
    )

    if (
        sts_transfer.is_boto_sts_transfer_enabled(syn)
        and sts_transfer.is_storage_location_sts_enabled(
            syn, entity_parent_id, location
        )
        and upload_destination_type == concrete_types.EXTERNAL_S3_UPLOAD_DESTINATION
    ):
        log_upload_message(
            syn,
            "\n"
            + "#" * 50
            + "\n Uploading file to external S3 storage using boto3 \n"
            + "#" * 50
            + "\n",
        )

        return upload_synapse_sts_boto_s3(
            syn=syn,
            parent_id=entity_parent_id,
            upload_destination=location,
            local_path=expanded_upload_path,
            mimetype=mimetype,
            md5=md5,
        )

    elif upload_destination_type in (
        concrete_types.SYNAPSE_S3_UPLOAD_DESTINATION,
        concrete_types.EXTERNAL_S3_UPLOAD_DESTINATION,
        concrete_types.EXTERNAL_GCP_UPLOAD_DESTINATION,
    ):
        if upload_destination_type == concrete_types.SYNAPSE_S3_UPLOAD_DESTINATION:
            storage_str = "Synapse"
        elif upload_destination_type == concrete_types.EXTERNAL_S3_UPLOAD_DESTINATION:
            storage_str = "your external S3"
        else:
            storage_str = "your external Google Bucket"

        log_upload_message(
            syn,
            "\n"
            + "#" * 50
            + "\n Uploading file to "
            + storage_str
            + " storage \n"
            + "#" * 50
            + "\n",
        )

        return upload_synapse_s3(
            syn=syn,
            file_path=expanded_upload_path,
            storageLocationId=location["storageLocationId"],
            mimetype=mimetype,
            max_threads=max_threads,
            md5=md5,
        )
    # external file handle (sftp)
    elif upload_destination_type == concrete_types.EXTERNAL_UPLOAD_DESTINATION:
        if location["uploadType"] == "SFTP":
            log_upload_message(
                syn,
                "\n%s\n%s\nUploading to: %s\n%s\n"
                % (
                    "#" * 50,
                    location.get("banner", ""),
                    urllib_parse.urlparse(location["url"]).netloc,
                    "#" * 50,
                ),
            )
            return upload_external_file_handle_sftp(
                syn=syn,
                file_path=expanded_upload_path,
                sftp_url=location["url"],
                mimetype=mimetype,
                md5=md5,
            )
        else:
            raise NotImplementedError("Can only handle SFTP upload locations.")
    # client authenticated S3
    elif (
        upload_destination_type
        == concrete_types.EXTERNAL_OBJECT_STORE_UPLOAD_DESTINATION
    ):
        log_upload_message(
            syn,
            "\n%s\n%s\nUploading to endpoint: [%s] bucket: [%s]\n%s\n"
            % (
                "#" * 50,
                location.get("banner", ""),
                location.get("endpointUrl"),
                location.get("bucket"),
                "#" * 50,
            ),
        )
        return upload_client_auth_s3(
            syn=syn,
            file_path=expanded_upload_path,
            bucket=location["bucket"],
            endpoint_url=location["endpointUrl"],
            key_prefix=location["keyPrefixUUID"],
            storage_location_id=location["storageLocationId"],
            mimetype=mimetype,
            md5=md5,
        )
    else:  # unknown storage location
        log_upload_message(
            syn,
            "\n%s\n%s\nUNKNOWN STORAGE LOCATION. Defaulting upload to Synapse.\n%s\n"
            % ("!" * 50, location.get("banner", ""), "!" * 50),
        )
        return upload_synapse_s3(
            syn=syn,
            file_path=expanded_upload_path,
            storageLocationId=None,
            mimetype=mimetype,
            max_threads=max_threads,
            md5=md5,
        )

upload_synapse_sts_boto_s3(syn, parent_id, upload_destination, local_path, mimetype=None, md5=None)

When uploading to Synapse storage normally the back end will generate a random prefix for our uploaded object. Since in this case the client is responsible for the remote key, the client will instead generate a random prefix. this both ensures we don't have a collision with an existing S3 object and also mitigates potential performance issues, although key locality performance issues are likely resolved as of: https://aws.amazon.com/about-aws/whats-new/2018/07/amazon-s3-announces-increased-request-rate-performance/

PARAMETER DESCRIPTION
syn

The synapse client

TYPE: Synapse

parent_id

The synapse ID of the parent.

TYPE: str

upload_destination

The upload destination

local_path

The local path to the file to upload.

TYPE: str

mimetype

The mimetype is known. Defaults to None.

TYPE: str DEFAULT: None

md5

MD5 checksum for the file, if known.

TYPE: str DEFAULT: None

RETURNS DESCRIPTION

description

Source code in synapseclient/core/upload/upload_functions.py
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
def upload_synapse_sts_boto_s3(
    syn: "Synapse",
    parent_id: str,
    upload_destination,
    local_path: str,
    mimetype: str = None,
    md5: str = None,
):
    """
    When uploading to Synapse storage normally the back end will generate a random prefix
    for our uploaded object. Since in this case the client is responsible for the remote
    key, the client will instead generate a random prefix. this both ensures we don't have a collision
    with an existing S3 object and also mitigates potential performance issues, although
    key locality performance issues are likely resolved as of:
    <https://aws.amazon.com/about-aws/whats-new/2018/07/amazon-s3-announces-increased-request-rate-performance/>

    Arguments:
        syn: The synapse client
        parent_id: The synapse ID of the parent.
        upload_destination: The upload destination
        local_path: The local path to the file to upload.
        mimetype: The mimetype is known. Defaults to None.
        md5: MD5 checksum for the file, if known.

    Returns:
        _description_
    """
    key_prefix = str(uuid.uuid4())

    bucket_name = upload_destination["bucket"]
    storage_location_id = upload_destination["storageLocationId"]
    remote_file_key = "/".join(
        [upload_destination["baseKey"], key_prefix, os.path.basename(local_path)]
    )

    def upload_fn(credentials):
        return S3ClientWrapper.upload_file(
            bucket=bucket_name,
            endpoint_url=None,
            remote_file_key=remote_file_key,
            upload_file_path=local_path,
            credentials=credentials,
            transfer_config_kwargs={"max_concurrency": syn.max_threads},
        )

    sts_transfer.with_boto_sts_credentials(upload_fn, syn, parent_id, "read_write")
    return syn.create_external_s3_file_handle(
        bucket_name=bucket_name,
        s3_file_key=remote_file_key,
        file_path=local_path,
        storage_location_id=storage_location_id,
        mimetype=mimetype,
        md5=md5,
    )

Multipart Upload

synapseclient.core.upload.multipart_upload

Implements the client side of Synapse's Multipart File Upload API, which provides a robust means of uploading large files (into the 10s of GiB). End users should not need to call any of the methods under UploadAttempt directly.

Classes

UploadAttempt

Used to handle multi-threaded operations for uploading one or parts of a file.

Source code in synapseclient/core/upload/multipart_upload.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
class UploadAttempt:
    """
    Used to handle multi-threaded operations for uploading one or parts of a file.
    """

    def __init__(
        self,
        syn,
        dest_file_name,
        upload_request_payload,
        part_request_body_provider_fn,
        md5_fn,
        max_threads: int,
        force_restart: bool,
    ):
        self._syn = syn
        self._dest_file_name = dest_file_name
        self._part_size = upload_request_payload["partSizeBytes"]

        self._upload_request_payload = upload_request_payload

        self._part_request_body_provider_fn = part_request_body_provider_fn
        self._md5_fn = md5_fn

        self._max_threads = max_threads
        self._force_restart = force_restart

        self._lock = threading.Lock()
        self._aborted = False

        # populated later
        self._upload_id: str = None
        self._pre_signed_part_urls: Mapping[int, str] = None

    @classmethod
    def _get_remaining_part_numbers(cls, upload_status):
        part_numbers = []
        parts_state = upload_status["partsState"]

        # parts are 1-based
        for i, part_status in enumerate(parts_state, 1):
            if part_status == "0":
                part_numbers.append(i)

        return len(parts_state), part_numbers

    @classmethod
    def _get_thread_session(cls):
        # get a lazily initialized requests.Session from the thread.
        # we want to share a requests.Session over the course of a thread
        # to take advantage of persistent http connection. we put it on a
        # thread local rather that in the task closure since a connection can
        # be reused across separate part uploads so no reason to restrict it
        # per worker task.
        session = getattr(_thread_local, "session", None)
        if not session:
            session = _thread_local.session = requests.Session()
        return session

    def _is_copy(self):
        # is this a copy or upload request
        return (
            self._upload_request_payload.get("concreteType")
            == concrete_types.MULTIPART_UPLOAD_COPY_REQUEST
        )

    def _create_synapse_upload(self):
        return self._syn.restPOST(
            "/file/multipart?forceRestart={}".format(str(self._force_restart).lower()),
            json.dumps(self._upload_request_payload),
            endpoint=self._syn.fileHandleEndpoint,
        )

    def _fetch_pre_signed_part_urls(
        self,
        upload_id: str,
        part_numbers: List[int],
        requests_session: requests.Session = None,
    ) -> Mapping[int, str]:
        uri = "/file/multipart/{upload_id}/presigned/url/batch".format(
            upload_id=upload_id
        )
        body = {
            "uploadId": upload_id,
            "partNumbers": part_numbers,
        }

        response = self._syn.restPOST(
            uri,
            json.dumps(body),
            requests_session=requests_session,
            endpoint=self._syn.fileHandleEndpoint,
        )

        part_urls = {}
        for part in response["partPresignedUrls"]:
            part_urls[part["partNumber"]] = (
                part["uploadPresignedUrl"],
                part.get("signedHeaders", {}),
            )

        return part_urls

    def _refresh_pre_signed_part_urls(
        self,
        part_number: int,
        expired_url: str,
    ):
        """Refresh all unfetched presigned urls, and return the refreshed
        url for the given part number. If an existing expired_url is passed
        and the url for the given part has already changed that new url
        will be returned without a refresh (i.e. it is assumed that another
        thread has already refreshed the url since the passed url expired).

        Arguments:
            part_number: the part number whose refreshed url should
                         be returned
            expired_url: the url that was detected as expired triggering
                         this refresh
        Returns:
            refreshed URL

        """
        with self._lock:
            current_url = self._pre_signed_part_urls[part_number]
            if current_url != expired_url:
                # if the url has already changed since the given url
                # was detected as expired we can assume that another
                # thread already refreshed the url and can avoid the extra
                # fetch.
                refreshed_url = current_url
            else:
                self._pre_signed_part_urls = self._fetch_pre_signed_part_urls(
                    self._upload_id,
                    list(self._pre_signed_part_urls.keys()),
                )

                refreshed_url = self._pre_signed_part_urls[part_number]

            return refreshed_url

    def _handle_part(self, part_number, otel_context: typing.Union[Context, None]):
        if otel_context:
            context.attach(otel_context)
        with tracer.start_as_current_span("UploadAttempt::_handle_part"):
            trace.get_current_span().set_attributes(
                {"thread.id": threading.get_ident()}
            )
            with self._lock:
                if self._aborted:
                    # this upload attempt has already been aborted
                    # so we short circuit the attempt to upload this part
                    raise SynapseUploadAbortedException(
                        "Upload aborted, skipping part {}".format(part_number)
                    )

                part_url, signed_headers = self._pre_signed_part_urls.get(part_number)

            session = self._get_thread_session()

            # obtain the body (i.e. the upload bytes) for the given part number.
            body = (
                self._part_request_body_provider_fn(part_number)
                if self._part_request_body_provider_fn
                else None
            )
            part_size = len(body) if body else 0
            for retry in range(2):

                def put_fn():
                    with tracer.start_as_current_span("UploadAttempt::put_part"):
                        return session.put(part_url, body, headers=signed_headers)

                try:
                    # use our backoff mechanism here, we have encountered 500s on puts to AWS signed urls
                    response = with_retry(
                        put_fn, retry_exceptions=[requests.exceptions.ConnectionError]
                    )
                    _raise_for_status(response)

                    # completed upload part to s3 successfully
                    break

                except SynapseHTTPError as ex:
                    if ex.response.status_code == 403 and retry < 1:
                        # we interpret this to mean our pre_signed url expired.
                        self._syn.logger.debug(
                            "The pre-signed upload URL for part {} has expired."
                            "Refreshing urls and retrying.\n".format(part_number)
                        )

                        # we refresh all the urls and obtain this part's
                        # specific url for the retry
                        with tracer.start_as_current_span(
                            "UploadAttempt::refresh_pre_signed_part_urls"
                        ):
                            (
                                part_url,
                                signed_headers,
                            ) = self._refresh_pre_signed_part_urls(
                                part_number,
                                part_url,
                            )

                    else:
                        raise

            md5_hex = self._md5_fn(body, response)

            # now tell synapse that we uploaded that part successfully
            self._syn.restPUT(
                "/file/multipart/{upload_id}/add/{part_number}?partMD5Hex={md5}".format(
                    upload_id=self._upload_id,
                    part_number=part_number,
                    md5=md5_hex,
                ),
                requests_session=session,
                endpoint=self._syn.fileHandleEndpoint,
            )

            # remove so future batch pre_signed url fetches will exclude this part
            with self._lock:
                del self._pre_signed_part_urls[part_number]

            return part_number, part_size

    def _upload_parts(self, part_count, remaining_part_numbers):
        trace.get_current_span().set_attributes({"thread.id": threading.get_ident()})
        time_upload_started = time.time()
        completed_part_count = part_count - len(remaining_part_numbers)
        file_size = self._upload_request_payload.get("fileSizeBytes")

        if not self._is_copy():
            # we won't have bytes to measure during a copy so the byte oriented progress bar is not useful
            progress = previously_transferred = min(
                completed_part_count * self._part_size,
                file_size,
            )

            self._syn._print_transfer_progress(
                progress,
                file_size,
                prefix="Uploading",
                postfix=self._dest_file_name,
                previouslyTransferred=previously_transferred,
            )

        self._pre_signed_part_urls = self._fetch_pre_signed_part_urls(
            self._upload_id,
            remaining_part_numbers,
        )

        futures = []
        with _executor(self._max_threads, False) as executor:
            # we don't wait on the shutdown since we do so ourselves below

            for part_number in remaining_part_numbers:
                futures.append(
                    executor.submit(
                        self._handle_part,
                        part_number,
                        context.get_current(),
                    )
                )

        for result in concurrent.futures.as_completed(futures):
            try:
                _, part_size = result.result()

                if part_size and not self._is_copy():
                    progress += part_size
                    self._syn._print_transfer_progress(
                        min(progress, file_size),
                        file_size,
                        prefix="Uploading",
                        postfix=self._dest_file_name,
                        dt=time.time() - time_upload_started,
                        previouslyTransferred=previously_transferred,
                    )
            except (Exception, KeyboardInterrupt) as cause:
                with self._lock:
                    self._aborted = True

                # wait for all threads to complete before
                # raising the exception, we don't want to return
                # control while there are still threads from this
                # upload attempt running
                concurrent.futures.wait(futures)

                if isinstance(cause, KeyboardInterrupt):
                    raise SynapseUploadAbortedException("User interrupted upload")
                raise SynapseUploadFailedException("Part upload failed") from cause

    def _complete_upload(self):
        upload_status_response = self._syn.restPUT(
            "/file/multipart/{upload_id}/complete".format(
                upload_id=self._upload_id,
            ),
            requests_session=self._get_thread_session(),
            endpoint=self._syn.fileHandleEndpoint,
        )

        upload_state = upload_status_response.get("state")
        if upload_state != "COMPLETED":
            # at this point we think successfully uploaded all the parts
            # but the upload status isn't complete, we'll throw an error
            # and let a subsequent attempt try to reconcile
            raise SynapseUploadFailedException(
                "Upload status has an unexpected state {}".format(upload_state)
            )

        return upload_status_response

    def __call__(self):
        upload_status_response = self._create_synapse_upload()
        upload_state = upload_status_response.get("state")

        if upload_state != "COMPLETED":
            self._upload_id = upload_status_response["uploadId"]
            part_count, remaining_part_numbers = self._get_remaining_part_numbers(
                upload_status_response
            )

            # if no remaining part numbers then all the parts have been
            # uploaded but the upload has not been marked complete.
            if remaining_part_numbers:
                self._upload_parts(part_count, remaining_part_numbers)
            upload_status_response = self._complete_upload()

        return upload_status_response

Functions

multipart_upload_file(syn, file_path, dest_file_name=None, content_type=None, part_size=None, storage_location_id=None, preview=True, force_restart=False, max_threads=None, md5=None)

Upload a file to a Synapse upload destination in chunks.

PARAMETER DESCRIPTION
syn

a Synapse object

file_path

the file to upload

TYPE: str

dest_file_name

upload as a different filename

TYPE: str DEFAULT: None

content_type

Refers to the Content-Type of the API request.

TYPE: str DEFAULT: None

part_size

Number of bytes per part. Minimum is 5MiB (5 * 1024 * 1024 bytes).

TYPE: int DEFAULT: None

storage_location_id

an id indicating where the file should be stored. Retrieved from Synapse's UploadDestination

TYPE: str DEFAULT: None

preview

True to generate a preview

TYPE: bool DEFAULT: True

force_restart

True to restart a previously initiated upload from scratch, False to try to resume

TYPE: bool DEFAULT: False

max_threads

number of concurrent threads to devote to upload

TYPE: int DEFAULT: None

md5

The MD5 of the file. If not provided, it will be calculated.

TYPE: str DEFAULT: None

RETURNS DESCRIPTION
str

a File Handle ID

Keyword arguments are passed down to _multipart_upload().

Source code in synapseclient/core/upload/multipart_upload.py
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
def multipart_upload_file(
    syn,
    file_path: str,
    dest_file_name: str = None,
    content_type: str = None,
    part_size: int = None,
    storage_location_id: str = None,
    preview: bool = True,
    force_restart: bool = False,
    max_threads: int = None,
    md5: str = None,
) -> str:
    """Upload a file to a Synapse upload destination in chunks.

    Arguments:
        syn: a Synapse object
        file_path: the file to upload
        dest_file_name: upload as a different filename
        content_type: Refers to the Content-Type of the API request.
        part_size: Number of bytes per part. Minimum is 5MiB (5 * 1024 * 1024 bytes).
        storage_location_id: an id indicating where the file should be
                             stored. Retrieved from Synapse's UploadDestination
        preview: True to generate a preview
        force_restart: True to restart a previously initiated upload
                       from scratch, False to try to resume
        max_threads: number of concurrent threads to devote
                     to upload
        md5: The MD5 of the file. If not provided, it will be calculated.

    Returns:
        a File Handle ID

    Keyword arguments are passed down to
    [_multipart_upload()][synapseclient.core.upload.multipart_upload._multipart_upload].

    """
    trace.get_current_span().set_attributes(
        {
            "synapse.storage_location_id": (
                storage_location_id if storage_location_id is not None else ""
            )
        }
    )

    if not os.path.exists(file_path):
        raise IOError('File "{}" not found.'.format(file_path))
    if os.path.isdir(file_path):
        raise IOError('File "{}" is a directory.'.format(file_path))

    file_size = os.path.getsize(file_path)
    if not dest_file_name:
        dest_file_name = os.path.basename(file_path)

    if content_type is None:
        mime_type, _ = mimetypes.guess_type(file_path, strict=False)
        content_type = mime_type or "application/octet-stream"

    callback_func = Spinner().print_tick if not syn.silent else None
    md5_hex = md5 or md5_for_file(file_path, callback=callback_func).hexdigest()

    part_size = get_part_size(
        part_size or DEFAULT_PART_SIZE,
        file_size,
        MIN_PART_SIZE,
        MAX_NUMBER_OF_PARTS,
    )

    upload_request = {
        "concreteType": concrete_types.MULTIPART_UPLOAD_REQUEST,
        "contentType": content_type,
        "contentMD5Hex": md5_hex,
        "fileName": dest_file_name,
        "fileSizeBytes": file_size,
        "generatePreview": preview,
        "partSizeBytes": part_size,
        "storageLocationId": storage_location_id,
    }

    def part_fn(part_number):
        return get_file_chunk(file_path, part_number, part_size)

    return _multipart_upload(
        syn,
        dest_file_name,
        upload_request,
        part_fn,
        md5_fn,
        force_restart=force_restart,
        max_threads=max_threads,
    )

multipart_upload_string(syn, text, dest_file_name=None, part_size=None, content_type=None, storage_location_id=None, preview=True, force_restart=False, max_threads=None)

Upload a file to a Synapse upload destination in chunks.

PARAMETER DESCRIPTION
syn

a Synapse object

text

a string to upload as a file.

TYPE: str

dest_file_name

upload as a different filename

TYPE: str DEFAULT: None

content_type

Refers to the Content-Type of the API request.

TYPE: str DEFAULT: None

part_size

number of bytes per part. Minimum 5MB.

TYPE: int DEFAULT: None

storage_location_id

an id indicating where the file should be stored. Retrieved from Synapse's UploadDestination

TYPE: str DEFAULT: None

preview

True to generate a preview

TYPE: bool DEFAULT: True

force_restart

True to restart a previously initiated upload from scratch, False to try to resume

TYPE: bool DEFAULT: False

max_threads

number of concurrent threads to devote to upload

TYPE: int DEFAULT: None

RETURNS DESCRIPTION

a File Handle ID

Keyword arguments are passed down to _multipart_upload().

Source code in synapseclient/core/upload/multipart_upload.py
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
def multipart_upload_string(
    syn,
    text: str,
    dest_file_name: str = None,
    part_size: int = None,
    content_type: str = None,
    storage_location_id: str = None,
    preview: bool = True,
    force_restart: bool = False,
    max_threads: int = None,
):
    """Upload a file to a Synapse upload destination in chunks.

    Arguments:
        syn: a Synapse object
        text: a string to upload as a file.
        dest_file_name: upload as a different filename
        content_type: Refers to the Content-Type of the API request.
        part_size: number of bytes per part. Minimum 5MB.
        storage_location_id: an id indicating where the file should be
                             stored. Retrieved from Synapse's UploadDestination
        preview: True to generate a preview
        force_restart: True to restart a previously initiated upload
                       from scratch, False to try to resume
        max_threads: number of concurrent threads to devote
                     to upload

    Returns:
        a File Handle ID

    Keyword arguments are passed down to
    [_multipart_upload()][synapseclient.core.upload.multipart_upload._multipart_upload].

    """
    data = text.encode("utf-8")
    file_size = len(data)
    md5_hex = md5_fn(data, None)

    if not dest_file_name:
        dest_file_name = "message.txt"

    if not content_type:
        content_type = "text/plain; charset=utf-8"

    part_size = get_part_size(
        part_size or DEFAULT_PART_SIZE,
        file_size,
        MIN_PART_SIZE,
        MAX_NUMBER_OF_PARTS,
    )

    upload_request = {
        "concreteType": concrete_types.MULTIPART_UPLOAD_REQUEST,
        "contentType": content_type,
        "contentMD5Hex": md5_hex,
        "fileName": dest_file_name,
        "fileSizeBytes": file_size,
        "generatePreview": preview,
        "partSizeBytes": part_size,
        "storageLocationId": storage_location_id,
    }

    def part_fn(part_number):
        return get_data_chunk(data, part_number, part_size)

    part_size = get_part_size(
        part_size or DEFAULT_PART_SIZE,
        file_size,
        MIN_PART_SIZE,
        MAX_NUMBER_OF_PARTS,
    )
    return _multipart_upload(
        syn,
        dest_file_name,
        upload_request,
        part_fn,
        md5_fn,
        force_restart=force_restart,
        max_threads=max_threads,
    )

multipart_copy(syn, source_file_handle_association, dest_file_name=None, part_size=None, storage_location_id=None, preview=True, force_restart=False, max_threads=None)

Makes a Multipart Upload Copy Request. This request performs a copy of an existing file handle without data transfer from the client.

PARAMETER DESCRIPTION
syn

A Synapse object

source_file_handle_association

Describes an association of a FileHandle with another object.

dest_file_name

The name of the file to be uploaded.

TYPE: str DEFAULT: None

part_size

The size that each part will be (in bytes).

TYPE: int DEFAULT: None

storage_location_id

The identifier of the storage location where this file should be copied to. The user must be the owner of the storage location.

TYPE: str DEFAULT: None

preview

True to generate a preview of the data.

TYPE: bool DEFAULT: True

force_restart

True to restart a previously initiated upload from scratch, False to try to resume.

TYPE: bool DEFAULT: False

max_threads

Number of concurrent threads to devote to copy.

TYPE: int DEFAULT: None

RETURNS DESCRIPTION

a File Handle ID

Keyword arguments are passed down to _multipart_upload().

Source code in synapseclient/core/upload/multipart_upload.py
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
def multipart_copy(
    syn,
    source_file_handle_association,
    dest_file_name: str = None,
    part_size: int = None,
    storage_location_id: str = None,
    preview: bool = True,
    force_restart: bool = False,
    max_threads: int = None,
):
    """Makes a
    [Multipart Upload Copy Request](https://rest-docs.synapse.org/rest/org/sagebionetworks/repo/model/file/MultipartUploadCopyRequest.html).
    This request performs a copy of an existing file handle without data transfer from the client.

    Arguments:
        syn: A Synapse object
        source_file_handle_association: Describes an association of a FileHandle with another object.
        dest_file_name: The name of the file to be uploaded.
        part_size: The size that each part will be (in bytes).
        storage_location_id: The identifier of the storage location where this file should be copied to.
                             The user must be the owner of the storage location.
        preview: True to generate a preview of the data.
        force_restart: True to restart a previously initiated upload from scratch, False to try to resume.
        max_threads: Number of concurrent threads to devote to copy.

    Returns:
        a File Handle ID

    Keyword arguments are passed down to
    [_multipart_upload()][synapseclient.core.upload.multipart_upload._multipart_upload].

    """
    part_size = part_size or DEFAULT_PART_SIZE

    upload_request = {
        "concreteType": concrete_types.MULTIPART_UPLOAD_COPY_REQUEST,
        "fileName": dest_file_name,
        "generatePreview": preview,
        "partSizeBytes": part_size,
        "sourceFileHandleAssociation": source_file_handle_association,
        "storageLocationId": storage_location_id,
    }

    return _multipart_upload(
        syn,
        dest_file_name,
        upload_request,
        copy_part_request_body_provider_fn,
        copy_md5_fn,
        force_restart=force_restart,
        max_threads=max_threads,
    )

_multipart_upload(syn, dest_file_name, upload_request, part_fn, md5_fn, force_restart=False, max_threads=None)

Calls upon an UploadAttempt object to initiate and/or retry a multipart file upload or copy. This function is wrapped by multipart_upload_file, multipart_upload_string, and multipart_copy. Retries cannot exceed 7 retries per call.

PARAMETER DESCRIPTION
syn

A Synapse object

dest_file_name

upload as a different filename

upload_request

A dictionary object with the user-fed logistical details of the upload/copy request.

part_fn

Function to calculate the partSize of each part

md5_fn

Function to calculate the MD5 of the file-like object

max_threads

number of concurrent threads to devote to upload.

TYPE: int DEFAULT: None

RETURNS DESCRIPTION

A File Handle ID

Source code in synapseclient/core/upload/multipart_upload.py
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
def _multipart_upload(
    syn,
    dest_file_name,
    upload_request,
    part_fn,
    md5_fn,
    force_restart: bool = False,
    max_threads: int = None,
):
    """Calls upon an [UploadAttempt][synapseclient.core.upload.multipart_upload.UploadAttempt]
    object to initiate and/or retry a multipart file upload or copy. This function is wrapped by
    [multipart_upload_file][synapseclient.core.upload.multipart_upload.multipart_upload_file],
    [multipart_upload_string][synapseclient.core.upload.multipart_upload.multipart_upload_string], and
    [multipart_copy][synapseclient.core.upload.multipart_upload.multipart_copy].
    Retries cannot exceed 7 retries per call.

    Arguments:
        syn: A Synapse object
        dest_file_name: upload as a different filename
        upload_request: A dictionary object with the user-fed logistical
                        details of the upload/copy request.
        part_fn: Function to calculate the partSize of each part
        md5_fn: Function to calculate the MD5 of the file-like object
        max_threads: number of concurrent threads to devote to upload.

    Returns:
        A File Handle ID

    """
    if max_threads is None:
        max_threads = pool_provider.DEFAULT_NUM_THREADS

    max_threads = max(max_threads, 1)

    retry = 0
    while True:
        try:
            upload_status_response = UploadAttempt(
                syn,
                dest_file_name,
                upload_request,
                part_fn,
                md5_fn,
                max_threads,
                # only force_restart the first time through (if requested).
                # a retry after a caught exception will not restart the upload
                # from scratch.
                force_restart and retry == 0,
            )()

            # success
            return upload_status_response["resultFileHandleId"]

        except SynapseUploadFailedException:
            if retry < MAX_RETRIES:
                retry += 1
            else:
                raise

Upload Async

synapseclient.core.upload.upload_functions_async

This module handles the various ways that a user can upload a file to Synapse.

Classes

Functions

upload_file_handle(syn, parent_entity_id, path, synapse_store=True, md5=None, file_size=None, mimetype=None) async

Uploads the file in the provided path (if necessary) to a storage location based on project settings. Returns a new FileHandle as a dict to represent the stored file.

PARAMETER DESCRIPTION
syn

The synapse client

TYPE: Synapse

parent_entity_id

The ID of the parent entity that the file will be attached to.

TYPE: str

path

The file path to the file being uploaded

TYPE: str

synapse_store

If False, will not upload the file, but instead create an ExternalFileHandle that references the file on the local machine. If True, will upload the file based on StorageLocation determined by the parent_entity_id.

TYPE: bool DEFAULT: True

md5

The MD5 checksum for the file, if known. Otherwise if the file is a local file, it will be calculated automatically.

TYPE: str DEFAULT: None

file_size

The size the file, if known. Otherwise if the file is a local file, it will be calculated automatically.

TYPE: int DEFAULT: None

mimetype

The MIME type the file, if known. Otherwise if the file is a local file, it will be calculated automatically.

TYPE: str DEFAULT: None

RETURNS DESCRIPTION
Dict[str, Union[str, int]]

A dictionary of a new FileHandle as a dict that represents the uploaded file

Source code in synapseclient/core/upload/upload_functions_async.py
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
async def upload_file_handle(
    syn: "Synapse",
    parent_entity_id: str,
    path: str,
    synapse_store: bool = True,
    md5: str = None,
    file_size: int = None,
    mimetype: str = None,
) -> Dict[str, Union[str, int]]:
    """
    Uploads the file in the provided path (if necessary) to a storage location based
    on project settings. Returns a new FileHandle as a dict to represent the
    stored file.

    Arguments:
        syn: The synapse client
        parent_entity_id: The ID of the parent entity that the file will be attached to.
        path: The file path to the file being uploaded
        synapse_store: If False, will not upload the file, but instead create an
            ExternalFileHandle that references the file on the local machine. If True,
            will upload the file based on StorageLocation determined by the
            parent_entity_id.
        md5: The MD5 checksum for the file, if known. Otherwise if the file is a
            local file, it will be calculated automatically.
        file_size: The size the file, if known. Otherwise if the file is a local file,
            it will be calculated automatically.
        mimetype: The MIME type the file, if known. Otherwise if the file is a local
            file, it will be calculated automatically.

    Returns:
        A dictionary of a new FileHandle as a dict that represents the uploaded file
    """
    if path is None:
        raise ValueError("path can not be None")

    # if doing a external file handle with no actual upload
    if not synapse_store:
        return await create_external_file_handle(
            syn, path, mimetype=mimetype, md5=md5, file_size=file_size
        )

    # expand the path because past this point an upload is required and some upload functions require an absolute path
    expanded_upload_path = os.path.expandvars(os.path.expanduser(path))

    if md5 is None and os.path.isfile(expanded_upload_path):
        md5 = await utils.md5_for_file_multiprocessing(
            filename=expanded_upload_path,
            process_pool_executor=syn._get_process_pool_executor(
                asyncio_event_loop=asyncio.get_running_loop()
            ),
            md5_semaphore=syn._get_md5_semaphore(
                asyncio_event_loop=asyncio.get_running_loop()
            ),
        )

    entity_parent_id = id_of(parent_entity_id)

    # determine the upload function based on the UploadDestination
    location = await get_upload_destination(
        entity_id=entity_parent_id, synapse_client=syn
    )
    upload_destination_type = location.get("concreteType", None) if location else None
    trace.get_current_span().set_attributes(
        {
            "synapse.parent_id": entity_parent_id,
            "synapse.upload_destination_type": upload_destination_type,
        }
    )

    if (
        sts_transfer.is_boto_sts_transfer_enabled(syn)
        and await sts_transfer.is_storage_location_sts_enabled_async(
            syn, entity_parent_id, location
        )
        and upload_destination_type == concrete_types.EXTERNAL_S3_UPLOAD_DESTINATION
    ):
        return await upload_synapse_sts_boto_s3(
            syn=syn,
            parent_id=entity_parent_id,
            upload_destination=location,
            local_path=expanded_upload_path,
            mimetype=mimetype,
            md5=md5,
            storage_str="Uploading file to external S3 storage using boto3",
        )

    elif upload_destination_type in (
        concrete_types.SYNAPSE_S3_UPLOAD_DESTINATION,
        concrete_types.EXTERNAL_S3_UPLOAD_DESTINATION,
        concrete_types.EXTERNAL_GCP_UPLOAD_DESTINATION,
    ):
        if upload_destination_type == concrete_types.SYNAPSE_S3_UPLOAD_DESTINATION:
            storage_str = "Uploading to Synapse storage"
        elif upload_destination_type == concrete_types.EXTERNAL_S3_UPLOAD_DESTINATION:
            storage_str = "Uploading to your external S3 storage"
        else:
            storage_str = "Uploading to your external Google Bucket storage"

        return await upload_synapse_s3(
            syn=syn,
            file_path=expanded_upload_path,
            storage_location_id=location["storageLocationId"],
            mimetype=mimetype,
            md5=md5,
            storage_str=storage_str,
        )
    # external file handle (sftp)
    elif upload_destination_type == concrete_types.EXTERNAL_UPLOAD_DESTINATION:
        if location["uploadType"] == "SFTP":
            storage_str = (
                f"Uploading to: {urllib_parse.urlparse(location['url']).netloc}"
            )
            banner = location.get("banner", None)
            if banner:
                syn.logger.info(banner)
            return await upload_external_file_handle_sftp(
                syn=syn,
                file_path=expanded_upload_path,
                sftp_url=location["url"],
                mimetype=mimetype,
                md5=md5,
                storage_str=storage_str,
            )
        else:
            raise NotImplementedError("Can only handle SFTP upload locations.")
    # client authenticated S3
    elif (
        upload_destination_type
        == concrete_types.EXTERNAL_OBJECT_STORE_UPLOAD_DESTINATION
    ):
        storage_str = f"Uploading to endpoint: [{location.get('endpointUrl')}] bucket: [{location.get('bucket')}]"
        banner = location.get("banner", None)
        if banner:
            syn.logger.info(banner)
        return await upload_client_auth_s3(
            syn=syn,
            file_path=expanded_upload_path,
            bucket=location["bucket"],
            endpoint_url=location["endpointUrl"],
            key_prefix=location["keyPrefixUUID"],
            storage_location_id=location["storageLocationId"],
            mimetype=mimetype,
            md5=md5,
            storage_str=storage_str,
        )
    else:  # unknown storage location
        return await upload_synapse_s3(
            syn=syn,
            file_path=expanded_upload_path,
            storage_location_id=None,
            mimetype=mimetype,
            md5=md5,
            storage_str="Uploading to Synapse storage",
        )

create_external_file_handle(syn, path, mimetype=None, md5=None, file_size=None) async

Create a file handle in Synapse without uploading any files. This is used in cases where one wishes to store a reference to a file that is not in Synapse.

Source code in synapseclient/core/upload/upload_functions_async.py
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
async def create_external_file_handle(
    syn: "Synapse",
    path: str,
    mimetype: str = None,
    md5: str = None,
    file_size: int = None,
) -> Dict[str, Union[str, int]]:
    """Create a file handle in Synapse without uploading any files. This is used in
    cases where one wishes to store a reference to a file that is not in Synapse."""
    is_local_file = False  # defaults to false
    url = as_url(os.path.expandvars(os.path.expanduser(path)))
    if is_url(url):
        parsed_url = urllib_parse.urlparse(url)
        parsed_path = file_url_to_path(url)
        if parsed_url.scheme == "file" and os.path.isfile(parsed_path):
            actual_md5 = await utils.md5_for_file_multiprocessing(
                filename=parsed_path,
                process_pool_executor=syn._get_process_pool_executor(
                    asyncio_event_loop=asyncio.get_running_loop()
                ),
                md5_semaphore=syn._get_md5_semaphore(
                    asyncio_event_loop=asyncio.get_running_loop()
                ),
            )
            if md5 is not None and md5 != actual_md5:
                raise SynapseMd5MismatchError(
                    f"The specified md5 [{md5}] does not match the calculated md5 "
                    f"[{actual_md5}] for local file [{parsed_path}]",
                )
            md5 = actual_md5
            file_size = os.stat(parsed_path).st_size
            is_local_file = True
    else:
        raise ValueError(f"externalUrl [{url}] is not a valid url")

    # just creates the file handle because there is nothing to upload
    file_handle = await post_external_filehandle(
        external_url=url, mimetype=mimetype, md5=md5, file_size=file_size
    )
    if is_local_file:
        syn.cache.add(
            file_handle_id=file_handle["id"], path=file_url_to_path(url), md5=md5
        )
    trace.get_current_span().set_attributes(
        {"synapse.file_handle_id": file_handle["id"]}
    )
    return file_handle

upload_external_file_handle_sftp(syn, file_path, sftp_url, mimetype=None, md5=None, storage_str=None) async

Upload a file to an SFTP server and create a file handle in Synapse.

Source code in synapseclient/core/upload/upload_functions_async.py
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
async def upload_external_file_handle_sftp(
    syn: "Synapse",
    file_path: str,
    sftp_url: str,
    mimetype: str = None,
    md5: str = None,
    storage_str: str = None,
) -> Dict[str, Union[str, int]]:
    """Upload a file to an SFTP server and create a file handle in Synapse."""
    username, password = syn._getUserCredentials(url=sftp_url)
    uploaded_url = SFTPWrapper.upload_file(
        file_path,
        urllib_parse.unquote(sftp_url),
        username,
        password,
        storage_str=storage_str,
    )

    file_md5 = md5 or await utils.md5_for_file_multiprocessing(
        filename=file_path,
        process_pool_executor=syn._get_process_pool_executor(
            asyncio_event_loop=asyncio.get_running_loop()
        ),
        md5_semaphore=syn._get_md5_semaphore(
            asyncio_event_loop=asyncio.get_running_loop()
        ),
    )
    file_handle = await post_external_filehandle(
        external_url=uploaded_url,
        mimetype=mimetype,
        md5=file_md5,
        file_size=os.stat(file_path).st_size,
    )
    syn.cache.add(file_handle_id=file_handle["id"], path=file_path, md5=file_md5)
    return file_handle

upload_synapse_s3(syn, file_path, storage_location_id=None, mimetype=None, force_restart=False, md5=None, storage_str=None) async

Upload a file to Synapse storage and create a file handle in Synapse.

Argments

syn: The synapse client file_path: The path to the file to upload. storage_location_id: The storage location ID. mimetype: The mimetype of the file. force_restart: If True, will force the upload to restart. md5: The MD5 checksum for the file. storage_str: The storage string.

RETURNS DESCRIPTION
Dict[str, Union[str, int]]

A dictionary of the file handle.

Source code in synapseclient/core/upload/upload_functions_async.py
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
async def upload_synapse_s3(
    syn: "Synapse",
    file_path: str,
    storage_location_id: Optional[int] = None,
    mimetype: str = None,
    force_restart: bool = False,
    md5: str = None,
    storage_str: str = None,
) -> Dict[str, Union[str, int]]:
    """Upload a file to Synapse storage and create a file handle in Synapse.

    Argments:
        syn: The synapse client
        file_path: The path to the file to upload.
        storage_location_id: The storage location ID.
        mimetype: The mimetype of the file.
        force_restart: If True, will force the upload to restart.
        md5: The MD5 checksum for the file.
        storage_str: The storage string.

    Returns:
        A dictionary of the file handle.
    """
    file_handle_id = await multipart_upload_file_async(
        syn=syn,
        file_path=file_path,
        content_type=mimetype,
        storage_location_id=storage_location_id,
        md5=md5,
        force_restart=force_restart,
        storage_str=storage_str,
    )
    syn.cache.add(file_handle_id=file_handle_id, path=file_path, md5=md5)

    return await get_file_handle(file_handle_id=file_handle_id, synapse_client=syn)

upload_synapse_sts_boto_s3(syn, parent_id, upload_destination, local_path, mimetype=None, md5=None, storage_str=None) async

When uploading to Synapse storage normally the back end will generate a random prefix for our uploaded object. Since in this case the client is responsible for the remote key, the client will instead generate a random prefix. this both ensures we don't have a collision with an existing S3 object and also mitigates potential performance issues, although key locality performance issues are likely resolved as of: https://aws.amazon.com/about-aws/whats-new/2018/07/amazon-s3-announces-increased-request-rate-performance/

PARAMETER DESCRIPTION
syn

The synapse client

TYPE: Synapse

parent_id

The synapse ID of the parent.

TYPE: str

upload_destination

The upload destination

TYPE: Dict[str, Union[str, int]]

local_path

The local path to the file to upload.

TYPE: str

mimetype

The mimetype is known. Defaults to None.

TYPE: str DEFAULT: None

md5

MD5 checksum for the file, if known.

TYPE: str DEFAULT: None

RETURNS DESCRIPTION
Dict[str, Union[str, int, bool]]

description

Source code in synapseclient/core/upload/upload_functions_async.py
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
async def upload_synapse_sts_boto_s3(
    syn: "Synapse",
    parent_id: str,
    upload_destination: Dict[str, Union[str, int]],
    local_path: str,
    mimetype: str = None,
    md5: str = None,
    storage_str: str = None,
) -> Dict[str, Union[str, int, bool]]:
    """
    When uploading to Synapse storage normally the back end will generate a random prefix
    for our uploaded object. Since in this case the client is responsible for the remote
    key, the client will instead generate a random prefix. this both ensures we don't have a collision
    with an existing S3 object and also mitigates potential performance issues, although
    key locality performance issues are likely resolved as of:
    <https://aws.amazon.com/about-aws/whats-new/2018/07/amazon-s3-announces-increased-request-rate-performance/>

    Arguments:
        syn: The synapse client
        parent_id: The synapse ID of the parent.
        upload_destination: The upload destination
        local_path: The local path to the file to upload.
        mimetype: The mimetype is known. Defaults to None.
        md5: MD5 checksum for the file, if known.

    Returns:
        _description_
    """
    key_prefix = str(uuid.uuid4())

    bucket_name = upload_destination["bucket"]
    storage_location_id = upload_destination["storageLocationId"]
    remote_file_key = "/".join(
        [
            upload_destination.get("baseKey", ""),
            key_prefix,
            os.path.basename(local_path),
        ]
    )

    def upload_fn(credentials: Dict[str, str]) -> str:
        """Wrapper for the upload function."""
        return S3ClientWrapper.upload_file(
            bucket=bucket_name,
            endpoint_url=None,
            remote_file_key=remote_file_key,
            upload_file_path=local_path,
            credentials=credentials,
            transfer_config_kwargs={"max_concurrency": syn.max_threads},
            storage_str=storage_str,
        )

    loop = asyncio.get_event_loop()

    await loop.run_in_executor(
        syn._get_thread_pool_executor(asyncio_event_loop=loop),
        lambda: sts_transfer.with_boto_sts_credentials(
            upload_fn, syn, parent_id, "read_write"
        ),
    )

    return await post_external_s3_file_handle(
        bucket_name=bucket_name,
        s3_file_key=remote_file_key,
        file_path=local_path,
        storage_location_id=storage_location_id,
        mimetype=mimetype,
        md5=md5,
        synapse_client=syn,
    )

upload_client_auth_s3(syn, file_path, bucket, endpoint_url, key_prefix, storage_location_id, mimetype=None, md5=None, storage_str=None) async

Use the S3 client to upload a file to an S3 bucket.

Source code in synapseclient/core/upload/upload_functions_async.py
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
async def upload_client_auth_s3(
    syn: "Synapse",
    file_path: str,
    bucket: str,
    endpoint_url: str,
    key_prefix: str,
    storage_location_id: int,
    mimetype: str = None,
    md5: str = None,
    storage_str: str = None,
) -> Dict[str, Union[str, int]]:
    """Use the S3 client to upload a file to an S3 bucket."""
    profile = syn._get_client_authenticated_s3_profile(endpoint_url, bucket)
    file_key = key_prefix + "/" + os.path.basename(file_path)
    loop = asyncio.get_event_loop()

    await loop.run_in_executor(
        syn._get_thread_pool_executor(asyncio_event_loop=loop),
        lambda: S3ClientWrapper.upload_file(
            bucket,
            endpoint_url,
            file_key,
            file_path,
            profile_name=profile,
            storage_str=storage_str,
        ),
    )

    file_handle = await post_external_object_store_filehandle(
        s3_file_key=file_key,
        file_path=file_path,
        storage_location_id=storage_location_id,
        mimetype=mimetype,
        md5=md5,
        synapse_client=syn,
    )
    syn.cache.add(file_handle_id=file_handle["id"], path=file_path, md5=md5)

    return file_handle

Multipart Upload Async

synapseclient.core.upload.multipart_upload_async

Implements the client side of Synapse's Multipart File Upload API, which provides a robust means of uploading large files (into the 10s of GiB). End users should not need to call any of the methods under UploadAttempt directly.

This mermaid flowchart illustrates the process of uploading a file to Synapse using the multipart upload API.

flowchart  TD
    upload_file_handle --> before-upload
    subgraph before-upload
        subgraph Disk I/O & CPU
            subgraph Multi-Processing
                md5["Calculate MD5"]
            end
            mime["Guess mime type"]
            file_size["Get file size"]
            file_name["Get file name"]
        end

        subgraph HTTP
            upload_destination["Find where to Upload 
 GET /entity/{entity_id}/uploadDestination"]
            start_upload["Start upload with Synapse 
 POST /file/multipart"]
            presigned_urls["Get URLs to upload to 
 POST /file/multipart/{upload_id}/presigned/url/batch"]
        end
    end

    before-upload --> during-upload

    subgraph during-upload
        subgraph multi-threaded["multi-threaded for each part"]
            read_part["Read part to upload into Memory"]
            read_part --> put_part["HTTP PUT to storage provider"]

            subgraph thread_locked1["Lock thread"]
                refresh_check{"New URl available?"}
                refresh_check --> |no|refresh
                refresh["Refresh remaining URLs to upload to 
 POST /file/multipart/{upload_id}/presigned/url/batch"]
            end


            put_part --> |URL Expired|refresh_check
            refresh_check --> |yes|put_part
            refresh --> put_part
            put_part --> |Finished|md5_part["Calculate MD5 of part"]
        end
        complete_part["PUT /file/multipart/{upload_id}/add/{part_number}?partMD5Hex={md5_hex}"]
        multi-threaded -->|Upload finished| complete_part
    end

    during-upload --> post-upload

    subgraph post-upload
        post_upload_compelete["PUT /file/multipart/{upload_id}/complete"]
        get_file_handle["GET /fileHandle/{file_handle_id}"]
    end

    post-upload --> entity["Create/Update Synapse entity"]

Classes

HandlePartResult dataclass

Result of a part upload.

ATTRIBUTE DESCRIPTION
part_number

The part number that was uploaded.

TYPE: int

part_size

The size of the part that was uploaded.

TYPE: int

md5_hex

The MD5 hash of the part that was uploaded.

TYPE: str

Source code in synapseclient/core/upload/multipart_upload_async.py
130
131
132
133
134
135
136
137
138
139
140
141
142
@dataclass
class HandlePartResult:
    """Result of a part upload.

    Attributes:
        part_number: The part number that was uploaded.
        part_size: The size of the part that was uploaded.
        md5_hex: The MD5 hash of the part that was uploaded.
    """

    part_number: int
    part_size: int
    md5_hex: str

UploadAttemptAsync

Used to handle multi-threaded operations for uploading one or parts of a file.

Source code in synapseclient/core/upload/multipart_upload_async.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
class UploadAttemptAsync:
    """
    Used to handle multi-threaded operations for uploading one or parts of a file.
    """

    def __init__(
        self,
        syn: "Synapse",
        dest_file_name: str,
        upload_request_payload: Dict[str, Any],
        part_request_body_provider_fn: Union[None, Callable[[int], bytes]],
        md5_fn: Callable[[bytes, httpx.Response], str],
        force_restart: bool,
        storage_str: str = None,
    ) -> None:
        self._syn = syn
        self._dest_file_name = dest_file_name
        self._part_size = upload_request_payload["partSizeBytes"]

        self._upload_request_payload = upload_request_payload

        self._part_request_body_provider_fn = part_request_body_provider_fn
        self._md5_fn = md5_fn

        self._force_restart = force_restart

        self._lock = asyncio.Lock()
        self._thread_lock = threading.Lock()
        self._aborted = False
        self._storage_str = storage_str

        # populated later
        self._upload_id: Optional[str] = None
        self._pre_signed_part_urls: Optional[Mapping[int, str]] = None
        self._progress_bar = None

    async def __call__(self) -> Dict[str, str]:
        """Orchestrate the upload of a file to Synapse."""
        upload_status_response = await post_file_multipart(
            upload_request_payload=self._upload_request_payload,
            force_restart=self._force_restart,
            endpoint=self._syn.fileHandleEndpoint,
            synapse_client=self._syn,
        )
        upload_state = upload_status_response.get("state")

        if upload_state != "COMPLETED":
            self._upload_id = upload_status_response["uploadId"]
            part_count, remaining_part_numbers = self._get_remaining_part_numbers(
                upload_status_response
            )

            # if no remaining part numbers then all the parts have been
            # uploaded but the upload has not been marked complete.
            if remaining_part_numbers:
                await self._upload_parts(part_count, remaining_part_numbers)
            upload_status_response = await self._complete_upload()

        return upload_status_response

    @classmethod
    def _get_remaining_part_numbers(
        cls, upload_status: Dict[str, str]
    ) -> Tuple[int, List[int]]:
        part_numbers = []
        parts_state = upload_status["partsState"]

        # parts are 1-based
        for i, part_status in enumerate(parts_state, 1):
            if part_status == "0":
                part_numbers.append(i)

        return len(parts_state), part_numbers

    def _is_copy(self) -> bool:
        # is this a copy or upload request
        return (
            self._upload_request_payload.get("concreteType")
            == concrete_types.MULTIPART_UPLOAD_COPY_REQUEST
        )

    async def _fetch_pre_signed_part_urls_async(
        self,
        upload_id: str,
        part_numbers: List[int],
    ) -> Mapping[int, str]:
        trace.get_current_span().set_attributes({"synapse.upload_id": upload_id})
        response = await post_file_multipart_presigned_urls(
            upload_id=upload_id,
            part_numbers=part_numbers,
            synapse_client=self._syn,
        )

        part_urls = {}
        for part in response["partPresignedUrls"]:
            part_urls[part["partNumber"]] = (
                part["uploadPresignedUrl"],
                part.get("signedHeaders", {}),
            )

        return part_urls

    def _refresh_pre_signed_part_urls(
        self,
        part_number: int,
        expired_url: str,
    ) -> Tuple[str, Dict[str, str]]:
        """Refresh all unfetched presigned urls, and return the refreshed
        url for the given part number. If an existing expired_url is passed
        and the url for the given part has already changed that new url
        will be returned without a refresh (i.e. it is assumed that another
        thread has already refreshed the url since the passed url expired).

        Arguments:
            part_number: the part number whose refreshed url should
                         be returned
            expired_url: the url that was detected as expired triggering
                         this refresh
        Returns:
            refreshed URL

        """
        with self._thread_lock:
            current_url, headers = self._pre_signed_part_urls[part_number]
            if current_url != expired_url:
                # if the url has already changed since the given url
                # was detected as expired we can assume that another
                # thread already refreshed the url and can avoid the extra
                # fetch.
                refreshed_url = current_url, headers
            else:
                self._pre_signed_part_urls = wrap_async_to_sync(
                    self._fetch_pre_signed_part_urls_async(
                        self._upload_id,
                        list(self._pre_signed_part_urls.keys()),
                    ),
                    syn=self._syn,
                )

                refreshed_url = self._pre_signed_part_urls[part_number]

        return refreshed_url

    async def _handle_part_wrapper(self, part_number: int) -> HandlePartResult:
        loop = asyncio.get_running_loop()
        otel_context = context.get_current()

        mem_info = psutil.virtual_memory()

        if mem_info.available <= self._part_size * 2:
            gc.collect()

        return await loop.run_in_executor(
            self._syn._get_thread_pool_executor(asyncio_event_loop=loop),
            self._handle_part,
            part_number,
            otel_context,
        )

    async def _upload_parts(
        self, part_count: int, remaining_part_numbers: List[int]
    ) -> None:
        """Take a list of part numbers and upload them to the pre-signed URLs.

        Arguments:
            part_count: The total number of parts in the upload.
            remaining_part_numbers: The parts that still need to be uploaded.
        """
        completed_part_count = part_count - len(remaining_part_numbers)
        file_size = self._upload_request_payload.get("fileSizeBytes")

        self._pre_signed_part_urls = await self._fetch_pre_signed_part_urls_async(
            upload_id=self._upload_id,
            part_numbers=remaining_part_numbers,
        )

        async_tasks = []

        for part_number in remaining_part_numbers:
            async_tasks.append(
                asyncio.create_task(self._handle_part_wrapper(part_number=part_number))
            )

        if not self._syn.silent and not self._progress_bar:
            if self._is_copy():
                # we won't have bytes to measure during a copy so the byte oriented
                # progress bar is not useful
                self._progress_bar = tqdm(
                    total=part_count,
                    desc=self._storage_str or "Copying",
                    unit_scale=True,
                    postfix=self._dest_file_name,
                    smoothing=0,
                )
                self._progress_bar.update(completed_part_count)
            else:
                previously_transferred = min(
                    completed_part_count * self._part_size,
                    file_size,
                )

                self._progress_bar = tqdm(
                    total=file_size,
                    desc=self._storage_str or "Uploading",
                    unit="B",
                    unit_scale=True,
                    postfix=self._dest_file_name,
                    smoothing=0,
                )
                self._progress_bar.update(previously_transferred)

        raised_exception = await self._orchestrate_upload_part_tasks(async_tasks)

        if raised_exception is not None:
            if isinstance(raised_exception, KeyboardInterrupt):
                raise SynapseUploadAbortedException(
                    "User interrupted upload"
                ) from raised_exception
            raise SynapseUploadFailedException(
                "Part upload failed"
            ) from raised_exception

    def _update_progress_bar(self, part_size: int) -> None:
        """Update the progress bar with the given part size."""
        if self._syn.silent or not self._progress_bar:
            return
        self._progress_bar.update(1 if self._is_copy() else part_size)

    async def _orchestrate_upload_part_tasks(
        self, async_tasks: List[asyncio.Task]
    ) -> Union[Exception, KeyboardInterrupt, None]:
        """
        Orchestrate the result of the upload part tasks. If successful, send a
        request to the server to add the part to the upload.

        Arguments:
            async_tasks: A set of tasks to orchestrate.

        Returns:
            An exception if one was raised, otherwise None.
        """
        raised_exception = None

        while async_tasks:
            done_tasks, pending_tasks = await asyncio.wait(
                async_tasks, return_when=asyncio.FIRST_COMPLETED
            )
            async_tasks = pending_tasks
            for completed_task in done_tasks:
                try:
                    task_result = completed_task.result()

                    if isinstance(task_result, HandlePartResult):
                        part_number = task_result.part_number
                        part_size = task_result.part_size
                        part_md5_hex = task_result.md5_hex
                    elif (
                        isinstance(task_result, AddPartResponse)
                        and task_result.add_part_state != "ADD_SUCCESS"
                    ):
                        # Restart the file upload process resuming where this left off.
                        # Rest docs state:
                        # "If add part fails for any reason, the client must re-upload
                        # the part and then re-attempt to add the part to the upload."
                        raise SynapseUploadFailedException(
                            (
                                "Adding individual part failed with unexpected state: "
                                f"{task_result.add_part_state}, for upload "
                                f"{task_result.upload_id} and part "
                                f"{task_result.part_number} with message: "
                                f"{task_result.error_message}"
                            )
                        )
                    else:
                        continue

                    async_tasks.add(
                        asyncio.create_task(
                            put_file_multipart_add(
                                upload_id=self._upload_id,
                                part_number=part_number,
                                md5_hex=part_md5_hex,
                                synapse_client=self._syn,
                            )
                        )
                    )

                    self._update_progress_bar(part_size=part_size)

                except (Exception, KeyboardInterrupt) as cause:
                    with self._thread_lock:
                        if self._aborted:
                            # we've already aborted, no need to raise
                            # another exception
                            continue
                        self._aborted = True
                    raised_exception = cause
                    continue
        return raised_exception

    async def _complete_upload(self) -> Dict[str, str]:
        """Close the upload and mark it as complete.

        Returns:
            The response from the server for the completed upload.
        """
        if not self._syn.silent and self._progress_bar:
            self._progress_bar.close()
        upload_status_response = await put_file_multipart_complete(
            upload_id=self._upload_id,
            endpoint=self._syn.fileHandleEndpoint,
            synapse_client=self._syn,
        )

        upload_state = upload_status_response.get("state")
        if upload_state != "COMPLETED":
            # at this point we think successfully uploaded all the parts
            # but the upload status isn't complete, we'll throw an error
            # and let a subsequent attempt try to reconcile
            raise SynapseUploadFailedException(
                f"Upload status has an unexpected state {upload_state}"
            )

        return upload_status_response

    def _handle_part(
        self, part_number: int, otel_context: Union[Context, None]
    ) -> HandlePartResult:
        """Take an individual part number and upload it to the pre-signed URL.

        Arguments:
            part_number: The part number to upload.
            otel_context: The OpenTelemetry context to use for tracing.

        Returns:
            The result of the part upload.

        Raises:
            SynapseUploadAbortedException: If the upload has been aborted.
            ValueError: If the part body is None.
        """
        if otel_context:
            context.attach(otel_context)
        with self._thread_lock:
            if self._aborted:
                # this upload attempt has already been aborted
                # so we short circuit the attempt to upload this part
                raise SynapseUploadAbortedException(
                    f"Upload aborted, skipping part {part_number}"
                )

            part_url, signed_headers = self._pre_signed_part_urls.get(part_number)

        session: httpx.Client = self._syn._requests_session_storage

        # obtain the body (i.e. the upload bytes) for the given part number.
        body = (
            self._part_request_body_provider_fn(part_number)
            if self._part_request_body_provider_fn
            else None
        )
        part_size = len(body) if body else 0
        self._syn.logger.debug(f"Uploading part {part_number} of size {part_size}")
        if not self._is_copy() and body is None:
            raise ValueError(f"No body for part {part_number}")

        response = self._put_part_with_retry(
            session=session,
            body=body,
            part_url=part_url,
            signed_headers=signed_headers,
            part_number=part_number,
        )

        md5_hex = self._md5_fn(body, response)
        del response
        del body

        # # remove so future batch pre_signed url fetches will exclude this part
        with self._thread_lock:
            del self._pre_signed_part_urls[part_number]

        return HandlePartResult(part_number, part_size, md5_hex)

    def _put_part_with_retry(
        self,
        session: httpx.Client,
        body: bytes,
        part_url: str,
        signed_headers: Dict[str, str],
        part_number: int,
    ) -> Union[httpx.Response, None]:
        """Put a part to the storage provider with retries.

        Arguments:
            session: The requests session to use for the put.
            body: The body of the part to put.
            part_url: The URL to put the part to.
            signed_headers: The signed headers to use for the put.
            part_number: The part number being put.

        Returns:
            The response from the put.

        Raises:
            SynapseHTTPError: If the put fails.
        """
        response = None
        for retry in range(2):
            try:
                # use our backoff mechanism here, we have encountered 500s on puts to AWS signed urls

                response = with_retry_time_based(
                    lambda part_url=part_url, signed_headers=signed_headers: session.put(
                        url=part_url,
                        content=body,  # noqa: F821
                        headers=signed_headers,
                    ),
                    retry_exceptions=[requests.exceptions.ConnectionError],
                )

                _raise_for_status_httpx(response=response, logger=self._syn.logger)

                # completed upload part to s3 successfully
                break

            except SynapseHTTPError as ex:
                if ex.response.status_code == 403 and retry < 1:
                    # we interpret this to mean our pre_signed url expired.
                    self._syn.logger.debug(
                        f"The pre-signed upload URL for part {part_number} has expired. "
                        "Refreshing urls and retrying.\n"
                    )

                    # we refresh all the urls and obtain this part's
                    # specific url for the retry
                    (
                        part_url,
                        signed_headers,
                    ) = self._refresh_pre_signed_part_urls(
                        part_number,
                        part_url,
                    )

                else:
                    raise
        return response

Functions

multipart_upload_file_async(syn, file_path, dest_file_name=None, content_type=None, part_size=None, storage_location_id=None, preview=True, force_restart=False, md5=None, storage_str=None) async

Upload a file to a Synapse upload destination in chunks.

PARAMETER DESCRIPTION
syn

a Synapse object

TYPE: Synapse

file_path

the file to upload

TYPE: str

dest_file_name

upload as a different filename

TYPE: str DEFAULT: None

content_type

Refers to the Content-Type of the API request.

TYPE: str DEFAULT: None

part_size

Number of bytes per part. Minimum is 5MiB (5 * 1024 * 1024 bytes).

TYPE: int DEFAULT: None

storage_location_id

an id indicating where the file should be stored. Retrieved from Synapse's UploadDestination

TYPE: str DEFAULT: None

preview

True to generate a preview

TYPE: bool DEFAULT: True

force_restart

True to restart a previously initiated upload from scratch, False to try to resume

TYPE: bool DEFAULT: False

md5

The MD5 of the file. If not provided, it will be calculated.

TYPE: str DEFAULT: None

storage_str

Optional string to append to the upload message

TYPE: str DEFAULT: None

RETURNS DESCRIPTION
str

a File Handle ID

Keyword arguments are passed down to _multipart_upload().

Source code in synapseclient/core/upload/multipart_upload_async.py
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
async def multipart_upload_file_async(
    syn: "Synapse",
    file_path: str,
    dest_file_name: str = None,
    content_type: str = None,
    part_size: int = None,
    storage_location_id: str = None,
    preview: bool = True,
    force_restart: bool = False,
    md5: str = None,
    storage_str: str = None,
) -> str:
    """Upload a file to a Synapse upload destination in chunks.

    Arguments:
        syn: a Synapse object
        file_path: the file to upload
        dest_file_name: upload as a different filename
        content_type: Refers to the Content-Type of the API request.
        part_size: Number of bytes per part. Minimum is 5MiB (5 * 1024 * 1024 bytes).
        storage_location_id: an id indicating where the file should be
                             stored. Retrieved from Synapse's UploadDestination
        preview: True to generate a preview
        force_restart: True to restart a previously initiated upload
                       from scratch, False to try to resume
        md5: The MD5 of the file. If not provided, it will be calculated.
        storage_str: Optional string to append to the upload message

    Returns:
        a File Handle ID

    Keyword arguments are passed down to
    [_multipart_upload()][synapseclient.core.upload.multipart_upload._multipart_upload].

    """
    trace.get_current_span().set_attributes(
        {
            "synapse.storage_location_id": (
                storage_location_id if storage_location_id is not None else ""
            )
        }
    )

    if not os.path.exists(file_path):
        raise IOError(f'File "{file_path}" not found.')
    if os.path.isdir(file_path):
        raise IOError(f'File "{file_path}" is a directory.')

    file_size = os.path.getsize(file_path)
    if not dest_file_name:
        dest_file_name = os.path.basename(file_path)

    if content_type is None:
        mime_type, _ = mimetypes.guess_type(file_path, strict=False)
        content_type = mime_type or "application/octet-stream"

    md5_hex = md5 or (
        await md5_for_file_multiprocessing(
            filename=file_path,
            process_pool_executor=syn._get_process_pool_executor(
                asyncio_event_loop=asyncio.get_running_loop()
            ),
            md5_semaphore=syn._get_md5_semaphore(
                asyncio_event_loop=asyncio.get_running_loop()
            ),
        )
    )

    part_size = get_part_size(
        part_size or DEFAULT_PART_SIZE,
        file_size,
        MIN_PART_SIZE,
        MAX_NUMBER_OF_PARTS,
    )

    upload_request = {
        "concreteType": concrete_types.MULTIPART_UPLOAD_REQUEST,
        "contentType": content_type,
        "contentMD5Hex": md5_hex,
        "fileName": dest_file_name,
        "fileSizeBytes": file_size,
        "generatePreview": preview,
        "partSizeBytes": part_size,
        "storageLocationId": storage_location_id,
    }

    def part_fn(part_number: int) -> bytes:
        """Return the nth chunk of a file."""
        return get_file_chunk(file_path, part_number, part_size)

    return await _multipart_upload_async(
        syn,
        dest_file_name,
        upload_request,
        part_fn,
        md5_fn_util,
        force_restart=force_restart,
        storage_str=storage_str,
    )

multipart_upload_string_async(syn, text, dest_file_name=None, part_size=None, content_type=None, storage_location_id=None, preview=True, force_restart=False) async

Upload a file to a Synapse upload destination in chunks.

PARAMETER DESCRIPTION
syn

a Synapse object

TYPE: Synapse

text

a string to upload as a file.

TYPE: str

dest_file_name

upload as a different filename

TYPE: str DEFAULT: None

content_type

Refers to the Content-Type of the API request.

TYPE: str DEFAULT: None

part_size

number of bytes per part. Minimum 5MB.

TYPE: int DEFAULT: None

storage_location_id

an id indicating where the file should be stored. Retrieved from Synapse's UploadDestination

TYPE: str DEFAULT: None

preview

True to generate a preview

TYPE: bool DEFAULT: True

force_restart

True to restart a previously initiated upload from scratch, False to try to resume

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
str

a File Handle ID

Keyword arguments are passed down to _multipart_upload().

Source code in synapseclient/core/upload/multipart_upload_async.py
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
async def multipart_upload_string_async(
    syn: "Synapse",
    text: str,
    dest_file_name: str = None,
    part_size: int = None,
    content_type: str = None,
    storage_location_id: str = None,
    preview: bool = True,
    force_restart: bool = False,
) -> str:
    """Upload a file to a Synapse upload destination in chunks.

    Arguments:
        syn: a Synapse object
        text: a string to upload as a file.
        dest_file_name: upload as a different filename
        content_type: Refers to the Content-Type of the API request.
        part_size: number of bytes per part. Minimum 5MB.
        storage_location_id: an id indicating where the file should be
                             stored. Retrieved from Synapse's UploadDestination
        preview: True to generate a preview
        force_restart: True to restart a previously initiated upload
                       from scratch, False to try to resume

    Returns:
        a File Handle ID

    Keyword arguments are passed down to
    [_multipart_upload()][synapseclient.core.upload.multipart_upload._multipart_upload].

    """
    data = text.encode("utf-8")
    file_size = len(data)
    md5_hex = md5_fn_util(data, None)

    if not dest_file_name:
        dest_file_name = "message.txt"

    if not content_type:
        content_type = "text/plain; charset=utf-8"

    part_size = get_part_size(
        part_size or DEFAULT_PART_SIZE, file_size, MIN_PART_SIZE, MAX_NUMBER_OF_PARTS
    )

    upload_request = {
        "concreteType": concrete_types.MULTIPART_UPLOAD_REQUEST,
        "contentType": content_type,
        "contentMD5Hex": md5_hex,
        "fileName": dest_file_name,
        "fileSizeBytes": file_size,
        "generatePreview": preview,
        "partSizeBytes": part_size,
        "storageLocationId": storage_location_id,
    }

    def part_fn(part_number: int) -> bytes:
        """Get the nth chunk of a buffer."""
        return get_data_chunk(data, part_number, part_size)

    part_size = get_part_size(
        part_size or DEFAULT_PART_SIZE, file_size, MIN_PART_SIZE, MAX_NUMBER_OF_PARTS
    )
    return await _multipart_upload_async(
        syn,
        dest_file_name,
        upload_request,
        part_fn,
        md5_fn_util,
        force_restart=force_restart,
    )

multipart_copy_async(syn, source_file_handle_association, dest_file_name=None, part_size=None, storage_location_id=None, preview=True, force_restart=False) async

Makes a Multipart Upload Copy Request. This request performs a copy of an existing file handle without data transfer from the client.

PARAMETER DESCRIPTION
syn

A Synapse object

TYPE: Synapse

source_file_handle_association

Describes an association of a FileHandle with another object.

TYPE: Dict[str, str]

dest_file_name

The name of the file to be uploaded.

TYPE: str DEFAULT: None

part_size

The size that each part will be (in bytes).

TYPE: int DEFAULT: None

storage_location_id

The identifier of the storage location where this file should be copied to. The user must be the owner of the storage location.

TYPE: str DEFAULT: None

preview

True to generate a preview of the data.

TYPE: bool DEFAULT: True

force_restart

True to restart a previously initiated upload from scratch, False to try to resume.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
str

a File Handle ID

Keyword arguments are passed down to _multipart_upload().

Source code in synapseclient/core/upload/multipart_upload_async.py
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
async def multipart_copy_async(
    syn: "Synapse",
    source_file_handle_association: Dict[str, str],
    dest_file_name: str = None,
    part_size: int = None,
    storage_location_id: str = None,
    preview: bool = True,
    force_restart: bool = False,
) -> str:
    """Makes a
    [Multipart Upload Copy Request](https://rest-docs.synapse.org/rest/org/sagebionetworks/repo/model/file/MultipartUploadCopyRequest.html).
    This request performs a copy of an existing file handle without data transfer from the client.

    Arguments:
        syn: A Synapse object
        source_file_handle_association: Describes an association of a FileHandle with another object.
        dest_file_name: The name of the file to be uploaded.
        part_size: The size that each part will be (in bytes).
        storage_location_id: The identifier of the storage location where this file should be copied to.
                             The user must be the owner of the storage location.
        preview: True to generate a preview of the data.
        force_restart: True to restart a previously initiated upload from scratch, False to try to resume.

    Returns:
        a File Handle ID

    Keyword arguments are passed down to
    [_multipart_upload()][synapseclient.core.upload.multipart_upload._multipart_upload].

    """
    part_size = part_size or DEFAULT_PART_SIZE

    upload_request = {
        "concreteType": concrete_types.MULTIPART_UPLOAD_COPY_REQUEST,
        "fileName": dest_file_name,
        "generatePreview": preview,
        "partSizeBytes": part_size,
        "sourceFileHandleAssociation": source_file_handle_association,
        "storageLocationId": storage_location_id,
    }

    return await _multipart_upload_async(
        syn,
        dest_file_name,
        upload_request,
        copy_part_request_body_provider_fn,
        copy_md5_fn,
        force_restart=force_restart,
    )

Multithreaded Downloading

synapseclient.core.multithread_download

Classes

DownloadRequest

Bases: NamedTuple

A request to download a file from Synapse

ATTRIBUTE DESCRIPTION
file_handle_id

The file handle ID to download.

object_id

The Synapse object this file associated to.

object_type

The type of the associated Synapse object.

path

The local path to download the file to. This path can be either an absolute path or a relative path from where the code is executed to the download location.

debug

A boolean to specify if debug mode is on.

TYPE: bool

Source code in synapseclient/core/multithread_download/download_threads.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
class DownloadRequest(NamedTuple):
    """
    A request to download a file from Synapse

    Attributes:
        file_handle_id : The file handle ID to download.
        object_id : The Synapse object this file associated to.
        object_type : The type of the associated Synapse object.
        path : The local path to download the file to.
            This path can be either an absolute path or
            a relative path from where the code is executed to the download location.
        debug: A boolean to specify if debug mode is on.
    """

    file_handle_id: int
    object_id: str
    object_type: str
    path: str
    debug: bool = False

Functions

download_file(client, download_request, *, max_concurrent_parts=None)

Main driver for the multi-threaded download. Users an ExecutorService, either set externally onto a thread local by an outside process, or creating one as needed otherwise.

PARAMETER DESCRIPTION
client

A synapseclient

download_request

A batch of DownloadRequest objects specifying what Synapse files to download

TYPE: DownloadRequest

max_concurrent_parts

The maximum concurrent number parts to download at once when downloading this file

TYPE: int DEFAULT: None

Source code in synapseclient/core/multithread_download/download_threads.py
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
def download_file(
    client,
    download_request: DownloadRequest,
    *,
    max_concurrent_parts: int = None,
):
    """
    Main driver for the multi-threaded download. Users an ExecutorService,
    either set externally onto a thread local by an outside process,
    or creating one as needed otherwise.

    Arguments:
        client: A synapseclient
        download_request: A batch of DownloadRequest objects specifying what
                            Synapse files to download
        max_concurrent_parts: The maximum concurrent number parts to download
                                at once when downloading this file
    """

    # we obtain an executor from a thread local if we are in the context of a Synapse sync
    # and wan't to re-use the same threadpool as was created for that
    executor = getattr(_thread_local, "executor", None)
    shutdown_after = False
    if not executor:
        shutdown_after = True
        executor = get_executor(client.max_threads)

    max_concurrent_parts = max_concurrent_parts or client.max_threads
    try:
        downloader = _MultithreadedDownloader(client, executor, max_concurrent_parts)
        downloader.download_file(download_request)
    finally:
        # if we created the Executor for the purposes of processing this download we also
        # shut it down. if it was passed in from the outside then it's managed by the caller
        if shutdown_after:
            executor.shutdown()

shared_executor(executor)

An outside process that will eventually trigger a download through the this module can configure a shared Executor by running its code within this context manager.

Source code in synapseclient/core/multithread_download/download_threads.py
39
40
41
42
43
44
45
46
47
@contextmanager
def shared_executor(executor):
    """An outside process that will eventually trigger a download through the this module
    can configure a shared Executor by running its code within this context manager."""
    _thread_local.executor = executor
    try:
        yield
    finally:
        del _thread_local.executor

Cache

synapseclient.core.cache


File Caching


Implements a cache on local disk for Synapse file entities and other objects with a FileHandle. This is part of the internal implementation of the client and should not be accessed directly by users of the client.

Classes

Cache

Represent a cache in which files are accessed by file handle ID.

Source code in synapseclient/core/cache.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
class Cache:
    """
    Represent a cache in which files are accessed by file handle ID.
    """

    def __setattr__(self, key, value):
        # expand out home shortcut ('~') and environment variables when setting cache_root_dir
        if key == "cache_root_dir":
            value = os.path.expandvars(os.path.expanduser(value))
            # create the cache_root_dir if it does not already exist
            if not os.path.exists(value):
                os.makedirs(value)
        self.__dict__[key] = value

    def __init__(self, cache_root_dir=CACHE_ROOT_DIR, fanout=1000):
        # set root dir of cache in which meta data will be stored and files
        # will be stored here by default, but other locations can be specified
        self.cache_root_dir = cache_root_dir
        self.fanout = fanout
        self.cache_map_file_name = ".cacheMap"

    def get_cache_dir(
        self, file_handle_id: typing.Union[collections.abc.Mapping, str]
    ) -> str:
        if isinstance(file_handle_id, collections.abc.Mapping):
            if "dataFileHandleId" in file_handle_id:
                file_handle_id = file_handle_id["dataFileHandleId"]
            elif (
                "concreteType" in file_handle_id
                and "id" in file_handle_id
                and file_handle_id["concreteType"].startswith(
                    "org.sagebionetworks.repo.model.file"
                )
            ):
                file_handle_id = file_handle_id["id"]
        return os.path.join(
            self.cache_root_dir,
            str(int(file_handle_id) % self.fanout),
            str(file_handle_id),
        )

    def _read_cache_map(self, cache_dir: str) -> dict:
        cache_map_file = os.path.join(cache_dir, self.cache_map_file_name)

        if not os.path.exists(cache_map_file):
            return {}

        with open(cache_map_file, "r") as f:
            try:
                cache_map = json.load(f)
            except json.decoder.JSONDecodeError:
                # a corrupt cache map file that is not parseable as JSON is treated
                # as if it does not exist at all (will be overwritten).
                return {}

        return cache_map

    def _write_cache_map(self, cache_dir: str, cache_map: dict) -> None:
        if not os.path.exists(cache_dir):
            os.makedirs(cache_dir)

        cache_map_file = os.path.join(cache_dir, self.cache_map_file_name)

        with open(cache_map_file, "w") as f:
            json.dump(cache_map, f)
            f.write("\n")  # For compatibility with R's JSON parser

    def _get_cache_modified_time(
        self, cache_map_entry: typing.Union[str, dict, None]
    ) -> typing.Union[str, None]:
        """
        Retrieve the `modified_time` from the `cache_map_entry`. This needed to be
        backwards-compatible any cache entries that do not have the new JSON structure
        for data. That means that if the cache_map_entry has a `modified_time` key,
        then it is a new entry and we can return the value. If it does not, then it
        is an old entry and we should return the `cache_map_entry` itself.

        The caveat is that `cache_map_entry` needs to be a string to return the value
        otherwise it will return None.

        Arguments:
            cache_map_entry: The entry from the cache map

        Returns:
            The modified time if it exists, otherwise the cache_map_entry
        """
        if cache_map_entry is not None and "modified_time" in cache_map_entry:
            return cache_map_entry.get("modified_time", None)
        elif cache_map_entry is not None and isinstance(cache_map_entry, str):
            return cache_map_entry
        return None

    def _get_cache_content_md5(
        self, cache_map_entry: typing.Union[str, dict, None]
    ) -> typing.Union[str, None]:
        """
        Retrieve the `content_md5` from the cache_map_entry.

        Arguments:
            cache_map_entry: The entry from the cache map

        Returns:
            The content md5 if it exists, otherwise None
        """
        if cache_map_entry is not None and "content_md5" in cache_map_entry:
            return cache_map_entry.get("content_md5", None)
        else:
            return None

    def _cache_item_unmodified(
        self, cache_map_entry: typing.Union[str, dict], path: str
    ) -> bool:
        """
        Determine if the cache_map_entry is unmodified by comparing the modified_time
        and content_md5 to the file at the given path.

        Arguments:
            cache_map_entry: The entry from the cache map
            path:            The path to the file to compare to

        Returns:
            True if the cache_map_entry is unmodified, otherwise False
        """
        cached_time = self._get_cache_modified_time(cache_map_entry)
        cached_md5 = self._get_cache_content_md5(cache_map_entry)

        # compare_timestamps has an implicit check for whether the path exists
        return compare_timestamps(_get_modified_time(path), cached_time) and (
            cached_md5 is None or cached_md5 == utils.md5_for_file(path).hexdigest()
        )

    def contains(
        self, file_handle_id: typing.Union[collections.abc.Mapping, str], path: str
    ) -> bool:
        """
        Given a file and file_handle_id, return True if an unmodified cached
        copy of the file exists at the exact path given or False otherwise.

        Arguments:
            file_handle_id: The ID of the fileHandle
            path:           The file path at which to look for a cached copy
        """
        cache_dir = self.get_cache_dir(file_handle_id)
        if not os.path.exists(cache_dir):
            return False

        with Lock(self.cache_map_file_name, dir=cache_dir):
            cache_map = self._read_cache_map(cache_dir)

            path = utils.normalize_path(path)

            cached_time = self._get_cache_modified_time(cache_map.get(path, None))

            if cached_time:
                return compare_timestamps(_get_modified_time(path), cached_time)
        return False

    @tracer.start_as_current_span("cache::get")
    def get(
        self,
        file_handle_id: typing.Union[collections.abc.Mapping, str],
        path: str = None,
    ) -> typing.Union[str, None]:
        """
        Retrieve a file with the given file handle from the cache.

        Arguments:
            file_handle_id: The ID of the fileHandle
            path:           If the given path is None, look for a cached copy of the
                            file in the cache directory. If the path is a directory,
                            look there for a cached copy. If a full file-path is
                            given, only check whether that exact file exists and is
                            unmodified since it was cached.

        Returns:
            Either a file path, if an unmodified cached copy of the file
            exists in the specified location or None if it does not
        """
        cache_dir = self.get_cache_dir(file_handle_id)
        trace.get_current_span().set_attributes(
            {
                "synapse.cache.dir": cache_dir,
                "synapse.cache.file_handle_id": file_handle_id,
            }
        )
        if not os.path.exists(cache_dir):
            trace.get_current_span().set_attributes({"synapse.cache.hit": False})
            return None

        with Lock(self.cache_map_file_name, dir=cache_dir):
            cache_map = self._read_cache_map(cache_dir)

            path = utils.normalize_path(path)

            # If the caller specifies a path and that path exists in the cache
            # but has been modified, we need to indicate no match by returning
            # None. The logic for updating a synapse entity depends on this to
            # determine the need to upload a new file.

            if path is not None:
                # If we're given a path to a directory, look for a cached file in that directory
                if os.path.isdir(path):
                    matching_unmodified_directory = None
                    removed_entry_from_cache = (
                        False  # determines if cache_map needs to be rewritten to disk
                    )

                    # iterate a copy of cache_map to allow modifying original cache_map
                    for cached_file_path, cache_map_entry in dict(cache_map).items():
                        if path == os.path.dirname(cached_file_path):
                            if self._cache_item_unmodified(
                                cache_map_entry, cached_file_path
                            ):
                                # "break" instead of "return" to write removed invalid entries to disk if necessary
                                matching_unmodified_directory = cached_file_path
                                break
                            else:
                                # remove invalid cache entries pointing to files that that no longer exist
                                # or have been modified
                                del cache_map[cached_file_path]
                                removed_entry_from_cache = True

                    if removed_entry_from_cache:
                        # write cache_map with non-existent entries removed
                        self._write_cache_map(cache_dir, cache_map)

                    if matching_unmodified_directory is not None:
                        trace.get_current_span().set_attributes(
                            {"synapse.cache.hit": True}
                        )
                        return matching_unmodified_directory

                # if we're given a full file path, look up a matching file in the cache
                else:
                    cache_map_entry = cache_map.get(path, None)
                    if cache_map_entry:
                        matching_file_path = (
                            path
                            if self._cache_item_unmodified(cache_map_entry, path)
                            else None
                        )
                        trace.get_current_span().set_attributes(
                            {"synapse.cache.hit": matching_file_path is not None}
                        )
                        return matching_file_path

            # return most recently cached and unmodified file OR
            # None if there are no unmodified files
            for cached_file_path, cache_map_entry in sorted(
                cache_map.items(),
                key=lambda item: (
                    item[1]["modified_time"] if isinstance(item[1], dict) else item[1]
                ),
                reverse=True,
            ):
                if self._cache_item_unmodified(cache_map_entry, cached_file_path):
                    trace.get_current_span().set_attributes({"synapse.cache.hit": True})
                    return cached_file_path

            trace.get_current_span().set_attributes({"synapse.cache.hit": False})
            return None

    def add(
        self,
        file_handle_id: typing.Union[collections.abc.Mapping, str],
        path: str,
        md5: str = None,
    ) -> dict:
        """
        Add a file to the cache
        """
        if not path or not os.path.exists(path):
            raise ValueError('Can\'t find file "%s"' % path)

        cache_dir = self.get_cache_dir(file_handle_id)
        with Lock(self.cache_map_file_name, dir=cache_dir):
            cache_map = self._read_cache_map(cache_dir)

            path = utils.normalize_path(path)
            # write .000 milliseconds for backward compatibility
            cache_map[path] = {
                "modified_time": epoch_time_to_iso(
                    math.floor(_get_modified_time(path))
                ),
                "content_md5": md5 or utils.md5_for_file(path).hexdigest(),
            }
            self._write_cache_map(cache_dir, cache_map)

        return cache_map

    def remove(
        self,
        file_handle_id: typing.Union[collections.abc.Mapping, str],
        path: str = None,
        delete: bool = None,
    ) -> typing.List[str]:
        """
        Remove a file from the cache.

        Arguments:
            file_handle_id: Will also extract file handle id from either a File or file handle
            path:           If the given path is None, remove (and potentially delete)
                            all cached copies. If the path is that of a file in the
                            .cacheMap file, remove it.
            delete:         If True, delete the file from disk as well as removing it from the cache

        Returns:
            A list of files removed
        """
        removed = []
        cache_dir = self.get_cache_dir(file_handle_id)

        # if we've passed an entity and not a path, get path from entity
        if (
            path is None
            and isinstance(file_handle_id, collections.abc.Mapping)
            and "path" in file_handle_id
        ):
            path = file_handle_id["path"]

        with Lock(self.cache_map_file_name, dir=cache_dir):
            cache_map = self._read_cache_map(cache_dir)

            if path is None:
                for path in cache_map:
                    if delete is True and os.path.exists(path):
                        os.remove(path)
                    removed.append(path)
                cache_map = {}
            else:
                path = utils.normalize_path(path)
                if path in cache_map:
                    if delete is True and os.path.exists(path):
                        os.remove(path)
                    del cache_map[path]
                    removed.append(path)

            self._write_cache_map(cache_dir, cache_map)

        return removed

    def _cache_dirs(self):
        """
        Generate a list of all cache dirs, directories of the form:
        [cache.cache_root_dir]/949/59949
        """
        for item1 in os.listdir(self.cache_root_dir):
            path1 = os.path.join(self.cache_root_dir, item1)
            if os.path.isdir(path1) and re.match("\\d+", item1):
                for item2 in os.listdir(path1):
                    path2 = os.path.join(path1, item2)
                    if os.path.isdir(path2) and re.match("\\d+", item2):
                        yield path2

    def purge(
        self,
        before_date: typing.Union[datetime.datetime, int] = None,
        after_date: typing.Union[datetime.datetime, int] = None,
        dry_run: bool = False,
    ) -> int:
        """
        Purge the cache. Use with caution. Deletes files whose cache maps were last updated in a specified period.

        Deletes .cacheMap files and files stored in the cache.cache_root_dir, but does not delete files stored outside
        the cache.

        Arguments:
            before_date: If specified, all files before this date will be removed
            after_date:  If specified, all files after this date will be removed
            dry_run:     If dry_run is True, then the selected files are printed rather than removed

        Returns:
            The number of files selected for removal

        Example: Using this function
            Either the before_date or after_date must be specified. If both are passed, files between the two dates are
            selected for removal. Dates must either be a timezone naive Python datetime.datetime instance or the number
            of seconds since the unix epoch. For example to delete all the files modified in January 2021, either of the
            following can be used::

            using offset naive datetime objects

                cache.purge(after_date=datetime.datetime(2021, 1, 1), before_date=datetime.datetime(2021, 2, 1))

            using seconds since the unix epoch

                cache.purge(after_date=1609459200, before_date=1612137600)
        """
        if before_date is None and after_date is None:
            raise ValueError("Either before date or after date should be provided")

        if isinstance(before_date, datetime.datetime):
            before_date = utils.to_unix_epoch_time_secs(before_date)
        if isinstance(after_date, datetime.datetime):
            after_date = utils.to_unix_epoch_time_secs(after_date)

        if before_date and after_date and before_date < after_date:
            raise ValueError("Before date should be larger than after date")

        count = 0
        for cache_dir in self._cache_dirs():
            # _get_modified_time returns None if the cache map file doesn't
            # exist and n > None evaluates to True in python 2.7(wtf?). I'm guessing it's
            # OK to purge directories in the cache that have no .cacheMap file

            last_modified_time = _get_modified_time(
                os.path.join(cache_dir, self.cache_map_file_name)
            )
            if last_modified_time is None or (
                (not before_date or before_date > last_modified_time)
                and (not after_date or after_date < last_modified_time)
            ):
                if dry_run:
                    print(cache_dir)
                else:
                    shutil.rmtree(cache_dir)
                count += 1
        return count
Functions
contains(file_handle_id, path)

Given a file and file_handle_id, return True if an unmodified cached copy of the file exists at the exact path given or False otherwise.

PARAMETER DESCRIPTION
file_handle_id

The ID of the fileHandle

TYPE: Union[Mapping, str]

path

The file path at which to look for a cached copy

TYPE: str

Source code in synapseclient/core/cache.py
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
def contains(
    self, file_handle_id: typing.Union[collections.abc.Mapping, str], path: str
) -> bool:
    """
    Given a file and file_handle_id, return True if an unmodified cached
    copy of the file exists at the exact path given or False otherwise.

    Arguments:
        file_handle_id: The ID of the fileHandle
        path:           The file path at which to look for a cached copy
    """
    cache_dir = self.get_cache_dir(file_handle_id)
    if not os.path.exists(cache_dir):
        return False

    with Lock(self.cache_map_file_name, dir=cache_dir):
        cache_map = self._read_cache_map(cache_dir)

        path = utils.normalize_path(path)

        cached_time = self._get_cache_modified_time(cache_map.get(path, None))

        if cached_time:
            return compare_timestamps(_get_modified_time(path), cached_time)
    return False
get(file_handle_id, path=None)

Retrieve a file with the given file handle from the cache.

PARAMETER DESCRIPTION
file_handle_id

The ID of the fileHandle

TYPE: Union[Mapping, str]

path

If the given path is None, look for a cached copy of the file in the cache directory. If the path is a directory, look there for a cached copy. If a full file-path is given, only check whether that exact file exists and is unmodified since it was cached.

TYPE: str DEFAULT: None

RETURNS DESCRIPTION
Union[str, None]

Either a file path, if an unmodified cached copy of the file

Union[str, None]

exists in the specified location or None if it does not

Source code in synapseclient/core/cache.py
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
@tracer.start_as_current_span("cache::get")
def get(
    self,
    file_handle_id: typing.Union[collections.abc.Mapping, str],
    path: str = None,
) -> typing.Union[str, None]:
    """
    Retrieve a file with the given file handle from the cache.

    Arguments:
        file_handle_id: The ID of the fileHandle
        path:           If the given path is None, look for a cached copy of the
                        file in the cache directory. If the path is a directory,
                        look there for a cached copy. If a full file-path is
                        given, only check whether that exact file exists and is
                        unmodified since it was cached.

    Returns:
        Either a file path, if an unmodified cached copy of the file
        exists in the specified location or None if it does not
    """
    cache_dir = self.get_cache_dir(file_handle_id)
    trace.get_current_span().set_attributes(
        {
            "synapse.cache.dir": cache_dir,
            "synapse.cache.file_handle_id": file_handle_id,
        }
    )
    if not os.path.exists(cache_dir):
        trace.get_current_span().set_attributes({"synapse.cache.hit": False})
        return None

    with Lock(self.cache_map_file_name, dir=cache_dir):
        cache_map = self._read_cache_map(cache_dir)

        path = utils.normalize_path(path)

        # If the caller specifies a path and that path exists in the cache
        # but has been modified, we need to indicate no match by returning
        # None. The logic for updating a synapse entity depends on this to
        # determine the need to upload a new file.

        if path is not None:
            # If we're given a path to a directory, look for a cached file in that directory
            if os.path.isdir(path):
                matching_unmodified_directory = None
                removed_entry_from_cache = (
                    False  # determines if cache_map needs to be rewritten to disk
                )

                # iterate a copy of cache_map to allow modifying original cache_map
                for cached_file_path, cache_map_entry in dict(cache_map).items():
                    if path == os.path.dirname(cached_file_path):
                        if self._cache_item_unmodified(
                            cache_map_entry, cached_file_path
                        ):
                            # "break" instead of "return" to write removed invalid entries to disk if necessary
                            matching_unmodified_directory = cached_file_path
                            break
                        else:
                            # remove invalid cache entries pointing to files that that no longer exist
                            # or have been modified
                            del cache_map[cached_file_path]
                            removed_entry_from_cache = True

                if removed_entry_from_cache:
                    # write cache_map with non-existent entries removed
                    self._write_cache_map(cache_dir, cache_map)

                if matching_unmodified_directory is not None:
                    trace.get_current_span().set_attributes(
                        {"synapse.cache.hit": True}
                    )
                    return matching_unmodified_directory

            # if we're given a full file path, look up a matching file in the cache
            else:
                cache_map_entry = cache_map.get(path, None)
                if cache_map_entry:
                    matching_file_path = (
                        path
                        if self._cache_item_unmodified(cache_map_entry, path)
                        else None
                    )
                    trace.get_current_span().set_attributes(
                        {"synapse.cache.hit": matching_file_path is not None}
                    )
                    return matching_file_path

        # return most recently cached and unmodified file OR
        # None if there are no unmodified files
        for cached_file_path, cache_map_entry in sorted(
            cache_map.items(),
            key=lambda item: (
                item[1]["modified_time"] if isinstance(item[1], dict) else item[1]
            ),
            reverse=True,
        ):
            if self._cache_item_unmodified(cache_map_entry, cached_file_path):
                trace.get_current_span().set_attributes({"synapse.cache.hit": True})
                return cached_file_path

        trace.get_current_span().set_attributes({"synapse.cache.hit": False})
        return None
add(file_handle_id, path, md5=None)

Add a file to the cache

Source code in synapseclient/core/cache.py
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
def add(
    self,
    file_handle_id: typing.Union[collections.abc.Mapping, str],
    path: str,
    md5: str = None,
) -> dict:
    """
    Add a file to the cache
    """
    if not path or not os.path.exists(path):
        raise ValueError('Can\'t find file "%s"' % path)

    cache_dir = self.get_cache_dir(file_handle_id)
    with Lock(self.cache_map_file_name, dir=cache_dir):
        cache_map = self._read_cache_map(cache_dir)

        path = utils.normalize_path(path)
        # write .000 milliseconds for backward compatibility
        cache_map[path] = {
            "modified_time": epoch_time_to_iso(
                math.floor(_get_modified_time(path))
            ),
            "content_md5": md5 or utils.md5_for_file(path).hexdigest(),
        }
        self._write_cache_map(cache_dir, cache_map)

    return cache_map
remove(file_handle_id, path=None, delete=None)

Remove a file from the cache.

PARAMETER DESCRIPTION
file_handle_id

Will also extract file handle id from either a File or file handle

TYPE: Union[Mapping, str]

path

If the given path is None, remove (and potentially delete) all cached copies. If the path is that of a file in the .cacheMap file, remove it.

TYPE: str DEFAULT: None

delete

If True, delete the file from disk as well as removing it from the cache

TYPE: bool DEFAULT: None

RETURNS DESCRIPTION
List[str]

A list of files removed

Source code in synapseclient/core/cache.py
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
def remove(
    self,
    file_handle_id: typing.Union[collections.abc.Mapping, str],
    path: str = None,
    delete: bool = None,
) -> typing.List[str]:
    """
    Remove a file from the cache.

    Arguments:
        file_handle_id: Will also extract file handle id from either a File or file handle
        path:           If the given path is None, remove (and potentially delete)
                        all cached copies. If the path is that of a file in the
                        .cacheMap file, remove it.
        delete:         If True, delete the file from disk as well as removing it from the cache

    Returns:
        A list of files removed
    """
    removed = []
    cache_dir = self.get_cache_dir(file_handle_id)

    # if we've passed an entity and not a path, get path from entity
    if (
        path is None
        and isinstance(file_handle_id, collections.abc.Mapping)
        and "path" in file_handle_id
    ):
        path = file_handle_id["path"]

    with Lock(self.cache_map_file_name, dir=cache_dir):
        cache_map = self._read_cache_map(cache_dir)

        if path is None:
            for path in cache_map:
                if delete is True and os.path.exists(path):
                    os.remove(path)
                removed.append(path)
            cache_map = {}
        else:
            path = utils.normalize_path(path)
            if path in cache_map:
                if delete is True and os.path.exists(path):
                    os.remove(path)
                del cache_map[path]
                removed.append(path)

        self._write_cache_map(cache_dir, cache_map)

    return removed
purge(before_date=None, after_date=None, dry_run=False)

Purge the cache. Use with caution. Deletes files whose cache maps were last updated in a specified period.

Deletes .cacheMap files and files stored in the cache.cache_root_dir, but does not delete files stored outside the cache.

PARAMETER DESCRIPTION
before_date

If specified, all files before this date will be removed

TYPE: Union[datetime, int] DEFAULT: None

after_date

If specified, all files after this date will be removed

TYPE: Union[datetime, int] DEFAULT: None

dry_run

If dry_run is True, then the selected files are printed rather than removed

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
int

The number of files selected for removal

Using this function

Either the before_date or after_date must be specified. If both are passed, files between the two dates are selected for removal. Dates must either be a timezone naive Python datetime.datetime instance or the number of seconds since the unix epoch. For example to delete all the files modified in January 2021, either of the following can be used::

using offset naive datetime objects

cache.purge(after_date=datetime.datetime(2021, 1, 1), before_date=datetime.datetime(2021, 2, 1))

using seconds since the unix epoch

cache.purge(after_date=1609459200, before_date=1612137600)
Source code in synapseclient/core/cache.py
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
def purge(
    self,
    before_date: typing.Union[datetime.datetime, int] = None,
    after_date: typing.Union[datetime.datetime, int] = None,
    dry_run: bool = False,
) -> int:
    """
    Purge the cache. Use with caution. Deletes files whose cache maps were last updated in a specified period.

    Deletes .cacheMap files and files stored in the cache.cache_root_dir, but does not delete files stored outside
    the cache.

    Arguments:
        before_date: If specified, all files before this date will be removed
        after_date:  If specified, all files after this date will be removed
        dry_run:     If dry_run is True, then the selected files are printed rather than removed

    Returns:
        The number of files selected for removal

    Example: Using this function
        Either the before_date or after_date must be specified. If both are passed, files between the two dates are
        selected for removal. Dates must either be a timezone naive Python datetime.datetime instance or the number
        of seconds since the unix epoch. For example to delete all the files modified in January 2021, either of the
        following can be used::

        using offset naive datetime objects

            cache.purge(after_date=datetime.datetime(2021, 1, 1), before_date=datetime.datetime(2021, 2, 1))

        using seconds since the unix epoch

            cache.purge(after_date=1609459200, before_date=1612137600)
    """
    if before_date is None and after_date is None:
        raise ValueError("Either before date or after date should be provided")

    if isinstance(before_date, datetime.datetime):
        before_date = utils.to_unix_epoch_time_secs(before_date)
    if isinstance(after_date, datetime.datetime):
        after_date = utils.to_unix_epoch_time_secs(after_date)

    if before_date and after_date and before_date < after_date:
        raise ValueError("Before date should be larger than after date")

    count = 0
    for cache_dir in self._cache_dirs():
        # _get_modified_time returns None if the cache map file doesn't
        # exist and n > None evaluates to True in python 2.7(wtf?). I'm guessing it's
        # OK to purge directories in the cache that have no .cacheMap file

        last_modified_time = _get_modified_time(
            os.path.join(cache_dir, self.cache_map_file_name)
        )
        if last_modified_time is None or (
            (not before_date or before_date > last_modified_time)
            and (not after_date or after_date < last_modified_time)
        ):
            if dry_run:
                print(cache_dir)
            else:
                shutil.rmtree(cache_dir)
            count += 1
    return count

Functions

epoch_time_to_iso(epoch_time)

Convert seconds since unix epoch to a string in ISO format

Source code in synapseclient/core/cache.py
31
32
33
34
35
36
37
38
39
def epoch_time_to_iso(epoch_time):
    """
    Convert seconds since unix epoch to a string in ISO format
    """
    return (
        None
        if epoch_time is None
        else utils.datetime_to_iso(utils.from_unix_epoch_time_secs(epoch_time))
    )

iso_time_to_epoch(iso_time)

Convert an ISO formatted time into seconds since unix epoch

Source code in synapseclient/core/cache.py
42
43
44
45
46
47
48
49
50
def iso_time_to_epoch(iso_time):
    """
    Convert an ISO formatted time into seconds since unix epoch
    """
    return (
        None
        if iso_time is None
        else utils.to_unix_epoch_time_secs(utils.iso_to_datetime(iso_time))
    )

compare_timestamps(modified_time, cached_time)

Compare two ISO formatted timestamps, with a special case when cached_time ends in .000Z.

For backward compatibility, we always write .000 for milliseconds into the cache. We then match a cached time ending in .000Z, meaning zero milliseconds with a modified time with any number of milliseconds.

PARAMETER DESCRIPTION
modified_time

The float representing seconds since unix epoch

cached_time

The string holding a ISO formatted time

Source code in synapseclient/core/cache.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
def compare_timestamps(modified_time, cached_time):
    """
    Compare two ISO formatted timestamps, with a special case when cached_time ends in .000Z.

    For backward compatibility, we always write .000 for milliseconds into the cache.
    We then match a cached time ending in .000Z, meaning zero milliseconds with a modified time with any number of
    milliseconds.

    Arguments:
        modified_time: The float representing seconds since unix epoch
        cached_time:   The string holding a ISO formatted time
    """
    if cached_time is None or modified_time is None:
        return False
    if cached_time.endswith(".000Z"):
        return cached_time == epoch_time_to_iso(math.floor(modified_time))
    else:
        return cached_time == epoch_time_to_iso(modified_time)

Credentials

synapseclient.core.credentials.cred_data.SynapseCredentials

Bases: AuthBase, ABC

Source code in synapseclient/core/credentials/cred_data.py
11
12
13
14
15
16
17
18
19
20
class SynapseCredentials(requests.auth.AuthBase, abc.ABC):
    @property
    @abc.abstractmethod
    def username(self) -> None:
        """The username associated with these credentials."""

    @property
    @abc.abstractmethod
    def secret(self) -> None:
        """The secret associated with these credentials."""

Attributes

username: None abstractmethod property

The username associated with these credentials.

secret: None abstractmethod property

The secret associated with these credentials.

synapseclient.core.credentials.cred_data.SynapseAuthTokenCredentials

Bases: SynapseCredentials

Source code in synapseclient/core/credentials/cred_data.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
class SynapseAuthTokenCredentials(SynapseCredentials):
    @classmethod
    def get_keyring_service_name(
        cls,
    ) -> typing.Literal["SYNAPSE.ORG_CLIENT_AUTH_TOKEN"]:
        return "SYNAPSE.ORG_CLIENT_AUTH_TOKEN"

    @classmethod
    def _validate_token(cls, token):
        # decode the token to ensure it minimally has view scope.
        # if it doesn't raise an error, the client will not be useful without it.

        # if for any reason we are not able to decode the token and check its scopes
        # we do NOT raise an error. this is to accommodate the possibility of a changed
        # token format some day that this version of the client may still be able to
        # pass as a bearer token.
        try:
            token_body = json.loads(
                str(
                    base64.urlsafe_b64decode(
                        # we add padding to ensure that lack of padding won't prevent a decode error.
                        # the python base64 implementation will truncate extra padding so we can overpad
                        # rather than compute exactly how much padding we might need.
                        # https://stackoverflow.com/a/49459036
                        token.split(".")[1]
                        + "==="
                    ),
                    "utf-8",
                )
            )
            scopes = token_body.get("access", {}).get("scope")
            if scopes is not None and "view" not in scopes:
                raise SynapseAuthenticationError("A view scoped token is required")

        except (IndexError, ValueError):
            # possible errors if token is not encoded as expected:
            # IndexError if the token is not a '.' delimited base64 string with a header and body
            # ValueError if the split string is not base64 encoded or if the decoded base64 is not json
            pass

    def __init__(self, token, username=None):
        self._validate_token(token)

        self._token = token
        self.username = username

    @property
    def username(self) -> str:
        """The username associated with this token."""
        return self._username

    @username.setter
    def username(self, username: str) -> None:
        self._username = username

    @property
    def secret(self) -> str:
        """The bearer token."""
        return self._token

    def __call__(self, r):
        r.headers.update({"Authorization": f"Bearer {self.secret}"})
        return r

    def __repr__(self):
        return f"SynapseAuthTokenCredentials(username='{self.username}', token='{self.secret}')"

Attributes

username: str property writable

The username associated with this token.

secret: str property

The bearer token.

synapseclient.core.credentials.credential_provider

This module contains classes that are responsible for retrieving synapse authentication information (e.g. authToken) from a source (e.g. login args, config file).

Classes

SynapseCredentialsProvider

A credential provider is responsible for retrieving synapse authentication information (e.g. authToken) from a source (e.g. login args, config file), and use them to return a SynapseCredentials instance.

Source code in synapseclient/core/credentials/credential_provider.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
class SynapseCredentialsProvider(metaclass=abc.ABCMeta):
    """
    A credential provider is responsible for retrieving synapse authentication
    information (e.g. authToken) from a source (e.g. login args, config file),
    and use them to return a
    [SynapseCredentials][synapseclient.core.credentials.cred_data.SynapseCredentials]
    instance.
    """

    @abc.abstractmethod
    def _get_auth_info(
        self, syn: "Synapse", user_login_args: Dict[str, str]
    ) -> Tuple[None, None]:
        """
        Subclasses must implement this to decide how to obtain an authentication token.
        For any of these values, return None if it is not possible to get that value.

        Not all implementations will need to make use of the user_login_args parameter
        or syn. These parameters provide context about the Synapse client's configuration
        and login() arguments.

        Arguments:
            syn: Synapse client instance
            user_login_args: subset of arguments passed during syn.login()

        Returns:
            Tuple of (username, bearer authentication token e.g. a personal access token),
            any of these values could None if it is not available.
        """
        return None, None

    def get_synapse_credentials(
        self, syn: "Synapse", user_login_args: Dict[str, str]
    ) -> Union[SynapseCredentials, None]:
        """
        Returns
        [SynapseCredentials][synapseclient.core.credentials.cred_data.SynapseCredentials]
        if this provider is able to get valid credentials, returns None otherwise.

        Arguments:
            syn: Synapse client instance
            user_login_args: subset of arguments passed during syn.login()

        Returns:
            [SynapseCredentials][synapseclient.core.credentials.cred_data.SynapseCredentials]
                if valid credentials can be found by this provider, None otherwise.
        """
        return self._create_synapse_credential(
            syn, *self._get_auth_info(syn=syn, user_login_args=user_login_args)
        )

    def _create_synapse_credential(
        self, syn: "Synapse", username: str, auth_token: str
    ) -> Union[SynapseCredentials, None]:
        if auth_token is not None:
            credentials = SynapseAuthTokenCredentials(auth_token)
            profile = syn.restGET("/userProfile", auth=credentials)
            profile_username = profile.get("userName")
            profile_emails = profile.get("emails", [])

            if username and (
                username != profile_username and username not in profile_emails
            ):
                # a username/email is not required when logging in with an auth token
                # however if both are provided raise an error if they do not correspond
                # to avoid any ambiguity about what profile was logged in
                raise SynapseAuthenticationError(
                    "username/email and auth_token both provided but username does not "
                    "match token profile"
                )

            credentials.username = profile_username
            return credentials

        return None
Functions
get_synapse_credentials(syn, user_login_args)

Returns SynapseCredentials if this provider is able to get valid credentials, returns None otherwise.

PARAMETER DESCRIPTION
syn

Synapse client instance

TYPE: Synapse

user_login_args

subset of arguments passed during syn.login()

TYPE: Dict[str, str]

RETURNS DESCRIPTION
Union[SynapseCredentials, None]

SynapseCredentials if valid credentials can be found by this provider, None otherwise.

Source code in synapseclient/core/credentials/credential_provider.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def get_synapse_credentials(
    self, syn: "Synapse", user_login_args: Dict[str, str]
) -> Union[SynapseCredentials, None]:
    """
    Returns
    [SynapseCredentials][synapseclient.core.credentials.cred_data.SynapseCredentials]
    if this provider is able to get valid credentials, returns None otherwise.

    Arguments:
        syn: Synapse client instance
        user_login_args: subset of arguments passed during syn.login()

    Returns:
        [SynapseCredentials][synapseclient.core.credentials.cred_data.SynapseCredentials]
            if valid credentials can be found by this provider, None otherwise.
    """
    return self._create_synapse_credential(
        syn, *self._get_auth_info(syn=syn, user_login_args=user_login_args)
    )

UserArgsCredentialsProvider

Bases: SynapseCredentialsProvider

Retrieves auth info from user_login_args during a CLI session.

Source code in synapseclient/core/credentials/credential_provider.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
class UserArgsCredentialsProvider(SynapseCredentialsProvider):
    """
    Retrieves auth info from user_login_args during a CLI session.
    """

    def _get_auth_info(
        self, syn: "Synapse", user_login_args: Dict[str, str]
    ) -> Tuple[str, str]:
        return (
            user_login_args.username,
            user_login_args.auth_token,
        )

ConfigFileCredentialsProvider

Bases: SynapseCredentialsProvider

Retrieves auth info from the ~/.synapseConfig file

Source code in synapseclient/core/credentials/credential_provider.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
class ConfigFileCredentialsProvider(SynapseCredentialsProvider):
    """
    Retrieves auth info from the `~/.synapseConfig` file
    """

    def _get_auth_info(
        self, syn: "Synapse", user_login_args: Dict[str, str]
    ) -> Tuple[Union[str, None], Union[str, None]]:
        config_dict = syn._get_config_authentication()
        # check to make sure we didn't accidentally provide the wrong user

        username = config_dict.get("username")
        token = config_dict.get("authtoken")

        if user_login_args.username and username != user_login_args.username:
            # if the username is provided and there is a config file username but they
            # don't match then we don't use any of the values from the config
            # to prevent ambiguity
            username = None
            token = None
            syn.logger.warning(
                f"{user_login_args.username} was defined in the user login "
                "arguments, however, it is also defined in the `~/.synapseConfig` "
                "file. Becuase they do not match we will not use the `authtoken` "
                "in the `~/.synapseConfig` file.",
            )

        return username, token

AWSParameterStoreCredentialsProvider

Bases: SynapseCredentialsProvider

Retrieves user's authentication token from AWS SSM Parameter store

Source code in synapseclient/core/credentials/credential_provider.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
class AWSParameterStoreCredentialsProvider(SynapseCredentialsProvider):
    """
    Retrieves user's authentication token from AWS SSM Parameter store
    """

    ENVIRONMENT_VAR_NAME = "SYNAPSE_TOKEN_AWS_SSM_PARAMETER_NAME"

    def _get_auth_info(
        self, syn: "Synapse", user_login_args: Dict[str, str]
    ) -> Tuple[Union[str, None], Union[str, None]]:
        ssm_param_name = os.environ.get(self.ENVIRONMENT_VAR_NAME)
        token = None
        if ssm_param_name:
            try:
                import boto3
                import botocore

                ssm_client = boto3.client("ssm")
                result = ssm_client.get_parameter(
                    Name=ssm_param_name,
                    WithDecryption=True,
                )
                token = result["Parameter"]["Value"]
            except ImportError:
                syn.logger.warning(
                    f"{self.ENVIRONMENT_VAR_NAME} was defined as {ssm_param_name}, "
                    'but "boto3" could not be imported. The Synapse client uses "boto3" '
                    "in order to access Systems Manager Parameter Storage. Please ensure "
                    'that you have installed "boto3" to enable this feature.'
                )
            # this except block must be defined after the ImportError except block
            # otherwise, there's no guarantee "botocore" is already imported and defined
            except botocore.exceptions.ClientError:
                syn.logger.warning(
                    f"{self.ENVIRONMENT_VAR_NAME} was defined as {ssm_param_name}, "
                    "but the matching parameter name could not be found in AWS Parameter "
                    "Store. Caused by AWS error:\n",
                    exc_info=True,
                )

        # if username is included in user's arguments, return it so that
        # it may be validated against the username authenticated by the token
        return user_login_args.username, token

EnvironmentVariableCredentialsProvider

Bases: SynapseCredentialsProvider

Retrieves the user's authentication token from an environment variable

Source code in synapseclient/core/credentials/credential_provider.py
186
187
188
189
190
191
192
193
194
195
196
197
198
199
class EnvironmentVariableCredentialsProvider(SynapseCredentialsProvider):
    """
    Retrieves the user's authentication token from an environment variable
    """

    ENVIRONMENT_VAR_NAME = "SYNAPSE_AUTH_TOKEN"

    def _get_auth_info(
        self, syn: "Synapse", user_login_args: Dict[str, str]
    ) -> Tuple[Union[str, None], Union[str, None]]:
        return (
            user_login_args.username,
            os.environ.get(self.ENVIRONMENT_VAR_NAME),
        )

SynapseCredentialsProviderChain

Bases: object

Class that has a list of SynapseCredentialsProvider from which this class attempts to retrieve SynapseCredentials.

By default this class uses the following providers in this order:

  1. UserArgsCredentialsProvider
  2. EnvironmentVariableCredentialsProvider
  3. ConfigFileCredentialsProvider
  4. AWSParameterStoreCredentialsProvider
ATTRIBUTE DESCRIPTION
cred_providers

list of (SynapseCredentialsProvider) credential providers

Source code in synapseclient/core/credentials/credential_provider.py
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
class SynapseCredentialsProviderChain(object):
    """
    Class that has a list of
    [SynapseCredentialsProvider][synapseclient.core.credentials.credential_provider.SynapseCredentialsProvider]
    from which this class attempts to retrieve
    [SynapseCredentials][synapseclient.core.credentials.cred_data.SynapseCredentials].


    By default this class uses the following providers in this order:

    1. [UserArgsCredentialsProvider][synapseclient.core.credentials.credential_provider.UserArgsCredentialsProvider]
    2. [EnvironmentVariableCredentialsProvider][synapseclient.core.credentials.credential_provider.EnvironmentVariableCredentialsProvider]
    3. [ConfigFileCredentialsProvider][synapseclient.core.credentials.credential_provider.ConfigFileCredentialsProvider]
    4. [AWSParameterStoreCredentialsProvider][synapseclient.core.credentials.credential_provider.AWSParameterStoreCredentialsProvider]

    Attributes:
        cred_providers: list of
            ([SynapseCredentialsProvider][synapseclient.core.credentials.credential_provider.SynapseCredentialsProvider])
            credential providers
    """

    def __init__(self, cred_providers) -> None:
        self.cred_providers = list(cred_providers)

    def get_credentials(
        self, syn: "Synapse", user_login_args: Dict[str, str]
    ) -> Union[SynapseCredentials, None]:
        """
        Iterates its list of
        [SynapseCredentialsProvider][synapseclient.core.credentials.credential_provider.SynapseCredentialsProvider]
        and returns the first non-None
        [SynapseCredentials][synapseclient.core.credentials.cred_data.SynapseCredentials]
        returned by a provider. If no provider is able to provide a
        [SynapseCredentials][synapseclient.core.credentials.cred_data.SynapseCredentials],
        returns None.

        Arguments:
            syn: Synapse client instance
            user_login_args: subset of arguments passed during syn.login()

        Returns:
            [SynapseCredentials][synapseclient.core.credentials.cred_data.SynapseCredentials]
                returned by the first non-None provider in its list, None otherwise
        """
        for provider in self.cred_providers:
            creds = provider.get_synapse_credentials(syn, user_login_args)
            if creds is not None:
                return creds
        return None
Functions
get_credentials(syn, user_login_args)

Iterates its list of SynapseCredentialsProvider and returns the first non-None SynapseCredentials returned by a provider. If no provider is able to provide a SynapseCredentials, returns None.

PARAMETER DESCRIPTION
syn

Synapse client instance

TYPE: Synapse

user_login_args

subset of arguments passed during syn.login()

TYPE: Dict[str, str]

RETURNS DESCRIPTION
Union[SynapseCredentials, None]

SynapseCredentials returned by the first non-None provider in its list, None otherwise

Source code in synapseclient/core/credentials/credential_provider.py
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
def get_credentials(
    self, syn: "Synapse", user_login_args: Dict[str, str]
) -> Union[SynapseCredentials, None]:
    """
    Iterates its list of
    [SynapseCredentialsProvider][synapseclient.core.credentials.credential_provider.SynapseCredentialsProvider]
    and returns the first non-None
    [SynapseCredentials][synapseclient.core.credentials.cred_data.SynapseCredentials]
    returned by a provider. If no provider is able to provide a
    [SynapseCredentials][synapseclient.core.credentials.cred_data.SynapseCredentials],
    returns None.

    Arguments:
        syn: Synapse client instance
        user_login_args: subset of arguments passed during syn.login()

    Returns:
        [SynapseCredentials][synapseclient.core.credentials.cred_data.SynapseCredentials]
            returned by the first non-None provider in its list, None otherwise
    """
    for provider in self.cred_providers:
        creds = provider.get_synapse_credentials(syn, user_login_args)
        if creds is not None:
            return creds
    return None

Functions

get_default_credential_chain()

Creates and uses a default credential chain to retrieve SynapseCredentials. The order this is returned is the order in which the credential providers are attempted.

RETURNS DESCRIPTION
SynapseCredentialsProviderChain

credential chain

Source code in synapseclient/core/credentials/credential_provider.py
266
267
268
269
270
271
272
273
274
275
276
def get_default_credential_chain() -> SynapseCredentialsProviderChain:
    """
    Creates and uses a default credential chain to retrieve
    [SynapseCredentials][synapseclient.core.credentials.cred_data.SynapseCredentials].
    The order this is returned is the order in which the credential providers
    are attempted.

    Returns:
        credential chain
    """
    return DEFAULT_CREDENTIAL_PROVIDER_CHAIN

Remote File Storage Wrappers

synapseclient.core.remote_file_storage_wrappers

Wrappers for remote file storage clients like S3 and SFTP.

Classes

S3ClientWrapper

Wrapper class for S3 client.

Source code in synapseclient/core/remote_file_storage_wrappers.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
class S3ClientWrapper:
    """
    Wrapper class for S3 client.
    """

    # These methods are static because in our use case, we always have the bucket and
    # endpoint and usually only call the download/upload once so there is no need to instantiate multiple objects

    @staticmethod
    def _attempt_import_boto3():
        """
        Check if boto3 installed and give instructions if not.

        Returns:
            The boto3 module or instructions to install it if unavailable
        """
        return attempt_import(
            "boto3",
            "\n\nLibraries required for client authenticated S3 access are not installed!\n"
            "The Synapse client uses boto3 in order to access S3-like storage "
            "locations.\n",
        )

    @staticmethod
    def _create_progress_callback_func(
        progress_bar: tqdm,
    ) -> callable:
        """
        Creates a progress callback function for tracking the progress of a file transfer.

        Arguments:
            file_size: The total size of the file being transferred.
            filename: The name of the file being transferred.
            prefix: A prefix to display before the progress bar. Defaults to None.

        Returns:
            progress_callback: The progress callback function.
        """

        def progress_callback(transferred_bytes: int) -> None:
            """
            Update the progress of a transfer.

            Arguments:
                bytes: The number of bytes transferred.
            """
            progress_bar.update(transferred_bytes)

        return progress_callback

    @staticmethod
    def download_file(
        bucket: str,
        endpoint_url: str,
        remote_file_key: str,
        download_file_path: str,
        *,
        profile_name: str = None,
        credentials: typing.Dict[str, str] = None,
        show_progress: bool = True,
        transfer_config_kwargs: dict = None,
    ) -> str:
        """
        Download a file from s3 using boto3.

        Arguments:
            bucket: name of bucket to upload to
            endpoint_url: a boto3 compatible endpoint url
            remote_file_key: object key to upload the file to
            download_file_path: local path to save the file to
            profile_name: AWS profile name from local aws config, **mutually exclusive with credentials**
            credentials: a dictionary of AWS credentials to use, **mutually exclusive with profile_name**

                Expected items:

                - `aws_access_key_id`
                - `aws_secret_access_key`
                - `aws_session_token`
            show_progress: whether to print progress indicator to console
            transfer_config_kwargs: boto S3 transfer configuration (see boto3.s3.transfer.TransferConfig)

        Returns:
            download_file_path: S3 path of the file

        Raises:
            ValueError: If the key does not exist in the bucket.
            botocore.exceptions.ClientError: If there is an error with the S3 client.
        """

        S3ClientWrapper._attempt_import_boto3()

        import boto3.s3.transfer
        import botocore

        transfer_config = boto3.s3.transfer.TransferConfig(
            **(transfer_config_kwargs or {})
        )

        session_args = credentials if credentials else {"profile_name": profile_name}
        boto_session = boto3.session.Session(**session_args)
        s3 = boto_session.resource("s3", endpoint_url=endpoint_url)

        try:
            s3_obj = s3.Object(bucket, remote_file_key)

            progress_callback = None
            progress_bar = None
            if show_progress:
                s3_obj.load()
                file_size = s3_obj.content_length
                filename = os.path.basename(download_file_path)
                progress_bar = tqdm(
                    total=file_size,
                    desc="Downloading",
                    unit="B",
                    unit_scale=True,
                    postfix=filename,
                    smoothing=0,
                )
                progress_callback = S3ClientWrapper._create_progress_callback_func(
                    progress_bar
                )

            s3_obj.download_file(
                download_file_path,
                Callback=progress_callback,
                Config=transfer_config,
            )
            if progress_bar:
                progress_bar.close()

            return download_file_path

        except botocore.exceptions.ClientError as e:
            if e.response["Error"]["Code"] == "404":
                raise ValueError(
                    "The key:%s does not exist in bucket:%s.", remote_file_key, bucket
                )
            else:
                raise

    @staticmethod
    def upload_file(
        bucket: str,
        endpoint_url: typing.Optional[str],
        remote_file_key: str,
        upload_file_path: str,
        *,
        profile_name: str = None,
        credentials: typing.Dict[str, str] = None,
        show_progress: bool = True,
        transfer_config_kwargs: dict = None,
        storage_str: str = None,
    ) -> str:
        """
        Upload a file to s3 using boto3.

        Arguments:
            bucket: name of bucket to upload to
            endpoint_url: a boto3 compatible endpoint url
            remote_file_key: object key to upload the file to
            upload_file_path: local path of the file to upload
            profile_name: AWS profile name from local aws config, **mutually exclusive with credentials**
            credentials: a dictionary of AWS credentials to use, **mutually exclusive with profile_name**

                Expected items:

                - `aws_access_key_id`
                - `aws_secret_access_key`
                - `aws_session_token`
            show_progress: whether to print progress indicator to console
            transfer_config_kwargs: boto S3 transfer configuration (see boto3.s3.transfer.TransferConfig)

        Returns:
            upload_file_path: S3 path of the file

        Raises:
            ValueError: If the path does not exist or is not a file
            botocore.exceptions.ClientError: If there is an error with the S3 client.
        """

        if not os.path.isfile(upload_file_path):
            raise ValueError(
                "The path: [%s] does not exist or is not a file", upload_file_path
            )

        S3ClientWrapper._attempt_import_boto3()
        import boto3.s3.transfer

        transfer_config = boto3.s3.transfer.TransferConfig(
            **(transfer_config_kwargs or {})
        )

        session_args = credentials if credentials else {"profile_name": profile_name}
        boto_session = boto3.session.Session(**session_args)
        s3 = boto_session.resource("s3", endpoint_url=endpoint_url)

        progress_callback = None
        progress_bar = None
        if show_progress:
            file_size = os.stat(upload_file_path).st_size
            filename = os.path.basename(upload_file_path)
            progress_bar = tqdm(
                total=file_size,
                desc=storage_str,
                unit="B",
                unit_scale=True,
                postfix=filename,
                smoothing=0,
            )
            progress_callback = S3ClientWrapper._create_progress_callback_func(
                progress_bar
            )

        # automatically determines whether to perform multi-part upload
        s3.Bucket(bucket).upload_file(
            upload_file_path,
            remote_file_key,
            Callback=progress_callback,
            Config=transfer_config,
            ExtraArgs={"ACL": "bucket-owner-full-control"},
        )
        if progress_bar:
            progress_bar.close()
        return upload_file_path
Functions
download_file(bucket, endpoint_url, remote_file_key, download_file_path, *, profile_name=None, credentials=None, show_progress=True, transfer_config_kwargs=None) staticmethod

Download a file from s3 using boto3.

PARAMETER DESCRIPTION
bucket

name of bucket to upload to

TYPE: str

endpoint_url

a boto3 compatible endpoint url

TYPE: str

remote_file_key

object key to upload the file to

TYPE: str

download_file_path

local path to save the file to

TYPE: str

profile_name

AWS profile name from local aws config, mutually exclusive with credentials

TYPE: str DEFAULT: None

credentials

a dictionary of AWS credentials to use, mutually exclusive with profile_name

Expected items:

  • aws_access_key_id
  • aws_secret_access_key
  • aws_session_token

TYPE: Dict[str, str] DEFAULT: None

show_progress

whether to print progress indicator to console

TYPE: bool DEFAULT: True

transfer_config_kwargs

boto S3 transfer configuration (see boto3.s3.transfer.TransferConfig)

TYPE: dict DEFAULT: None

RETURNS DESCRIPTION
download_file_path

S3 path of the file

TYPE: str

RAISES DESCRIPTION
ValueError

If the key does not exist in the bucket.

ClientError

If there is an error with the S3 client.

Source code in synapseclient/core/remote_file_storage_wrappers.py
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
@staticmethod
def download_file(
    bucket: str,
    endpoint_url: str,
    remote_file_key: str,
    download_file_path: str,
    *,
    profile_name: str = None,
    credentials: typing.Dict[str, str] = None,
    show_progress: bool = True,
    transfer_config_kwargs: dict = None,
) -> str:
    """
    Download a file from s3 using boto3.

    Arguments:
        bucket: name of bucket to upload to
        endpoint_url: a boto3 compatible endpoint url
        remote_file_key: object key to upload the file to
        download_file_path: local path to save the file to
        profile_name: AWS profile name from local aws config, **mutually exclusive with credentials**
        credentials: a dictionary of AWS credentials to use, **mutually exclusive with profile_name**

            Expected items:

            - `aws_access_key_id`
            - `aws_secret_access_key`
            - `aws_session_token`
        show_progress: whether to print progress indicator to console
        transfer_config_kwargs: boto S3 transfer configuration (see boto3.s3.transfer.TransferConfig)

    Returns:
        download_file_path: S3 path of the file

    Raises:
        ValueError: If the key does not exist in the bucket.
        botocore.exceptions.ClientError: If there is an error with the S3 client.
    """

    S3ClientWrapper._attempt_import_boto3()

    import boto3.s3.transfer
    import botocore

    transfer_config = boto3.s3.transfer.TransferConfig(
        **(transfer_config_kwargs or {})
    )

    session_args = credentials if credentials else {"profile_name": profile_name}
    boto_session = boto3.session.Session(**session_args)
    s3 = boto_session.resource("s3", endpoint_url=endpoint_url)

    try:
        s3_obj = s3.Object(bucket, remote_file_key)

        progress_callback = None
        progress_bar = None
        if show_progress:
            s3_obj.load()
            file_size = s3_obj.content_length
            filename = os.path.basename(download_file_path)
            progress_bar = tqdm(
                total=file_size,
                desc="Downloading",
                unit="B",
                unit_scale=True,
                postfix=filename,
                smoothing=0,
            )
            progress_callback = S3ClientWrapper._create_progress_callback_func(
                progress_bar
            )

        s3_obj.download_file(
            download_file_path,
            Callback=progress_callback,
            Config=transfer_config,
        )
        if progress_bar:
            progress_bar.close()

        return download_file_path

    except botocore.exceptions.ClientError as e:
        if e.response["Error"]["Code"] == "404":
            raise ValueError(
                "The key:%s does not exist in bucket:%s.", remote_file_key, bucket
            )
        else:
            raise
upload_file(bucket, endpoint_url, remote_file_key, upload_file_path, *, profile_name=None, credentials=None, show_progress=True, transfer_config_kwargs=None, storage_str=None) staticmethod

Upload a file to s3 using boto3.

PARAMETER DESCRIPTION
bucket

name of bucket to upload to

TYPE: str

endpoint_url

a boto3 compatible endpoint url

TYPE: Optional[str]

remote_file_key

object key to upload the file to

TYPE: str

upload_file_path

local path of the file to upload

TYPE: str

profile_name

AWS profile name from local aws config, mutually exclusive with credentials

TYPE: str DEFAULT: None

credentials

a dictionary of AWS credentials to use, mutually exclusive with profile_name

Expected items:

  • aws_access_key_id
  • aws_secret_access_key
  • aws_session_token

TYPE: Dict[str, str] DEFAULT: None

show_progress

whether to print progress indicator to console

TYPE: bool DEFAULT: True

transfer_config_kwargs

boto S3 transfer configuration (see boto3.s3.transfer.TransferConfig)

TYPE: dict DEFAULT: None

RETURNS DESCRIPTION
upload_file_path

S3 path of the file

TYPE: str

RAISES DESCRIPTION
ValueError

If the path does not exist or is not a file

ClientError

If there is an error with the S3 client.

Source code in synapseclient/core/remote_file_storage_wrappers.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
@staticmethod
def upload_file(
    bucket: str,
    endpoint_url: typing.Optional[str],
    remote_file_key: str,
    upload_file_path: str,
    *,
    profile_name: str = None,
    credentials: typing.Dict[str, str] = None,
    show_progress: bool = True,
    transfer_config_kwargs: dict = None,
    storage_str: str = None,
) -> str:
    """
    Upload a file to s3 using boto3.

    Arguments:
        bucket: name of bucket to upload to
        endpoint_url: a boto3 compatible endpoint url
        remote_file_key: object key to upload the file to
        upload_file_path: local path of the file to upload
        profile_name: AWS profile name from local aws config, **mutually exclusive with credentials**
        credentials: a dictionary of AWS credentials to use, **mutually exclusive with profile_name**

            Expected items:

            - `aws_access_key_id`
            - `aws_secret_access_key`
            - `aws_session_token`
        show_progress: whether to print progress indicator to console
        transfer_config_kwargs: boto S3 transfer configuration (see boto3.s3.transfer.TransferConfig)

    Returns:
        upload_file_path: S3 path of the file

    Raises:
        ValueError: If the path does not exist or is not a file
        botocore.exceptions.ClientError: If there is an error with the S3 client.
    """

    if not os.path.isfile(upload_file_path):
        raise ValueError(
            "The path: [%s] does not exist or is not a file", upload_file_path
        )

    S3ClientWrapper._attempt_import_boto3()
    import boto3.s3.transfer

    transfer_config = boto3.s3.transfer.TransferConfig(
        **(transfer_config_kwargs or {})
    )

    session_args = credentials if credentials else {"profile_name": profile_name}
    boto_session = boto3.session.Session(**session_args)
    s3 = boto_session.resource("s3", endpoint_url=endpoint_url)

    progress_callback = None
    progress_bar = None
    if show_progress:
        file_size = os.stat(upload_file_path).st_size
        filename = os.path.basename(upload_file_path)
        progress_bar = tqdm(
            total=file_size,
            desc=storage_str,
            unit="B",
            unit_scale=True,
            postfix=filename,
            smoothing=0,
        )
        progress_callback = S3ClientWrapper._create_progress_callback_func(
            progress_bar
        )

    # automatically determines whether to perform multi-part upload
    s3.Bucket(bucket).upload_file(
        upload_file_path,
        remote_file_key,
        Callback=progress_callback,
        Config=transfer_config,
        ExtraArgs={"ACL": "bucket-owner-full-control"},
    )
    if progress_bar:
        progress_bar.close()
    return upload_file_path

SFTPWrapper

Source code in synapseclient/core/remote_file_storage_wrappers.py
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
class SFTPWrapper:
    @staticmethod
    def _attempt_import_sftp():
        """
        Check if pysftp is installed and give instructions if not.

        Returns:
            The pysftp module if available
        """
        return attempt_import(
            "pysftp",
            "\n\nLibraries required for SFTP are not installed!\n"
            "The Synapse client uses pysftp in order to access SFTP storage locations. "
            "This library in turn depends on pycrypto.\n"
            "For Windows systems without a C/C++ compiler, install the appropriate binary "
            "distribution of pycrypto from:\n"
            "http://www.voidspace.org.uk/python/modules.shtml#pycrypto\n\n"
            "For more information, see: http://python-docs.synapse.org/build/html/sftp.html",
        )

    @staticmethod
    def _parse_for_sftp(url):
        parsedURL = urllib_parse.urlparse(url)
        if parsedURL.scheme != "sftp":
            raise (
                NotImplementedError(
                    "This method only supports sftp URLs of the form sftp://..."
                )
            )
        return parsedURL

    @staticmethod
    def upload_file(
        filepath: str,
        url: str,
        username: str = None,
        password: str = None,
        storage_str: str = None,
    ) -> str:
        """
        Performs upload of a local file to an sftp server.

        Arguments:
            filepath: The path to the file to be uploaded.
            url: URL where file will be deposited. Should include path and protocol. e.g.
                        sftp://sftp.example.com/path/to/file/store
            username: The username for authentication. Defaults to None.
            password: The password for authentication. Defaults to None.

        Returns:
            The URL of the uploaded file.
        """
        progress_bar = tqdm(
            desc=storage_str,
            unit="B",
            unit_scale=True,
            smoothing=0,
            postfix=filepath,
        )

        def progress_callback(*args, **kwargs) -> None:
            if not progress_bar.total:
                progress_bar.total = args[1]
            progress_bar.update(args[0] - progress_bar.n)

        parsedURL = SFTPWrapper._parse_for_sftp(url)
        with _retry_pysftp_connection(
            parsedURL.hostname, username=username, password=password
        ) as sftp:
            sftp.makedirs(parsedURL.path)
            with sftp.cd(parsedURL.path):
                sftp.put(filepath, preserve_mtime=True, callback=progress_callback)
        progress_bar.close()
        path = urllib_parse.quote(parsedURL.path + "/" + os.path.split(filepath)[-1])
        parsedURL = parsedURL._replace(path=path)
        return urllib_parse.urlunparse(parsedURL)

    @staticmethod
    def download_file(
        url: str,
        localFilepath: str = None,
        username: str = None,
        password: str = None,
        show_progress: bool = True,
    ) -> str:
        """
        Performs download of a file from an sftp server.

        Arguments:
            url: URL where file will be deposited.  Path will be chopped out.
            localFilepath: location where to store file
            username: username on server
            password: password for authentication on  server
            show_progress: whether to print progress indicator to console

        Returns:
            The local filepath where the file was saved.
        """

        parsedURL = SFTPWrapper._parse_for_sftp(url)

        # Create the local file path if it doesn't exist
        path = urllib_parse.unquote(parsedURL.path)
        if localFilepath is None:
            localFilepath = os.getcwd()
        if os.path.isdir(localFilepath):
            localFilepath = os.path.join(localFilepath, path.split("/")[-1])
        # Check and create the directory
        dir = os.path.dirname(localFilepath)
        if not os.path.exists(dir):
            os.makedirs(dir)

        # Download file
        with _retry_pysftp_connection(
            parsedURL.hostname, username=username, password=password
        ) as sftp:
            sftp.get(
                path,
                localFilepath,
                preserve_mtime=True,
                callback=(printTransferProgress if show_progress else None),
            )
        return localFilepath
Functions
upload_file(filepath, url, username=None, password=None, storage_str=None) staticmethod

Performs upload of a local file to an sftp server.

PARAMETER DESCRIPTION
filepath

The path to the file to be uploaded.

TYPE: str

url

URL where file will be deposited. Should include path and protocol. e.g. sftp://sftp.example.com/path/to/file/store

TYPE: str

username

The username for authentication. Defaults to None.

TYPE: str DEFAULT: None

password

The password for authentication. Defaults to None.

TYPE: str DEFAULT: None

RETURNS DESCRIPTION
str

The URL of the uploaded file.

Source code in synapseclient/core/remote_file_storage_wrappers.py
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
@staticmethod
def upload_file(
    filepath: str,
    url: str,
    username: str = None,
    password: str = None,
    storage_str: str = None,
) -> str:
    """
    Performs upload of a local file to an sftp server.

    Arguments:
        filepath: The path to the file to be uploaded.
        url: URL where file will be deposited. Should include path and protocol. e.g.
                    sftp://sftp.example.com/path/to/file/store
        username: The username for authentication. Defaults to None.
        password: The password for authentication. Defaults to None.

    Returns:
        The URL of the uploaded file.
    """
    progress_bar = tqdm(
        desc=storage_str,
        unit="B",
        unit_scale=True,
        smoothing=0,
        postfix=filepath,
    )

    def progress_callback(*args, **kwargs) -> None:
        if not progress_bar.total:
            progress_bar.total = args[1]
        progress_bar.update(args[0] - progress_bar.n)

    parsedURL = SFTPWrapper._parse_for_sftp(url)
    with _retry_pysftp_connection(
        parsedURL.hostname, username=username, password=password
    ) as sftp:
        sftp.makedirs(parsedURL.path)
        with sftp.cd(parsedURL.path):
            sftp.put(filepath, preserve_mtime=True, callback=progress_callback)
    progress_bar.close()
    path = urllib_parse.quote(parsedURL.path + "/" + os.path.split(filepath)[-1])
    parsedURL = parsedURL._replace(path=path)
    return urllib_parse.urlunparse(parsedURL)
download_file(url, localFilepath=None, username=None, password=None, show_progress=True) staticmethod

Performs download of a file from an sftp server.

PARAMETER DESCRIPTION
url

URL where file will be deposited. Path will be chopped out.

TYPE: str

localFilepath

location where to store file

TYPE: str DEFAULT: None

username

username on server

TYPE: str DEFAULT: None

password

password for authentication on server

TYPE: str DEFAULT: None

show_progress

whether to print progress indicator to console

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
str

The local filepath where the file was saved.

Source code in synapseclient/core/remote_file_storage_wrappers.py
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
@staticmethod
def download_file(
    url: str,
    localFilepath: str = None,
    username: str = None,
    password: str = None,
    show_progress: bool = True,
) -> str:
    """
    Performs download of a file from an sftp server.

    Arguments:
        url: URL where file will be deposited.  Path will be chopped out.
        localFilepath: location where to store file
        username: username on server
        password: password for authentication on  server
        show_progress: whether to print progress indicator to console

    Returns:
        The local filepath where the file was saved.
    """

    parsedURL = SFTPWrapper._parse_for_sftp(url)

    # Create the local file path if it doesn't exist
    path = urllib_parse.unquote(parsedURL.path)
    if localFilepath is None:
        localFilepath = os.getcwd()
    if os.path.isdir(localFilepath):
        localFilepath = os.path.join(localFilepath, path.split("/")[-1])
    # Check and create the directory
    dir = os.path.dirname(localFilepath)
    if not os.path.exists(dir):
        os.makedirs(dir)

    # Download file
    with _retry_pysftp_connection(
        parsedURL.hostname, username=username, password=password
    ) as sftp:
        sftp.get(
            path,
            localFilepath,
            preserve_mtime=True,
            callback=(printTransferProgress if show_progress else None),
        )
    return localFilepath

Functions

Retry

synapseclient.core.retry

A helper tool that allows the Python client to make more than one attempt at connecting to the server if initially met with an error. These retry attempts can be made under certain conditions, i.e. for certain status codes, connection errors, and/or connection exceptions.

Functions

with_retry(function, verbose=False, retry_status_codes=[429, 500, 502, 503, 504], expected_status_codes=[], retry_errors=[], retry_exceptions=[], retries=DEFAULT_RETRIES, wait=DEFAULT_WAIT, back_off=DEFAULT_BACK_OFF, max_wait=DEFAULT_MAX_WAIT)

Retries the given function under certain conditions.

PARAMETER DESCRIPTION
function

A function with no arguments. If arguments are needed, use a lambda (see example).

retry_status_codes

What status codes to retry upon in the case of a SynapseHTTPError.

DEFAULT: [429, 500, 502, 503, 504]

expected_status_codes

If specified responses with any other status codes result in a retry.

DEFAULT: []

retry_errors

What reasons to retry upon, if function().response.json()['reason'] exists.

DEFAULT: []

retry_exceptions

What types of exceptions, specified as strings or Exception classes, to retry upon.

DEFAULT: []

retries

How many times to retry maximum.

DEFAULT: DEFAULT_RETRIES

wait

How many seconds to wait between retries.

DEFAULT: DEFAULT_WAIT

back_off

Exponential constant to increase wait for between progressive failures.

DEFAULT: DEFAULT_BACK_OFF

max_wait

back_off between requests will not exceed this value

DEFAULT: DEFAULT_MAX_WAIT

RETURNS DESCRIPTION

function()

Using with_retry

Using with_retry to consolidate inputs into a list.

from synapseclient.core.retry import with_retry

def foo(a, b, c): return [a, b, c]
result = with_retry(lambda: foo("1", "2", "3"), **STANDARD_RETRY_PARAMS)
Source code in synapseclient/core/retry.py
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
def with_retry(
    function,
    verbose=False,
    retry_status_codes=[429, 500, 502, 503, 504],
    expected_status_codes=[],
    retry_errors=[],
    retry_exceptions=[],
    retries=DEFAULT_RETRIES,
    wait=DEFAULT_WAIT,
    back_off=DEFAULT_BACK_OFF,
    max_wait=DEFAULT_MAX_WAIT,
):
    """
    Retries the given function under certain conditions.

    Arguments:
        function: A function with no arguments.  If arguments are needed, use a lambda (see example).
        retry_status_codes: What status codes to retry upon in the case of a SynapseHTTPError.
        expected_status_codes: If specified responses with any other status codes result in a retry.
        retry_errors: What reasons to retry upon, if function().response.json()['reason'] exists.
        retry_exceptions: What types of exceptions, specified as strings or Exception classes, to retry upon.
        retries: How many times to retry maximum.
        wait: How many seconds to wait between retries.
        back_off: Exponential constant to increase wait for between progressive failures.
        max_wait: back_off between requests will not exceed this value

    Returns:
        function()

    Example: Using with_retry
        Using ``with_retry`` to consolidate inputs into a list.

            from synapseclient.core.retry import with_retry

            def foo(a, b, c): return [a, b, c]
            result = with_retry(lambda: foo("1", "2", "3"), **STANDARD_RETRY_PARAMS)
    """

    if verbose:
        logger = logging.getLogger(DEBUG_LOGGER_NAME)
    else:
        logger = logging.getLogger(DEFAULT_LOGGER_NAME)

    # Retry until we succeed or run out of tries
    total_wait = 0
    while True:
        # Start with a clean slate
        exc = None
        exc_info = None
        retry = False
        response = None

        # Try making the call
        try:
            response = function()
        except Exception as ex:
            exc = ex
            exc_info = sys.exc_info()
            logger.debug(DEBUG_EXCEPTION, function, exc_info=True)
            if hasattr(ex, "response"):
                response = ex.response

        # Check if we got a retry-able error
        if response is not None and hasattr(response, "status_code"):
            if (
                expected_status_codes
                and response.status_code not in expected_status_codes
            ) or (retry_status_codes and response.status_code in retry_status_codes):
                response_message = _get_message(response)
                retry = True
                logger.debug("retrying on status code: %s" % str(response.status_code))
                # TODO: this was originally printed regardless of 'verbose' was that behavior correct?
                logger.debug(str(response_message))
                if (response.status_code == 429) and (wait > 10):
                    logger.warning("%s...\n" % response_message)
                    logger.warning("Retrying in %i seconds" % wait)

            elif response.status_code not in range(200, 299):
                # For all other non 200 messages look for retryable errors in the body or reason field
                response_message = _get_message(response)
                if any(
                    [msg.lower() in response_message.lower() for msg in retry_errors]
                ):
                    retry = True
                    logger.debug("retrying %s" % response_message)
                # special case for message throttling
                elif (
                    "Please slow down.  You may send a maximum of 10 message"
                    in response
                ):
                    retry = True
                    wait = 16
                    logger.debug("retrying " + response_message)

        # Check if we got a retry-able exception
        if exc is not None:
            if (
                exc.__class__.__name__ in retry_exceptions
                or exc.__class__ in retry_exceptions
                or any(
                    [msg.lower() in str(exc_info[1]).lower() for msg in retry_errors]
                )
            ):
                retry = True
                logger.debug("retrying exception: " + str(exc))

        # Wait then retry
        retries -= 1
        if retries >= 0 and retry:
            randomized_wait = wait * random.uniform(0.5, 1.5)
            logger.debug(
                "total wait time {total_wait:5.0f} seconds\n "
                "... Retrying in {wait:5.1f} seconds...".format(
                    total_wait=total_wait, wait=randomized_wait
                )
            )
            total_wait += randomized_wait
            doze(randomized_wait)
            wait = min(max_wait, wait * back_off)
            continue

        # Out of retries, re-raise the exception or return the response
        if exc_info is not None and exc_info[0] is not None:
            logger.debug(
                "retries have run out. re-raising the exception", exc_info=True
            )
            raise exc
        return response

calculate_exponential_backoff(retries, base_wait, wait_random_lower, wait_random_upper, back_off_factor, max_back_off)

Handle calculating the exponential backoff.

PARAMETER DESCRIPTION
retries

The number of retries that have been attempted

TYPE: int

base_wait

The base wait time

TYPE: float

wait_random_lower

The lower bound of the random wait time

TYPE: float

wait_random_upper

The upper bound of the random wait time

TYPE: float

back_off_factor

The factor to increase the wait time by for each retry

TYPE: float

max_back_off

The maximum wait time

TYPE: float

RETURNS DESCRIPTION
float

The total wait time

Source code in synapseclient/core/retry.py
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
def calculate_exponential_backoff(
    retries: int,
    base_wait: float,
    wait_random_lower: float,
    wait_random_upper: float,
    back_off_factor: float,
    max_back_off: float,
) -> float:
    """
    Handle calculating the exponential backoff.

    Arguments:
        retries: The number of retries that have been attempted
        base_wait: The base wait time
        wait_random_lower: The lower bound of the random wait time
        wait_random_upper: The upper bound of the random wait time
        back_off_factor: The factor to increase the wait time by for each retry
        max_back_off: The maximum wait time

    Returns:
        The total wait time
    """
    random_jitter = random.uniform(wait_random_lower, wait_random_upper)
    time_to_wait = min(
        (base_wait * (back_off_factor**retries)) + random_jitter,
        max_back_off,
    )
    return time_to_wait

with_retry_time_based_async(function, verbose=False, retry_status_codes=None, expected_status_codes=None, retry_errors=None, retry_exceptions=None, retry_base_wait=DEFAULT_BASE_WAIT_ASYNC, retry_wait_random_lower=DEFAULT_WAIT_RANDOM_LOWER_ASYNC, retry_wait_random_upper=DEFAULT_WAIT_RANDOM_UPPER_ASYNC, retry_back_off_factor=DEFAULT_BACK_OFF_FACTOR_ASYNC, retry_max_back_off=DEFAULT_MAX_BACK_OFF_ASYNC, retry_max_wait_before_failure=DEFAULT_MAX_WAIT_BEFORE_FAIL_ASYNC) async

Retries the given function under certain conditions. This is created such that it will retry an unbounded number of times until the maximum wait time is reached. The backoff is calculated using an exponential backoff algorithm with a random jitter. The maximum backoff inbetween retries is capped at retry_max_back_off.

PARAMETER DESCRIPTION
verbose

Whether to log debug messages

TYPE: bool DEFAULT: False

function

A function with no arguments. If arguments are needed, use a lambda (see example).

TYPE: Coroutine[Any, Any, Any]

retry_status_codes

What status codes to retry upon in the case of a SynapseHTTPError.

TYPE: List[int] DEFAULT: None

expected_status_codes

If specified responses with any other status codes result in a retry.

TYPE: List[int] DEFAULT: None

retry_errors

What reasons to retry upon, if function().response.json()['reason'] exists.

TYPE: List[str] DEFAULT: None

retry_exceptions

What types of exceptions, specified as strings or Exception classes, to retry upon.

TYPE: List[Union[Exception, str]] DEFAULT: None

retry_base_wait

The base wait time inbetween retries.

TYPE: float DEFAULT: DEFAULT_BASE_WAIT_ASYNC

retry_wait_random_lower

The lower bound of the random wait time.

TYPE: float DEFAULT: DEFAULT_WAIT_RANDOM_LOWER_ASYNC

retry_wait_random_upper

The upper bound of the random wait time.

TYPE: float DEFAULT: DEFAULT_WAIT_RANDOM_UPPER_ASYNC

retry_back_off_factor

The factor to increase the wait time by for each retry.

TYPE: float DEFAULT: DEFAULT_BACK_OFF_FACTOR_ASYNC

retry_max_back_off

The maximum wait time.

TYPE: float DEFAULT: DEFAULT_MAX_BACK_OFF_ASYNC

retry_max_wait_before_failure

The maximum wait time before failure.

TYPE: float DEFAULT: DEFAULT_MAX_WAIT_BEFORE_FAIL_ASYNC

Using with_retry

Using with_retry_time_based_async to consolidate inputs into a list.

from synapseclient.core.retry import with_retry_time_based_async

async def foo(a, b, c): return [a, b, c]
result = await with_retry_time_based_async(lambda: foo("1", "2", "3"))
Source code in synapseclient/core/retry.py
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
async def with_retry_time_based_async(
    function: Coroutine[Any, Any, Any],
    verbose: bool = False,
    retry_status_codes: List[int] = None,
    expected_status_codes: List[int] = None,
    retry_errors: List[str] = None,
    retry_exceptions: List[Union[Exception, str]] = None,
    retry_base_wait: float = DEFAULT_BASE_WAIT_ASYNC,
    retry_wait_random_lower: float = DEFAULT_WAIT_RANDOM_LOWER_ASYNC,
    retry_wait_random_upper: float = DEFAULT_WAIT_RANDOM_UPPER_ASYNC,
    retry_back_off_factor: float = DEFAULT_BACK_OFF_FACTOR_ASYNC,
    retry_max_back_off: float = DEFAULT_MAX_BACK_OFF_ASYNC,
    retry_max_wait_before_failure: float = DEFAULT_MAX_WAIT_BEFORE_FAIL_ASYNC,
) -> Union[Exception, httpx.Response, Any, None]:
    """
    Retries the given function under certain conditions. This is created such that it
    will retry an unbounded number of times until the maximum wait time is reached. The
    backoff is calculated using an exponential backoff algorithm with a random jitter.
    The maximum backoff inbetween retries is capped at `retry_max_back_off`.

    Arguments:
        verbose: Whether to log debug messages
        function: A function with no arguments. If arguments are needed, use a lambda
            (see example).
        retry_status_codes: What status codes to retry upon in the case of a
            SynapseHTTPError.
        expected_status_codes: If specified responses with any other status codes result
            in a retry.
        retry_errors: What reasons to retry upon, if
            `function().response.json()['reason']` exists.
        retry_exceptions: What types of exceptions, specified as strings or Exception
            classes, to retry upon.
        retry_base_wait: The base wait time inbetween retries.
        retry_wait_random_lower: The lower bound of the random wait time.
        retry_wait_random_upper: The upper bound of the random wait time.
        retry_back_off_factor: The factor to increase the wait time by for each retry.
        retry_max_back_off: The maximum wait time.
        retry_max_wait_before_failure: The maximum wait time before failure.

    Example: Using with_retry
        Using ``with_retry_time_based_async`` to consolidate inputs into a list.

            from synapseclient.core.retry import with_retry_time_based_async

            async def foo(a, b, c): return [a, b, c]
            result = await with_retry_time_based_async(lambda: foo("1", "2", "3"))
    """
    (
        retry_status_codes,
        expected_status_codes,
        retry_errors,
        retry_exceptions,
        logger,
    ) = _assign_default_values(
        retry_status_codes=retry_status_codes,
        expected_status_codes=expected_status_codes,
        retry_errors=retry_errors,
        retry_exceptions=retry_exceptions,
        verbose=verbose,
    )

    # Retry until we succeed or run past the maximum wait time
    total_wait = 0
    retries = -1
    while True:
        caught_exception = None
        caught_exception_info = None
        response = None

        try:
            response = await function()
        except Exception as ex:
            caught_exception = ex
            caught_exception_info = sys.exc_info()
            logger.debug(DEBUG_EXCEPTION, function, exc_info=True)
            if hasattr(ex, "response"):
                response = ex.response

        retry = _is_retryable(
            response=response,
            caught_exception=caught_exception,
            caught_exception_info=caught_exception_info,
            expected_status_codes=expected_status_codes,
            retry_status_codes=retry_status_codes,
            retry_exceptions=retry_exceptions,
            retry_errors=retry_errors,
        )

        # Wait then retry
        retries += 1
        if total_wait < retry_max_wait_before_failure and retry:
            _log_for_retry(
                logger=logger, response=response, caught_exception=caught_exception
            )

            backoff_wait = calculate_exponential_backoff(
                retries=retries,
                base_wait=retry_base_wait,
                wait_random_lower=retry_wait_random_lower,
                wait_random_upper=retry_wait_random_upper,
                back_off_factor=retry_back_off_factor,
                max_back_off=retry_max_back_off,
            )
            total_wait += backoff_wait
            await asyncio.sleep(backoff_wait)
            continue

        # Out of retries, re-raise the exception or return the response
        if caught_exception_info is not None and caught_exception_info[0] is not None:
            logger.debug(
                (
                    "Retries have run out. re-raising the exception: %s"
                    if retry
                    else "Raising the exception: %s"
                ),
                str(caught_exception_info[0]),
            )
            raise caught_exception
        return response

with_retry_time_based(function, verbose=False, retry_status_codes=None, expected_status_codes=None, retry_errors=None, retry_exceptions=None, retry_base_wait=DEFAULT_BASE_WAIT_ASYNC, retry_wait_random_lower=DEFAULT_WAIT_RANDOM_LOWER_ASYNC, retry_wait_random_upper=DEFAULT_WAIT_RANDOM_UPPER_ASYNC, retry_back_off_factor=DEFAULT_BACK_OFF_FACTOR_ASYNC, retry_max_back_off=DEFAULT_MAX_BACK_OFF_ASYNC, retry_max_wait_before_failure=DEFAULT_MAX_WAIT_BEFORE_FAIL_ASYNC)

Retries the given function under certain conditions. This is created such that it will retry an unbounded number of times until the maximum wait time is reached. The backoff is calculated using an exponential backoff algorithm with a random jitter. The maximum backoff inbetween retries is capped at retry_max_back_off.

PARAMETER DESCRIPTION
verbose

Whether to log debug messages

TYPE: bool DEFAULT: False

function

A function with no arguments. If arguments are needed, use a lambda (see example).

retry_status_codes

What status codes to retry upon in the case of a SynapseHTTPError.

TYPE: List[int] DEFAULT: None

expected_status_codes

If specified responses with any other status codes result in a retry.

TYPE: List[int] DEFAULT: None

retry_errors

What reasons to retry upon, if function().response.json()['reason'] exists.

TYPE: List[str] DEFAULT: None

retry_exceptions

What types of exceptions, specified as strings or Exception classes, to retry upon.

TYPE: List[Union[Exception, str]] DEFAULT: None

retry_base_wait

The base wait time inbetween retries.

TYPE: float DEFAULT: DEFAULT_BASE_WAIT_ASYNC

retry_wait_random_lower

The lower bound of the random wait time.

TYPE: float DEFAULT: DEFAULT_WAIT_RANDOM_LOWER_ASYNC

retry_wait_random_upper

The upper bound of the random wait time.

TYPE: float DEFAULT: DEFAULT_WAIT_RANDOM_UPPER_ASYNC

retry_back_off_factor

The factor to increase the wait time by for each retry.

TYPE: float DEFAULT: DEFAULT_BACK_OFF_FACTOR_ASYNC

retry_max_back_off

The maximum wait time.

TYPE: float DEFAULT: DEFAULT_MAX_BACK_OFF_ASYNC

retry_max_wait_before_failure

The maximum wait time before failure.

TYPE: float DEFAULT: DEFAULT_MAX_WAIT_BEFORE_FAIL_ASYNC

Using with_retry

Using with_retry_time_based to consolidate inputs into a list.

from synapseclient.core.retry import with_retry_time_based

async def foo(a, b, c): return [a, b, c]
result = with_retry_time_based(lambda: foo("1", "2", "3"))
Source code in synapseclient/core/retry.py
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
def with_retry_time_based(
    function,
    verbose: bool = False,
    retry_status_codes: List[int] = None,
    expected_status_codes: List[int] = None,
    retry_errors: List[str] = None,
    retry_exceptions: List[Union[Exception, str]] = None,
    retry_base_wait: float = DEFAULT_BASE_WAIT_ASYNC,
    retry_wait_random_lower: float = DEFAULT_WAIT_RANDOM_LOWER_ASYNC,
    retry_wait_random_upper: float = DEFAULT_WAIT_RANDOM_UPPER_ASYNC,
    retry_back_off_factor: float = DEFAULT_BACK_OFF_FACTOR_ASYNC,
    retry_max_back_off: float = DEFAULT_MAX_BACK_OFF_ASYNC,
    retry_max_wait_before_failure: float = DEFAULT_MAX_WAIT_BEFORE_FAIL_ASYNC,
) -> Union[Exception, httpx.Response, Any, None]:
    """
    Retries the given function under certain conditions. This is created such that it
    will retry an unbounded number of times until the maximum wait time is reached. The
    backoff is calculated using an exponential backoff algorithm with a random jitter.
    The maximum backoff inbetween retries is capped at `retry_max_back_off`.

    Arguments:
        verbose: Whether to log debug messages
        function: A function with no arguments. If arguments are needed, use a lambda
            (see example).
        retry_status_codes: What status codes to retry upon in the case of a
            SynapseHTTPError.
        expected_status_codes: If specified responses with any other status codes result
            in a retry.
        retry_errors: What reasons to retry upon, if
            `function().response.json()['reason']` exists.
        retry_exceptions: What types of exceptions, specified as strings or Exception
            classes, to retry upon.
        retry_base_wait: The base wait time inbetween retries.
        retry_wait_random_lower: The lower bound of the random wait time.
        retry_wait_random_upper: The upper bound of the random wait time.
        retry_back_off_factor: The factor to increase the wait time by for each retry.
        retry_max_back_off: The maximum wait time.
        retry_max_wait_before_failure: The maximum wait time before failure.

    Example: Using with_retry
        Using ``with_retry_time_based`` to consolidate inputs into a list.

            from synapseclient.core.retry import with_retry_time_based

            async def foo(a, b, c): return [a, b, c]
            result = with_retry_time_based(lambda: foo("1", "2", "3"))
    """
    (
        retry_status_codes,
        expected_status_codes,
        retry_errors,
        retry_exceptions,
        logger,
    ) = _assign_default_values(
        retry_status_codes=retry_status_codes,
        expected_status_codes=expected_status_codes,
        retry_errors=retry_errors,
        retry_exceptions=retry_exceptions,
        verbose=verbose,
    )

    # Retry until we succeed or run past the maximum wait time
    total_wait = 0
    retries = -1
    while True:
        caught_exception = None
        caught_exception_info = None
        response = None

        try:
            response = function()
        except Exception as ex:
            caught_exception = ex
            caught_exception_info = sys.exc_info()
            logger.debug(DEBUG_EXCEPTION, function, exc_info=True)
            if hasattr(ex, "response"):
                response = ex.response

        retry = _is_retryable(
            response=response,
            caught_exception=caught_exception,
            caught_exception_info=caught_exception_info,
            expected_status_codes=expected_status_codes,
            retry_status_codes=retry_status_codes,
            retry_exceptions=retry_exceptions,
            retry_errors=retry_errors,
        )

        # Wait then retry
        retries += 1
        if total_wait < retry_max_wait_before_failure and retry:
            _log_for_retry(
                logger=logger, response=response, caught_exception=caught_exception
            )

            backoff_wait = calculate_exponential_backoff(
                retries=retries,
                base_wait=retry_base_wait,
                wait_random_lower=retry_wait_random_lower,
                wait_random_upper=retry_wait_random_upper,
                back_off_factor=retry_back_off_factor,
                max_back_off=retry_max_back_off,
            )
            total_wait += backoff_wait
            time.sleep(backoff_wait)
            continue

        # Out of retries, re-raise the exception or return the response
        if caught_exception_info is not None and caught_exception_info[0] is not None:
            logger.debug(
                (
                    "Retries have run out. re-raising the exception: %s"
                    if retry
                    else "Raising the exception: %s"
                ),
                str(caught_exception_info[0]),
            )
            raise caught_exception
        return response

Utils

synapseclient.core.utils

Utility functions useful in the implementation and testing of the Synapse client.

Classes

threadsafe_iter

Takes an iterator/generator and makes it thread-safe by serializing call to the next method of given iterator/generator. See: http://anandology.com/blog/using-iterators-and-generators/

Source code in synapseclient/core/utils.py
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
class threadsafe_iter:
    """Takes an iterator/generator and makes it thread-safe by serializing call to the
    `next` method of given iterator/generator.
    See: <http://anandology.com/blog/using-iterators-and-generators/>
    """

    def __init__(self, it):
        self.it = it
        self.lock = threading.Lock()

    def __iter__(self):
        return self

    def __next__(self):
        with self.lock:
            return next(self.it)

deprecated_keyword_param

A decorator to use to warn when a keyword parameter from a function has been deprecated and is intended for future removal. Will emit a warning such a keyword is passed.

Source code in synapseclient/core/utils.py
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
class deprecated_keyword_param:
    """A decorator to use to warn when a keyword parameter from a function has been deprecated
    and is intended for future removal. Will emit a warning such a keyword is passed."""

    def __init__(self, keywords, version, reason):
        self.keywords = set(keywords)
        self.version = version
        self.reason = reason

    def __call__(self, fn):
        def wrapper(*args, **kwargs):
            found = self.keywords.intersection(kwargs)
            if found:
                warnings.warn(
                    "Parameter(s) {} deprecated since version {}; {}".format(
                        sorted(list(found)), self.version, self.reason
                    ),
                    category=DeprecationWarning,
                    stacklevel=2,
                )

            return fn(*args, **kwargs)

        return wrapper

Functions

md5_for_file(filename, block_size=2 * MB, callback=None)

Calculates the MD5 of the given file. See source http://stackoverflow.com/questions/1131220/get-md5-hash-of-a-files-without-open-it-in-python.

PARAMETER DESCRIPTION
filename

The file to read in

TYPE: str

block_size

How much of the file to read in at once (bytes). Defaults to 2 MB

TYPE: int DEFAULT: 2 * MB

callback

The callback function that help us show loading spinner on terminal. Defaults to None

TYPE: Callable DEFAULT: None

RETURNS DESCRIPTION

The MD5 Checksum

Source code in synapseclient/core/utils.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def md5_for_file(
    filename: str, block_size: int = 2 * MB, callback: typing.Callable = None
):
    """
    Calculates the MD5 of the given file.
    See source <http://stackoverflow.com/questions/1131220/get-md5-hash-of-a-files-without-open-it-in-python>.

    Arguments:
        filename: The file to read in
        block_size: How much of the file to read in at once (bytes).
                    Defaults to 2 MB
        callback: The callback function that help us show loading spinner on terminal.
                    Defaults to None

    Returns:
        The MD5 Checksum
    """
    loop_iteration = 0
    md5 = hashlib.new("md5", usedforsecurity=False)
    with open(filename, "rb") as f:
        while True:
            loop_iteration += 1
            if callback:
                callback()
            data = f.read(block_size)
            if not data:
                break
            md5.update(data)
            del data
            # Garbage collect every 100 iterations
            if loop_iteration % 100 == 0:
                gc.collect()
    return md5

md5_for_file_hex(filename, block_size=2 * MB, callback=None)

Calculates the MD5 of the given file. See source http://stackoverflow.com/questions/1131220/get-md5-hash-of-a-files-without-open-it-in-python.

PARAMETER DESCRIPTION
filename

The file to read in

TYPE: str

block_size

How much of the file to read in at once (bytes). Defaults to 2 MB

TYPE: int DEFAULT: 2 * MB

callback

The callback function that help us show loading spinner on terminal. Defaults to None

TYPE: Callable DEFAULT: None

RETURNS DESCRIPTION
str

The MD5 Checksum

Source code in synapseclient/core/utils.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def md5_for_file_hex(
    filename: str, block_size: int = 2 * MB, callback: typing.Callable = None
) -> str:
    """
    Calculates the MD5 of the given file.
    See source <http://stackoverflow.com/questions/1131220/get-md5-hash-of-a-files-without-open-it-in-python>.

    Arguments:
        filename: The file to read in
        block_size: How much of the file to read in at once (bytes).
                    Defaults to 2 MB
        callback: The callback function that help us show loading spinner on terminal.
                    Defaults to None

    Returns:
        The MD5 Checksum
    """

    return md5_for_file(filename, block_size, callback).hexdigest()

md5_for_file_multiprocessing(filename, process_pool_executor, md5_semaphore, block_size=2 * MB) async

Calculates the MD5 of the given file. See source http://stackoverflow.com/questions/1131220/get-md5-hash-of-a-files-without-open-it-in-python.

PARAMETER DESCRIPTION
filename

The file to read in

TYPE: str

process_pool_executor

The process pool executor to use for the calculation.

md5_semaphore

The semaphore to use for waiting to calculate.

TYPE: Semaphore

block_size

How much of the file to read in at once (bytes). Defaults to 2 MB.

TYPE: int DEFAULT: 2 * MB

RETURNS DESCRIPTION
str

The MD5 Checksum

Source code in synapseclient/core/utils.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
async def md5_for_file_multiprocessing(
    filename: str,
    process_pool_executor,
    md5_semaphore: asyncio.Semaphore,
    block_size: int = 2 * MB,
) -> str:
    """
    Calculates the MD5 of the given file.
    See source <http://stackoverflow.com/questions/1131220/get-md5-hash-of-a-files-without-open-it-in-python>.

    Arguments:
        filename: The file to read in
        process_pool_executor: The process pool executor to use for the calculation.
        md5_semaphore: The semaphore to use for waiting to calculate.
        block_size: How much of the file to read in at once (bytes).
                    Defaults to 2 MB.

    Returns:
        The MD5 Checksum
    """
    async with md5_semaphore:
        with tracer.start_as_current_span("Utils::md5_for_file_multiprocessing"):
            future = process_pool_executor.submit(
                md5_for_file_hex, filename, block_size
            )
            while not future.done():
                await asyncio.sleep(0)
            result = future.result()
            return result

md5_fn(part, _)

Calculate the MD5 of a file-like object.

PARAMETER DESCRIPTION
part

A file-like object to read from.

RETURNS DESCRIPTION
str

The MD5 Checksum

Source code in synapseclient/core/utils.py
140
141
142
143
144
145
146
147
148
149
150
151
152
@tracer.start_as_current_span("Utils::md5_fn")
def md5_fn(part, _) -> str:
    """Calculate the MD5 of a file-like object.

    Arguments:
        part: A file-like object to read from.

    Returns:
        The MD5 Checksum
    """
    md5 = hashlib.new("md5", usedforsecurity=False)
    md5.update(part)
    return md5.hexdigest()

download_file(url, localFilepath=None)

Downloads a remote file.

PARAMETER DESCRIPTION
localFilePath

May be None, in which case a temporary file is created

RETURNS DESCRIPTION
localFilePath

The path to the downloaded file

TYPE: str

Source code in synapseclient/core/utils.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
def download_file(url: str, localFilepath: str = None) -> str:
    """
    Downloads a remote file.

    Arguments:
        localFilePath: May be None, in which case a temporary file is created

    Returns:
        localFilePath: The path to the downloaded file
    """

    f = None
    try:
        if localFilepath:
            dir = os.path.dirname(localFilepath)
            if not os.path.exists(dir):
                os.makedirs(dir)
            f = open(localFilepath, "wb")
        else:
            f = tempfile.NamedTemporaryFile(delete=False)
            localFilepath = f.name

        r = requests.get(url, stream=True)
        toBeTransferred = float(r.headers["content-length"])
        for nChunks, chunk in enumerate(r.iter_content(chunk_size=1024 * 10)):
            if chunk:
                f.write(chunk)
                printTransferProgress(nChunks * 1024 * 10, toBeTransferred)
    finally:
        if f:
            f.close()
            printTransferProgress(toBeTransferred, toBeTransferred)

    return localFilepath

extract_filename(content_disposition_header, default_filename=None)

Extract a filename from an HTTP content-disposition header field.

See this memo and this package for cryptic details.

Source code in synapseclient/core/utils.py
191
192
193
194
195
196
197
198
199
200
201
202
203
def extract_filename(content_disposition_header, default_filename=None):
    """
    Extract a filename from an HTTP content-disposition header field.

    See [this memo](http://tools.ietf.org/html/rfc6266) and
    [this package](http://pypi.python.org/pypi/rfc6266)
    for cryptic details.
    """

    if not content_disposition_header:
        return default_filename
    value, params = cgi.parse_header(content_disposition_header)
    return params.get("filename", default_filename)

extract_user_name(profile)

Extract a displayable user name from a user's profile

Source code in synapseclient/core/utils.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
def extract_user_name(profile):
    """
    Extract a displayable user name from a user's profile
    """
    if "userName" in profile and profile["userName"]:
        return profile["userName"]
    elif "displayName" in profile and profile["displayName"]:
        return profile["displayName"]
    else:
        if (
            "firstName" in profile
            and profile["firstName"]
            and "lastName" in profile
            and profile["lastName"]
        ):
            return profile["firstName"] + " " + profile["lastName"]
        elif "lastName" in profile and profile["lastName"]:
            return profile["lastName"]
        elif "firstName" in profile and profile["firstName"]:
            return profile["firstName"]
        else:
            return str(profile.get("id", "Unknown-user"))

id_of(obj)

Try to figure out the Synapse ID of the given object.

PARAMETER DESCRIPTION
obj

May be a string, Entity object, or dictionary

TYPE: Union[str, Mapping, Number]

RETURNS DESCRIPTION
str

The ID

RAISES DESCRIPTION
ValueError

if the object doesn't have an ID

Source code in synapseclient/core/utils.py
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
def id_of(obj: typing.Union[str, collections.abc.Mapping, numbers.Number]) -> str:
    """
    Try to figure out the Synapse ID of the given object.

    Arguments:
        obj: May be a string, Entity object, or dictionary

    Returns:
        The ID

    Raises:
        ValueError: if the object doesn't have an ID
    """
    if isinstance(obj, str):
        return str(obj)
    if isinstance(obj, numbers.Number):
        return str(obj)

    id_attr_names = [
        "id",
        "ownerId",
        "tableId",
    ]  # possible attribute names for a synapse Id
    for attribute_name in id_attr_names:
        syn_id = _get_from_members_items_or_properties(obj, attribute_name)
        if syn_id is not None:
            return str(syn_id)

    raise ValueError("Invalid parameters: couldn't find id of " + str(obj))

concrete_type_of(obj)

Return the concrete type of an object representing a Synapse entity. This is meant to operate either against an actual Entity object, or the lighter weight dictionary returned by Synapse#getChildren, both of which are Mappings.

Source code in synapseclient/core/utils.py
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
def concrete_type_of(obj: collections.abc.Mapping):
    """
    Return the concrete type of an object representing a Synapse entity.
    This is meant to operate either against an actual Entity object, or the lighter
    weight dictionary returned by Synapse#getChildren, both of which are Mappings.
    """
    concrete_type = None
    if isinstance(obj, collections.abc.Mapping):
        for key in ("concreteType", "type"):
            concrete_type = obj.get(key)
            if concrete_type:
                break

    if not isinstance(concrete_type, str) or not concrete_type.startswith(
        "org.sagebionetworks.repo.model"
    ):
        raise ValueError("Unable to determine concreteType")

    return concrete_type

is_in_path(id, path)

Determines whether id is in the path as returned from /entity/{id}/path

PARAMETER DESCRIPTION
id

synapse id string

TYPE: str

path

object as returned from '/entity/{id}/path'

TYPE: Mapping

RETURNS DESCRIPTION
bool

True or False

Source code in synapseclient/core/utils.py
299
300
301
302
303
304
305
306
307
308
309
def is_in_path(id: str, path: collections.abc.Mapping) -> bool:
    """Determines whether id is in the path as returned from /entity/{id}/path

    Arguments:
        id: synapse id string
        path: object as returned from '/entity/{id}/path'

    Returns:
        True or False
    """
    return id in [item["id"] for item in path["path"]]

get_properties(entity)

Returns the dictionary of properties of the given Entity.

Source code in synapseclient/core/utils.py
312
313
314
315
def get_properties(entity):
    """Returns the dictionary of properties of the given Entity."""

    return entity.properties if hasattr(entity, "properties") else entity

is_url(s)

Return True if the string appears to be a valid URL.

Source code in synapseclient/core/utils.py
318
319
320
321
322
323
324
325
326
327
328
329
330
331
def is_url(s):
    """Return True if the string appears to be a valid URL."""
    if isinstance(s, str):
        try:
            url_parts = urllib_parse.urlsplit(s)
            # looks like a Windows drive letter?
            if len(url_parts.scheme) == 1 and url_parts.scheme.isalpha():
                return False
            if url_parts.scheme == "file" and bool(url_parts.path):
                return True
            return bool(url_parts.scheme) and bool(url_parts.netloc)
        except Exception:
            return False
    return False

as_url(s)

Tries to convert the input into a proper URL.

Source code in synapseclient/core/utils.py
334
335
336
337
338
339
340
341
342
343
def as_url(s):
    """Tries to convert the input into a proper URL."""
    url_parts = urllib_parse.urlsplit(s)
    # Windows drive letter?
    if len(url_parts.scheme) == 1 and url_parts.scheme.isalpha():
        return "file:///%s" % str(s).replace("\\", "/")
    if url_parts.scheme:
        return url_parts.geturl()
    else:
        return "file://%s" % str(s)

guess_file_name(string)

Tries to derive a filename from an arbitrary string.

Source code in synapseclient/core/utils.py
346
347
348
349
350
351
352
353
354
355
356
357
358
def guess_file_name(string):
    """Tries to derive a filename from an arbitrary string."""
    path = normalize_path(urllib_parse.urlparse(string).path)
    tokens = [x for x in path.split("/") if x != ""]
    if len(tokens) > 0:
        return tokens[-1]

    # Try scrubbing the path of illegal characters
    if len(path) > 0:
        path = re.sub(r"[^a-zA-Z0-9_.+() -]", "", path)
    if len(path) > 0:
        return path
    raise ValueError("Could not derive a name from %s" % string)

normalize_path(path)

Transforms a path into an absolute path with forward slashes only.

Source code in synapseclient/core/utils.py
361
362
363
364
365
def normalize_path(path):
    """Transforms a path into an absolute path with forward slashes only."""
    if path is None:
        return None
    return re.sub(r"\\", "/", os.path.normcase(os.path.abspath(path)))

equal_paths(path1, path2)

Compare file paths in a platform neutral way

Source code in synapseclient/core/utils.py
368
369
370
371
372
def equal_paths(path1, path2):
    """
    Compare file paths in a platform neutral way
    """
    return normalize_path(path1) == normalize_path(path2)

file_url_to_path(url, verify_exists=False)

Convert a file URL to a path, handling some odd cases around Windows paths.

PARAMETER DESCRIPTION
url

a file URL

TYPE: str

verify_exists

If true, return an populated dict only if the resulting file path exists on the local file system.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
Union[str, None]

a path or None if the URL is not a file URL.

Source code in synapseclient/core/utils.py
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
def file_url_to_path(url: str, verify_exists: bool = False) -> typing.Union[str, None]:
    """
    Convert a file URL to a path, handling some odd cases around Windows paths.

    Arguments:
        url: a file URL
        verify_exists: If true, return an populated dict only if the resulting file
                        path exists on the local file system.

    Returns:
        a path or None if the URL is not a file URL.
    """
    parts = urllib_parse.urlsplit(url)
    if parts.scheme == "file" or parts.scheme == "":
        path = parts.path
        # A windows file URL, for example file:///c:/WINDOWS/asdf.txt
        # will get back a path of: /c:/WINDOWS/asdf.txt, which we need to fix by
        # lopping off the leading slash character. Apparently, the Python developers
        # think this is not a bug: http://bugs.python.org/issue7965
        if SLASH_PREFIX_REGEX.match(path):
            path = path[1:]
        if os.path.exists(path) or not verify_exists:
            return path
    return None

is_same_base_url(url1, url2)

Compares two urls to see if they are the same excluding up to the base path

PARAMETER DESCRIPTION
url1

a URL

TYPE: str

url2

a second URL

TYPE: str

RETURNS DESCRIPTION
bool

A Boolean

Source code in synapseclient/core/utils.py
401
402
403
404
405
406
407
408
409
410
411
412
413
def is_same_base_url(url1: str, url2: str) -> bool:
    """Compares two urls to see if they are the same excluding up to the base path

    Arguments:
        url1: a URL
        url2: a second URL

    Returns:
        A Boolean
    """
    url1 = urllib_parse.urlsplit(url1)
    url2 = urllib_parse.urlsplit(url2)
    return url1.scheme == url2.scheme and url1.hostname == url2.hostname

is_synapse_id_str(obj)

If the input is a Synapse ID return it, otherwise return None

Source code in synapseclient/core/utils.py
416
417
418
419
420
421
422
def is_synapse_id_str(obj: str) -> typing.Union[str, None]:
    """If the input is a Synapse ID return it, otherwise return None"""
    if isinstance(obj, str):
        m = re.match(r"(syn\d+(\.\d+)?$)", obj)
        if m:
            return m.group(1)
    return None

get_synid_and_version(obj)

Extract the Synapse ID and version number from input entity

PARAMETER DESCRIPTION
obj

May be a string, Entity object, or dictionary.

TYPE: Union[str, Mapping]

RETURNS DESCRIPTION
Tuple[str, Union[int, None]]

A tuple containing the synapse ID and version number, where the version number may be an integer or None if the input object does not contain a versonNumber or .version notation (if string).

Get synID and version from string object

Extract the synID and version number of the entity string ID

from synapseclient.core import utils
utils.get_synid_and_version("syn123.4")

The call above will return the following tuple:

('syn123', 4)
Source code in synapseclient/core/utils.py
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
def get_synid_and_version(
    obj: typing.Union[str, collections.abc.Mapping]
) -> typing.Tuple[str, typing.Union[int, None]]:
    """Extract the Synapse ID and version number from input entity

    Arguments:
            obj: May be a string, Entity object, or dictionary.

    Returns:
        A tuple containing the synapse ID and version number,
            where the version number may be an integer or None if
            the input object does not contain a versonNumber or
            .version notation (if string).

    Example: Get synID and version from string object
        Extract the synID and version number of the entity string ID

            from synapseclient.core import utils
            utils.get_synid_and_version("syn123.4")

        The call above will return the following tuple:

            ('syn123', 4)
    """

    if isinstance(obj, str):
        synapse_id_and_version = is_synapse_id_str(obj)
        if not synapse_id_and_version:
            raise ValueError("The input string was not determined to be a syn ID.")
        m = re.match(r"(syn\d+)(?:\.(\d+))?", synapse_id_and_version)
        id = m.group(1)
        version = int(m.group(2)) if m.group(2) is not None else m.group(2)

        return id, version

    id = id_of(obj)
    version = None
    if "versionNumber" in obj:
        version = obj["versionNumber"]

    return id, version

bool_or_none(input_value)

Attempts to convert a string to a bool. Returns None if it fails.

PARAMETER DESCRIPTION
input_value

The string to convert to a bool

TYPE: str

RETURNS DESCRIPTION
Union[bool, None]

The bool or None if the conversion fails

Source code in synapseclient/core/utils.py
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
def bool_or_none(input_value: str) -> typing.Union[bool, None]:
    """
    Attempts to convert a string to a bool. Returns None if it fails.

    Arguments:
        input_value: The string to convert to a bool

    Returns:
        The bool or None if the conversion fails
    """
    if input_value is None or input_value == "":
        return None

    return_value = None

    if input_value.lower() == "true":
        return_value = True
    elif input_value.lower() == "false":
        return_value = False

    return return_value

datetime_or_none(datetime_str)

Attempts to convert a string to a datetime object. Returns None if it fails.

Some of the expected formats of datetime_str are:

  • 2023-12-04T07:00:00Z
  • 2001-01-01 15:00:00+07:00
  • 2001-01-01 15:00:00-07:00
  • 2023-12-04 07:00:00+00:00
  • 2019-01-01
PARAMETER DESCRIPTION
datetime_str

The string to convert to a datetime object

TYPE: str

RETURNS DESCRIPTION
Union[datetime, None]

The datetime object or None if the conversion fails

Source code in synapseclient/core/utils.py
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
def datetime_or_none(datetime_str: str) -> typing.Union[datetime.datetime, None]:
    """Attempts to convert a string to a datetime object. Returns None if it fails.

    Some of the expected formats of datetime_str are:

    - 2023-12-04T07:00:00Z
    - 2001-01-01 15:00:00+07:00
    - 2001-01-01 15:00:00-07:00
    - 2023-12-04 07:00:00+00:00
    - 2019-01-01

    Arguments:
        datetime_str: The string to convert to a datetime object

    Returns:
        The datetime object or None if the conversion fails
    """
    try:
        return datetime.datetime.fromisoformat(datetime_str.replace("Z", "+00:00"))
    except Exception:
        return None

is_date(dt)

Objects of class datetime.date and datetime.datetime will be recognized as dates

Source code in synapseclient/core/utils.py
514
515
516
def is_date(dt):
    """Objects of class datetime.date and datetime.datetime will be recognized as dates"""
    return isinstance(dt, datetime.date) or isinstance(dt, datetime.datetime)

to_list(value)

Convert the value (an iterable or a scalar value) to a list.

Source code in synapseclient/core/utils.py
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
def to_list(value):
    """Convert the value (an iterable or a scalar value) to a list."""
    if isinstance(value, collections.abc.Iterable) and not isinstance(value, str):
        values = []
        for val in value:
            possible_datetime = None
            if isinstance(val, str):
                possible_datetime = datetime_or_none(value)
            values.append(val if possible_datetime is None else possible_datetime)
        return values
    else:
        possible_datetime = None
        if isinstance(value, str):
            possible_datetime = datetime_or_none(value)
        return [value if possible_datetime is None else possible_datetime]

make_bogus_uuid_file()

Makes a bogus test file with a uuid4 string for testing. It is the caller's responsibility to clean up the file when finished.

RETURNS DESCRIPTION
str

The name of the file

Source code in synapseclient/core/utils.py
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
def make_bogus_uuid_file() -> str:
    """
    Makes a bogus test file with a uuid4 string for testing. It is the caller's
    responsibility to clean up the file when finished.

    Returns:
        The name of the file
    """

    data = uuid.uuid4()

    f = tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False)
    try:
        f.write(str(data))
        f.write("\n")
    finally:
        f.close()

    return normalize_path(f.name)

make_bogus_data_file(n=100, seed=None)

Makes a bogus data file for testing. It is the caller's responsibility to clean up the file when finished.

PARAMETER DESCRIPTION
n

How many random floating point numbers to be written into the file, separated by commas

TYPE: int DEFAULT: 100

seed

Random seed for the random numbers

TYPE: int DEFAULT: None

RETURNS DESCRIPTION
str

The name of the file

Source code in synapseclient/core/utils.py
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
def make_bogus_data_file(n: int = 100, seed: int = None) -> str:
    """
    Makes a bogus data file for testing. It is the caller's responsibility
    to clean up the file when finished.

    Arguments:
        n: How many random floating point numbers to be written into the file,
            separated by commas
        seed: Random seed for the random numbers

    Returns:
        The name of the file
    """

    if seed is not None:
        random.seed(seed)
    data = [random.gauss(mu=0.0, sigma=1.0) for i in range(n)]

    f = tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False)
    try:
        f.write(", ".join(str(n) for n in data))
        f.write("\n")
    finally:
        f.close()

    return normalize_path(f.name)

make_bogus_binary_file(n=1 * KB, filepath=None, printprogress=False)

Makes a bogus binary data file for testing. It is the caller's responsibility to clean up the file when finished.

PARAMETER DESCRIPTION
n

How many bytes to write

TYPE: int DEFAULT: 1 * KB

filepath

Where to write the data

TYPE: str DEFAULT: None

printprogress

Toggle printing of progress

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
str

The name of the file

Source code in synapseclient/core/utils.py
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
def make_bogus_binary_file(
    n: int = 1 * KB, filepath: str = None, printprogress: bool = False
) -> str:
    """
    Makes a bogus binary data file for testing. It is the caller's responsibility
    to clean up the file when finished.

    Arguments:
        n: How many bytes to write
        filepath: Where to write the data
        printprogress: Toggle printing of progress

    Returns:
        The name of the file
    """

    with (
        open(filepath, "wb")
        if filepath
        else tempfile.NamedTemporaryFile(mode="wb", suffix=".dat", delete=False)
    ) as f:
        if not filepath:
            filepath = f.name
        progress = 0
        remaining = n
        while remaining > 0:
            buff_size = int(min(remaining, 1 * KB))
            f.write(os.urandom(buff_size))
            remaining -= buff_size
            if printprogress:
                progress += buff_size
                printTransferProgress(progress, n, "Generated ", filepath)
        return normalize_path(filepath)

to_unix_epoch_time(dt)

Convert either datetime.date or datetime.datetime objects to UNIX time.

Source code in synapseclient/core/utils.py
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
def to_unix_epoch_time(dt: typing.Union[datetime.date, datetime.datetime, str]) -> int:
    """
    Convert either [datetime.date or datetime.datetime objects](http://docs.python.org/2/library/datetime.html) to UNIX time.
    """
    if type(dt) == str:
        dt = datetime.datetime.fromisoformat(dt.replace("Z", "+00:00"))
    if type(dt) == datetime.date:
        current_timezone = datetime.datetime.now().astimezone().tzinfo
        datetime_utc = datetime.datetime.combine(dt, datetime.time(0, 0, 0, 0)).replace(
            tzinfo=current_timezone
        )
    else:
        # If the datetime is not timezone aware, assume it is in the local timezone.
        # This is required in order for windows to work with the `astimezone` method.
        if dt.tzinfo is None:
            current_timezone = datetime.datetime.now().astimezone().tzinfo
            dt = dt.replace(tzinfo=current_timezone)
        datetime_utc = dt.astimezone(datetime.timezone.utc)
    return int((datetime_utc - UNIX_EPOCH).total_seconds() * 1000)

to_unix_epoch_time_secs(dt)

Convert either datetime.date or datetime.datetime objects to UNIX time.

Source code in synapseclient/core/utils.py
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
def to_unix_epoch_time_secs(
    dt: typing.Union[datetime.date, datetime.datetime]
) -> float:
    """
    Convert either [datetime.date or datetime.datetime objects](http://docs.python.org/2/library/datetime.html) to UNIX time.
    """
    if type(dt) == datetime.date:
        current_timezone = datetime.datetime.now().astimezone().tzinfo
        datetime_utc = datetime.datetime.combine(dt, datetime.time(0, 0, 0, 0)).replace(
            tzinfo=current_timezone
        )
    else:
        # If the datetime is not timezone aware, assume it is in the local timezone.
        # This is required in order for windows to work with the `astimezone` method.
        if dt.tzinfo is None:
            current_timezone = datetime.datetime.now().astimezone().tzinfo
            dt = dt.replace(tzinfo=current_timezone)
        datetime_utc = dt.astimezone(datetime.timezone.utc)
    return (datetime_utc - UNIX_EPOCH).total_seconds()

from_unix_epoch_time_secs(secs)

Returns a Datetime object given milliseconds since midnight Jan 1, 1970.

Source code in synapseclient/core/utils.py
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
def from_unix_epoch_time_secs(secs):
    """Returns a Datetime object given milliseconds since midnight Jan 1, 1970."""
    if isinstance(secs, str):
        secs = float(secs)

    # utcfromtimestamp() fails for negative values (dates before 1970-1-1) on Windows
    # so, here's a hack that enables ancient events, such as Chris's birthday to be
    # converted from milliseconds since the UNIX epoch to higher level Datetime objects. Ha!
    if platform.system() == "Windows" and secs < 0:
        mirror_date = datetime.datetime.utcfromtimestamp(abs(secs)).replace(
            tzinfo=datetime.timezone.utc
        )

        result = (UNIX_EPOCH - (mirror_date - UNIX_EPOCH)).replace(
            tzinfo=datetime.timezone.utc
        )

        return result
    datetime_instance = datetime.datetime.utcfromtimestamp(secs).replace(
        tzinfo=datetime.timezone.utc
    )

    return datetime_instance

from_unix_epoch_time(ms)

Returns a Datetime object given milliseconds since midnight Jan 1, 1970.

Source code in synapseclient/core/utils.py
694
695
696
697
698
699
def from_unix_epoch_time(ms) -> datetime.datetime:
    """Returns a Datetime object given milliseconds since midnight Jan 1, 1970."""

    if isinstance(ms, str):
        ms = float(ms)
    return from_unix_epoch_time_secs(ms / 1000.0)

datetime_to_iso(dt, sep='T', include_milliseconds_if_zero=True)

Round microseconds to milliseconds (as expected by older clients) and add back the "Z" at the end. See: http://stackoverflow.com/questions/30266188/how-to-convert-date-string-to-iso8601-standard

PARAMETER DESCRIPTION
dt

The datetime to convert

TYPE: datetime

sep

Seperator character to use.

TYPE: str DEFAULT: 'T'

include_milliseconds_if_zero

Whether or not to include millseconds in this result if the number of millseconds is 0.

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
str

The formatted string.

Source code in synapseclient/core/utils.py
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
def datetime_to_iso(
    dt: datetime.datetime, sep: str = "T", include_milliseconds_if_zero: bool = True
) -> str:
    """
    Round microseconds to milliseconds (as expected by older clients) and add back
    the "Z" at the end.
    See: http://stackoverflow.com/questions/30266188/how-to-convert-date-string-to-iso8601-standard

    Arguments:
        dt: The datetime to convert
        sep: Seperator character to use.
        include_milliseconds_if_zero: Whether or not to include millseconds in this result
                                        if the number of millseconds is 0.

    Returns:
        The formatted string.
    """
    fmt = (
        "{time.year:04}-{time.month:02}-{time.day:02}"
        "{sep}{time.hour:02}:{time.minute:02}:{time.second:02}.{millisecond:03}{tz}"
    )
    fmt_no_mills = (
        "{time.year:04}-{time.month:02}-{time.day:02}"
        "{sep}{time.hour:02}:{time.minute:02}:{time.second:02}{tz}"
    )
    if dt.microsecond >= 999500:
        dt -= datetime.timedelta(microseconds=dt.microsecond)
        dt += datetime.timedelta(seconds=1)
    rounded_microseconds = int(round(dt.microsecond / 1000.0))
    if include_milliseconds_if_zero or rounded_microseconds:
        return fmt.format(time=dt, millisecond=rounded_microseconds, tz="Z", sep=sep)
    else:
        return fmt_no_mills.format(
            time=dt, millisecond=rounded_microseconds, tz="Z", sep=sep
        )

format_time_interval(seconds)

Format a time interval given in seconds to a readable value, e.g. "5 minutes, 37 seconds".

Source code in synapseclient/core/utils.py
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
def format_time_interval(seconds):
    """
    Format a time interval given in seconds to a readable value,
    e.g. \"5 minutes, 37 seconds\".
    """

    periods = (
        ("year", 60 * 60 * 24 * 365),
        ("month", 60 * 60 * 24 * 30),
        ("day", 60 * 60 * 24),
        ("hour", 60 * 60),
        ("minute", 60),
        ("second", 1),
    )

    result = []
    for period_name, period_seconds in periods:
        if seconds > period_seconds or period_name == "second":
            period_value, seconds = divmod(seconds, period_seconds)
            if period_value > 0 or period_name == "second":
                if period_value == 1:
                    result.append("%d %s" % (period_value, period_name))
                else:
                    result.append("%d %ss" % (period_value, period_name))
    return ", ".join(result)

itersubclasses(cls, _seen=None)

http://code.activestate.com/recipes/576949/ (r3)

itersubclasses(cls)

Generator over all subclasses of a given class, in depth first order.

>>> list(itersubclasses(int)) == [bool]
True
>>> class A(object): pass
>>> class B(A): pass
>>> class C(A): pass
>>> class D(B,C): pass
>>> class E(D): pass
>>>
>>> for cls in itersubclasses(A):
...     print(cls.__name__)
B
D
E
C
>>> # get ALL (new-style) classes currently defined
>>> [cls.__name__ for cls in itersubclasses(object)] #doctest: +ELLIPSIS
['type', ...'tuple', ...]
Source code in synapseclient/core/utils.py
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
def itersubclasses(cls, _seen=None):
    """
    <http://code.activestate.com/recipes/576949/> (r3)

    itersubclasses(cls)

    Generator over all subclasses of a given class, in depth first order.

        >>> list(itersubclasses(int)) == [bool]
        True
        >>> class A(object): pass
        >>> class B(A): pass
        >>> class C(A): pass
        >>> class D(B,C): pass
        >>> class E(D): pass
        >>>
        >>> for cls in itersubclasses(A):
        ...     print(cls.__name__)
        B
        D
        E
        C
        >>> # get ALL (new-style) classes currently defined
        >>> [cls.__name__ for cls in itersubclasses(object)] #doctest: +ELLIPSIS
        ['type', ...'tuple', ...]
    """

    if not isinstance(cls, type):
        raise TypeError(
            "itersubclasses must be called with " "new-style classes, not %.100r" % cls
        )
    if _seen is None:
        _seen = set()
    try:
        subs = cls.__subclasses__()
    except TypeError:  # fails only when cls is type
        subs = cls.__subclasses__(cls)
    for sub in subs:
        if sub not in _seen:
            _seen.add(sub)
            yield sub
            for inner_sub in itersubclasses(sub, _seen):
                yield inner_sub

normalize_whitespace(s)

Strips the string and replace all whitespace sequences and other non-printable characters with a single space.

Source code in synapseclient/core/utils.py
824
825
826
827
828
829
830
def normalize_whitespace(s):
    """
    Strips the string and replace all whitespace sequences and other
    non-printable characters with a single space.
    """
    assert isinstance(s, str)
    return re.sub(r"[\x00-\x20\s]+", " ", s.strip())

query_limit_and_offset(query, hard_limit=1000)

Extract limit and offset from the end of a query string.

RETURNS DESCRIPTION
str

A tuple containing the query with limit and offset removed,

int

the limit at most equal to the hard_limit,

int

and the offset which defaults to 1

Source code in synapseclient/core/utils.py
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
def query_limit_and_offset(
    query: str, hard_limit: int = 1000
) -> typing.Tuple[str, int, int]:
    """
    Extract limit and offset from the end of a query string.

    Returns:
        A tuple containing the query with limit and offset removed,
        the limit at most equal to the hard_limit,
        and the offset which defaults to 1
    """
    # Regex a lower-case string to simplify matching
    tempQueryStr = query.lower()
    regex = r"\A(.*\s)(offset|limit)\s*(\d*\s*)\Z"

    # Continue to strip off and save the last limit/offset
    match = re.search(regex, tempQueryStr)
    options = {}
    while match is not None:
        options[match.group(2)] = int(match.group(3))
        tempQueryStr = match.group(1)
        match = re.search(regex, tempQueryStr)

    # Get a truncated version of the original query string (not in lower-case)
    query = query[: len(tempQueryStr)].strip()

    # Continue querying until the entire query has been fetched (or crash out)
    limit = min(options.get("limit", hard_limit), hard_limit)
    offset = options.get("offset", 1)

    return query, limit, offset

extract_synapse_id_from_query(query)

An unfortunate hack to pull the synapse ID out of a table query of the form "select column1, column2 from syn12345 where...." needed to build URLs for table services.

Source code in synapseclient/core/utils.py
929
930
931
932
933
934
935
936
937
938
939
def extract_synapse_id_from_query(query):
    """
    An unfortunate hack to pull the synapse ID out of a table query of the form
    "select column1, column2 from syn12345 where...."
    needed to build URLs for table services.
    """
    m = re.search(r"from\s+(syn\d+)", query, re.IGNORECASE)
    if m:
        return m.group(1)
    else:
        raise ValueError('Couldn\'t extract synapse ID from query: "%s"' % query)

printTransferProgress(transferred, toBeTransferred, prefix='', postfix='', isBytes=True, dt=None, previouslyTransferred=0)

Prints a progress bar

PARAMETER DESCRIPTION
transferred

a number of items/bytes completed

TYPE: int

toBeTransferred

total number of items/bytes when completed

TYPE: int

prefix

String printed before progress bar

TYPE: str DEFAULT: ''

postfix

String printed after progress bar

TYPE: str DEFAULT: ''

isBytes

A boolean indicating whether to convert bytes to kB, MB, GB etc.

TYPE: bool DEFAULT: True

dt

The time in seconds that has passed since transfer started is used to calculate rate

TYPE: float DEFAULT: None

previouslyTransferred

the number of bytes that were already transferred before this transfer began (e.g. someone ctrl+c'd out of an upload and restarted it later)

TYPE: int DEFAULT: 0

Source code in synapseclient/core/utils.py
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
def printTransferProgress(
    transferred: int,
    toBeTransferred: int,
    prefix: str = "",
    postfix: str = "",
    isBytes: bool = True,
    dt: float = None,
    previouslyTransferred: int = 0,
):
    """Prints a progress bar

    Arguments:
        transferred: a number of items/bytes completed
        toBeTransferred: total number of items/bytes when completed
        prefix: String printed before progress bar
        postfix: String printed after progress bar
        isBytes: A boolean indicating whether to convert bytes to kB, MB, GB etc.
        dt: The time in seconds that has passed since transfer started is used to calculate rate
        previouslyTransferred: the number of bytes that were already transferred before this
                                transfer began (e.g. someone ctrl+c'd out of an upload and
                                restarted it later)
    """
    if not sys.stdout.isatty():
        return
    barLength = 20  # Modify this to change the length of the progress bar
    status = ""
    rate = ""
    if dt is not None and dt != 0:
        rate = (transferred - previouslyTransferred) / float(dt)
        rate = "(%s/s)" % humanizeBytes(rate) if isBytes else rate
    if toBeTransferred < 0:
        defaultToBeTransferred = barLength * 1 * MB
        if transferred > defaultToBeTransferred:
            progress = (
                float(transferred % defaultToBeTransferred) / defaultToBeTransferred
            )
        else:
            progress = float(transferred) / defaultToBeTransferred
    elif toBeTransferred == 0:  # There is nothing to be transferred
        progress = 1
        status = "Done...\n"
    else:
        progress = float(transferred) / toBeTransferred
        if progress >= 1:
            progress = 1
            status = "Done...\n"
    block = int(round(barLength * progress))
    nbytes = humanizeBytes(transferred) if isBytes else transferred
    if toBeTransferred > 0:
        outOf = "/%s" % (humanizeBytes(toBeTransferred) if isBytes else toBeTransferred)
        percentage = "%4.2f%%" % (progress * 100)
    else:
        outOf = ""
        percentage = ""
    text = "\r%s [%s]%s   %s%s %s %s %s    \n" % (
        prefix,
        "#" * block + "-" * (barLength - block),
        percentage,
        nbytes,
        outOf,
        rate,
        postfix,
        status,
    )
    sys.stdout.write(text)
    sys.stdout.flush()

touch(path, times=None)

Make sure a file exists. Update its access and modified times.

Source code in synapseclient/core/utils.py
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
def touch(path, times=None):
    """
    Make sure a file exists. Update its access and modified times.
    """
    basedir = os.path.dirname(path)
    if not os.path.exists(basedir):
        try:
            os.makedirs(basedir)
        except OSError as err:
            # alternate processes might be creating these at the same time
            if err.errno != errno.EEXIST:
                raise

    with open(path, "a"):
        os.utime(path, times)
    return path

is_json(content_type)

detect if a content-type is JSON

Source code in synapseclient/core/utils.py
1042
1043
1044
1045
1046
1047
1048
1049
1050
def is_json(content_type):
    """detect if a content-type is JSON"""
    # The value of Content-Type defined here:
    # http://www.w3.org/Protocols/rfc2616/rfc2616-sec3.html#sec3.7
    return (
        content_type.lower().strip().startswith("application/json")
        if content_type
        else False
    )

find_data_file_handle(bundle)

Return the fileHandle whose ID matches the dataFileHandleId in an entity bundle

Source code in synapseclient/core/utils.py
1053
1054
1055
1056
1057
1058
def find_data_file_handle(bundle):
    """Return the fileHandle whose ID matches the dataFileHandleId in an entity bundle"""
    for fileHandle in bundle["fileHandles"]:
        if fileHandle["id"] == bundle["entity"]["dataFileHandleId"]:
            return fileHandle
    return None

unique_filename(path)

Returns a unique path by appending (n) for some number n to the end of the filename.

Source code in synapseclient/core/utils.py
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
def unique_filename(path):
    """Returns a unique path by appending (n) for some number n to the end of the filename."""

    base, ext = os.path.splitext(path)
    counter = 0
    while os.path.exists(path):
        counter += 1
        path = base + ("(%d)" % counter) + ext

    return path

threadsafe_generator(f)

A decorator that takes a generator function and makes it thread-safe. See: http://anandology.com/blog/using-iterators-and-generators/

Source code in synapseclient/core/utils.py
1091
1092
1093
1094
1095
1096
1097
1098
1099
def threadsafe_generator(f):
    """A decorator that takes a generator function and makes it thread-safe.
    See: <http://anandology.com/blog/using-iterators-and-generators/>
    """

    def g(*a, **kw):
        return threadsafe_iter(f(*a, **kw))

    return g

extract_prefix(keys)

Takes a list of strings and extracts a common prefix delimited by a dot, for example::

extract_prefix(["entity.bang", "entity.bar", "entity.bat"])
# returns "entity"
Source code in synapseclient/core/utils.py
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
def extract_prefix(keys):
    """
    Takes a list of strings and extracts a common prefix delimited by a dot,
    for example::

        extract_prefix(["entity.bang", "entity.bar", "entity.bat"])
        # returns "entity"

    """
    prefixes = set()
    for key in keys:
        parts = key.split(".")
        if len(parts) > 1:
            prefixes.add(parts[0])
        else:
            return ""
    if len(prefixes) == 1:
        return prefixes.pop() + "."
    return ""

extract_zip_file_to_directory(zip_file, zip_entry_name, target_dir)

Extracts a specified file in a zip to the specified directory

PARAMETER DESCRIPTION
zip_file

an opened zip file. e.g. "with zipfile.ZipFile(zipfilepath) as zip_file:"

TYPE: ZipFile

zip_entry_name

the name of the file to be extracted from the zip e.g. folderInsideZipIfAny/fileName.txt

TYPE: str

target_dir

the directory to which the file will be extracted

TYPE: str

RETURNS DESCRIPTION
str

full path to the extracted file

Source code in synapseclient/core/utils.py
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
def extract_zip_file_to_directory(
    zip_file: zipfile.ZipFile, zip_entry_name: str, target_dir: str
) -> str:
    """
    Extracts a specified file in a zip to the specified directory

    Arguments:
        zip_file: an opened zip file. e.g. "with zipfile.ZipFile(zipfilepath) as zip_file:"
        zip_entry_name: the name of the file to be extracted from the zip
                        e.g. folderInsideZipIfAny/fileName.txt
        target_dir: the directory to which the file will be extracted

    Returns:
        full path to the extracted file
    """
    file_base_name = os.path.basename(zip_entry_name)  # base name of the file
    filepath = os.path.join(
        target_dir, file_base_name
    )  # file path to the cached file to write

    # Create the cache directory if it does not exist
    if not os.path.exists(target_dir):
        os.makedirs(target_dir)

    # write the file from the zip into the cache
    with open(filepath, "wb") as cache_file:
        cache_file.write(zip_file.read(zip_entry_name))

    return filepath

topolgical_sort(graph)

Given a graph in the form of a dictionary returns a sorted list Adapted from: http://blog.jupo.org/2012/04/06/topological-sorting-acyclic-directed-graphs/

PARAMETER DESCRIPTION
graph

a dictionary with values containing lists of keys

TYPE: Dict[str, List[str]]

RETURNS DESCRIPTION
list

A sorted list of items

Source code in synapseclient/core/utils.py
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
def topolgical_sort(graph: typing.Dict[str, typing.List[str]]) -> list:
    """Given a graph in the form of a dictionary returns a sorted list
    Adapted from:
    <http://blog.jupo.org/2012/04/06/topological-sorting-acyclic-directed-graphs/>

    Arguments:
        graph: a dictionary with values containing lists of keys
        referencing back into the dictionary

    Returns:
        A sorted list of items
    """
    graph_unsorted = graph.copy()
    graph_sorted = []
    # Convert the unsorted graph into a hash table. This gives us
    # constant-time lookup for checking if edges are unresolved

    # Run until the unsorted graph is empty.
    while graph_unsorted:
        # Go through each of the node/edges pairs in the unsorted
        # graph. If a set of edges doesn't contain any nodes that
        # haven't been resolved, that is, that are still in the
        # unsorted graph, remove the pair from the unsorted graph,
        # and append it to the sorted graph. Note here that by using
        # using the items() method for iterating, a copy of the
        # unsorted graph is used, allowing us to modify the unsorted
        # graph as we move through it. We also keep a flag for
        # checking that that graph is acyclic, which is true if any
        # nodes are resolved during each pass through the graph. If
        # not, we need to bail out as the graph therefore can't be
        # sorted.
        acyclic = False
        for node, edges in list(graph_unsorted.items()):
            for edge in edges:
                if edge in graph_unsorted:
                    break
            else:
                acyclic = True
                del graph_unsorted[node]
                graph_sorted.append((node, edges))

        if not acyclic:
            # We've passed through all the unsorted nodes and
            # weren't able to resolve any of them, which means there
            # are nodes with cyclic edges that will never be resolved,
            # so we bail out with an error.
            raise RuntimeError(
                "A cyclic dependency occurred."
                " Some files in provenance reference each other circularly."
            )
    return graph_sorted

caller_module_name(current_frame)

Returns the name of the module in which the calling function resides.

PARAMETER DESCRIPTION
current_frame

use inspect.currentframe().

RETURNS DESCRIPTION

the name of the module calling the function, foo(),

in which this calling_module() is invoked.

Ignores callers that belong in the same module as foo()

Source code in synapseclient/core/utils.py
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
def caller_module_name(current_frame):
    """
    Returns the name of the module in which the calling function resides.

    Arguments:
        current_frame: use inspect.currentframe().

    Returns:
        the name of the module calling the function, foo(),
        in which this calling_module() is invoked.
        Ignores callers that belong in the same module as foo()
    """

    current_frame_filename = (
        current_frame.f_code.co_filename
    )  # filename in which foo() resides

    # go back a frame takes us to the frame calling foo()
    caller_frame = current_frame.f_back
    caller_filename = caller_frame.f_code.co_filename

    # find the first frame that does not have the same filename.
    # this ensures that we don't consider functions within
    # the same module as foo() that use foo() as a helper function
    while caller_filename == current_frame_filename:
        caller_frame = caller_frame.f_back
        caller_filename = caller_frame.f_code.co_filename

    return inspect.getmodulename(caller_filename)

attempt_import(module_name, fail_message)

Attempt to import a module by name and return the imported module if successful.

PARAMETER DESCRIPTION
module_name

The name of the module to import.

TYPE: str

fail_message

The error message to display if the import fails.

TYPE: str

RETURNS DESCRIPTION

The imported module.

RAISES DESCRIPTION
ImportError

If the module cannot be imported.

Source code in synapseclient/core/utils.py
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
def attempt_import(module_name: str, fail_message: str):
    """
    Attempt to import a module by name and return the imported module if successful.

    Arguments:
        module_name: The name of the module to import.
        fail_message: The error message to display if the import fails.

    Returns:
        The imported module.

    Raises:
        ImportError: If the module cannot be imported.

    """
    try:
        return importlib.import_module(module_name)
    except ImportError:
        sys.stderr.write(
            (
                fail_message
                + "To install this library on Mac or Linux distributions:\n"
                "    (sudo) pip install %s\n\n"
                "On Windows, right click the Command Prompt(cmd.exe) and select 'Run as administrator' then:\n"
                "    pip install %s\n\n"
                "\n\n\n" % (module_name, module_name)
            )
        )
        raise

snake_case(string)

Convert the given string from CamelCase to snake_case

Source code in synapseclient/core/utils.py
1297
1298
1299
1300
def snake_case(string):
    """Convert the given string from CamelCase to snake_case"""
    # https://stackoverflow.com/a/1176023
    return re.sub(r"(?<!^)(?=[A-Z])", "_", string).lower()

is_base64_encoded(input_string)

Return whether the given input string appears to be base64 encoded

Source code in synapseclient/core/utils.py
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
def is_base64_encoded(input_string):
    """Return whether the given input string appears to be base64 encoded"""
    if not input_string:
        # None, empty string are not considered encoded
        return False
    try:
        # see if we can decode it and then reencode it back to the input
        byte_string = (
            input_string
            if isinstance(input_string, bytes)
            else str.encode(input_string)
        )
        return base64.b64encode(base64.b64decode(byte_string)) == byte_string
    except Exception:
        return False

run_and_attach_otel_context(callable_function, current_context)

This is a generic function that will run a callable function and attach the passed in OpenTelemetry context to the thread or context that the function is running on.

This is a hack to get around AsyncIO run_in_executor not propagating the context to the code it's executing. When we are directly calling async functions after SYNPY-1411 we will be able to remove this function.

Adding this to a run_in_executor call

Note the 2 lambdas that are required:

import asyncio
from opentelemetry import context
from synapseclient import Synapse

loop = asyncio.get_event_loop()
current_context = context.get_current()
await loop.run_in_executor(
    None,
    lambda: run_and_attach_otel_context(
        lambda: Synapse.get_client(synapse_client=synapse_client).delete(
            obj="syn123",
        ),
        current_context,
    ),
)
Source code in synapseclient/core/utils.py
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
def run_and_attach_otel_context(
    callable_function: Callable[..., R], current_context: Context
) -> R:
    """
    This is a generic function that will run a callable function and attach the passed in
    OpenTelemetry context to the thread or context that the function is running on.

    This is a hack to get around AsyncIO `run_in_executor` not propagating the context
    to the code it's executing. When we are directly calling async functions after
    SYNPY-1411 we will be able to remove this function.

    Example: Adding this to a `run_in_executor` call
        Note the 2 lambdas that are required:

            import asyncio
            from opentelemetry import context
            from synapseclient import Synapse

            loop = asyncio.get_event_loop()
            current_context = context.get_current()
            await loop.run_in_executor(
                None,
                lambda: run_and_attach_otel_context(
                    lambda: Synapse.get_client(synapse_client=synapse_client).delete(
                        obj="syn123",
                    ),
                    current_context,
                ),
            )
    """
    context.attach(current_context)
    return callable_function()

delete_none_keys(incoming_object)

Clean up the incoming object by removing any keys with None values.

Source code in synapseclient/core/utils.py
1393
1394
1395
1396
1397
1398
def delete_none_keys(incoming_object: typing.Dict) -> None:
    """Clean up the incoming object by removing any keys with None values."""
    if incoming_object:
        for key in list(incoming_object.keys()):
            if incoming_object[key] is None:
                del incoming_object[key]

merge_dataclass_entities(source, destination)

Utility function to merge two dataclass entities together. This is used when we are upserting an entity from the Synapse service with the requested changes.

PARAMETER DESCRIPTION
source

The source entity to merge from.

TYPE: Union[Project, Folder, File]

destination

The destination entity to merge into.

TYPE: Union[Project, Folder]

RETURNS DESCRIPTION
Union[Project, Folder]

The destination entity with the merged values.

Source code in synapseclient/core/utils.py
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
def merge_dataclass_entities(
    source: typing.Union["Project", "Folder", "File"],
    destination: typing.Union["Project", "Folder"],
) -> typing.Union["Project", "Folder"]:
    """
    Utility function to merge two dataclass entities together. This is used when we are
    upserting an entity from the Synapse service with the requested changes.

    Arguments:
        source: The source entity to merge from.
        destination: The destination entity to merge into.

    Returns:
        The destination entity with the merged values.
    """
    # Convert dataclasses to dictionaries
    destination_dict = asdict(destination)
    source_dict = asdict(source)
    modified_items = {}

    # Update destination_dict with source_dict, keeping destination's values in case of conflicts
    for key, value in source_dict.items():
        if is_dataclass(getattr(source, key)):
            if hasattr(destination, key):
                setattr(destination, key, getattr(source, key))
            else:
                modified_items[key] = merge_dataclass_entities(
                    getattr(source, key), destination=getattr(destination, key)
                )
        elif key not in destination_dict or destination_dict[key] is None:
            modified_items[key] = value
        elif key == "annotations":
            modified_items[key] = {
                **(value or {}),
                **destination_dict[key],
            }

    # Update destination's fields with the merged dictionary
    for key, value in modified_items.items():
        setattr(destination, key, value)

    return destination

Async Utils

synapseclient.core.async_utils

This utility class is to hold any utilities that are needed for async operations.

Classes

ClassOrInstance

Helper class to allow a method to be called as a class method or instance method.

Source code in synapseclient/core/async_utils.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
class ClassOrInstance:
    """Helper class to allow a method to be called as a class method or instance method."""

    def __init__(self, fn):
        self.fn = fn

    def __get__(self, obj, cls):
        def f(*args, **kwds):
            if obj is not None:
                return self.fn(obj, *args, **kwds)
            else:
                return self.fn(cls, *args, **kwds)

        functools.update_wrapper(f, self.fn)
        return f

Functions

otel_trace_method(method_to_trace_name=None)

Decorator to trace a method with OpenTelemetry in an async environment. This function is specifically written to be used on a method within a class.

This will pass the class instance as the first argument to the method. This allows you to modify the name of the trace to include information about the class instance.

Decorating a method within a class that will be traced with OpenTelemetry.

Setting the trace name:

@otel_trace_method(method_to_trace_name=lambda self, **kwargs: f"Project_Store: {self.name}")
async def store(self):
PARAMETER DESCRIPTION
method_to_trace_name

A callable that takes the class instance as the first argument and returns a string to be used as the trace name. If this is not provided, the trace name will be set to the method name.

TYPE: Union[Callable[..., str], None] DEFAULT: None

RETURNS DESCRIPTION

A callable decorator that will trace the method with OpenTelemetry.

Source code in synapseclient/core/async_utils.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def otel_trace_method(method_to_trace_name: Union[Callable[..., str], None] = None):
    """
    Decorator to trace a method with OpenTelemetry in an async environment. This function
    is specifically written to be used on a method within a class.

    This will pass the class instance as the first argument to the method. This allows
    you to modify the name of the trace to include information about the class instance.

    Example: Decorating a method within a class that will be traced with OpenTelemetry.
        Setting the trace name:

            @otel_trace_method(method_to_trace_name=lambda self, **kwargs: f"Project_Store: {self.name}")
            async def store(self):

    Arguments:
        method_to_trace_name: A callable that takes the class instance as the first argument
            and returns a string to be used as the trace name. If this is not provided,
            the trace name will be set to the method name.

    Returns:
        A callable decorator that will trace the method with OpenTelemetry.
    """

    def decorator(func):
        """Function decorator."""

        async def otel_trace_method_wrapper(self, *arg, **kwargs) -> None:
            """Wrapper for the function to be traced."""
            trace_name = (
                method_to_trace_name(self, *arg, **kwargs)
                if method_to_trace_name
                else None
            )
            current_span = trace.get_current_span()
            if current_span.is_recording():
                with tracer.start_as_current_span(
                    trace_name or f"Synaspse::{func.__name__}"
                ):
                    return await func(self, *arg, **kwargs)
            else:
                return await func(self, *arg, **kwargs)

        return otel_trace_method_wrapper

    return decorator

wrap_async_to_sync(coroutine, syn)

Wrap an async function to be called in a sync context.

Source code in synapseclient/core/async_utils.py
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def wrap_async_to_sync(coroutine: Coroutine[Any, Any, Any], syn: "Synapse") -> Any:
    """Wrap an async function to be called in a sync context."""
    loop = None
    try:
        try:
            loop = asyncio.get_running_loop()
        except RuntimeError:
            pass

        if loop:
            nest_asyncio.apply(loop=loop)
            return loop.run_until_complete(coroutine)
        else:
            return asyncio.run(coroutine)

    except Exception as ex:
        syn.logger.exception(
            f"Error occurred while running {coroutine} in a sync context."
        )
        raise ex

async_to_sync(cls)

Convert all name_of_thing_async methods to name_of_thing methods

(see http://stackoverflow.com/questions/18048341/add-methods-to-a-class-generated-from-other-methods for help understanding)

Source code in synapseclient/core/async_utils.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
def async_to_sync(cls):
    """
    Convert all name_of_thing_async methods to name_of_thing methods

    (see
    http://stackoverflow.com/questions/18048341/add-methods-to-a-class-generated-from-other-methods
    for help understanding)
    """

    def create_method(async_method_name: str):
        """Creates a replacement method for the async method."""

        @ClassOrInstance
        def newmethod(self, *args, **kwargs):
            """The new method that will replace the non-async method."""

            async def wrapper(*args, **kwargs):
                """Wrapper for the function to be called in an async context."""
                return await getattr(self, async_method_name)(*args, **kwargs)

            loop = None
            try:
                try:
                    loop = asyncio.get_running_loop()
                except RuntimeError:
                    pass

                if loop:
                    nest_asyncio.apply(loop=loop)
                    return loop.run_until_complete(wrapper(*args, **kwargs))
                else:
                    return asyncio.run(wrapper(*args, **kwargs))

            except Exception as ex:
                from synapseclient import Synapse

                synapse_client = Synapse.get_client(
                    getattr(kwargs, "synapse_client", None)
                )
                synapse_client.logger.exception(
                    f"Error occurred while running {async_method_name} on {self.__class__}."
                )
                raise ex

        return newmethod

    methods = cls.__dict__.keys()

    methods_to_update = []
    for k in methods:
        if "async" in k and (new_method_name := k.replace("_async", "")) not in methods:
            new_method = create_method(k)

            new_method.fn.__name__ = new_method_name
            new_method.__name__ = new_method_name

            functools.update_wrapper(new_method, new_method.fn)
            methods_to_update.append(
                {
                    "new_method_name": new_method_name,
                    "new_method": new_method,
                }
            )
    for method_to_update in methods_to_update:
        setattr(
            cls, method_to_update["new_method_name"], method_to_update["new_method"]
        )

    return cls

Versions

synapseclient.core.version_check

Version Functions

Check for latest version and recommend upgrade:

synapseclient.check_for_updates()

Print release notes for installed version of client:

synapseclient.release_notes()

Functions

version_check(current_version=None, version_url=_VERSION_URL, check_for_point_releases=False)

Gets the latest version information from version_url and check against the current version. Recommends upgrade, if a newer version exists.

PARAMETER DESCRIPTION
current_version

The current version of the entity

DEFAULT: None

version_url

The URL of the entity version

DEFAULT: _VERSION_URL

check_for_point_releases

Bool.

DEFAULT: False

RETURNS DESCRIPTION

True if current version is the latest release (or higher) version, otherwise False.

Source code in synapseclient/core/version_check.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def version_check(
    current_version=None, version_url=_VERSION_URL, check_for_point_releases=False
):
    """
    Gets the latest version information from version_url and check against the current version.
    Recommends upgrade, if a newer version exists.

    Arguments:
        current_version: The current version of the entity
        version_url: The URL of the entity version
        check_for_point_releases: Bool.

    Returns:
        True if current version is the latest release (or higher) version, otherwise False.
    """

    try:
        if not current_version:
            current_version = synapseclient.__version__

        version_info = _get_version_info(version_url)

        current_base_version = _strip_dev_suffix(current_version)

        # Check blacklist
        if (
            current_base_version in version_info["blacklist"]
            or current_version in version_info["blacklist"]
        ):
            msg = (
                "\nPLEASE UPGRADE YOUR CLIENT\n\nUpgrading your SynapseClient is"
                " required. Please upgrade your client by typing:\n    pip install"
                " --upgrade synapseclient\n\n"
            )
            raise SystemExit(msg)

        if "message" in version_info:
            sys.stderr.write(version_info["message"] + "\n")

        levels = 3 if check_for_point_releases else 2

        # Compare with latest version
        if _version_tuple(current_version, levels=levels) < _version_tuple(
            version_info["latestVersion"], levels=levels
        ):
            sys.stderr.write(
                "\nUPGRADE AVAILABLE\n\nA more recent version of the Synapse Client"
                " (%s) is available. Your version (%s) can be upgraded by typing:\n   "
                " pip install --upgrade synapseclient\n\n"
                % (
                    version_info["latestVersion"],
                    current_version,
                )
            )
            if "releaseNotes" in version_info:
                sys.stderr.write(
                    "Python Synapse Client version %s release notes\n\n"
                    % version_info["latestVersion"]
                )
                sys.stderr.write(version_info["releaseNotes"] + "\n\n")
            return False

    except Exception as e:
        # Don't prevent the client from running if something goes wrong
        sys.stderr.write("Exception in version check: %s\n" % (str(e),))
        return False

    return True

check_for_updates()

Check for the existence of newer versions of the client, reporting both current release version and development version.

For help installing development versions of the client, see the README.md.

Source code in synapseclient/core/version_check.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def check_for_updates():
    """
    Check for the existence of newer versions of the client, reporting both current release version and development
    version.

    For help installing development versions of the client,
    see the [README.md](https://github.com/Sage-Bionetworks/synapsePythonClient#installation).
    """
    sys.stderr.write("Python Synapse Client\n")
    sys.stderr.write("currently running version:  %s\n" % synapseclient.__version__)

    release_version_info = _get_version_info(_VERSION_URL)
    sys.stderr.write(
        "latest release version:     %s\n" % release_version_info["latestVersion"]
    )

    if _version_tuple(synapseclient.__version__, levels=3) < _version_tuple(
        release_version_info["latestVersion"], levels=3
    ):
        print(
            "\nUPGRADE AVAILABLE\n\nA more recent version of the Synapse Client (%s) is"
            " available. Your version (%s) can be upgraded by typing:\n    pip install"
            " --upgrade synapseclient\n\n"
            % (
                release_version_info["latestVersion"],
                synapseclient.__version__,
            )
        )
    else:
        sys.stderr.write("\nYour Synapse client is up to date!\n")

release_notes(version_url=None)

Print release notes for the installed version of the client or latest release or development version if version_url is supplied.

Defaults to None, meaning release notes for the installed version. Alternatives are:
  • synapseclient.version_check._VERSION_URL
  • synapseclient.version_check._DEV_VERSION_URL
Source code in synapseclient/core/version_check.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
def release_notes(version_url=None):
    """
    Print release notes for the installed version of the client or latest release or development version if version_url
    is supplied.

    version_url: Defaults to None, meaning release notes for the installed version. Alternatives are:
                        - synapseclient.version_check._VERSION_URL
                        - synapseclient.version_check._DEV_VERSION_URL

    """
    version_info = _get_version_info(version_url)
    sys.stderr.write(
        "Python Synapse Client version %s release notes\n\n"
        % version_info["latestVersion"]
    )
    if "releaseNotes" in version_info:
        sys.stderr.write(version_info["releaseNotes"] + "\n")

STS Transfer

synapseclient.core.sts_transfer

Classes

StsTokenStore

Cache STS tokens in memory for observed entity ids. An optimization for long lived Synapse objects that will interact with the same Synapse storage locations over and over again so they don't have to do a remote call to fetch a new token for every entity, which for e.g. small files can amount to non trivial overhead.

Source code in synapseclient/core/sts_transfer.py
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
class StsTokenStore:
    """
    Cache STS tokens in memory for observed entity ids.
    An optimization for long lived Synapse objects that will interact with the same
    Synapse storage locations over and over again so they don't have to do a remote call
    to fetch a new token for every entity, which for e.g. small files can amount to
    non trivial overhead.
    """

    # each token is < 1k but given we don't know how long Python process will be running
    # (could be very long in a programmatic environment) we impose a limit on the maximum
    # number of tokens we will store in memory to prevent this optimization from becoming
    # a memory leak.
    DEFAULT_TOKEN_CACHE_SIZE = 5000

    def __init__(self, max_token_cache_size=DEFAULT_TOKEN_CACHE_SIZE):
        self._tokens = {p: _TokenCache(max_token_cache_size) for p in STS_PERMISSIONS}
        self._lock = threading.Lock()

    def get_token(
        self, syn, entity_id, permission, min_remaining_life: datetime.timedelta
    ):
        with self._lock:
            utcnow = datetime.datetime.utcnow()
            token_cache = self._tokens.get(permission)
            if token_cache is None:
                raise ValueError(f"Invalid STS permission {permission}")

            token = token_cache.get(entity_id)
            if (
                not token
                or (iso_to_datetime(token["expiration"]) - utcnow) < min_remaining_life
            ):
                # either there is no cached token or the remaining life on the token isn't enough so fetch new
                token = token_cache[entity_id] = self._fetch_token(
                    syn, entity_id, permission
                )

        return token

    @staticmethod
    def _fetch_token(syn, entity_id, permission):
        return syn.restGET(f"/entity/{entity_id}/sts?permission={permission}")

Functions

get_sts_credentials(syn, entity_id, permission, *, output_format='json', min_remaining_life=None)

See Synapse.get_sts_storage_token

Source code in synapseclient/core/sts_transfer.py
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
def get_sts_credentials(
    syn, entity_id, permission, *, output_format="json", min_remaining_life=None
):
    """See Synapse.get_sts_storage_token"""
    min_remaining_life = min_remaining_life or DEFAULT_MIN_LIFE

    value = syn._sts_token_store.get_token(
        syn, entity_id, permission, min_remaining_life
    )

    if output_format == "boto":
        # the Synapse STS API returns camel cased keys that we need to convert to use with boto.
        # prefix with "aws_", convert to snake case, and exclude any other key/value pairs in the value
        # e.g. expiration
        return {
            "aws_{}".format(snake_case(k)): value[k]
            for k in ("accessKeyId", "secretAccessKey", "sessionToken")
        }
    elif output_format == "json":
        # pass through what server sent
        return value

    elif output_format == "shell":
        # for "shell" we try to detect what is best for the system
        # assume bourne compatible output outside of windows
        if platform.system() == "Windows" and "bash" not in os.environ.get("SHELL", ""):
            if len(os.getenv("PSModulePath", "").split(os.pathsep)) >= 3:
                # https://stackoverflow.com/a/55598796
                output_format = "powershell"
            else:
                output_format = "cmd"
        else:
            output_format = "bash"

    template_string = EXPORT_TEMPLATE_STRINGS.get(output_format)
    if not template_string:
        raise ValueError(f"Unrecognized output_format {output_format}")

    return _format_export_template_string(syn, entity_id, value, template_string)

with_boto_sts_credentials(fn, syn, entity_id, permission)

A wrapper around a function that will get sts credentials and try to use them on the given function which should take a dictionary with the aws_access_key_id, aws_secret_access_key, and aws_session_token as keys. If the given function returns a boto error that looks like the token has expired it will retry once after fetching fresh credentials.

The purpose is to be able to use potentially cached credentials in long running tasks while reducing worry that they will expire in the middle of running and cause an unrecoverable error. The alternative of fetching a fresh STS token for every request might be okay for a few large files but would greatly slow down transferring many small files.

Source code in synapseclient/core/sts_transfer.py
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
def with_boto_sts_credentials(fn, syn, entity_id, permission):
    """
    A wrapper around a function that will get sts credentials and try to use them on the given
    function which should take a dictionary with the `aws_access_key_id`, `aws_secret_access_key`, and `aws_session_token`
    as keys. If the given function returns a boto error that looks like the token has expired
    it will retry once after fetching fresh credentials.

    The purpose is to be able to use potentially cached credentials in long running tasks while reducing
    worry that they will expire in the middle of running and cause an unrecoverable error.
    The alternative of fetching a fresh STS token for every request might be okay for a few large files
    but would greatly slow down transferring many small files.
    """

    for attempt in range(2):
        credentials = get_sts_credentials(
            syn, entity_id, permission, output_format="boto"
        )
        try:
            response = fn(credentials)
        except boto3.exceptions.Boto3Error as ex:
            if "ExpiredToken" in str(ex) and attempt == 0:
                continue
            else:
                raise

        return response

is_boto_sts_transfer_enabled(syn)

Check if the boto/STS transfers are enabled in the Synapse configuration. If enabled then synapseclient will attempt to automatically use boto to upload and download from supported storage locations that are sts enabled.

PARAMETER DESCRIPTION
syn

A Synapse object

RETURNS DESCRIPTION

True if STS if enabled, False otherwise

Source code in synapseclient/core/sts_transfer.py
220
221
222
223
224
225
226
227
228
229
230
231
232
def is_boto_sts_transfer_enabled(syn):
    """
    Check if the boto/STS transfers are enabled in the Synapse configuration.
    If enabled then synapseclient will attempt to automatically use boto to upload
    and download from supported storage locations that are sts enabled.

    Arguments:
        syn: A [Synapse][synapseclient.Synapse] object

    Returns:
        True if STS if enabled, False otherwise
    """
    return bool(boto3 and syn.use_boto_sts_transfers)

is_storage_location_sts_enabled(syn, entity_id, location)

Returns whether the given storage location is enabled for STS.

PARAMETER DESCRIPTION
syn

A Synapse object

entity_id

The ID of synapse entity whose storage location we want to check for sts access

location

A storage location ID or a dictionary representing the location UploadDestination

RETURNS DESCRIPTION

True if STS is enabled for the location, False otherwise

Source code in synapseclient/core/sts_transfer.py
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
def is_storage_location_sts_enabled(syn, entity_id, location):
    """
    Returns whether the given storage location is enabled for STS.

    Arguments:
        syn:       A [Synapse][synapseclient.Synapse] object
        entity_id: The ID of synapse entity whose storage location we want to check for sts access
        location:  A storage location ID or a dictionary representing the location UploadDestination

    Returns:
        True if STS is enabled for the location, False otherwise
    """
    if not location:
        return False

    if isinstance(location, collections.abc.Mapping):
        # looks like this is already an upload destination dict
        destination = location

    else:
        # otherwise treat it as a storage location id,
        destination = syn.restGET(
            f"/entity/{entity_id}/uploadDestination/{location}",
            endpoint=syn.fileHandleEndpoint,
        )

    return destination.get("stsEnabled", False)

is_storage_location_sts_enabled_async(syn, entity_id, location) async

Returns whether the given storage location is enabled for STS.

PARAMETER DESCRIPTION
syn

A Synapse object

TYPE: Synapse

entity_id

The ID of synapse entity whose storage location we want to check for sts access

TYPE: str

location

A storage location ID or a dictionary representing the location UploadDestination

TYPE: str

RETURNS DESCRIPTION
bool

True if STS is enabled for the location, False otherwise

Source code in synapseclient/core/sts_transfer.py
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
async def is_storage_location_sts_enabled_async(
    syn: "Synapse", entity_id: str, location: str
) -> bool:
    """
    Returns whether the given storage location is enabled for STS.

    Arguments:
        syn:       A [Synapse][synapseclient.Synapse] object
        entity_id: The ID of synapse entity whose storage location we want to check for sts access
        location:  A storage location ID or a dictionary representing the location UploadDestination

    Returns:
        True if STS is enabled for the location, False otherwise
    """
    if not location:
        return False

    if isinstance(location, collections.abc.Mapping):
        # looks like this is already an upload destination dict
        destination = location

    else:
        # Lazy import to avoid circular imports
        from synapseclient.api.entity_services import get_upload_destination_location

        # otherwise treat it as a storage location id,
        destination = await get_upload_destination_location(
            entity_id=entity_id, location=location, synapse_client=syn
        )

    return destination.get("stsEnabled", False)