Skip to content

Synapse Utils

synapseutils

Overview

The synapseutils package provides both higher level beta functions as well as utilities for interacting with Synapse. The behavior of these functions are subject to change.

Functions

synapseutils.sync

Functions

syncFromSynapse(syn, entity, path=None, ifcollision='overwrite.local', allFiles=None, followLink=False, manifest='all', downloadFile=True)

Synchronizes a File entity, or a Folder entity, meaning all the files in a folder (including subfolders) from Synapse, and adds a readme manifest with file metadata.

There are a few conversions around annotations to call out here.

Conversion of objects from the REST API to Python native objects

The first annotation conversion is to take the annotations from the REST API and convert them into Python native objects. For example the REST API will return a milliseconds since epoch timestamp for a datetime annotation, however, we want to convert that into a Python datetime object. These conversions take place in the annotations module.

Conversion of Python native objects into strings

The second annotation conversion occurs when we are writing to the manifest TSV file. In this case we need to convert the Python native objects into strings that can be written to the manifest file. In addition we also need to handle the case where the annotation value is a list of objects. In this case we are converting the list into a single cell of data with a comma , delimiter wrapped in brackets [].

PARAMETER DESCRIPTION
syn

A Synapse object with user's login, e.g. syn = synapseclient.login()

entity

A Synapse ID, a Synapse Entity object of type file, folder or project.

path

An optional path where the file hierarchy will be reproduced. If not specified the files will by default be placed in the synapseCache.

DEFAULT: None

ifcollision

Determines how to handle file collisions. Maybe "overwrite.local", "keep.local", or "keep.both".

DEFAULT: 'overwrite.local'

followLink

Determines whether the link returns the target Entity.

DEFAULT: False

manifest

Determines whether creating manifest file automatically. The optional values here (all, root, suppress).

DEFAULT: 'all'

downloadFile

Determines whether downloading the files.

DEFAULT: True

RETURNS DESCRIPTION

List of entities (files, tables, links)

When entity is a Project or Folder, this function will crawl all subfolders of the project/folder specified by entity and download all files that have not already been downloaded. When entity is a File the function will download the latest version of the file unless version is denoted in the synid with .version notiation (e.g. syn123.1) If there are newer files in Synapse (or a local file has been edited outside of the cache) since the last download then local the file will be replaced by the new file unless "ifcollision" is changed.

If the files are being downloaded to a specific location outside of the Synapse cache a file (SYNAPSE_METADATA_MANIFEST.tsv) will also be added in the path that contains the metadata (annotations, storage location and provenance of all downloaded files).

See also:

Using this function

Download and print the paths of all downloaded files:

entities = syncFromSynapse(syn, "syn1234")
for f in entities:
    print(f.path)
Source code in synapseutils/sync.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
@tracer.start_as_current_span("sync::syncFromSynapse")
def syncFromSynapse(
    syn,
    entity,
    path=None,
    ifcollision="overwrite.local",
    allFiles=None,
    followLink=False,
    manifest="all",
    downloadFile=True,
):
    """Synchronizes a File entity, or a Folder entity, meaning all the files in a folder
    (including subfolders) from Synapse, and adds a readme manifest with file metadata.

    There are a few conversions around annotations to call out here.

    ## Conversion of objects from the REST API to Python native objects

    The first annotation conversion is to take the annotations from the REST API and
    convert them into Python native objects. For example the REST API will return a
    milliseconds since epoch timestamp for a datetime annotation, however, we want to
    convert that into a Python datetime object. These conversions take place in the
    [annotations module][synapseclient.annotations].


    ## Conversion of Python native objects into strings

    The second annotation conversion occurs when we are writing to the manifest TSV file.
    In this case we need to convert the Python native objects into strings that can be
    written to the manifest file. In addition we also need to handle the case where the
    annotation value is a list of objects. In this case we are converting the list
    into a single cell of data with a comma `,` delimiter wrapped in brackets `[]`.

    Arguments:
        syn: A Synapse object with user's login, e.g. syn = synapseclient.login()
        entity: A Synapse ID, a Synapse Entity object of type file, folder or
                project.
        path: An optional path where the file hierarchy will be reproduced. If not
              specified the files will by default be placed in the synapseCache.
        ifcollision: Determines how to handle file collisions. Maybe
                     "overwrite.local", "keep.local", or "keep.both".
        followLink: Determines whether the link returns the target Entity.
        manifest: Determines whether creating manifest file automatically. The
                  optional values here (`all`, `root`, `suppress`).
        downloadFile: Determines whether downloading the files.

    Returns:
        List of entities ([files][synapseclient.File],
            [tables][synapseclient.Table], [links][synapseclient.Link])


    When entity is a Project or Folder, this function will crawl all subfolders
    of the project/folder specified by `entity` and download all files that have
    not already been downloaded. When entity is a File the function will download the
    latest version of the file unless version is denoted in the synid with .version
    notiation (e.g. syn123.1) If there are newer files in Synapse (or a local file
    has been edited outside of the cache) since the last download then local the file
    will be replaced by the new file unless "ifcollision" is changed.

    If the files are being downloaded to a specific location outside of the Synapse
    cache a file (SYNAPSE_METADATA_MANIFEST.tsv) will also be added in the path that
    contains the metadata (annotations, storage location and provenance of all
    downloaded files).

    See also:

    - [synapseutils.sync.syncToSynapse][]

    Example: Using this function
        Download and print the paths of all downloaded files:

            entities = syncFromSynapse(syn, "syn1234")
            for f in entities:
                print(f.path)
    """

    if manifest not in ("all", "root", "suppress"):
        raise ValueError(
            'Value of manifest option should be one of the ("all", "root", "suppress")'
        )

    # we'll have the following threads:
    # 1. the entrant thread to this function walks the folder hierarchy and
    #    schedules files for download,
    #    and then waits for all the file downloads to complete
    # 2. each file download will run in a separate thread in an Executor
    # 3. downloads that support S3 multipart concurrent downloads will be scheduled
    #    by the thread in #2 and have
    #    their parts downloaded in additional threads in the same Executor
    # To support multipart downloads in #3 using the same Executor as the download
    # thread #2, we need at least 2 threads always, if those aren't available then
    # we'll run single threaded to avoid a deadlock.
    with _sync_executor(syn) as executor:
        sync_from_synapse = _SyncDownloader(syn, executor)
        files = sync_from_synapse.sync(
            entity, path, ifcollision, followLink, downloadFile, manifest
        )

    # the allFiles parameter used to be passed in as part of the recursive
    # implementation of this function with the public signature invoking itself. now
    # that this isn't a recursive any longer we don't need allFiles as a parameter
    # (especially on the public signature) but it is retained for now for backwards
    # compatibility with external invokers.
    if allFiles is not None:
        allFiles.extend(files)
        files = allFiles

    return files

syncToSynapse(syn, manifestFile, dryRun=False, sendMessages=True, retries=MAX_RETRIES)

Synchronizes files specified in the manifest file to Synapse.

Given a file describing all of the uploads, this uploads the content to Synapse and optionally notifies you via Synapse messagging (email) at specific intervals, on errors and on completion.

Read more about the manifest file format

There are a few conversions around annotations to call out here.

Conversion of annotations from the TSV file to Python native objects

The first annotation conversion is from the TSV file into a Python native object. For example Pandas will read a TSV file and convert the string "True" into a boolean True, however, Pandas will NOT convert our comma delimited and bracket wrapped list of annotations into their Python native objects. This means that we need to do that conversion here after splitting them apart.

Conversion of Python native objects for the REST API

The second annotation conversion occurs when we are taking the Python native objects and converting them into a string that can be sent to the REST API. For example the datetime objects which may have timezone information are converted to milliseconds since epoch.

PARAMETER DESCRIPTION
syn

A Synapse object with user's login, e.g. syn = synapseclient.login()

manifestFile

A tsv file with file locations and metadata to be pushed to Synapse.

dryRun

Performs validation without uploading if set to True.

DEFAULT: False

sendMessages

Sends out messages on completion if set to True.

DEFAULT: True

RETURNS DESCRIPTION
None

None

Source code in synapseutils/sync.py
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
@tracer.start_as_current_span("sync::syncToSynapse")
def syncToSynapse(
    syn, manifestFile, dryRun=False, sendMessages=True, retries=MAX_RETRIES
) -> None:
    """Synchronizes files specified in the manifest file to Synapse.

    Given a file describing all of the uploads, this uploads the content to Synapse and
    optionally notifies you via Synapse messagging (email) at specific intervals, on
    errors and on completion.

    [Read more about the manifest file format](../../explanations/manifest_tsv/)

    There are a few conversions around annotations to call out here.

    ## Conversion of annotations from the TSV file to Python native objects

    The first annotation conversion is from the TSV file into a Python native object. For
    example Pandas will read a TSV file and convert the string "True" into a boolean True,
    however, Pandas will NOT convert our comma delimited and bracket wrapped list of
    annotations into their Python native objects. This means that we need to do that
    conversion here after splitting them apart.

    ## Conversion of Python native objects for the REST API

    The second annotation conversion occurs when we are taking the Python native objects
    and converting them into a string that can be sent to the REST API. For example
    the datetime objects which may have timezone information are converted to milliseconds
    since epoch.

    Arguments:
        syn: A Synapse object with user's login, e.g. syn = synapseclient.login()
        manifestFile: A tsv file with file locations and metadata to be pushed to Synapse.
        dryRun: Performs validation without uploading if set to True.
        sendMessages: Sends out messages on completion if set to True.

    Returns:
        None
    """
    df = readManifestFile(syn, manifestFile)
    # have to check all size of single file
    sizes = [
        os.stat(os.path.expandvars(os.path.expanduser(f))).st_size
        for f in df.path
        if not is_url(f)
    ]
    # Write output on what is getting pushed and estimated times - send out message.
    sys.stdout.write("=" * 50 + "\n")
    sys.stdout.write(
        "We are about to upload %i files with a total size of %s.\n "
        % (len(df), utils.humanizeBytes(sum(sizes)))
    )
    sys.stdout.write("=" * 50 + "\n")

    if dryRun:
        return

    sys.stdout.write("Starting upload...\n")
    if sendMessages:
        notify_decorator = notifyMe(syn, "Upload of %s" % manifestFile, retries=retries)
        upload = notify_decorator(_manifest_upload)
        upload(syn, df)
    else:
        _manifest_upload(syn, df)

