Skip to content

Curator

synapseclient.extensions.curator

Synapse Curator Extensions

This module provides library functions for metadata curation tasks in Synapse.

Functions

create_file_based_metadata_task

create_file_based_metadata_task(folder_id: str, curation_task_name: str, instructions: str, attach_wiki: bool = True, entity_view_name: str = 'JSON Schema view', schema_uri: Optional[str] = None, enable_derived_annotations: bool = False, *, synapse_client: Optional[Synapse] = None) -> Tuple[str, str]

Create a file view for a schema-bound folder using schematic.

Creating a file-based metadata curation task with schema binding

In this example, we create an EntityView and CurationTask for file-based metadata curation. If a schema_uri is provided, it will be bound to the folder.

import synapseclient
from synapseclient.extensions.curator import create_file_based_metadata_task

syn = synapseclient.Synapse()
syn.login()

entity_view_id, task_id = create_file_based_metadata_task(
    synapse_client=syn,
    folder_id="syn12345678",
    curation_task_name="BiospecimenMetadataTemplate",
    instructions="Please curate this metadata according to the schema requirements",
    attach_wiki=True,
    entity_view_name="Biospecimen Metadata View",
    schema_uri="sage.schemas.v2571-amp.Biospecimen.schema-0.0.1"
)
PARAMETER DESCRIPTION
folder_id

The Synapse Folder ID to create the file view for.

TYPE: str

curation_task_name

Name for the CurationTask (used as data_type field). Must be unique within the project, otherwise if it matches an existing CurationTask, that task will be updated with new data.

TYPE: str

instructions

Instructions for the curation task.

TYPE: str

attach_wiki

Whether or not to attach a Synapse Wiki (default: True).

TYPE: bool DEFAULT: True

entity_view_name

Name for the created entity view (default: "JSON Schema view").

TYPE: str DEFAULT: 'JSON Schema view'

schema_uri

Optional JSON schema URI to bind to the folder. If provided, the schema will be bound to the folder before creating the entity view. (e.g., 'sage.schemas.v2571-amp.Biospecimen.schema-0.0.1')

TYPE: Optional[str] DEFAULT: None

enable_derived_annotations

If true, enable derived annotations. Defaults to False.

TYPE: bool DEFAULT: False

synapse_client

If not passed in and caching was not disabled by Synapse.allow_client_caching(False) this will use the last created instance from the Synapse class constructor.

TYPE: Optional[Synapse] DEFAULT: None

RETURNS DESCRIPTION
Tuple[str, str]

A tuple containing: - The Synapse ID of the entity view created - The task ID of the curation task created

RAISES DESCRIPTION
ValueError

If required parameters are missing.

SynapseError

If there are issues with Synapse operations.

Source code in synapseclient/extensions/curator/file_based_metadata_task.py
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
def create_file_based_metadata_task(
    folder_id: str,
    curation_task_name: str,
    instructions: str,
    attach_wiki: bool = True,
    entity_view_name: str = "JSON Schema view",
    schema_uri: Optional[str] = None,
    enable_derived_annotations: bool = False,
    *,
    synapse_client: Optional[Synapse] = None,
) -> Tuple[str, str]:
    """
    Create a file view for a schema-bound folder using schematic.

    Example: Creating a file-based metadata curation task with schema binding
        In this example, we create an EntityView and CurationTask for file-based
        metadata curation. If a schema_uri is provided, it will be bound to the folder.

        ```python
        import synapseclient
        from synapseclient.extensions.curator import create_file_based_metadata_task

        syn = synapseclient.Synapse()
        syn.login()

        entity_view_id, task_id = create_file_based_metadata_task(
            synapse_client=syn,
            folder_id="syn12345678",
            curation_task_name="BiospecimenMetadataTemplate",
            instructions="Please curate this metadata according to the schema requirements",
            attach_wiki=True,
            entity_view_name="Biospecimen Metadata View",
            schema_uri="sage.schemas.v2571-amp.Biospecimen.schema-0.0.1"
        )
        ```

    Arguments:
        folder_id: The Synapse Folder ID to create the file view for.
        curation_task_name: Name for the CurationTask (used as data_type field).
            Must be unique within the project, otherwise if it matches an existing
            CurationTask, that task will be updated with new data.
        instructions: Instructions for the curation task.
        attach_wiki: Whether or not to attach a Synapse Wiki (default: True).
        entity_view_name: Name for the created entity view (default: "JSON Schema view").
        schema_uri: Optional JSON schema URI to bind to the folder. If provided,
            the schema will be bound to the folder before creating the entity view.
            (e.g., 'sage.schemas.v2571-amp.Biospecimen.schema-0.0.1')
        enable_derived_annotations: If true, enable derived annotations. Defaults to False.
        synapse_client: If not passed in and caching was not disabled by
                `Synapse.allow_client_caching(False)` this will use the last created
                instance from the Synapse class constructor.

    Returns:
        A tuple containing:
          - The Synapse ID of the entity view created
          - The task ID of the curation task created

    Raises:
        ValueError: If required parameters are missing.
        SynapseError: If there are issues with Synapse operations.
    """
    # Validate required parameters
    if not folder_id:
        raise ValueError("folder_id is required")
    if not curation_task_name:
        raise ValueError("curation_task_name is required")
    if not instructions:
        raise ValueError("instructions is required")

    synapse_client = Synapse.get_client(synapse_client=synapse_client)

    # Bind schema to folder if schema_uri is provided
    if schema_uri:
        synapse_client.logger.info(
            f"Attempting to bind schema {schema_uri} to folder {folder_id}."
        )
        try:
            folder = Folder(folder_id).get(synapse_client=synapse_client)
            folder.bind_schema(
                json_schema_uri=schema_uri,
                enable_derived_annotations=enable_derived_annotations,
                synapse_client=synapse_client,
            )
            synapse_client.logger.info(
                f"Successfully bound schema {schema_uri} to folder {folder_id}."
            )
        except Exception as e:
            synapse_client.logger.exception(
                f"Error binding schema {schema_uri} to folder {folder_id}"
            )
            raise e

    synapse_client.logger.info("Attempting to create entity view.")
    try:
        entity_view_id = create_json_schema_entity_view(
            syn=synapse_client,
            synapse_entity_id=folder_id,
            entity_view_name=entity_view_name,
        )
    except Exception as e:
        synapse_client.logger.exception("Error creating entity view")
        raise e
    synapse_client.logger.info("Created entity view.")

    if attach_wiki:
        synapse_client.logger.info("Attempting to attach wiki.")
        try:
            create_or_update_wiki_with_entity_view(
                syn=synapse_client, entity_view_id=entity_view_id, owner_id=folder_id
            )
        except Exception as e:
            synapse_client.logger.exception("Error creating wiki")
            raise e
        synapse_client.logger.info("Wiki attached.")

    # Validate that the folder has an attached JSON schema
    # The curation_task_name parameter is now required and used directly for the CurationTask.

    synapse_client.logger.info("Attempting to get the attached schema.")
    try:
        entity = get(folder_id, synapse_client=synapse_client)
        entity.get_schema(synapse_client=synapse_client)
    except Exception as e:
        synapse_client.logger.exception("Error getting the attached schema.")
        raise e
    synapse_client.logger.info("Schema retrieval successful")

    # Use the provided curation_task_name (required parameter)
    task_datatype = curation_task_name

    synapse_client.logger.info(
        "Attempting to get the Synapse ID of the provided folders project."
    )
    try:
        entity = Folder(folder_id).get(synapse_client=synapse_client)
        parent = synapse_client.get(entity.parent_id)
        project = None
        while not project:
            if parent.concreteType == "org.sagebionetworks.repo.model.Project":
                project = parent
                break
            parent = synapse_client.get(parent.parentId)
    except Exception as e:
        synapse_client.logger.exception(
            "Error getting the Synapse ID of the provided folders project"
        )
        raise e
    synapse_client.logger.info("Got the Synapse ID of the provided folders project.")

    synapse_client.logger.info("Attempting to create the CurationTask.")
    try:
        task = CurationTask(
            data_type=task_datatype,
            project_id=project.id,
            instructions=instructions,
            task_properties=FileBasedMetadataTaskProperties(
                upload_folder_id=folder_id,
                file_view_id=entity_view_id,
            ),
        ).store(synapse_client=synapse_client)
    except Exception as e:
        synapse_client.logger.exception("Error creating the CurationTask.")
        raise e
    synapse_client.logger.info("Created the CurationTask.")

    return (entity_view_id, task.task_id)

