aind_data_transfer_service.models package

Submodules

aind_data_transfer_service.models.core module

Core models for using V2 of aind-data-transfer-service

class aind_data_transfer_service.models.core.SubmitJobRequestV2(*, dag_id: Literal['transform_and_upload_v2'] = 'transform_and_upload_v2', user_email: EmailStr | None = None, email_notification_types: Set[Literal['begin', 'end', 'fail', 'retry', 'all']] = {'fail'}, upload_jobs: List[UploadJobConfigsV2])

Bases: BaseSettings

Main request that will be sent to the backend. Bundles jobs into a list and allows a user to add an email address to receive notifications.

check_duplicate_upload_jobs(info: ValidationInfo)

Validate that there are no duplicate upload jobs. If a list of current jobs is provided in a context manager, jobs are also checked against the list.

dag_id: Literal['transform_and_upload_v2']
email_notification_types: Set[Literal['begin', 'end', 'fail', 'retry', 'all']]
model_config: ClassVar[SettingsConfigDict] = {'arbitrary_types_allowed': True, 'case_sensitive': False, 'cli_avoid_json': False, 'cli_enforce_required': False, 'cli_exit_on_error': True, 'cli_flag_prefix_char': '-', 'cli_hide_none_type': False, 'cli_ignore_unknown_args': False, 'cli_implicit_flags': False, 'cli_kebab_case': False, 'cli_parse_args': None, 'cli_parse_none_str': None, 'cli_prefix': '', 'cli_prog_name': None, 'cli_shortcuts': None, 'cli_use_class_docs_for_groups': False, 'enable_decoding': True, 'env_file': None, 'env_file_encoding': None, 'env_ignore_empty': False, 'env_nested_delimiter': None, 'env_nested_max_split': None, 'env_parse_enums': None, 'env_parse_none_str': None, 'env_prefix': '', 'env_prefix_target': 'variable', 'extra': 'ignore', 'json_file': None, 'json_file_encoding': None, 'nested_model_default_partial_update': False, 'protected_namespaces': ('model_validate', 'model_dump', 'settings_customise_sources'), 'secrets_dir': None, 'toml_file': None, 'use_enum_values': True, 'validate_default': True, 'yaml_config_section': None, 'yaml_file': None, 'yaml_file_encoding': None}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

propagate_email_settings()

Propagate email settings from global to individual jobs

upload_jobs: List[UploadJobConfigsV2]
user_email: EmailStr | None
class aind_data_transfer_service.models.core.Task(*, skip_task: bool = False, image: str | None = None, image_version: str | None = None, image_resources: Dict[str, Any] | None = None, job_settings: Dict[str, Any] | None = None, command_script: str | None = None)

Bases: BaseModel

Configuration for a task run during a data transfer upload job.

command_script: str | None
image: str | None
image_resources: Dict[str, Any] | None
image_version: str | None
job_settings: Dict[str, Any] | None
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

skip_task: bool
classmethod validate_json_serializable(v: Dict[str, Any] | None, info: ValidationInfo) Dict[str, Any] | None

Validate that fields are json serializable.

class aind_data_transfer_service.models.core.UploadJobConfigsV2(*, job_type: str, user_email: EmailStr | None = None, email_notification_types: Set[Literal['begin', 'end', 'fail', 'retry', 'all']] | None = None, s3_bucket: Literal['private', 'open', 'default'] = 'default', project_name: str, platform: _Behavior | _Confocal | _Ecephys | _Exaspim | _Fip | _Hcr | _Hsfp | _Isi | _Merfish | _Mri | _Mesospim | _Motor_Observatory | _Multiplane_Ophys | _Slap2 | _Single_Plane_Ophys | _Smartspim | None = None, modalities: List[_Barseq | _Behavior | _Behavior_Videos | _Brightfield | _Confocal | _Emg | _Em | _Ecephys | _Fib | _Fmost | _Icephys | _Isi | _Mri | _Mapseq | _Merfish | _Pophys | _Slap2 | _Spim | _Stpt | _Scrnaseq], subject_id: str, acq_datetime: datetime, tasks: Dict[str, Task | Dict[str, Task]])

Bases: BaseSettings

Configuration for a data transfer upload job