generateManifest(syn, allFiles, filename, provenance_cache=None)

Generates a manifest file based on a list of entities objects.

Read more about the manifest file format

PARAMETER DESCRIPTION
syn

A Synapse object with user's login, e.g. syn = synapseclient.login()

allFiles

A list of File Entity objects on Synapse (can't be Synapse IDs)

filename

file where manifest will be written

provenance_cache

an optional dict of known provenance dicts keyed by entity ids

DEFAULT: None

RETURNS DESCRIPTION
None

None

Source code in synapseutils/sync.py
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
def generateManifest(syn, allFiles, filename, provenance_cache=None) -> None:
    """Generates a manifest file based on a list of entities objects.

    [Read more about the manifest file format](../../explanations/manifest_tsv/)

    Arguments:
        syn: A Synapse object with user's login, e.g. syn = synapseclient.login()
        allFiles: A list of File Entity objects on Synapse (can't be Synapse IDs)
        filename: file where manifest will be written
        provenance_cache: an optional dict of known provenance dicts keyed by entity
                          ids

    Returns:
        None
    """
    keys, data = _extract_file_entity_metadata(
        syn, allFiles, provenance_cache=provenance_cache
    )
    _write_manifest_data(filename, keys, data)

generate_sync_manifest(syn, directory_path, parent_id, manifest_path)

Generate manifest for syncToSynapse from a local directory.

Read more about the manifest file format

PARAMETER DESCRIPTION
syn

A Synapse object with user's login, e.g. syn = synapseclient.login()

directory_path

Path to local directory to be pushed to Synapse.

parent_id

Synapse ID of the parent folder/project on Synapse.

manifest_path

Path to the manifest file to be generated.

RETURNS DESCRIPTION
None

None

Source code in synapseutils/sync.py
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
@tracer.start_as_current_span("sync::generate_sync_manifest")
def generate_sync_manifest(syn, directory_path, parent_id, manifest_path) -> None:
    """Generate manifest for [syncToSynapse][synapseutils.sync.syncToSynapse] from a local directory.

    [Read more about the manifest file format](../../explanations/manifest_tsv/)

    Arguments:
        syn: A Synapse object with user's login, e.g. syn = synapseclient.login()
        directory_path: Path to local directory to be pushed to Synapse.
        parent_id: Synapse ID of the parent folder/project on Synapse.
        manifest_path: Path to the manifest file to be generated.

    Returns:
        None
    """
    manifest_cols = ["path", "parent"]
    manifest_rows = _walk_directory_tree(syn, directory_path, parent_id)
    _write_manifest_data(manifest_path, manifest_cols, manifest_rows)

readManifestFile(syn, manifestFile)

Verifies a file manifest and returns a reordered dataframe ready for upload.

Read more about the manifest file format

PARAMETER DESCRIPTION
syn

A Synapse object with user's login, e.g. syn = synapseclient.login()

manifestFile

A tsv file with file locations and metadata to be pushed to Synapse.

RETURNS DESCRIPTION

A pandas dataframe if the manifest is validated.

Source code in synapseutils/sync.py
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
@tracer.start_as_current_span("sync::readManifestFile")
def readManifestFile(syn, manifestFile):
    """Verifies a file manifest and returns a reordered dataframe ready for upload.

    [Read more about the manifest file format](../../explanations/manifest_tsv/)

    Arguments:
        syn: A Synapse object with user's login, e.g. syn = synapseclient.login()
        manifestFile: A tsv file with file locations and metadata to be pushed to Synapse.

    Returns:
        A pandas dataframe if the manifest is validated.
    """
    table.test_import_pandas()
    import pandas as pd

    if manifestFile is sys.stdin:
        sys.stdout.write("Validation and upload of: <stdin>\n")
    else:
        sys.stdout.write("Validation and upload of: %s\n" % manifestFile)
    # Read manifest file into pandas dataframe
    df = pd.read_csv(manifestFile, sep="\t")
    if "synapseStore" not in df:
        df = df.assign(synapseStore=None)
    df.loc[
        df["path"].apply(is_url), "synapseStore"
    ] = False  # override synapseStore values to False when path is a url
    df.loc[
        df["synapseStore"].isnull(), "synapseStore"
    ] = True  # remaining unset values default to True
    df.synapseStore = df.synapseStore.astype(bool)
    df = df.fillna("")

    sys.stdout.write("Validating columns of manifest...")
    for field in REQUIRED_FIELDS:
        sys.stdout.write(".")
        if field not in df.columns:
            sys.stdout.write("\n")
            raise ValueError("Manifest must contain a column of %s" % field)
    sys.stdout.write("OK\n")

    sys.stdout.write("Validating that all paths exist...")
    df.path = df.path.apply(_check_path_and_normalize)

    sys.stdout.write("OK\n")

    sys.stdout.write("Validating that all files are unique...")
    # Both the path and the combination of entity name and parent must be unique
    if len(df.path) != len(set(df.path)):
        raise ValueError("All rows in manifest must contain a unique file to upload")
    sys.stdout.write("OK\n")

    # Check each size of uploaded file
    sys.stdout.write("Validating that all the files are not empty...")
    _check_size_each_file(df)
    sys.stdout.write("OK\n")

    # check the name of each file should be store on Synapse
    name_column = "name"
    # Create entity name column from basename
    if name_column not in df.columns:
        filenames = [os.path.basename(path) for path in df["path"]]
        df["name"] = filenames

    sys.stdout.write("Validating file names... \n")
    _check_file_name(df)
    sys.stdout.write("OK\n")

    sys.stdout.write("Validating provenance...")
    df = _sortAndFixProvenance(syn, df)
    sys.stdout.write("OK\n")

    sys.stdout.write("Validating that parents exist and are containers...")
    parents = set(df.parent)
    for synId in parents:
        try:
            container = syn.get(synId, downloadFile=False)
        except SynapseHTTPError:
            sys.stdout.write(
                "\n%s in the parent column is not a valid Synapse Id\n" % synId
            )
            raise
        if not is_container(container):
            sys.stdout.write(
                "\n%s in the parent column is is not a Folder or Project\n" % synId
            )
            raise SynapseHTTPError
    sys.stdout.write("OK\n")
    return df

synapseutils.copy_functions

Functions

copy(syn, entity, destinationId, skipCopyWikiPage=False, skipCopyAnnotations=False, **kwargs)

  • This function will assist users in copying entities ( Tables, Links, Files, Folders, Projects ), and will recursively copy everything in directories.
  • A Mapping of the old entities to the new entities will be created and all the wikis of each entity will also be copied over and links to synapse Ids will be updated.
PARAMETER DESCRIPTION
syn

A Synapse object with user's login, e.g. syn = synapseclient.login()

TYPE: Synapse

entity

A synapse entity ID

TYPE: str

destinationId

Synapse ID of a folder/project that the copied entity is being copied to

TYPE: str

skipCopyWikiPage

Skip copying the wiki pages.

TYPE: bool DEFAULT: False

skipCopyAnnotations

Skips copying the annotations.

TYPE: bool DEFAULT: False

version

(File copy only) Can specify version of a file. Default to None

updateExisting

(File copy only) When the destination has an entity that has the same name, users can choose to update that entity. It must be the same entity type Default to False

setProvenance

(File copy only) Has three values to set the provenance of the copied entity: traceback: Sets to the source entity existing: Sets to source entity's original provenance (if it exists) None: No provenance is set

excludeTypes

(Folder/Project copy only) Accepts a list of entity types (file, table, link) which determines which entity types to not copy. Defaults to an empty list.

RETURNS DESCRIPTION
Dict[str, str]

A mapping between the original and copied entity: {'syn1234':'syn33455'}

Using this function

Sample copy:

import synapseutils
import synapseclient
syn = synapseclient.login()
synapseutils.copy(syn, ...)

Copying Files:

synapseutils.copy(syn, "syn12345", "syn45678", updateExisting=False, setProvenance = "traceback",version=None)

Copying Folders/Projects:

# This will copy everything in the project into the destinationId except files and tables.
synapseutils.copy(syn, "syn123450","syn345678",excludeTypes=["file","table"])
Source code in synapseutils/copy_functions.py
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
def copy(
    syn: synapseclient.Synapse,
    entity: str,
    destinationId: str,
    skipCopyWikiPage: bool = False,
    skipCopyAnnotations: bool = False,
    **kwargs,
) -> typing.Dict[str, str]:
    """
    - This function will assist users in copying entities
        (
        [Tables][synapseclient.table.Table],
        [Links][synapseclient.entity.Link],
        [Files][synapseclient.entity.File],
        [Folders][synapseclient.entity.Folder],
        [Projects][synapseclient.entity.Project]
        ),
      and will recursively copy everything in directories.
    - A Mapping of the old entities to the new entities will be created and all the wikis of each entity
      will also be copied over and links to synapse Ids will be updated.

    Arguments:
        syn: A Synapse object with user's login, e.g. syn = synapseclient.login()
        entity: A synapse entity ID
        destinationId: Synapse ID of a folder/project that the copied entity is being copied to
        skipCopyWikiPage: Skip copying the wiki pages.
        skipCopyAnnotations: Skips copying the annotations.
        version: (File copy only) Can specify version of a file. Default to None
        updateExisting: (File copy only) When the destination has an entity that has the same name,
                        users can choose to update that entity. It must be the same entity type
                        Default to False
        setProvenance: (File copy only) Has three values to set the provenance of the copied entity:
                        traceback: Sets to the source entity
                        existing: Sets to source entity's original provenance (if it exists)
                        None: No provenance is set
        excludeTypes: (Folder/Project copy only) Accepts a list of entity types (file, table, link)
                        which determines which entity types to not copy. Defaults to an empty list.

    Returns:
        A mapping between the original and copied entity: {'syn1234':'syn33455'}

    Example: Using this function
        Sample copy:

            import synapseutils
            import synapseclient
            syn = synapseclient.login()
            synapseutils.copy(syn, ...)

        Copying Files:

            synapseutils.copy(syn, "syn12345", "syn45678", updateExisting=False, setProvenance = "traceback",version=None)

        Copying Folders/Projects:

            # This will copy everything in the project into the destinationId except files and tables.
            synapseutils.copy(syn, "syn123450","syn345678",excludeTypes=["file","table"])
    """
    updateLinks = kwargs.get("updateLinks", True)
    updateSynIds = kwargs.get("updateSynIds", True)
    entitySubPageId = kwargs.get("entitySubPageId", None)
    destinationSubPageId = kwargs.get("destinationSubPageId", None)

    mapping = _copyRecursive(
        syn, entity, destinationId, skipCopyAnnotations=skipCopyAnnotations, **kwargs
    )
    if not skipCopyWikiPage:
        for oldEnt in mapping:
            copyWiki(
                syn,
                oldEnt,
                mapping[oldEnt],
                entitySubPageId=entitySubPageId,
                destinationSubPageId=destinationSubPageId,
                updateLinks=updateLinks,
                updateSynIds=updateSynIds,
                entityMap=mapping,
            )
    return mapping

changeFileMetaData(syn, entity, downloadAs=None, contentType=None, forceVersion=True, name=None)

Change File Entity metadata like the download as name.

PARAMETER DESCRIPTION
syn

A Synapse object with user's login, e.g. syn = synapseclient.login()

TYPE: Synapse

entity

Synapse entity Id or object.

TYPE: Union[str, Entity]

downloadAs

Specify filename to change the filename of a filehandle.

TYPE: str DEFAULT: None

contentType

Specify content type to change the content type of a filehandle.

TYPE: str DEFAULT: None

forceVersion

Indicates whether the method should increment the version of the object even if nothing has changed. Defaults to True.

TYPE: bool DEFAULT: True

name

Specify filename to change the filename of the file.

TYPE: str DEFAULT: None

RETURNS DESCRIPTION
Entity

Synapse Entity

Using this function

Can be used to change the filename, the filename when the file is downloaded, or the file content-type without downloading:

file_entity = syn.get(synid)
print(os.path.basename(file_entity.path))  ## prints, e.g., "my_file.txt"
file_entity = synapseutils.changeFileMetaData(syn=syn, entity=file_entity, downloadAs="my_new_downloadAs_name_file.txt", name="my_new_name_file.txt")
print(os.path.basename(file_entity.path))  ## prints, "my_new_downloadAs_name_file.txt"
print(file_entity.name) ## prints, "my_new_name_file.txt"
Source code in synapseutils/copy_functions.py
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
def changeFileMetaData(
    syn: synapseclient.Synapse,
    entity: typing.Union[str, Entity],
    downloadAs: str = None,
    contentType: str = None,
    forceVersion: bool = True,
    name: str = None,
) -> Entity:
    """
    Change File Entity metadata like the download as name.

    Arguments:
        syn: A Synapse object with user's login, e.g. syn = synapseclient.login()
        entity: Synapse entity Id or object.
        downloadAs: Specify filename to change the filename of a filehandle.
        contentType: Specify content type to change the content type of a filehandle.
        forceVersion: Indicates whether the method should increment the version of
                        the object even if nothing has changed. Defaults to True.
        name: Specify filename to change the filename of the file.

    Returns:
        Synapse Entity

    Example: Using this function
        Can be used to change the filename, the filename when the file is downloaded, or the file content-type without downloading:

            file_entity = syn.get(synid)
            print(os.path.basename(file_entity.path))  ## prints, e.g., "my_file.txt"
            file_entity = synapseutils.changeFileMetaData(syn=syn, entity=file_entity, downloadAs="my_new_downloadAs_name_file.txt", name="my_new_name_file.txt")
            print(os.path.basename(file_entity.path))  ## prints, "my_new_downloadAs_name_file.txt"
            print(file_entity.name) ## prints, "my_new_name_file.txt"
    """
    ent = syn.get(entity, downloadFile=False)
    fileResult = syn._getFileHandleDownload(ent.dataFileHandleId, ent.id)
    ent.contentType = ent.contentType if contentType is None else contentType
    downloadAs = (
        fileResult["fileHandle"]["fileName"] if downloadAs is None else downloadAs
    )
    copiedFileHandle = copyFileHandles(
        syn,
        [ent.dataFileHandleId],
        [ent.concreteType.split(".")[-1]],
        [ent.id],
        [contentType],
        [downloadAs],
    )
    copyResult = copiedFileHandle[0]
    if copyResult.get("failureCode") is not None:
        raise ValueError(
            "%s dataFileHandleId: %s"
            % (copyResult["failureCode"], copyResult["originalFileHandleId"])
        )
    ent.dataFileHandleId = copyResult["newFileHandle"]["id"]
    ent.name = ent.name if name is None else name
    ent = syn.store(ent, forceVersion=forceVersion)
    return ent

copyFileHandles(syn, fileHandles, associateObjectTypes, associateObjectIds, newContentTypes=None, newFileNames=None)

Given a list of fileHandle Ids or Objects, copy the fileHandles

PARAMETER DESCRIPTION
syn

A Synapse object with user's login, e.g. syn = synapseclient.login()

TYPE: Synapse

fileHandles

List of fileHandle Ids or Objects

TYPE: List[Union[File, Entity]]

associateObjectTypes

List of associated object types: FileEntity, TableEntity, WikiAttachment, UserProfileAttachment, MessageAttachment, TeamAttachment, SubmissionAttachment, VerificationSubmission (Must be the same length as fileHandles)

TYPE: List[str]

associateObjectIds

List of associated object Ids: If copying a file, the objectId is the synapse id, and if copying a wiki attachment, the object id is the wiki subpage id. (Must be the same length as fileHandles)

TYPE: List[str]

newContentTypes

List of content types. Set each item to a new content type for each file handle, or leave the item as None to keep the original content type. Default None, which keeps all original content types.

TYPE: List[str] DEFAULT: None

newFileNames

List of filenames. Set each item to a new filename for each file handle, or leave the item as None to keep the original name. Default None, which keeps all original file names.

TYPE: List[str] DEFAULT: None

RETURNS DESCRIPTION

List of batch filehandle copy results, can include failureCodes: UNAUTHORIZED and NOT_FOUND

RAISES DESCRIPTION
ValueError

If length of all input arguments are not the same

Source code in synapseutils/copy_functions.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
def copyFileHandles(
    syn: synapseclient.Synapse,
    fileHandles: typing.List[typing.Union[File, Entity]],
    associateObjectTypes: typing.List[str],
    associateObjectIds: typing.List[str],
    newContentTypes: typing.List[str] = None,
    newFileNames: typing.List[str] = None,
):
    """
    Given a list of fileHandle Ids or Objects, copy the fileHandles

    Arguments:
        syn: A Synapse object with user's login, e.g. syn = synapseclient.login()
        fileHandles: List of fileHandle Ids or Objects
        associateObjectTypes: List of associated object types: FileEntity, TableEntity,
                                WikiAttachment, UserProfileAttachment, MessageAttachment,
                                TeamAttachment, SubmissionAttachment, VerificationSubmission
                                (Must be the same length as fileHandles)
        associateObjectIds: List of associated object Ids: If copying a file,
                            the objectId is the synapse id, and if copying a wiki attachment,
                            the object id is the wiki subpage id.
                            (Must be the same length as fileHandles)
        newContentTypes: List of content types. Set each item to a new content type for each file
                            handle, or leave the item as None to keep the original content type.
                            Default None, which keeps all original content types.
        newFileNames: List of filenames. Set each item to a new filename for each file handle,
                        or leave the item as None to keep the original name. Default None,
                        which keeps all original file names.

    Returns:
        List of batch filehandle copy results, can include failureCodes: UNAUTHORIZED and NOT_FOUND

    Raises:
        ValueError: If length of all input arguments are not the same
    """

    # Check if length of all inputs are equal
    if not (
        len(fileHandles) == len(associateObjectTypes) == len(associateObjectIds)
        and (newContentTypes is None or len(newContentTypes) == len(associateObjectIds))
        and (newFileNames is None or len(newFileNames) == len(associateObjectIds))
    ):
        raise ValueError("Length of all input arguments must be the same")

    # If no optional params passed, assign to empty list
    if newContentTypes is None:
        newContentTypes = []
    if newFileNames is None:
        newFileNames = []

    # Remove this line if we change API to only take fileHandleIds and not Objects
    file_handle_ids = [synapseclient.core.utils.id_of(handle) for handle in fileHandles]

    # division logic for POST call here
    master_copy_results_list = []  # list which holds all results from POST call
    for (
        batch_file_handles_ids,
        batch_assoc_obj_types,
        batch_assoc_obj_ids,
        batch_con_type,
        batch_file_name,
    ) in _batch_iterator_generator(
        [
            file_handle_ids,
            associateObjectTypes,
            associateObjectIds,
            newContentTypes,
            newFileNames,
        ],
        MAX_FILE_HANDLE_PER_COPY_REQUEST,
    ):
        batch_copy_results = _copy_file_handles_batch(
            syn,
            batch_file_handles_ids,
            batch_assoc_obj_types,
            batch_assoc_obj_ids,
            batch_con_type,
            batch_file_name,
        )
        master_copy_results_list.extend(batch_copy_results)

    return master_copy_results_list

copyWiki(syn, entity, destinationId, entitySubPageId=None, destinationSubPageId=None, updateLinks=True, updateSynIds=True, entityMap=None)

Copies wikis and updates internal links

PARAMETER DESCRIPTION
syn

A Synapse object with user's login, e.g. syn = synapseclient.login()

entity

A synapse ID of an entity whose wiki you want to copy

destinationId

Synapse ID of a folder/project that the wiki wants to be copied to

updateLinks

Update all the internal links. (e.g. syn1234/wiki/34345 becomes syn3345/wiki/49508)

DEFAULT: True

updateSynIds

Update all the synapse ID's referenced in the wikis. (e.g. syn1234 becomes syn2345) Defaults to True but needs an entityMap

DEFAULT: True

entityMap

An entity map {'oldSynId','newSynId'} to update the synapse IDs referenced in the wiki.

DEFAULT: None

entitySubPageId

Can specify subPageId and copy all of its subwikis Defaults to None, which copies the entire wiki subPageId can be found: https://www.synapse.org/#!Synapse:syn123/wiki/1234 In this case, 1234 is the subPageId.

DEFAULT: None

destinationSubPageId

Can specify destination subPageId to copy wikis to.

DEFAULT: None

RETURNS DESCRIPTION

A list of Objects with three fields: id, title and parentId.

Source code in synapseutils/copy_functions.py
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
def copyWiki(
    syn,
    entity,
    destinationId,
    entitySubPageId=None,
    destinationSubPageId=None,
    updateLinks=True,
    updateSynIds=True,
    entityMap=None,
):
    """
    Copies wikis and updates internal links

    Arguments:
        syn: A Synapse object with user's login, e.g. syn = synapseclient.login()
        entity: A synapse ID of an entity whose wiki you want to copy
        destinationId: Synapse ID of a folder/project that the wiki wants to be copied to
        updateLinks: Update all the internal links.
                     (e.g. syn1234/wiki/34345 becomes syn3345/wiki/49508)
        updateSynIds: Update all the synapse ID's referenced in the wikis.
                        (e.g. syn1234 becomes syn2345)
                        Defaults to True but needs an entityMap
        entityMap: An entity map {'oldSynId','newSynId'} to update the synapse IDs
                    referenced in the wiki.
        entitySubPageId: Can specify subPageId and copy all of its subwikis
                            Defaults to None, which copies the entire wiki subPageId can be found:
                            https://www.synapse.org/#!Synapse:syn123/wiki/1234
                            In this case, 1234 is the subPageId.
        destinationSubPageId: Can specify destination subPageId to copy wikis to.

    Returns:
        A list of Objects with three fields: id, title and parentId.
    """

    # Validate input parameters
    if entitySubPageId:
        entitySubPageId = str(int(entitySubPageId))
    if destinationSubPageId:
        destinationSubPageId = str(int(destinationSubPageId))

    oldOwn = syn.get(entity, downloadFile=False)
    # getWikiHeaders fails when there is no wiki

    try:
        oldWikiHeaders = syn.getWikiHeaders(oldOwn)
    except SynapseHTTPError as e:
        if e.response.status_code == 404:
            return []
        else:
            raise e

    newOwn = syn.get(destinationId, downloadFile=False)
    wikiIdMap = dict()
    newWikis = dict()
    # If entitySubPageId is given but not destinationSubPageId, set the pageId to "" (will get the root page)
    # A entitySubPage could be copied to a project without any wiki pages, this has to be checked
    newWikiPage = None
    if destinationSubPageId:
        try:
            newWikiPage = syn.getWiki(newOwn, destinationSubPageId)
        except SynapseHTTPError as e:
            if e.response.status_code == 404:
                pass
            else:
                raise e
    if entitySubPageId:
        oldWikiHeaders = _getSubWikiHeaders(oldWikiHeaders, entitySubPageId)

    if not oldWikiHeaders:
        return []

    for wikiHeader in oldWikiHeaders:
        wiki = syn.getWiki(oldOwn, wikiHeader["id"])
        syn.logger.info("Got wiki %s" % wikiHeader["id"])
        if not wiki.get("attachmentFileHandleIds"):
            new_file_handles = []
        else:
            results = [
                syn._getFileHandleDownload(
                    filehandleId, wiki.id, objectType="WikiAttachment"
                )
                for filehandleId in wiki["attachmentFileHandleIds"]
            ]
            # Get rid of the previews
            nopreviews = [
                attach["fileHandle"]
                for attach in results
                if not attach["fileHandle"]["isPreview"]
            ]
            contentTypes = [attach["contentType"] for attach in nopreviews]
            fileNames = [attach["fileName"] for attach in nopreviews]
            copiedFileHandles = copyFileHandles(
                syn,
                nopreviews,
                ["WikiAttachment"] * len(nopreviews),
                [wiki.id] * len(nopreviews),
                contentTypes,
                fileNames,
            )
            # Check if failurecodes exist
            for filehandle in copiedFileHandles:
                if filehandle.get("failureCode") is not None:
                    raise ValueError(
                        "%s dataFileHandleId: %s"
                        % (
                            filehandle["failureCode"],
                            filehandle["originalFileHandleId"],
                        )
                    )
            new_file_handles = [
                filehandle["newFileHandle"]["id"] for filehandle in copiedFileHandles
            ]
        # for some reason some wikis don't have titles?
        if hasattr(wikiHeader, "parentId"):
            newWikiPage = Wiki(
                owner=newOwn,
                title=wiki.get("title", ""),
                markdown=wiki.markdown,
                fileHandles=new_file_handles,
                parentWikiId=wikiIdMap[wiki.parentWikiId],
            )
            newWikiPage = syn.store(newWikiPage)
        else:
            if destinationSubPageId is not None and newWikiPage is not None:
                newWikiPage["attachmentFileHandleIds"] = new_file_handles
                newWikiPage["markdown"] = wiki["markdown"]
                newWikiPage["title"] = wiki.get("title", "")
                # Need to add logic to update titles here
                newWikiPage = syn.store(newWikiPage)
            else:
                newWikiPage = Wiki(
                    owner=newOwn,
                    title=wiki.get("title", ""),
                    markdown=wiki.markdown,
                    fileHandles=new_file_handles,
                    parentWikiId=destinationSubPageId,
                )
                newWikiPage = syn.store(newWikiPage)
        newWikis[newWikiPage["id"]] = newWikiPage
        wikiIdMap[wiki["id"]] = newWikiPage["id"]

    if updateLinks:
        syn.logger.info("Updating internal links:\n")
        newWikis = _updateInternalLinks(newWikis, wikiIdMap, entity, destinationId)
        syn.logger.info("Done updating internal links.\n")

    if updateSynIds and entityMap is not None:
        syn.logger.info("Updating Synapse references:\n")
        newWikis = _updateSynIds(newWikis, wikiIdMap, entityMap)
        syn.logger.info("Done updating Synapse IDs.\n")

    syn.logger.info("Storing new Wikis\n")
    for oldWikiId in wikiIdMap.keys():
        newWikiId = wikiIdMap[oldWikiId]
        newWikis[newWikiId] = syn.store(newWikis[newWikiId])
        syn.logger.info("\tStored: %s\n" % newWikiId)
    return syn.getWikiHeaders(newOwn)

synapseutils.walk_functions

Functions

walk(syn, synId, includeTypes=['folder', 'file', 'table', 'link', 'entityview', 'dockerrepo', 'submissionview', 'dataset', 'materializedview'])

Traverse through the hierarchy of files and folders stored under the synId. Has the same behavior as os.walk()

PARAMETER DESCRIPTION
syn

A Synapse object with user's login, e.g. syn = synapseclient.login()

TYPE: Synapse

synId

A synapse ID of a folder or project

TYPE: str

includeTypes

Must be a list of entity types (ie.["file", "table"]) The "folder" type is always included so the hierarchy can be traversed

TYPE: List[str] DEFAULT: ['folder', 'file', 'table', 'link', 'entityview', 'dockerrepo', 'submissionview', 'dataset', 'materializedview']

Print Project & Files in slash delimited format

Traversing through a project and print out each Folder and File

import synapseclient
import synapseutils
syn = synapseclient.login()

for directory_path, directory_names, file_name in synapseutils.walk(
    syn=syn, synId="syn1234", includeTypes=["file"]
):
    for directory_name in directory_names:
        print(
            f"Directory ({directory_name[1]}): {directory_path[0]}/{directory_name[0]}"
        )

    for file in file_name:
        print(f"File ({file[1]}): {directory_path[0]}/{file[0]}")

The output will look like this assuming only 1 folder and 1 file in the directory:

Directory (syn12345678): My Project Name/my_directory_name
File (syn23456789): My Project Name/my_directory_name/fileA.txt
Using this function

Traversing through a project and printing out the directory path, folders, and files

walkedPath = walk(syn, "syn1234", ["file"]) #Exclude tables and views

for dirpath, dirname, filename in walkedPath:
    print(dirpath)
    print(dirname) #All the folders in the directory path
    print(filename) #All the files in the directory path

This is a high level sequence diagram of the walk function:

sequenceDiagram
    autonumber
    participant walk

    opt Not start_entity
        walk->>client: Call `.get()` method
        client-->>walk: Metadata about the root start_entity
    end

    alt Root is not a container
        note over walk: Return early
    else newpath is none
        note over walk: Get directory path from name of entity and synapse ID
    else
        note over walk: Use path passed in from recursive call
    end

    loop Get children for container
        walk->>client: Call `.getChildren()` method
        client-->>walk: return immediate children
        note over walk: Aggregation of all children into dirs and non-dirs list
    end

    loop For each directory
        walk->>walk: Recursively call walk
    end
Source code in synapseutils/walk_functions.py
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def walk(
    syn: synapseclient.Synapse,
    synId: str,
    includeTypes: typing.List[str] = [
        "folder",
        "file",
        "table",
        "link",
        "entityview",
        "dockerrepo",
        "submissionview",
        "dataset",
        "materializedview",
    ],
):
    """
    Traverse through the hierarchy of files and folders stored under the synId.
    Has the same behavior as os.walk()

    Arguments:
        syn: A Synapse object with user's login, e.g. syn = synapseclient.login()
        synId: A synapse ID of a folder or project
        includeTypes: Must be a list of entity types (ie.["file", "table"])
                        The "folder" type is always included so the hierarchy can be traversed

    Example: Print Project & Files in slash delimited format
        Traversing through a project and print out each Folder and File

            import synapseclient
            import synapseutils
            syn = synapseclient.login()

            for directory_path, directory_names, file_name in synapseutils.walk(
                syn=syn, synId="syn1234", includeTypes=["file"]
            ):
                for directory_name in directory_names:
                    print(
                        f"Directory ({directory_name[1]}): {directory_path[0]}/{directory_name[0]}"
                    )

                for file in file_name:
                    print(f"File ({file[1]}): {directory_path[0]}/{file[0]}")

        The output will look like this assuming only 1 folder and 1 file in the directory:

            Directory (syn12345678): My Project Name/my_directory_name
            File (syn23456789): My Project Name/my_directory_name/fileA.txt

    Example: Using this function
        Traversing through a project and printing out the directory path, folders, and files

            walkedPath = walk(syn, "syn1234", ["file"]) #Exclude tables and views

            for dirpath, dirname, filename in walkedPath:
                print(dirpath)
                print(dirname) #All the folders in the directory path
                print(filename) #All the files in the directory path

    This is a high level sequence diagram of the walk function:

    ```mermaid
    sequenceDiagram
        autonumber
        participant walk

        opt Not start_entity
            walk->>client: Call `.get()` method
            client-->>walk: Metadata about the root start_entity
        end

        alt Root is not a container
            note over walk: Return early
        else newpath is none
            note over walk: Get directory path from name of entity and synapse ID
        else
            note over walk: Use path passed in from recursive call
        end

        loop Get children for container
            walk->>client: Call `.getChildren()` method
            client-->>walk: return immediate children
            note over walk: Aggregation of all children into dirs and non-dirs list
        end

        loop For each directory
            walk->>walk: Recursively call walk
        end
    ```
    """
    # Ensure that "folder" is included so the hierarchy can be traversed
    if "folder" not in includeTypes:
        includeTypes.append("folder")
    return _help_walk(syn=syn, syn_id=synId, include_types=includeTypes)

synapseutils.monitor

Functions

notifyMe(syn, messageSubject='', retries=0)

Function decorator that notifies you via email whenever an function completes running or there is a failure.

PARAMETER DESCRIPTION
syn

A Synapse object with user's login, e.g. syn = synapseclient.login()

messageSubject

A string with subject line for sent out messages.

DEFAULT: ''

retries

Number of retries to attempt on failure

DEFAULT: 0

Using this function

As a decorator:

# to decorate a function that you define
from synapseutils import notifyMe
import synapseclient
syn = synapseclient.login()

@notifyMe(syn, 'Long running function', retries=2)
def my_function(x):
    doing_something()
    return long_runtime_func(x)

my_function(123)

Wrapping a function:

# to wrap a function that already exists
from synapseutils import notifyMe
import synapseclient
syn = synapseclient.login()

notify_decorator = notifyMe(syn, 'Long running query', retries=2)
my_query = notify_decorator(syn.tableQuery)
results = my_query("select id from syn1223")
Source code in synapseutils/monitor.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
def notifyMe(syn, messageSubject="", retries=0):
    """Function decorator that notifies you via email whenever an function completes running or there is a failure.

    Arguments:
        syn: A Synapse object with user's login, e.g. syn = synapseclient.login()
        messageSubject: A string with subject line for sent out messages.
        retries: Number of retries to attempt on failure

    Example: Using this function
        As a decorator:

            # to decorate a function that you define
            from synapseutils import notifyMe
            import synapseclient
            syn = synapseclient.login()

            @notifyMe(syn, 'Long running function', retries=2)
            def my_function(x):
                doing_something()
                return long_runtime_func(x)

            my_function(123)

        Wrapping a function:

            # to wrap a function that already exists
            from synapseutils import notifyMe
            import synapseclient
            syn = synapseclient.login()

            notify_decorator = notifyMe(syn, 'Long running query', retries=2)
            my_query = notify_decorator(syn.tableQuery)
            results = my_query("select id from syn1223")
    """

    def notify_decorator(func):
        @functools.wraps(func)
        def with_retry_and_messaging(*args, **kwargs):
            attempt = 0
            destination = syn.getUserProfile()["ownerId"]
            while attempt <= retries:
                try:
                    output = func(*args, **kwargs)
                    syn.sendMessage(
                        [destination],
                        messageSubject,
                        messageBody="Call to %s completed successfully!"
                        % func.__name__,
                    )
                    return output
                except Exception as e:
                    sys.stderr.write(traceback.format_exc())
                    syn.sendMessage(
                        [destination],
                        messageSubject,
                        messageBody=(
                            "Encountered a temporary Failure during upload.  "
                            "Will retry %i more times. \n\n Error message was:\n%s\n\n%s"
                            % (retries - attempt, e, traceback.format_exc())
                        ),
                    )
                    attempt += 1

        return with_retry_and_messaging

    return notify_decorator

with_progress_bar(func, totalCalls, prefix='', postfix='', isBytes=False)

Wraps a function to add a progress bar based on the number of calls to that function.

PARAMETER DESCRIPTION
func

Function being wrapped with progress Bar

totalCalls

total number of items/bytes when completed

prefix

String printed before progress bar

DEFAULT: ''

prefix

String printed after progress bar

DEFAULT: ''

isBytes

A boolean indicating weather to convert bytes to kB, MB, GB etc.

DEFAULT: False

RETURNS DESCRIPTION

A wrapped function that contains a progress bar

Source code in synapseutils/monitor.py
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def with_progress_bar(func, totalCalls, prefix="", postfix="", isBytes=False):
    """Wraps a function to add a progress bar based on the number of calls to that function.

    Arguments:
        func: Function being wrapped with progress Bar
        totalCalls: total number of items/bytes when completed
        prefix: String printed before progress bar
        prefix: String printed after progress bar
        isBytes: A boolean indicating weather to convert bytes to kB, MB, GB etc.

    Returns:
        A wrapped function that contains a progress bar
    """
    completed = Value("d", 0)
    lock = Lock()

    def progress(*args, **kwargs):
        with lock:
            completed.value += 1
        printTransferProgress(completed.value, totalCalls, prefix, postfix, isBytes)
        return func(*args, **kwargs)

    return progress

synapseutils.migrate_functions

Classes

MigrationResult

A MigrationResult is a proxy object to the underlying sqlite db. It provides a programmatic interface that allows the caller to iterate over the file handles that were migrated without having to connect to or know the schema of the sqlite db, and also avoids the potential memory liability of putting everything into an in memory data structure that could be a liability when migrating a huge project of hundreds of thousands/millions of entities.

As this proxy object is not thread safe since it accesses an underlying sqlite db.

Source code in synapseutils/migrate_functions.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
class MigrationResult:
    """A MigrationResult is a proxy object to the underlying sqlite db.
    It provides a programmatic interface that allows the caller to iterate over the
    file handles that were migrated without having to connect to or know the schema
    of the sqlite db, and also avoids the potential memory liability of putting
    everything into an in memory data structure that could be a liability when
    migrating a huge project of hundreds of thousands/millions of entities.

    As this proxy object is not thread safe since it accesses an underlying sqlite db.
    """

    def __init__(self, syn, db_path):
        self._syn = syn
        self.db_path = db_path

    def get_counts_by_status(self):
        """
        Returns a dictionary of counts by the migration status of each indexed file/version.
        Keys are as follows:

        - `INDEXED` - the file/version has been indexed and will be migrated on a call to migrate_indexed_files
        - `MIGRATED` - the file/version has been migrated
        - `ALREADY_MIGRATED` - the file/version was already stored at the target storage location and no migration is needed
        - `ERRORED` - an error occurred while indexing or migrating the file/version
        """  # noqa
        import sqlite3

        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.cursor()

            # for the purposes of these counts, containers (Projects and Folders) do not count.
            # we are counting actual files only
            result = cursor.execute(
                "select status, count(*) from migrations where type in (?, ?) group by status",
                (_MigrationType.FILE.value, _MigrationType.TABLE_ATTACHED_FILE.value),
            )

            counts_by_status = {status.name: 0 for status in _MigrationStatus}
            for row in result:
                status = row[0]
                count = row[1]
                counts_by_status[_MigrationStatus(status).name] = count

            return counts_by_status

    def get_migrations(self):
        """
        A generator yielding each file/version in the migration index.
        A dictionary of the properties of the migration row is yielded as follows

        Yields:
            id: the Synapse id
            type: the concrete type of the entity
            version: the verson of the file entity (if applicable)
            row_id: the row of the table attached file (if applicable)
            col_id: the column id of the table attached file (if applicable)
            from_storage_location_id: - the previous storage location id where the file/version was stored
            from_file_handle_id: the id file handle of the existing file/version
            to_file_handle_id: if migrated, the new file handle id
            status: one of INDEXED, MIGRATED, ALREADY_MIGRATED, ERRORED indicating the status of the file/version
            exception: if an error was encountered indexing/migrating the file/version its stack is here
        """
        import sqlite3

        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.cursor()

            last_id = None
            column_names = None

            rowid = -1
            while True:
                results = cursor.execute(
                    """
                        select
                            rowid,

                            id,
                            type,
                            version,
                            row_id,
                            col_id,
                            from_storage_location_id,
                            from_file_handle_id,
                            to_file_handle_id,
                            file_size,
                            status,
                            exception
                        from migrations
                        where
                            rowid > ?
                            and type in (?, ?)
                        order by
                            rowid
                        limit ?
                    """,
                    (
                        rowid,
                        _MigrationType.FILE.value,
                        _MigrationType.TABLE_ATTACHED_FILE.value,
                        _get_batch_size(),
                    ),
                )

                row_count = 0
                for row in results:
                    row_count += 1

                    # using the internal sqlite rowid for ordering only
                    rowid = row[0]

                    # exclude the sqlite internal rowid
                    row_dict = _get_row_dict(cursor, row, False)
                    entity_id = row_dict["id"]
                    if entity_id != last_id:
                        # if the next row is dealing with a different entity than the last table
                        # id then we discard any cached column names we looked up
                        column_names = {}

                    row_dict["type"] = (
                        "file"
                        if row_dict["type"] == _MigrationType.FILE.value
                        else "table"
                    )

                    for int_arg in (
                        "version",
                        "row_id",
                        "from_storage_location_id",
                        "from_file_handle_id",
                        "to_file_handle_id",
                    ):
                        int_val = row_dict.get(int_arg)
                        if int_val is not None:
                            row_dict[int_arg] = int(int_val)

                    col_id = row_dict.pop("col_id", None)
                    if col_id is not None:
                        column_name = column_names.get(col_id)

                        # for usability we look up the actual column name from the id,
                        # but that involves a lookup so we cache them for re-use across
                        # rows that deal with the same table entity
                        if column_name is None:
                            column = self._syn.restGET("/column/{}".format(col_id))
                            column_name = column_names[col_id] = column["name"]

                        row_dict["col_name"] = column_name

                    row_dict["status"] = _MigrationStatus(row_dict["status"]).name

                    yield row_dict

                    last_id = entity_id

                if row_count == 0:
                    # out of rows
                    break

    def as_csv(self, path):
        """
        Output a flat csv file of the contents of the Migration index.

        Arguments:
            path: The path to the csv file to be created

        Returns:
            None: But a csv file is created at the given path with the following columns:
            id: the Synapse id
            type: the concrete type of the entity
            version: the verson of the file entity (if applicable)
            row_id: the row of the table attached file (if applicable)
            col_name: the column name of the column the table attached file resides in (if applicable)
            from_storage_location_id: the previous storage location id where the file/version was stored
            from_file_handle_id: the id file handle of the existing file/version
            to_file_handle_id: if migrated, the new file handle id
            status: one of INDEXED, MIGRATED, ALREADY_MIGRATED, ERRORED indicating the status of the file/version
            exception: if an error was encountered indexing/migrating the file/version its stack is here

        """

        with open(path, "w", newline="") as csv_file:
            csv_writer = csv.writer(csv_file)

            # headers
            csv_writer.writerow(
                [
                    "id",
                    "type",
                    "version",
                    "row_id",
                    "col_name",
                    "from_storage_location_id",
                    "from_file_handle_id",
                    "to_file_handle_id",
                    "status",
                    "exception",
                ]
            )

            for row_dict in self.get_migrations():
                row_data = [
                    row_dict["id"],
                    row_dict["type"],
                    row_dict.get("version"),
                    row_dict.get("row_id"),
                    row_dict.get("col_name"),
                    row_dict.get("from_storage_location_id"),
                    row_dict.get("from_file_handle_id"),
                    row_dict.get("to_file_handle_id"),
                    row_dict["status"],
                    row_dict.get("exception"),
                ]

                csv_writer.writerow(row_data)
Functions
get_counts_by_status()

Returns a dictionary of counts by the migration status of each indexed file/version. Keys are as follows:

  • INDEXED - the file/version has been indexed and will be migrated on a call to migrate_indexed_files
  • MIGRATED - the file/version has been migrated
  • ALREADY_MIGRATED - the file/version was already stored at the target storage location and no migration is needed
  • ERRORED - an error occurred while indexing or migrating the file/version
Source code in synapseutils/migrate_functions.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
def get_counts_by_status(self):
    """
    Returns a dictionary of counts by the migration status of each indexed file/version.
    Keys are as follows:

    - `INDEXED` - the file/version has been indexed and will be migrated on a call to migrate_indexed_files
    - `MIGRATED` - the file/version has been migrated
    - `ALREADY_MIGRATED` - the file/version was already stored at the target storage location and no migration is needed
    - `ERRORED` - an error occurred while indexing or migrating the file/version
    """  # noqa
    import sqlite3

    with sqlite3.connect(self.db_path) as conn:
        cursor = conn.cursor()

        # for the purposes of these counts, containers (Projects and Folders) do not count.
        # we are counting actual files only
        result = cursor.execute(
            "select status, count(*) from migrations where type in (?, ?) group by status",
            (_MigrationType.FILE.value, _MigrationType.TABLE_ATTACHED_FILE.value),
        )

        counts_by_status = {status.name: 0 for status in _MigrationStatus}
        for row in result:
            status = row[0]
            count = row[1]
            counts_by_status[_MigrationStatus(status).name] = count

        return counts_by_status
get_migrations()

A generator yielding each file/version in the migration index. A dictionary of the properties of the migration row is yielded as follows

YIELDS DESCRIPTION
id

the Synapse id

type

the concrete type of the entity

version

the verson of the file entity (if applicable)

row_id

the row of the table attached file (if applicable)

col_id

the column id of the table attached file (if applicable)

from_storage_location_id
  • the previous storage location id where the file/version was stored
from_file_handle_id

the id file handle of the existing file/version

to_file_handle_id

if migrated, the new file handle id

status

one of INDEXED, MIGRATED, ALREADY_MIGRATED, ERRORED indicating the status of the file/version

exception

if an error was encountered indexing/migrating the file/version its stack is here

Source code in synapseutils/migrate_functions.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
def get_migrations(self):
    """
    A generator yielding each file/version in the migration index.
    A dictionary of the properties of the migration row is yielded as follows

    Yields:
        id: the Synapse id
        type: the concrete type of the entity
        version: the verson of the file entity (if applicable)
        row_id: the row of the table attached file (if applicable)
        col_id: the column id of the table attached file (if applicable)
        from_storage_location_id: - the previous storage location id where the file/version was stored
        from_file_handle_id: the id file handle of the existing file/version
        to_file_handle_id: if migrated, the new file handle id
        status: one of INDEXED, MIGRATED, ALREADY_MIGRATED, ERRORED indicating the status of the file/version
        exception: if an error was encountered indexing/migrating the file/version its stack is here
    """
    import sqlite3

    with sqlite3.connect(self.db_path) as conn:
        cursor = conn.cursor()

        last_id = None
        column_names = None

        rowid = -1
        while True:
            results = cursor.execute(
                """
                    select
                        rowid,

                        id,
                        type,
                        version,
                        row_id,
                        col_id,
                        from_storage_location_id,
                        from_file_handle_id,
                        to_file_handle_id,
                        file_size,
                        status,
                        exception
                    from migrations
                    where
                        rowid > ?
                        and type in (?, ?)
                    order by
                        rowid
                    limit ?
                """,
                (
                    rowid,
                    _MigrationType.FILE.value,
                    _MigrationType.TABLE_ATTACHED_FILE.value,
                    _get_batch_size(),
                ),
            )

            row_count = 0
            for row in results:
                row_count += 1

                # using the internal sqlite rowid for ordering only
                rowid = row[0]

                # exclude the sqlite internal rowid
                row_dict = _get_row_dict(cursor, row, False)
                entity_id = row_dict["id"]
                if entity_id != last_id:
                    # if the next row is dealing with a different entity than the last table
                    # id then we discard any cached column names we looked up
                    column_names = {}

                row_dict["type"] = (
                    "file"
                    if row_dict["type"] == _MigrationType.FILE.value
                    else "table"
                )

                for int_arg in (
                    "version",
                    "row_id",
                    "from_storage_location_id",
                    "from_file_handle_id",
                    "to_file_handle_id",
                ):
                    int_val = row_dict.get(int_arg)
                    if int_val is not None:
                        row_dict[int_arg] = int(int_val)

                col_id = row_dict.pop("col_id", None)
                if col_id is not None:
                    column_name = column_names.get(col_id)

                    # for usability we look up the actual column name from the id,
                    # but that involves a lookup so we cache them for re-use across
                    # rows that deal with the same table entity
                    if column_name is None:
                        column = self._syn.restGET("/column/{}".format(col_id))
                        column_name = column_names[col_id] = column["name"]

                    row_dict["col_name"] = column_name

                row_dict["status"] = _MigrationStatus(row_dict["status"]).name

                yield row_dict

                last_id = entity_id

            if row_count == 0:
                # out of rows
                break
as_csv(path)

Output a flat csv file of the contents of the Migration index.

PARAMETER DESCRIPTION
path

The path to the csv file to be created

RETURNS DESCRIPTION
None

But a csv file is created at the given path with the following columns:

id

the Synapse id

type

the concrete type of the entity

version

the verson of the file entity (if applicable)

row_id

the row of the table attached file (if applicable)

col_name

the column name of the column the table attached file resides in (if applicable)

from_storage_location_id

the previous storage location id where the file/version was stored

from_file_handle_id

the id file handle of the existing file/version

to_file_handle_id

if migrated, the new file handle id

status

one of INDEXED, MIGRATED, ALREADY_MIGRATED, ERRORED indicating the status of the file/version

exception

if an error was encountered indexing/migrating the file/version its stack is here

Source code in synapseutils/migrate_functions.py
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
def as_csv(self, path):
    """
    Output a flat csv file of the contents of the Migration index.

    Arguments:
        path: The path to the csv file to be created

    Returns:
        None: But a csv file is created at the given path with the following columns:
        id: the Synapse id
        type: the concrete type of the entity
        version: the verson of the file entity (if applicable)
        row_id: the row of the table attached file (if applicable)
        col_name: the column name of the column the table attached file resides in (if applicable)
        from_storage_location_id: the previous storage location id where the file/version was stored
        from_file_handle_id: the id file handle of the existing file/version
        to_file_handle_id: if migrated, the new file handle id
        status: one of INDEXED, MIGRATED, ALREADY_MIGRATED, ERRORED indicating the status of the file/version
        exception: if an error was encountered indexing/migrating the file/version its stack is here

    """

    with open(path, "w", newline="") as csv_file:
        csv_writer = csv.writer(csv_file)

        # headers
        csv_writer.writerow(
            [
                "id",
                "type",
                "version",
                "row_id",
                "col_name",
                "from_storage_location_id",
                "from_file_handle_id",
                "to_file_handle_id",
                "status",
                "exception",
            ]
        )

        for row_dict in self.get_migrations():
            row_data = [
                row_dict["id"],
                row_dict["type"],
                row_dict.get("version"),
                row_dict.get("row_id"),
                row_dict.get("col_name"),
                row_dict.get("from_storage_location_id"),
                row_dict.get("from_file_handle_id"),
                row_dict.get("to_file_handle_id"),
                row_dict["status"],
                row_dict.get("exception"),
            ]

            csv_writer.writerow(row_data)

Functions

index_files_for_migration(syn, entity, dest_storage_location_id, db_path, source_storage_location_ids=None, file_version_strategy='new', include_table_files=False, continue_on_error=False)

Index the given entity for migration to a new storage location. This is the first step in migrating an entity to a new storage location using synapseutils.

This function will create a sqlite database at the given db_path that can be subsequently passed to the migrate_indexed_files function for actual migration. This function itself does not modify the given entity in any way.

PARAMETER DESCRIPTION
syn

A Synapse object with user's login, e.g. syn = synapseclient.login()

TYPE: Synapse

entity

A Synapse entity whose files should be migrated. Can be a Project, Folder, File entity, or Table entity. If it is a container (a Project or Folder) its contents will be recursively indexed.

dest_storage_location_id

The id of the new storage location to be migrated to.

TYPE: str

db_path

A path on disk where a sqlite db can be created to store the contents of the created index.

TYPE: str

source_storage_location_ids

An optional iterable of storage location ids that will be migrated. If provided, files outside of one of the listed storage locations will not be indexed for migration. If not provided, then all files not already in the destination storage location will be indexed for migrated.

TYPE: Iterable[str] DEFAULT: None

file_version_strategy

One of "new" (default), "all", "latest", "skip" as follows:

  • new: will create a new version of file entities in the new storage location, leaving existing versions unchanged
  • all: all existing versions will be migrated in place to the new storage location
  • latest: the latest version will be migrated in place to the new storage location
  • skip: skip migrating file entities. use this e.g. if wanting to e.g. migrate table attached files in a container while leaving the files unchanged

DEFAULT: 'new'

include_table_files

Whether to migrate files attached to tables. If False (default) then e.g. only file entities in the container will be migrated and tables will be untouched.

DEFAULT: False

continue_on_error

Whether any errors encountered while indexing an entity (access etc) will be raised or instead just recorded in the index while allowing the index creation to continue. Default is False (any errors are raised).

DEFAULT: False

RETURNS DESCRIPTION

A MigrationResult object that can be used to inspect the contents of the index or output the index to a CSV for manual inspection.

Source code in synapseutils/migrate_functions.py
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
def index_files_for_migration(
    syn: synapseclient.Synapse,
    entity,
    dest_storage_location_id: str,
    db_path: str,
    source_storage_location_ids: typing.Iterable[str] = None,
    file_version_strategy="new",
    include_table_files=False,
    continue_on_error=False,
):
    """
    Index the given entity for migration to a new storage location. This is the first step in migrating an entity
    to a new storage location using synapseutils.

    This function will create a sqlite database at the given db_path that can be subsequently passed
    to the migrate_indexed_files function for actual migration. This function itself does not modify the given entity
    in any way.

    Arguments:
        syn: A Synapse object with user's login, e.g. syn = synapseclient.login()
        entity: A Synapse entity whose files should be migrated. Can be a Project, Folder,
                File entity, or Table entity. If it is a container (a Project or Folder)
                its contents will be recursively indexed.
        dest_storage_location_id: The id of the new storage location to be migrated to.
        db_path: A path on disk where a sqlite db can be created to store the contents of the
                    created index.
        source_storage_location_ids: An optional iterable of storage location ids that
                                        will be migrated. If provided, files outside of
                                        one of the listed storage locations will not be
                                        indexed for migration. If not provided, then all
                                        files not already in the destination storage
                                        location will be indexed for migrated.
        file_version_strategy: One of "new" (default), "all", "latest", "skip" as follows:

            - `new`: will create a new version of file entities in the new storage location, leaving existing versions unchanged
            - `all`: all existing versions will be migrated in place to the new storage location
            - `latest`: the latest version will be migrated in place to the new storage location
            - `skip`: skip migrating file entities. use this e.g. if wanting to e.g. migrate table attached files in a container while leaving the files unchanged
        include_table_files: Whether to migrate files attached to tables. If False (default) then e.g. only
                                file entities in the container will be migrated and tables will be untouched.
        continue_on_error: Whether any errors encountered while indexing an entity (access etc) will be raised
                                or instead just recorded in the index while allowing the index creation
                                to continue. Default is False (any errors are raised).

    Returns:
        A MigrationResult object that can be used to inspect the contents of the index or output the index to a CSV for manual inspection.
    """  # noqa
    root_id = utils.id_of(entity)

    # accept an Iterable, but easier to work internally if we can assume a list of strings
    source_storage_location_ids = [str(s) for s in source_storage_location_ids or []]

    file_version_strategies = {"new", "all", "latest", "skip"}
    if file_version_strategy not in file_version_strategies:
        raise ValueError(
            "Invalid file_version_strategy: {}, must be one of {}".format(
                file_version_strategy, file_version_strategies
            )
        )

    if file_version_strategy == "skip" and not include_table_files:
        raise ValueError(
            "Skipping both files entities and table attached files, nothing to migrate"
        )

    _verify_storage_location_ownership(syn, dest_storage_location_id)

    test_import_sqlite3()
    import sqlite3

    with sqlite3.connect(db_path) as conn:
        cursor = conn.cursor()
        _ensure_schema(cursor)

        _verify_index_settings(
            cursor,
            db_path,
            root_id,
            dest_storage_location_id,
            source_storage_location_ids,
            file_version_strategy,
            include_table_files,
        )
        conn.commit()

        entity = syn.get(root_id, downloadFile=False)
        try:
            _index_entity(
                conn,
                cursor,
                syn,
                entity,
                None,
                dest_storage_location_id,
                source_storage_location_ids,
                file_version_strategy,
                include_table_files,
                continue_on_error,
            )

        except _IndexingError as indexing_ex:
            logging.exception(
                "Aborted due to failure to index entity %s of type %s. Use the continue_on_error option to skip "
                "over entities due to individual failures.",
                indexing_ex.entity_id,
                indexing_ex.concrete_type,
            )

            raise indexing_ex.__cause__

    return MigrationResult(syn, db_path)

migrate_indexed_files(syn, db_path, create_table_snapshots=True, continue_on_error=False, force=False)

Migrate files previously indexed in a sqlite database at the given db_path using the separate index_files_for_migration function. The files listed in the index will be migrated according to the configuration of that index.

PARAMETER DESCRIPTION
syn

A Synapse object with user's login, e.g. syn = synapseclient.login()

TYPE: Synapse

db_path

A path on disk where a sqlite db was created using the index_files_for_migration function.

TYPE: str

create_table_snapshots

When updating the files in any table, whether the a snapshot of the table is first created.

DEFAULT: True

continue_on_error

Whether any errors encountered while migrating will be raised or instead just recorded in the sqlite database while allowing the migration to continue. Default is False (any errors are raised).

DEFAULT: False

force

If running in an interactive shell, migration requires an interactice confirmation. This can be bypassed by using the force=True option.

DEFAULT: False

RETURNS DESCRIPTION
Union[MigrationResult, None]

A MigrationResult object that can be used to inspect the results of the migration.

Source code in synapseutils/migrate_functions.py
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
def migrate_indexed_files(
    syn: synapseclient.Synapse,
    db_path: str,
    create_table_snapshots=True,
    continue_on_error=False,
    force=False,
) -> typing.Union[MigrationResult, None]:
    """
    Migrate files previously indexed in a sqlite database at the given db_path using the separate
    index_files_for_migration function. The files listed in the index will be migrated according to the
    configuration of that index.

    Arguments:
        syn: A Synapse object with user's login, e.g. syn = synapseclient.login()
        db_path: A path on disk where a sqlite db was created using the index_files_for_migration function.
        create_table_snapshots: When updating the files in any table, whether the a snapshot of the table is
                                first created.
        continue_on_error: Whether any errors encountered while migrating will be raised
                            or instead just recorded in the sqlite database while allowing the migration
                            to continue. Default is False (any errors are raised).
        force: If running in an interactive shell, migration requires an interactice confirmation.
                This can be bypassed by using the force=True option.

    Returns:
        A MigrationResult object that can be used to inspect the results of the migration.
    """
    executor, max_concurrent_file_copies = _get_executor(syn)

    test_import_sqlite3()
    import sqlite3

    with sqlite3.connect(db_path) as conn:
        cursor = conn.cursor()

        _ensure_schema(cursor)
        settings = _retrieve_index_settings(cursor)
        if settings is None:
            # no settings were available at the index given
            raise ValueError(
                "Unable to retrieve existing index settings from '{}'. "
                "Either this path does represent a previously created migration index file or the file is corrupt."
            )

        dest_storage_location_id = settings["dest_storage_location_id"]
        if not _confirm_migration(cursor, force, dest_storage_location_id):
            logging.info("Migration aborted.")
            return

        key = _MigrationKey(id="", type=None, row_id=-1, col_id=-1, version=-1)

        futures = set()

        # we keep track of the file handles that are currently being migrated
        # so that if we encounter multiple entities associated with the same
        # file handle we can copy the file handle once and update all the entities
        # with the single copied file handle
        pending_file_handle_ids = set()
        completed_file_handle_ids = set()

        # we keep track of the entity keys (syn id + version) so that we know
        # if we encounter the same one twice. normally we wouldn't but when we backtrack
        # to update any entities skipped because of a shared file handle we might
        # query for the same key as is already being operated on.
        pending_keys = set()

        batch_size = _get_batch_size()
        while True:
            # we query for additional file or table associated file handles to migrate in batches
            # ordering by synapse id. there can be multiple file handles associated with a particular
            # synapse id (i.e. multiple file entity versions or multiple table attached files per table),
            # so the ordering and where clause need to account for that.
            # we also include in the query any unmigrated files that were skipped previously through
            # the query loop that share a file handle with a file handle id that is now finished.
            version = key.version if key.version is not None else -1
            row_id = key.row_id if key.row_id is not None else -1
            col_id = key.col_id if key.col_id is not None else -1

            query_kwargs = {
                "indexed_status": _MigrationStatus.INDEXED.value,
                "id": key.id,
                "file_type": _MigrationType.FILE.value,
                "table_type": _MigrationType.TABLE_ATTACHED_FILE.value,
                "version": version,
                "row_id": row_id,
                "col_id": col_id,
                # ensure that we aren't ever adding more items to the shared executor than allowed
                "limit": min(batch_size, max_concurrent_file_copies - len(futures)),
            }

            # we can't use both named and positional literals in a query, so we use named
            # literals and then inline a string for the values for our file handle ids
            # since these are a dynamic list of values
            pending_file_handle_in = "('" + "','".join(pending_file_handle_ids) + "')"
            completed_file_handle_in = (
                "('" + "','".join(completed_file_handle_ids) + "')"
            )

            results = cursor.execute(
                f"""
                    select
                        id,
                        type,
                        version,
                        row_id,
                        col_id,
                        from_file_handle_id,
                        file_size
                    from migrations
                    where
                        status = :indexed_status
                        and (
                                (
                                    ((id > :id and type in (:file_type, :table_type))
                                    or (id = :id and type = :file_type and version is not null and version > :version)
                                    or (id = :id and type = :table_type and (row_id > :row_id or (row_id = :row_id and col_id > :col_id))))
                                    and from_file_handle_id not in {pending_file_handle_in}
                                ) or
                                (
                                    id <= :id
                                    and from_file_handle_id in {completed_file_handle_in}
                                )
                        )
                    order by
                        id,
                        type,
                        row_id,
                        col_id,
                        version
                    limit :limit
                """,  # noqa
                query_kwargs,
            )

            row_count = 0
            for row in results:
                row_count += 1

                row_dict = _get_row_dict(cursor, row, True)
                key_dict = {
                    k: v
                    for k, v in row_dict.items()
                    if k in ("id", "type", "version", "row_id", "col_id")
                }

                last_key = key
                key = _MigrationKey(**key_dict)
                from_file_handle_id = row_dict["from_file_handle_id"]

                if (
                    key in pending_keys
                    or from_file_handle_id in pending_file_handle_ids
                ):
                    # if this record is already being migrated or it shares a file handle
                    # with a record that is being migrated then skip this.
                    # if it the record shares a file handle it will be picked up later
                    # when its file handle is completed.
                    continue

                file_size = row_dict["file_size"]

                pending_keys.add(key)
                to_file_handle_id = _check_file_handle_exists(
                    conn.cursor(), from_file_handle_id
                )
                if not to_file_handle_id:
                    pending_file_handle_ids.add(from_file_handle_id)

                if key.type == _MigrationType.FILE.value:
                    if key.version is None:
                        migration_fn = _create_new_file_version

                    else:
                        migration_fn = _migrate_file_version

                elif key.type == _MigrationType.TABLE_ATTACHED_FILE.value:
                    if last_key.id != key.id and create_table_snapshots:
                        syn.create_snapshot_version(key.id)

                    migration_fn = _migrate_table_attached_file

                else:
                    raise ValueError(
                        "Unexpected type {} with id {}".format(key.type, key.id)
                    )

                def migration_task(
                    syn,
                    key,
                    from_file_handle_id,
                    to_file_handle_id,
                    file_size,
                    storage_location_id,
                ):
                    # a closure to wrap the actual function call so that we an add some local variables
                    # to the return tuple which will be consumed when the future is processed
                    with shared_executor(executor):
                        try:
                            # instrument the shared executor in this thread so that we won't
                            # create a new executor to perform the multipart copy
                            to_file_handle_id = migration_fn(
                                syn,
                                key,
                                from_file_handle_id,
                                to_file_handle_id,
                                file_size,
                                storage_location_id,
                            )
                            return key, from_file_handle_id, to_file_handle_id
                        except Exception as ex:
                            raise _MigrationError(
                                key, from_file_handle_id, to_file_handle_id
                            ) from ex

                future = executor.submit(
                    migration_task,
                    syn,
                    key,
                    from_file_handle_id,
                    to_file_handle_id,
                    file_size,
                    dest_storage_location_id,
                )
                futures.add(future)

            if row_count == 0 and not pending_file_handle_ids:
                # we've run out of migratable sqlite rows, we have nothing else
                # to submit, so we break out and wait for all remaining
                # tasks to conclude.
                break

            if len(futures) >= max_concurrent_file_copies or row_count < batch_size:
                # if we have no concurrency left to process any additional entities
                # or if we're near the end of he migration and have a small
                # remainder batch then we wait for one of the processing migrations
                # to finish. a small batch doesn't mean this is the last batch since
                # a completed file handle here could be associated with another
                # entity that we deferred before because it shared the same file handle id
                futures, completed_file_handle_ids = _wait_futures(
                    conn,
                    cursor,
                    futures,
                    pending_keys,
                    concurrent.futures.FIRST_COMPLETED,
                    continue_on_error,
                )

                pending_file_handle_ids -= completed_file_handle_ids

        if futures:
            # wait for all remaining migrations to conclude before returning
            _wait_futures(
                conn,
                cursor,
                futures,
                pending_keys,
                concurrent.futures.ALL_COMPLETED,
                continue_on_error,
            )

    return MigrationResult(syn, db_path)

synapseutils.describe_functions

Functions

describe(syn, entity)

Gets a synapse entity and returns summary statistics about it.

PARAMETER DESCRIPTION
syn

A Synapse object with user's login, e.g. syn = synapseclient.login()

entity

synapse id of the entity to be described

TYPE: str

Using this function

Describing columns of a table

import synapseclient
import synapseutils
syn = synapseclient.login()
statistics = synapseutils(syn, entity="syn123")
print(statistics)
{
    "column1": {
        "dtype": "object",
        "mode": "FOOBAR"
    },
    "column2": {
        "dtype": "int64",
        "mode": 1,
        "min": 1,
        "max": 2,
        "mean": 1.4
    },
    "column3": {
        "dtype": "bool",
        "mode": false,
        "min": false,
        "max": true,
        "mean": 0.5
    }
}
RETURNS DESCRIPTION
Union[dict, None]

A dict if the dataset is valid; None if not.

Source code in synapseutils/describe_functions.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def describe(syn, entity: str) -> typing.Union[dict, None]:
    """
    Gets a synapse entity and returns summary statistics about it.

    Arguments:
        syn: A Synapse object with user's login, e.g. syn = synapseclient.login()
        entity: synapse id of the entity to be described

    Example: Using this function
        Describing columns of a table

            import synapseclient
            import synapseutils
            syn = synapseclient.login()
            statistics = synapseutils(syn, entity="syn123")
            print(statistics)
            {
                "column1": {
                    "dtype": "object",
                    "mode": "FOOBAR"
                },
                "column2": {
                    "dtype": "int64",
                    "mode": 1,
                    "min": 1,
                    "max": 2,
                    "mean": 1.4
                },
                "column3": {
                    "dtype": "bool",
                    "mode": false,
                    "min": false,
                    "max": true,
                    "mean": 0.5
                }
            }

    Returns:
        A dict if the dataset is valid; None if not.
    """
    df = _open_entity_as_df(syn=syn, entity=entity)

    if df is None:
        return None

    stats = _describe_wrapper(df)
    syn.logger.info(json.dumps(stats, indent=2, default=str))
    return stats