create_record_based_metadata_task

create_record_based_metadata_task(project_id: str, folder_id: str, record_set_name: str, record_set_description: str, curation_task_name: str, upsert_keys: List[str], instructions: str, schema_uri: str, bind_schema_to_record_set: bool = True, enable_derived_annotations: bool = False, *, synapse_client: Optional[Synapse] = None) -> Tuple[RecordSet, CurationTask, Grid]

Generate and upload CSV templates as a RecordSet for record-based metadata, create a CurationTask, and also create a Grid to bootstrap the ValidationStatistics.

A number of schema URIs that are already registered to Synapse can be found at:

If you have yet to create and register your JSON schema in Synapse, please refer to the tutorial at https://python-docs.synapse.org/en/stable/tutorials/python/json_schema/.

Creating a record-based metadata curation task with a schema URI

In this example, we create a RecordSet and CurationTask for biospecimen metadata curation using a schema URI. By default this will also bind the schema to the RecordSet, however the bind_schema_to_record_set parameter can be set to False to skip that step.

import synapseclient
from synapseclient.extensions.curator import create_record_based_metadata_task

syn = synapseclient.Synapse()
syn.login()

record_set, task, grid = create_record_based_metadata_task(
    synapse_client=syn,
    project_id="syn12345678",
    folder_id="syn87654321",
    record_set_name="BiospecimenMetadata_RecordSet",
    record_set_description="RecordSet for biospecimen metadata curation",
    curation_task_name="BiospecimenMetadataTemplate",
    upsert_keys=["specimenID"],
    instructions="Please curate this metadata according to the schema requirements",
    schema_uri="schema-org-schema.name.schema-v1.0.0"
)
PARAMETER DESCRIPTION
project_id

The Synapse ID of the project where the folder exists.

TYPE: str

folder_id

The Synapse ID of the folder to upload to.

TYPE: str

record_set_name

Name for the RecordSet.

TYPE: str

record_set_description

Description for the RecordSet.

TYPE: str

curation_task_name

Name for the CurationTask (used as data_type field). Must be unique within the project, otherwise if it matches an existing CurationTask, that task will be updated with new data.

TYPE: str

upsert_keys

List of column names to use as upsert keys.

TYPE: List[str]

instructions

Instructions for the curation task.

TYPE: str

schema_uri

JSON schema URI for the RecordSet schema. (e.g., 'sage.schemas.v2571-amp.Biospecimen.schema-0.0.1', 'sage.schemas.v2571-ad.Analysis.schema-0.0.0')

TYPE: str

bind_schema_to_record_set

Whether to bind the given schema to the RecordSet (default: True).

TYPE: bool DEFAULT: True

enable_derived_annotations

If true, enable derived annotations. Defaults to False.

TYPE: bool DEFAULT: False

synapse_client

If not passed in and caching was not disabled by Synapse.allow_client_caching(False) this will use the last created instance from the Synapse class constructor.

TYPE: Optional[Synapse] DEFAULT: None

RETURNS DESCRIPTION
Tuple[RecordSet, CurationTask, Grid]

Tuple containing the created RecordSet, CurationTask, and Grid objects

RAISES DESCRIPTION
ValueError

If required parameters are missing or if schema_uri is not provided.

SynapseError

If there are issues with Synapse operations.