acq_datetime: datetime
email_notification_types: Set[Literal['begin', 'end', 'fail', 'retry', 'all']] | None
job_type: str
modalities: List[_Barseq | _Behavior | _Behavior_Videos | _Brightfield | _Confocal | _Emg | _Em | _Ecephys | _Fib | _Fmost | _Icephys | _Isi | _Mri | _Mapseq | _Merfish | _Pophys | _Slap2 | _Spim | _Stpt | _Scrnaseq]
model_config: ClassVar[SettingsConfigDict] = {'arbitrary_types_allowed': True, 'case_sensitive': False, 'cli_avoid_json': False, 'cli_enforce_required': False, 'cli_exit_on_error': True, 'cli_flag_prefix_char': '-', 'cli_hide_none_type': False, 'cli_ignore_unknown_args': False, 'cli_implicit_flags': False, 'cli_kebab_case': False, 'cli_parse_args': None, 'cli_parse_none_str': None, 'cli_prefix': '', 'cli_prog_name': None, 'cli_shortcuts': None, 'cli_use_class_docs_for_groups': False, 'enable_decoding': True, 'env_file': None, 'env_file_encoding': None, 'env_ignore_empty': False, 'env_nested_delimiter': None, 'env_nested_max_split': None, 'env_parse_enums': None, 'env_parse_none_str': None, 'env_prefix': '', 'env_prefix_target': 'variable', 'extra': 'ignore', 'json_file': None, 'json_file_encoding': None, 'nested_model_default_partial_update': False, 'protected_namespaces': ('model_validate', 'model_dump', 'settings_customise_sources'), 'secrets_dir': None, 'toml_file': None, 'use_enum_values': True, 'validate_default': True, 'yaml_config_section': None, 'yaml_file': None, 'yaml_file_encoding': None}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

platform: _Behavior | _Confocal | _Ecephys | _Exaspim | _Fip | _Hcr | _Hsfp | _Isi | _Merfish | _Mri | _Mesospim | _Motor_Observatory | _Multiplane_Ophys | _Slap2 | _Single_Plane_Ophys | _Smartspim | None
project_name: str
s3_bucket: Literal['private', 'open', 'default']
property s3_prefix: str

Construct s3_prefix from configs.

subject_id: str
tasks: Dict[str, Task | Dict[str, Task]]
user_email: EmailStr | None
classmethod validate_platform(v)

For backwards compatibility, allow a user to input an aind-data-schema-model platform and then convert it.

classmethod validate_tasks(v: Dict[str, Task | Dict[str, Task]]) Dict[str, Task | Dict[str, Task]]

Validate that modality-specific tasks are keyed by valid modality abbreviations.

classmethod validate_with_context(v: str, info: ValidationInfo) str

Validate certain fields. If a list of accepted values is provided in a context manager, then it will validate against the list. Otherwise, it won’t raise any validation error.

Parameters:
  • v (str) – Value input into the field.

  • info (ValidationInfo)

Return type:

str

aind_data_transfer_service.models.core.validation_context(context: Dict[str, Any] | None) None

Following guide in: https://docs.pydantic.dev/latest/concepts/validators/#validation-context :param context: :type context: Union[Dict[str, Any], None]

Return type:

None

aind_data_transfer_service.models.internal module

Module for internal data models used in application

class aind_data_transfer_service.models.internal.AirflowDagRun(*, conf: dict | None, dag_id: str | None, dag_run_id: str | None, data_interval_end: AwareDatetime | None, data_interval_start: AwareDatetime | None, end_date: AwareDatetime | None, execution_date: AwareDatetime | None, external_trigger: bool | None, last_scheduling_decision: AwareDatetime | None, logical_date: AwareDatetime | None, note: str | None, run_type: str | None, start_date: AwareDatetime | None, state: str | None)

Bases: BaseModel

Data model for dag_run entry when requesting info from airflow

conf: dict | None
dag_id: str | None
dag_run_id: str | None
data_interval_end: AwareDatetime | None
data_interval_start: AwareDatetime | None
end_date: AwareDatetime | None
execution_date: AwareDatetime | None
external_trigger: bool | None
last_scheduling_decision: AwareDatetime | None
logical_date: AwareDatetime | None
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

note: str | None
run_type: str | None
start_date: AwareDatetime | None
state: str | None
class aind_data_transfer_service.models.internal.AirflowDagRunsRequestParameters(*, dag_ids: list[str] = ['transform_and_upload', 'transform_and_upload_v2'], page_limit: int = 100, page_offset: int = 0, states: list[str] | None = [], execution_date_gte: str | None = '2026-05-20T22:57:07.350133+00:00', execution_date_lte: str | None = None, order_by: str = '-execution_date')

Bases: BaseModel

Model for parameters when requesting info from dag_runs endpoint

dag_ids: list[str]
execution_date_gte: str | None
execution_date_lte: str | None
classmethod from_query_params(query_params: QueryParams)

Maps the query parameters to the model

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

order_by: str
page_limit: int
page_offset: int
states: list[str] | None
classmethod validate_min_execution_date(execution_date_gte: str) str

Validate the earliest submit date filter is within 2 weeks

class aind_data_transfer_service.models.internal.AirflowDagRunsResponse(*, dag_runs: List[AirflowDagRun], total_entries: int)

Bases: BaseModel

Data model for response when requesting info from dag_runs endpoint

dag_runs: List[AirflowDagRun]
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