Source code in synapseclient/extensions/curator/record_based_metadata_task.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
def create_record_based_metadata_task(
    project_id: str,
    folder_id: str,
    record_set_name: str,
    record_set_description: str,
    curation_task_name: str,
    upsert_keys: List[str],
    instructions: str,
    schema_uri: str,
    bind_schema_to_record_set: bool = True,
    enable_derived_annotations: bool = False,
    *,
    synapse_client: Optional[Synapse] = None,
) -> Tuple[RecordSet, CurationTask, Grid]:
    """
    Generate and upload CSV templates as a RecordSet for record-based metadata,
    create a CurationTask, and also create a Grid to bootstrap the ValidationStatistics.

    A number of schema URIs that are already registered to Synapse can be found at:

    - <https://www.synapse.org/Synapse:syn69735275/tables/>


    If you have yet to create and register your JSON schema in Synapse, please refer to
    the tutorial at <https://python-docs.synapse.org/en/stable/tutorials/python/json_schema/>.


    Example: Creating a record-based metadata curation task with a schema URI
        In this example, we create a RecordSet and CurationTask for biospecimen metadata
        curation using a schema URI. By default this will also bind the schema to the
        RecordSet, however the `bind_schema_to_record_set` parameter can be set to
        False to skip that step.


        ```python
        import synapseclient
        from synapseclient.extensions.curator import create_record_based_metadata_task

        syn = synapseclient.Synapse()
        syn.login()

        record_set, task, grid = create_record_based_metadata_task(
            synapse_client=syn,
            project_id="syn12345678",
            folder_id="syn87654321",
            record_set_name="BiospecimenMetadata_RecordSet",
            record_set_description="RecordSet for biospecimen metadata curation",
            curation_task_name="BiospecimenMetadataTemplate",
            upsert_keys=["specimenID"],
            instructions="Please curate this metadata according to the schema requirements",
            schema_uri="schema-org-schema.name.schema-v1.0.0"
        )
        ```

    Arguments:
        project_id: The Synapse ID of the project where the folder exists.
        folder_id: The Synapse ID of the folder to upload to.
        record_set_name: Name for the RecordSet.
        record_set_description: Description for the RecordSet.
        curation_task_name: Name for the CurationTask (used as data_type field).
            Must be unique within the project, otherwise if it matches an existing
            CurationTask, that task will be updated with new data.
        upsert_keys: List of column names to use as upsert keys.
        instructions: Instructions for the curation task.
        schema_uri: JSON schema URI for the RecordSet schema.
            (e.g., 'sage.schemas.v2571-amp.Biospecimen.schema-0.0.1', 'sage.schemas.v2571-ad.Analysis.schema-0.0.0')
        bind_schema_to_record_set: Whether to bind the given schema to the RecordSet
            (default: True).
        enable_derived_annotations: If true, enable derived annotations. Defaults to False.
        synapse_client: If not passed in and caching was not disabled by
                `Synapse.allow_client_caching(False)` this will use the last created
                instance from the Synapse class constructor.

    Returns:
        Tuple containing the created RecordSet, CurationTask, and Grid objects

    Raises:
        ValueError: If required parameters are missing or if schema_uri is not provided.
        SynapseError: If there are issues with Synapse operations.
    """
    # Validate required parameters
    if not project_id:
        raise ValueError("project_id is required")
    if not folder_id:
        raise ValueError("folder_id is required")
    if not record_set_name:
        raise ValueError("record_set_name is required")
    if not record_set_description:
        raise ValueError("record_set_description is required")
    if not curation_task_name:
        raise ValueError("curation_task_name is required")
    if not upsert_keys:
        raise ValueError("upsert_keys is required and must be a non-empty list")
    if not instructions:
        raise ValueError("instructions is required")
    if not schema_uri:
        raise ValueError("schema_uri is required")

    synapse_client = Synapse.get_client(synapse_client=synapse_client)

    template_df = extract_schema_properties_from_web(
        syn=synapse_client, schema_uri=schema_uri
    )
    synapse_client.logger.info(
        f"Extracted schema properties and created template: {template_df.columns.tolist()}"
    )

    tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".csv")
    try:
        with open(tmp.name, "w", encoding="utf-8") as f:
            template_df.to_csv(f, index=False)
    except Exception as e:
        synapse_client.logger.exception("Error writing template to temporary CSV file")
        raise e

    try:
        record_set_with_data = RecordSet(
            name=record_set_name,
            parent_id=folder_id,
            description=record_set_description,
            path=tmp.name,
            upsert_keys=upsert_keys,
        ).store(synapse_client=synapse_client)
        record_set_id = record_set_with_data.id
        synapse_client.logger.info(
            f"Created RecordSet with ID: {record_set_id} in folder {folder_id}"
        )

        if bind_schema_to_record_set:
            record_set_with_data.bind_schema(
                json_schema_uri=schema_uri,
                enable_derived_annotations=enable_derived_annotations,
                synapse_client=synapse_client,
            )
            synapse_client.logger.info(
                f"Bound schema {schema_uri} to RecordSet ID: {record_set_id}"
            )
    except Exception as e:
        synapse_client.logger.exception("Error creating RecordSet in Synapse")
        raise e

    try:
        curation_task = CurationTask(
            data_type=curation_task_name,
            project_id=project_id,
            instructions=instructions,
            task_properties=RecordBasedMetadataTaskProperties(
                record_set_id=record_set_id,
            ),
        ).store(synapse_client=synapse_client)
        synapse_client.logger.info(
            f"Created CurationTask ({curation_task.task_id}) with name {curation_task_name}"
        )
    except Exception as e:
        synapse_client.logger.error(f"Error creating CurationTask in Synapse: {e}")
        raise e

    try:
        curation_grid: Grid = Grid(
            record_set_id=record_set_id,
        )
        curation_grid.create(synapse_client=synapse_client)
        curation_grid = curation_grid.export_to_record_set(
            synapse_client=synapse_client
        )
        synapse_client.logger.info(
            f"Created Grid view for RecordSet ID: {record_set_id} for curation task {curation_task_name}"
        )
    except Exception as e:
        synapse_client.logger.exception("Error creating Grid view in Synapse")
        raise e

    return record_set_with_data, curation_task, curation_grid

generate_jsonld

generate_jsonld(schema: Any, data_model_labels: DisplayLabelType, output_jsonld: Optional[str], *, synapse_client: Optional[Synapse] = None) -> dict

Convert a CSV data model specification to JSON-LD format with validation and error checking.

This function parses your CSV data model (containing attributes, validation rules, dependencies, and valid values), converts it to a graph-based JSON-LD representation, validates the structure for common errors, and saves the result. The generated JSON-LD file serves as input for generate_jsonschema() and other data model operations.

Data Model Requirements:

Your CSV should include columns defining:

  • Attribute names: Property/attribute identifiers
  • Display names: Human-readable labels (optional but recommended)
  • Descriptions: Documentation for each attribute
  • Valid values: Allowed enum values for attributes (comma-separated)
  • Validation rules: Rules like list, regex, inRange, required, etc.
  • Dependencies: Relationships between attributes using dependsOn
  • Required status: Whether attributes are mandatory

Validation Checks Performed:

  • Ensures all required fields (like displayName) are present
  • Detects cycles in attribute dependencies (which would create invalid schemas)
  • Checks for blacklisted characters in display names that Synapse doesn't allow
  • Validates that attribute names don't conflict with reserved system names
  • Verifies the graph structure is a valid directed acyclic graph (DAG)
PARAMETER DESCRIPTION
schema

Path to your data model CSV file. This file should contain your complete data model specification with all attributes, validation rules, and relationships.

TYPE: Any

data_model_labels

Label format for the JSON-LD output:

  • "class_label" (default, recommended): Uses standard attribute names as labels
  • "display_label": Uses display names as labels if they contain no blacklisted characters (parentheses, periods, spaces, hyphens), otherwise falls back to class labels. Use cautiously as this can affect downstream compatibility.

TYPE: DisplayLabelType

output_jsonld

Path where the JSON-LD file will be saved. If None, saves alongside the input CSV with a .jsonld extension (e.g., model.csvmodel.jsonld).

TYPE: Optional[str]

synapse_client

Optional Synapse client instance for logging. If None, creates a new client instance. Use Synapse.get_client() or pass an authenticated client.

TYPE: Optional[Synapse] DEFAULT: None

Output:

The function logs validation errors and warnings to help you fix data model issues before generating JSON schemas. Errors indicate critical problems that must be fixed, while warnings suggest improvements but won't block schema generation.

RETURNS DESCRIPTION
dict

The generated data model as a dictionary in JSON-LD format. The same data is also saved to the file path specified in output_jsonld.

Using this function to generate JSONLD Schema files:

Basic usage with default output path:

from synapseclient import Synapse
from synapseclient.extensions.curator import generate_jsonld

syn = Synapse()
syn.login()

jsonld_model = generate_jsonld(
    schema="path/to/my_data_model.csv",
    data_model_labels="class_label",
    output_jsonld=None,  # Saves to my_data_model.jsonld
    synapse_client=syn
)

Specify custom output path:

jsonld_model = generate_jsonld(
    schema="models/patient_model.csv",
    data_model_labels="class_label",
    output_jsonld="~/output/patient_model_v1.jsonld",
    synapse_client=syn
)

Use display labels:

jsonld_model = generate_jsonld(
    schema="my_model.csv",
    data_model_labels="display_label",
    output_jsonld="my_model.jsonld",
    synapse_client=syn
)

Source code in synapseclient/extensions/curator/schema_generation.py
5452
5453
5454
5455
5456
5457
5458
5459
5460
5461
5462
5463
5464
5465
5466
5467
5468
5469
5470
5471
5472
5473
5474
5475
5476
5477
5478
5479
5480
5481
5482
5483
5484
5485
5486
5487
5488
5489
5490
5491
5492
5493
5494
5495
5496
5497
5498
5499
5500
5501
5502
5503
5504
5505
5506
5507
5508
5509
5510
5511
5512
5513
5514
5515
5516
5517
5518
5519
5520
5521
5522
5523
5524
5525
5526
5527
5528
5529
5530
5531
5532
5533
5534
5535
5536
5537
5538
5539
5540
5541
5542
5543
5544
5545
5546
5547
5548
5549
5550
5551
5552
5553
5554
5555
5556
5557
5558
5559
5560
5561
5562
5563
5564
5565
5566
5567
5568
5569
5570
5571
5572
5573
5574
5575
5576
5577
5578
5579
5580
5581
5582
5583
5584
5585
5586
5587
5588
5589
5590
5591
5592
5593
5594
5595
5596
5597
5598
5599
5600
5601
5602
5603
5604
5605
5606
5607
5608
5609
5610
5611
5612
5613
5614
5615
5616
5617
5618
5619
5620
5621
5622
5623
5624
def generate_jsonld(
    schema: Any,
    data_model_labels: DisplayLabelType,
    output_jsonld: Optional[str],
    *,
    synapse_client: Optional[Synapse] = None,
) -> dict:
    """
    Convert a CSV data model specification to JSON-LD format with validation and error checking.

    This function parses your CSV data model (containing attributes, validation rules,
    dependencies, and valid values), converts it to a graph-based JSON-LD representation,
    validates the structure for common errors, and saves the result. The generated JSON-LD
    file serves as input for `generate_jsonschema()` and other data model operations.

    **Data Model Requirements:**

    Your CSV should include columns defining:

    - **Attribute names**: Property/attribute identifiers
    - **Display names**: Human-readable labels (optional but recommended)
    - **Descriptions**: Documentation for each attribute
    - **Valid values**: Allowed enum values for attributes (comma-separated)
    - **Validation rules**: Rules like `list`, `regex`, `inRange`, `required`, etc.
    - **Dependencies**: Relationships between attributes using `dependsOn`
    - **Required status**: Whether attributes are mandatory

    **Validation Checks Performed:**

    - Ensures all required fields (like `displayName`) are present
    - Detects cycles in attribute dependencies (which would create invalid schemas)
    - Checks for blacklisted characters in display names that Synapse doesn't allow
    - Validates that attribute names don't conflict with reserved system names
    - Verifies the graph structure is a valid directed acyclic graph (DAG)

    Arguments:
        schema: Path to your data model CSV file. This file should contain your complete
            data model specification with all attributes, validation rules, and relationships.
        data_model_labels: Label format for the JSON-LD output:

            - `"class_label"` (default, recommended): Uses standard attribute names as labels
            - `"display_label"`: Uses display names as labels if they contain no blacklisted
              characters (parentheses, periods, spaces, hyphens), otherwise falls back to
              class labels. Use cautiously as this can affect downstream compatibility.
        output_jsonld: Path where the JSON-LD file will be saved. If None, saves alongside
            the input CSV with a `.jsonld` extension (e.g., `model.csv` → `model.jsonld`).
        synapse_client: Optional Synapse client instance for logging. If None, creates a
            new client instance. Use `Synapse.get_client()` or pass an authenticated client.

    **Output:**

    The function logs validation errors and warnings to help you fix data model issues
    before generating JSON schemas. Errors indicate critical problems that must be fixed,
    while warnings suggest improvements but won't block schema generation.

    Returns:
        The generated data model as a dictionary in JSON-LD format. The same data is
            also saved to the file path specified in `output_jsonld`.


    Example: Using this function to generate JSONLD Schema files:
        Basic usage with default output path:

        ```python
        from synapseclient import Synapse
        from synapseclient.extensions.curator import generate_jsonld

        syn = Synapse()
        syn.login()

        jsonld_model = generate_jsonld(
            schema="path/to/my_data_model.csv",
            data_model_labels="class_label",
            output_jsonld=None,  # Saves to my_data_model.jsonld
            synapse_client=syn
        )
        ```

        Specify custom output path:

        ```python
        jsonld_model = generate_jsonld(
            schema="models/patient_model.csv",
            data_model_labels="class_label",
            output_jsonld="~/output/patient_model_v1.jsonld",
            synapse_client=syn
        )
        ```

        Use display labels:
        ```python
        jsonld_model = generate_jsonld(
            schema="my_model.csv",
            data_model_labels="display_label",
            output_jsonld="my_model.jsonld",
            synapse_client=syn
        )
        ```
    """
    syn = Synapse.get_client(synapse_client=synapse_client)

    # Instantiate Parser
    data_model_parser = DataModelParser(path_to_data_model=schema, logger=syn.logger)

    # Parse Model
    syn.logger.info("Parsing data model.")
    parsed_data_model = data_model_parser.parse_model()

    # Convert parsed model to graph
    # Instantiate DataModelGraph
    data_model_grapher = DataModelGraph(
        parsed_data_model, data_model_labels, syn.logger
    )

    # Generate graphschema
    syn.logger.info("Generating data model graph.")
    graph_data_model = data_model_grapher.graph

    # Validate generated data model.
    syn.logger.info("Validating the data model internally.")
    data_model_validator = DataModelValidator(graph=graph_data_model, logger=syn.logger)
    data_model_errors, data_model_warnings = data_model_validator.run_checks()

    # If there are errors log them.
    if data_model_errors:
        for err in data_model_errors:
            if isinstance(err, str):
                syn.logger.error(err)
            elif isinstance(err, list):
                for error in err:
                    syn.logger.error(error)

    # If there are warnings log them.
    if data_model_warnings:
        for war in data_model_warnings:
            if isinstance(war, str):
                syn.logger.warning(war)
            elif isinstance(war, list):
                for warning in war:
                    syn.logger.warning(warning)

    syn.logger.info("Converting data model to JSON-LD")
    jsonld_data_model = convert_graph_to_jsonld(
        graph=graph_data_model, logger=syn.logger
    )

    # output JSON-LD file alongside CSV file by default, get path.
    if output_jsonld is None:
        if ".jsonld" not in schema:
            csv_no_ext = re.sub("[.]csv$", "", schema)
            output_jsonld = csv_no_ext + ".jsonld"
        else:
            output_jsonld = schema

        syn.logger.info(
            "By default, the JSON-LD output will be stored alongside the first "
            f"input CSV or JSON-LD file. In this case, it will appear here: '{output_jsonld}'. "
            "You can use the `--output_jsonld` argument to specify another file path."
        )

    # saving updated schema.org schema
    try:
        export_schema(
            schema=jsonld_data_model, file_path=output_jsonld, logger=syn.logger
        )
    except Exception:
        syn.logger.exception(
            (
                f"The Data Model could not be created by using '{output_jsonld}' location. "
                "Please check your file path again"
            )
        )
    return jsonld_data_model