total_entries: int
class aind_data_transfer_service.models.internal.AirflowTaskInstance(*, dag_id: str | None, dag_run_id: str | None, duration: int | float | None, end_date: AwareDatetime | None, execution_date: AwareDatetime | None, executor_config: str | None, hostname: str | None, map_index: int | None, max_tries: int | None, note: str | None, operator: str | None, pid: int | None, pool: str | None, pool_slots: int | None, priority_weight: int | None, queue: str | None, queued_when: AwareDatetime | None, rendered_fields: dict | None, sla_miss: dict | None, start_date: AwareDatetime | None, state: str | None, task_id: str | None, trigger: dict | None, triggerer_job: dict | None, try_number: int | None, unixname: str | None)

Bases: BaseModel

Data model for task_instance entry when requesting info from airflow

dag_id: str | None
dag_run_id: str | None
duration: int | float | None
end_date: AwareDatetime | None
execution_date: AwareDatetime | None
executor_config: str | None
hostname: str | None
map_index: int | None
max_tries: int | None
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

note: str | None
operator: str | None
pid: int | None
pool: str | None
pool_slots: int | None
priority_weight: int | None
queue: str | None
queued_when: AwareDatetime | None
rendered_fields: dict | None
sla_miss: dict | None
start_date: AwareDatetime | None
state: str | None
task_id: str | None
trigger: dict | None
triggerer_job: dict | None
try_number: int | None
unixname: str | None
class aind_data_transfer_service.models.internal.AirflowTaskInstanceLogsRequestParameters(*, dag_id: str, dag_run_id: str, task_id: str, try_number: int, map_index: int, full_content: bool = True)

Bases: BaseModel

Model for parameters when requesting info from task_instance_logs endpoint

dag_id: str
dag_run_id: str
classmethod from_query_params(query_params: QueryParams)

Maps the query parameters to the model

full_content: bool
map_index: int
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

task_id: str
try_number: int
class aind_data_transfer_service.models.internal.AirflowTaskInstancesRequestParameters(*, dag_id: str, dag_run_id: str)

Bases: BaseModel

Model for parameters when requesting info from task_instances endpoint

dag_id: str
dag_run_id: str
classmethod from_query_params(query_params: QueryParams)

Maps the query parameters to the model

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class aind_data_transfer_service.models.internal.AirflowTaskInstancesResponse(*, task_instances: List[AirflowTaskInstance], total_entries: int)

Bases: BaseModel

Data model for response when requesting info from task_instances endpoint

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

task_instances: List[AirflowTaskInstance]
total_entries: int
class aind_data_transfer_service.models.internal.JobParamInfo(*, name: str | None, last_modified: datetime | None, job_type: str, task_id: str, modality: str | None, version: str | None)

Bases: BaseModel

Model for job parameter info from AWS Parameter Store

classmethod from_aws_describe_parameter(parameter: ParameterMetadataTypeDef, job_type: str, task_id: str, modality: str | None, version: str | None)

Map the parameter to the model

static get_parameter_name(job_type: str, task_id: str, modality: str | None, version: str | None = None) str

Create the parameter name from job_type and task_id

static get_parameter_prefix(version: str | None = None) str

Get the prefix for job_type parameters

static get_parameter_regex(version: str | None = None) str

Create the regex pattern to match the parameter name

job_type: str
last_modified: datetime | None
modality: str | None
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

name: str | None
task_id: str
version: str | None
class aind_data_transfer_service.models.internal.JobStatus(*, dag_id: str | None = None, end_time: datetime | None = None, job_id: str | None = None, job_state: str | None = None, name: str | None = None, job_type: str | None = None, comment: str | None = None, start_time: datetime | None = None, submit_time: datetime | None = None)

Bases: BaseModel

Model for what we want to render to the user.

comment: str | None
dag_id: str | None
end_time: datetime | None
classmethod from_airflow_dag_run(airflow_dag_run: AirflowDagRun)

Maps the fields from an AirflowDagRun to this model

property jinja_dict: dict

Map model to a dictionary that jinja can render

job_id: str | None
job_state: str | None
job_type: str | None
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

name: str | None
start_time: datetime | None
submit_time: datetime | None
class aind_data_transfer_service.models.internal.JobTasks(*, dag_id: str | None = None, job_id: str | None = None, task_id: str | None = None, try_number: int | None = None, task_state: str | None = None, priority_weight: int | None = None, map_index: int | None = None, submit_time: datetime | None = None, start_time: datetime | None = None, end_time: datetime | None = None, duration: int | float | None = None, comment: str | None = None)

Bases: BaseModel

Model for what is rendered to the user for each task.

comment: str | None
dag_id: str | None
duration: int | float | None
end_time: datetime | None
classmethod from_airflow_task_instance(airflow_task_instance: AirflowTaskInstance)

Maps the fields from an AirflowTaskInstance to this model

job_id: str | None
map_index: int | None
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

priority_weight: int | None
start_time: datetime | None
submit_time: datetime | None
task_id: str | None
task_state: str | None
try_number: int | None

Module contents

Package for models