generate_jsonschema

generate_jsonschema(data_model_source: str, output_directory: str, data_type: Optional[list[str]], data_model_labels: DisplayLabelType, synapse_client: Synapse) -> tuple[list[dict[str, Any]], list[str]]

Generate JSON Schema validation files from a data model with validation rules.

This function creates JSON Schema files that enforce validation rules defined in your CSV data model. The generated schemas can validate manifests for required fields, data types, valid values (enums), ranges, regex patterns, conditional dependencies, and more.

Validation Rules Supported:

  • Type validation: Enforces string, number, integer, or boolean types
  • Valid values: Creates enum constraints from valid values in the data model
  • Required fields: Marks attributes as required (can be component-specific)
  • Range validation: Translates inRange rules to min/max constraints
  • Pattern matching: Converts regex rules to JSON Schema patterns
  • Format validation: Applies date (ISO date) and url (URI) format constraints
  • Array validation: Handles list rules for array-type properties
  • Conditional dependencies: Creates if/then schemas for dependent attributes

Component-Based Rules: Rules can be applied selectively to specific components using the #Component syntax in your validation rules. This allows different validation behavior per manifest type.

PARAMETER DESCRIPTION
data_model_source

Path to the data model file (CSV or JSONLD) or URL to the raw JSONLD. Can accept:

  • A CSV file with your data model specification (will be parsed automatically)
  • A JSONLD file generated from generate_jsonld() or equivalent
  • A URL pointing to a raw JSONLD data model

TYPE: str

output_directory

Directory path where JSON Schema files will be saved. Each component will generate a separate <Component>_validation_schema.json file.

TYPE: str

data_type

List of specific component names (data types) to generate schemas for. If None, generates schemas for all components in the data model.

TYPE: Optional[list[str]]

data_model_labels

Label format for properties in the generated schema:

  • "class_label" (default): Uses standard attribute names as property keys
  • "display_label": Uses display names if valid (no blacklisted characters), otherwise falls back to class labels. Use with caution as display names may contain spaces or special characters.

TYPE: DisplayLabelType

synapse_client

Synapse client instance for logging. Use Synapse.get_client() or pass an existing authenticated client.

TYPE: Synapse

RETURNS DESCRIPTION
tuple[list[dict[str, Any]], list[str]]

tuple[list[dict[str, Any]], list[str]]: A tuple containing: - A list of JSON schema dictionaries, each corresponding to a component - A list of file paths where the schemas were written

Using this function to generate JSON Schema files:

Generate schemas from a CSV data model:

from synapseclient import Synapse
from synapseclient.extensions.curator import generate_jsonschema

syn = Synapse()
syn.login()

schemas, file_paths = generate_jsonschema(
    data_model_source="path/to/model.csv",
    output_directory="./schemas",
    data_type=None,  # All components
    data_model_labels="class_label",
    synapse_client=syn
)

Generate schemas from a JSONLD data model:

schemas, file_paths = generate_jsonschema(
    data_model_source="path/to/model.jsonld",
    output_directory="./schemas",
    data_type=None,  # All components
    data_model_labels="class_label",
    synapse_client=syn
)

Generate schema for specific components:

schemas, file_paths = generate_jsonschema(
    data_model_source="https://example.com/model.jsonld",
    output_directory="./validation_schemas",
    data_type=["Patient", "Biospecimen"],
    data_model_labels="class_label",
    synapse_client=syn
)
Source code in synapseclient/extensions/curator/schema_generation.py
5337
5338
5339
5340
5341
5342
5343
5344
5345
5346
5347
5348
5349
5350
5351
5352
5353
5354
5355
5356
5357
5358
5359
5360
5361
5362
5363
5364
5365
5366
5367
5368
5369
5370
5371
5372
5373
5374
5375
5376
5377
5378
5379
5380
5381
5382
5383
5384
5385
5386
5387
5388
5389
5390
5391
5392
5393
5394
5395
5396
5397
5398
5399
5400
5401
5402
5403
5404
5405
5406
5407
5408
5409
5410
5411
5412
5413
5414
5415
5416
5417
5418
5419
5420
5421
5422
5423
5424
5425
5426
5427
5428
5429
5430
5431
5432
5433
5434
5435
5436
5437
5438
5439
5440
5441
5442
5443
5444
5445
5446
5447
5448
5449
def generate_jsonschema(
    data_model_source: str,
    output_directory: str,
    data_type: Optional[list[str]],
    data_model_labels: DisplayLabelType,
    synapse_client: Synapse,
) -> tuple[list[dict[str, Any]], list[str]]:
    """
    Generate JSON Schema validation files from a data model with validation rules.

    This function creates JSON Schema files that enforce validation rules defined in your
    CSV data model. The generated schemas can validate manifests for required fields,
    data types, valid values (enums), ranges, regex patterns, conditional dependencies,
    and more.

    **Validation Rules Supported:**

    - **Type validation**: Enforces string, number, integer, or boolean types
    - **Valid values**: Creates enum constraints from valid values in the data model
    - **Required fields**: Marks attributes as required (can be component-specific)
    - **Range validation**: Translates `inRange` rules to min/max constraints
    - **Pattern matching**: Converts `regex` rules to JSON Schema patterns
    - **Format validation**: Applies `date` (ISO date) and `url` (URI) format constraints
    - **Array validation**: Handles `list` rules for array-type properties
    - **Conditional dependencies**: Creates `if/then` schemas for dependent attributes

    **Component-Based Rules:**
    Rules can be applied selectively to specific components using the `#Component` syntax
    in your validation rules. This allows different validation behavior per manifest type.

    Arguments:
        data_model_source: Path to the data model file (CSV or JSONLD) or URL to the raw
            JSONLD. Can accept:

            - A CSV file with your data model specification (will be parsed automatically)
            - A JSONLD file generated from `generate_jsonld()` or equivalent
            - A URL pointing to a raw JSONLD data model
        output_directory: Directory path where JSON Schema files will be saved. Each
            component will generate a separate `<Component>_validation_schema.json` file.
        data_type: List of specific component names (data types) to generate schemas for.
            If None, generates schemas for all components in the data model.
        data_model_labels: Label format for properties in the generated schema:

            - `"class_label"` (default): Uses standard attribute names as property keys
            - `"display_label"`: Uses display names if valid (no blacklisted characters),
              otherwise falls back to class labels. Use with caution as display names
              may contain spaces or special characters.
        synapse_client: Synapse client instance for logging. Use `Synapse.get_client()`
            or pass an existing authenticated client.

    Returns:
        tuple[list[dict[str, Any]], list[str]]: A tuple containing:
            - A list of JSON schema dictionaries, each corresponding to a component
            - A list of file paths where the schemas were written

    Example: Using this function to generate JSON Schema files:
        Generate schemas from a CSV data model:

        ```python
        from synapseclient import Synapse
        from synapseclient.extensions.curator import generate_jsonschema

        syn = Synapse()
        syn.login()

        schemas, file_paths = generate_jsonschema(
            data_model_source="path/to/model.csv",
            output_directory="./schemas",
            data_type=None,  # All components
            data_model_labels="class_label",
            synapse_client=syn
        )
        ```

        Generate schemas from a JSONLD data model:

        ```python
        schemas, file_paths = generate_jsonschema(
            data_model_source="path/to/model.jsonld",
            output_directory="./schemas",
            data_type=None,  # All components
            data_model_labels="class_label",
            synapse_client=syn
        )
        ```

        Generate schema for specific components:

        ```python
        schemas, file_paths = generate_jsonschema(
            data_model_source="https://example.com/model.jsonld",
            output_directory="./validation_schemas",
            data_type=["Patient", "Biospecimen"],
            data_model_labels="class_label",
            synapse_client=syn
        )
        ```
    """

    synapse_client = Synapse.get_client(synapse_client=synapse_client)

    generator = JsonSchemaGeneratorDirector(
        data_model_source=data_model_source,
        output_directory=output_directory,
        components=data_type,
        logger=synapse_client.logger,
    )

    schemas, file_paths = generator.generate_jsonschema(
        data_model_labels=data_model_labels
    )

    return schemas, file_paths

query_schema_registry

query_schema_registry(synapse_client: Optional[Synapse] = None, schema_registry_table_id: Optional[str] = None, column_config: Optional[SchemaRegistryColumnConfig] = None, return_latest_only: bool = True, **filters) -> Union[str, List[str], None]

Query the schema registry table to find schemas matching the provided filters.

This function searches the Synapse schema registry table for schemas that match the provided filter parameters. Results are sorted by version in descending order (newest first). The function supports any number of filter parameters as long as they are configured in the column_config.

PARAMETER DESCRIPTION
synapse_client

Optional authenticated Synapse client instance

TYPE: Optional[Synapse] DEFAULT: None

schema_registry_table_id

Optional Synapse ID of the schema registry table. If None, uses the default table ID.

TYPE: Optional[str] DEFAULT: None

column_config

Optional configuration for custom column names. If None, uses default configuration ('version' and 'uri' columns).

TYPE: Optional[SchemaRegistryColumnConfig] DEFAULT: None

return_latest_only

If True (default), returns only the latest URI as a string. If False, returns all matching URIs as a list of strings.

TYPE: bool DEFAULT: True

**filters

Filter parameters to search for matching schemas. These work as follows:

  Column-Based Filtering:
  - Any column name in the schema registry table can be used as a filter
  - Pass column names directly as keyword arguments
  - Common filters: dcc, datatype, version, uri
  - Any additional columns in your table can be used

  Filter Values:
  - Exact matching: Use plain strings (e.g., dcc="ad")
  - Pattern matching: Use SQL LIKE patterns with wildcards:
    * % = any sequence of characters
  - Examples:
    * dcc="ad" → matches exactly "ad"
    * datatype="%spec%" → matches any datatype containing "spec"

  Filter Logic:
  - Multiple filters are combined with AND (all must match)
  - At least one filter must be provided

DEFAULT: {}

RETURNS DESCRIPTION
Union[str, List[str], None]

If return_latest_only is True: Single URI string of the latest version, or None if not found

Union[str, List[str], None]

If return_latest_only is False: List of URI strings sorted by version (highest version first)

RAISES DESCRIPTION
ValueError

If no filter parameters are provided

Expected Table Structure

The schema registry table should contain columns for:

  • Schema version for sorting (default: 'version')
  • JSON schema URI (default: 'uri')
  • Any filterable columns as configured in column_config

Additional columns may be present and will be included in results.

Comprehensive filter usage demonstrations

This includes several examples of how to use the filtering system.

Basic Filtering (using default filters):

from synapseclient import Synapse
from synapseclient.extensions.curator import query_schema_registry

syn = Synapse()
syn.login()

# 1. Get latest schema URI for a specific DCC and datatype
latest_uri = query_schema_registry(
    synapse_client=syn,
    dcc="ad",  # Exact match for Alzheimer's Disease DCC
    datatype="Analysis"  # Exact datatype match
)
# Returns: "sage.schemas.v2571-ad.Analysis.schema-0.0.0"

# 2. Get all versions of matching schemas (not just latest)
all_versions = query_schema_registry(
    synapse_client=syn,
    dcc="mc2",
    datatype="Biospecimen",
    return_latest_only=False
)
# Returns: ["MultiConsortiaCoordinatingCenter-Biospecimen-12.0.0",
#           "sage.schemas.v2571-mc2.Biospecimen.schema-9.0.0"]

# 3. Pattern matching with wildcards
# Find all "Biospecimen" schemas across all DCCs
biospecimen_schemas = query_schema_registry(
    synapse_client=syn,
    datatype="Biospecimen",  # Exact match for Biospecimen
    return_latest_only=False
)
# Returns: ["MultiConsortiaCoordinatingCenter-Biospecimen-12.0.0",
#           "sage.schemas.v2571-mc2.Biospecimen.schema-9.0.0",
#           "sage.schemas.v2571-veo.Biospecimen.schema-0.3.0",
#           "sage.schemas.v2571-amp.Biospecimen.schema-0.0.1"]

# 4. Pattern matching for DCC variations
mc2_schemas = query_schema_registry(
    synapse_client=syn,
    dcc="%C2",  # Matches 'mc2' and 'MC2'
    return_latest_only=False
)
# Returns schemas from both 'mc2' and 'MC2' DCCs

# 5. Using additional columns for filtering (if they exist in your table)
specific_schemas = query_schema_registry(
    synapse_client=syn,
    dcc="amp",  # Must be AMP DCC
    org="sage.schemas.v2571",  # Must match organization
    return_latest_only=False
)
# Returns schemas that match BOTH conditions

Direct Column Filtering (simplified approach):

# Any column in the schema registry table can be used for filtering
# Just use the column name directly as a keyword argument

# Basic filters using standard columns
query_schema_registry(dcc="ad", datatype="Analysis")
query_schema_registry(version="0.0.0")
query_schema_registry(uri="sage.schemas.v2571-ad.Analysis.schema-0.0.0")

# Additional columns (if they exist in your table)
query_schema_registry(org="sage.schemas.v2571")
query_schema_registry(name="ad.Analysis.schema")

# Multiple column filters (all must match)
query_schema_registry(
    dcc="mc2",
    datatype="Biospecimen",
    org="MultiConsortiaCoordinatingCenter"
)

Filter Value Examples with Real Data:

# Exact matching
query_schema_registry(dcc="ad")                   # Returns schemas with dcc="ad"
query_schema_registry(datatype="Biospecimen")     # Returns schemas with datatype="Biospecimen"
query_schema_registry(dcc="MC2")                  # Returns schemas with dcc="MC2" (case sensitive)

# Pattern matching with wildcards
query_schema_registry(dcc="%C2")                   # Matches "mc2", "MC2"
query_schema_registry(datatype="%spec%")           # Matches "Biospecimen"

# Examples with expected results:
query_schema_registry(dcc="ad", datatype="Analysis")
# Returns: "sage.schemas.v2571-ad.Analysis.schema-0.0.0"

query_schema_registry(datatype="Biospecimen", return_latest_only=False)
# Returns: ["MultiConsortiaCoordinatingCenter-Biospecimen-12.0.0",
#           "sage.schemas.v2571-mc2.Biospecimen.schema-9.0.0", ...]

# Multiple conditions (all must be true)
query_schema_registry(
    dcc="amp",             # AND
    datatype="Biospecimen", # AND
    org="sage.schemas.v2571"  # AND (if org column exists)
)
# Returns: ["sage.schemas.v2571-amp.Biospecimen.schema-0.0.1"]

Source code in synapseclient/extensions/curator/schema_registry.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
def query_schema_registry(
    synapse_client: Optional[Synapse] = None,
    schema_registry_table_id: Optional[str] = None,
    column_config: Optional[SchemaRegistryColumnConfig] = None,
    return_latest_only: bool = True,
    **filters,
) -> Union[str, List[str], None]:
    """
    Query the schema registry table to find schemas matching the provided filters.

    This function searches the Synapse schema registry table for schemas that match
    the provided filter parameters. Results are sorted by version in descending order
    (newest first). The function supports any number of filter parameters as long as
    they are configured in the column_config.

    Arguments:
        synapse_client: Optional authenticated Synapse client instance
        schema_registry_table_id: Optional Synapse ID of the schema registry table.
                                  If None, uses the default table ID.
        column_config: Optional configuration for custom column names.
                      If None, uses default configuration ('version' and 'uri' columns).
        return_latest_only: If True (default), returns only the latest URI as a string.
                           If False, returns all matching URIs as a list of strings.
        **filters: Filter parameters to search for matching schemas. These work as follows:

                  Column-Based Filtering:
                  - Any column name in the schema registry table can be used as a filter
                  - Pass column names directly as keyword arguments
                  - Common filters: dcc, datatype, version, uri
                  - Any additional columns in your table can be used

                  Filter Values:
                  - Exact matching: Use plain strings (e.g., dcc="ad")
                  - Pattern matching: Use SQL LIKE patterns with wildcards:
                    * % = any sequence of characters
                  - Examples:
                    * dcc="ad" → matches exactly "ad"
                    * datatype="%spec%" → matches any datatype containing "spec"

                  Filter Logic:
                  - Multiple filters are combined with AND (all must match)
                  - At least one filter must be provided

    Returns:
        If return_latest_only is True: Single URI string of the latest version, or None if not found
        If return_latest_only is False: List of URI strings sorted by version (highest version first)

    Raises:
        ValueError: If no filter parameters are provided

    Expected Table Structure:
        The schema registry table should contain columns for:

        - Schema version for sorting (default: 'version')
        - JSON schema URI (default: 'uri')
        - Any filterable columns as configured in column_config

        Additional columns may be present and will be included in results.

    Example: Comprehensive filter usage demonstrations
        This includes several examples of how to use the filtering system.

        Basic Filtering (using default filters):
        ```python
        from synapseclient import Synapse
        from synapseclient.extensions.curator import query_schema_registry

        syn = Synapse()
        syn.login()

        # 1. Get latest schema URI for a specific DCC and datatype
        latest_uri = query_schema_registry(
            synapse_client=syn,
            dcc="ad",  # Exact match for Alzheimer's Disease DCC
            datatype="Analysis"  # Exact datatype match
        )
        # Returns: "sage.schemas.v2571-ad.Analysis.schema-0.0.0"

        # 2. Get all versions of matching schemas (not just latest)
        all_versions = query_schema_registry(
            synapse_client=syn,
            dcc="mc2",
            datatype="Biospecimen",
            return_latest_only=False
        )
        # Returns: ["MultiConsortiaCoordinatingCenter-Biospecimen-12.0.0",
        #           "sage.schemas.v2571-mc2.Biospecimen.schema-9.0.0"]

        # 3. Pattern matching with wildcards
        # Find all "Biospecimen" schemas across all DCCs
        biospecimen_schemas = query_schema_registry(
            synapse_client=syn,
            datatype="Biospecimen",  # Exact match for Biospecimen
            return_latest_only=False
        )
        # Returns: ["MultiConsortiaCoordinatingCenter-Biospecimen-12.0.0",
        #           "sage.schemas.v2571-mc2.Biospecimen.schema-9.0.0",
        #           "sage.schemas.v2571-veo.Biospecimen.schema-0.3.0",
        #           "sage.schemas.v2571-amp.Biospecimen.schema-0.0.1"]

        # 4. Pattern matching for DCC variations
        mc2_schemas = query_schema_registry(
            synapse_client=syn,
            dcc="%C2",  # Matches 'mc2' and 'MC2'
            return_latest_only=False
        )
        # Returns schemas from both 'mc2' and 'MC2' DCCs

        # 5. Using additional columns for filtering (if they exist in your table)
        specific_schemas = query_schema_registry(
            synapse_client=syn,
            dcc="amp",  # Must be AMP DCC
            org="sage.schemas.v2571",  # Must match organization
            return_latest_only=False
        )
        # Returns schemas that match BOTH conditions
        ```

        Direct Column Filtering (simplified approach):
        ```python
        # Any column in the schema registry table can be used for filtering
        # Just use the column name directly as a keyword argument

        # Basic filters using standard columns
        query_schema_registry(dcc="ad", datatype="Analysis")
        query_schema_registry(version="0.0.0")
        query_schema_registry(uri="sage.schemas.v2571-ad.Analysis.schema-0.0.0")

        # Additional columns (if they exist in your table)
        query_schema_registry(org="sage.schemas.v2571")
        query_schema_registry(name="ad.Analysis.schema")

        # Multiple column filters (all must match)
        query_schema_registry(
            dcc="mc2",
            datatype="Biospecimen",
            org="MultiConsortiaCoordinatingCenter"
        )
        ```

        Filter Value Examples with Real Data:
        ```python
        # Exact matching
        query_schema_registry(dcc="ad")                   # Returns schemas with dcc="ad"
        query_schema_registry(datatype="Biospecimen")     # Returns schemas with datatype="Biospecimen"
        query_schema_registry(dcc="MC2")                  # Returns schemas with dcc="MC2" (case sensitive)

        # Pattern matching with wildcards
        query_schema_registry(dcc="%C2")                   # Matches "mc2", "MC2"
        query_schema_registry(datatype="%spec%")           # Matches "Biospecimen"

        # Examples with expected results:
        query_schema_registry(dcc="ad", datatype="Analysis")
        # Returns: "sage.schemas.v2571-ad.Analysis.schema-0.0.0"

        query_schema_registry(datatype="Biospecimen", return_latest_only=False)
        # Returns: ["MultiConsortiaCoordinatingCenter-Biospecimen-12.0.0",
        #           "sage.schemas.v2571-mc2.Biospecimen.schema-9.0.0", ...]

        # Multiple conditions (all must be true)
        query_schema_registry(
            dcc="amp",             # AND
            datatype="Biospecimen", # AND
            org="sage.schemas.v2571"  # AND (if org column exists)
        )
        # Returns: ["sage.schemas.v2571-amp.Biospecimen.schema-0.0.1"]
        ```
    """
    syn = Synapse.get_client(synapse_client=synapse_client)
    logger = syn.logger

    # Use provided table ID or default
    table_id = (
        schema_registry_table_id
        if schema_registry_table_id
        else SCHEMA_REGISTRY_TABLE_ID
    )

    # Use provided column config or default
    if column_config is None:
        column_config = SchemaRegistryColumnConfig()

    # Validate that we have at least one filter
    if not filters:
        raise ValueError("At least one filter parameter must be provided")

    # Build WHERE clause from filters using column names directly
    where_conditions = []
    for column_name, filter_value in filters.items():
        # Check if the value contains SQL wildcards (% or _)
        if isinstance(filter_value, str) and (
            "%" in filter_value or "_" in filter_value
        ):
            # Use LIKE for pattern matching
            where_conditions.append(f"{column_name} LIKE '{filter_value}'")
        else:
            # Use exact match
            where_conditions.append(f"{column_name} = '{filter_value}'")

    where_clause = " AND ".join(where_conditions)

    # Construct SQL query using configurable column names
    # Results are sorted by version in descending order (newest first)
    query = f"""
    SELECT * FROM {table_id}
    WHERE {where_clause}
    ORDER BY {column_config.version_column} DESC
    """

    # Create a readable filter summary for logging
    filter_summary = ", ".join([f"{k}='{v}'" for k, v in filters.items()])

    logger.info(f"Querying schema registry with filters: {filter_summary}")
    logger.info(f"Using table: {table_id}")
    logger.info(f"SQL Query: {query}")

    # Query the table and get results as a pandas DataFrame
    table = Table(id=table_id)
    results_df = table.query(query=query, synapse_client=syn)

    if results_df.empty:
        logger.info(f"No schemas found matching filters: {filter_summary}")
        return None if return_latest_only else []

    # Extract URIs from the results and return as a list of strings
    uri_list = results_df[column_config.uri_column].tolist()

    logger.info(f"Found {len(uri_list)} matching schema(s):")
    for i, uri in enumerate(uri_list, 1):
        logger.info(f"  {i}. URI: {uri}")

    if return_latest_only:
        return uri_list[0] if uri_list else None
    else:
        return uri_list