aind_data_transfer_service.models package¶
Submodules¶
aind_data_transfer_service.models.core module¶
Core models for using V2 of aind-data-transfer-service
- class aind_data_transfer_service.models.core.SubmitJobRequestV2(*, dag_id: Literal['transform_and_upload_v2'] = 'transform_and_upload_v2', user_email: EmailStr | None = None, email_notification_types: Set[Literal['begin', 'end', 'fail', 'retry', 'all']] = {'fail'}, upload_jobs: List[UploadJobConfigsV2])¶
Bases:
BaseSettingsMain request that will be sent to the backend. Bundles jobs into a list and allows a user to add an email address to receive notifications.
- check_duplicate_upload_jobs(info: ValidationInfo)¶
Validate that there are no duplicate upload jobs. If a list of current jobs is provided in a context manager, jobs are also checked against the list.
- dag_id: Literal['transform_and_upload_v2']¶
- email_notification_types: Set[Literal['begin', 'end', 'fail', 'retry', 'all']]¶
- model_config: ClassVar[SettingsConfigDict] = {'arbitrary_types_allowed': True, 'case_sensitive': False, 'cli_avoid_json': False, 'cli_enforce_required': False, 'cli_exit_on_error': True, 'cli_flag_prefix_char': '-', 'cli_hide_none_type': False, 'cli_ignore_unknown_args': False, 'cli_implicit_flags': False, 'cli_kebab_case': False, 'cli_parse_args': None, 'cli_parse_none_str': None, 'cli_prefix': '', 'cli_prog_name': None, 'cli_shortcuts': None, 'cli_use_class_docs_for_groups': False, 'enable_decoding': True, 'env_file': None, 'env_file_encoding': None, 'env_ignore_empty': False, 'env_nested_delimiter': None, 'env_nested_max_split': None, 'env_parse_enums': None, 'env_parse_none_str': None, 'env_prefix': '', 'env_prefix_target': 'variable', 'extra': 'ignore', 'json_file': None, 'json_file_encoding': None, 'nested_model_default_partial_update': False, 'protected_namespaces': ('model_validate', 'model_dump', 'settings_customise_sources'), 'secrets_dir': None, 'toml_file': None, 'use_enum_values': True, 'validate_default': True, 'yaml_config_section': None, 'yaml_file': None, 'yaml_file_encoding': None}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- propagate_email_settings()¶
Propagate email settings from global to individual jobs
- upload_jobs: List[UploadJobConfigsV2]¶
- user_email: EmailStr | None¶
- class aind_data_transfer_service.models.core.Task(*, skip_task: bool = False, image: str | None = None, image_version: str | None = None, image_resources: Dict[str, Any] | None = None, job_settings: Dict[str, Any] | None = None, command_script: str | None = None)¶
Bases:
BaseModelConfiguration for a task run during a data transfer upload job.
- command_script: str | None¶
- image: str | None¶
- image_resources: Dict[str, Any] | None¶
- image_version: str | None¶
- job_settings: Dict[str, Any] | None¶
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- skip_task: bool¶
- classmethod validate_json_serializable(v: Dict[str, Any] | None, info: ValidationInfo) Dict[str, Any] | None¶
Validate that fields are json serializable.
- class aind_data_transfer_service.models.core.UploadJobConfigsV2(*, job_type: str, user_email: EmailStr | None = None, email_notification_types: Set[Literal['begin', 'end', 'fail', 'retry', 'all']] | None = None, s3_bucket: Literal['private', 'open', 'default'] = 'default', project_name: str, platform: _Behavior | _Confocal | _Ecephys | _Exaspim | _Fip | _Hcr | _Hsfp | _Isi | _Merfish | _Mri | _Mesospim | _Motor_Observatory | _Multiplane_Ophys | _Slap2 | _Single_Plane_Ophys | _Smartspim | None = None, modalities: List[_Barseq | _Behavior | _Behavior_Videos | _Brightfield | _Confocal | _Emg | _Em | _Ecephys | _Fib | _Fmost | _Icephys | _Isi | _Mri | _Mapseq | _Merfish | _Pophys | _Slap2 | _Spim | _Stpt | _Scrnaseq], subject_id: str, acq_datetime: datetime, tasks: Dict[str, Task | Dict[str, Task]])¶
Bases:
BaseSettingsConfiguration for a data transfer upload job
- acq_datetime: datetime¶
- email_notification_types: Set[Literal['begin', 'end', 'fail', 'retry', 'all']] | None¶
- job_type: str¶
- modalities: List[_Barseq | _Behavior | _Behavior_Videos | _Brightfield | _Confocal | _Emg | _Em | _Ecephys | _Fib | _Fmost | _Icephys | _Isi | _Mri | _Mapseq | _Merfish | _Pophys | _Slap2 | _Spim | _Stpt | _Scrnaseq]¶
- model_config: ClassVar[SettingsConfigDict] = {'arbitrary_types_allowed': True, 'case_sensitive': False, 'cli_avoid_json': False, 'cli_enforce_required': False, 'cli_exit_on_error': True, 'cli_flag_prefix_char': '-', 'cli_hide_none_type': False, 'cli_ignore_unknown_args': False, 'cli_implicit_flags': False, 'cli_kebab_case': False, 'cli_parse_args': None, 'cli_parse_none_str': None, 'cli_prefix': '', 'cli_prog_name': None, 'cli_shortcuts': None, 'cli_use_class_docs_for_groups': False, 'enable_decoding': True, 'env_file': None, 'env_file_encoding': None, 'env_ignore_empty': False, 'env_nested_delimiter': None, 'env_nested_max_split': None, 'env_parse_enums': None, 'env_parse_none_str': None, 'env_prefix': '', 'env_prefix_target': 'variable', 'extra': 'ignore', 'json_file': None, 'json_file_encoding': None, 'nested_model_default_partial_update': False, 'protected_namespaces': ('model_validate', 'model_dump', 'settings_customise_sources'), 'secrets_dir': None, 'toml_file': None, 'use_enum_values': True, 'validate_default': True, 'yaml_config_section': None, 'yaml_file': None, 'yaml_file_encoding': None}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- platform: _Behavior | _Confocal | _Ecephys | _Exaspim | _Fip | _Hcr | _Hsfp | _Isi | _Merfish | _Mri | _Mesospim | _Motor_Observatory | _Multiplane_Ophys | _Slap2 | _Single_Plane_Ophys | _Smartspim | None¶
- project_name: str¶
- s3_bucket: Literal['private', 'open', 'default']¶
- property s3_prefix: str¶
Construct s3_prefix from configs.
- subject_id: str¶
- user_email: EmailStr | None¶
- classmethod validate_platform(v)¶
For backwards compatibility, allow a user to input an aind-data-schema-model platform and then convert it.
- classmethod validate_tasks(v: Dict[str, Task | Dict[str, Task]]) Dict[str, Task | Dict[str, Task]]¶
Validate that modality-specific tasks are keyed by valid modality abbreviations.
- classmethod validate_with_context(v: str, info: ValidationInfo) str¶
Validate certain fields. If a list of accepted values is provided in a context manager, then it will validate against the list. Otherwise, it won’t raise any validation error.
- Parameters:
v (str) – Value input into the field.
info (ValidationInfo)
- Return type:
str
- aind_data_transfer_service.models.core.validation_context(context: Dict[str, Any] | None) None¶
Following guide in: https://docs.pydantic.dev/latest/concepts/validators/#validation-context :param context: :type context: Union[Dict[str, Any], None]
- Return type:
None
aind_data_transfer_service.models.internal module¶
Module for internal data models used in application
- class aind_data_transfer_service.models.internal.AirflowDagRun(*, conf: dict | None, dag_id: str | None, dag_run_id: str | None, data_interval_end: AwareDatetime | None, data_interval_start: AwareDatetime | None, end_date: AwareDatetime | None, execution_date: AwareDatetime | None, external_trigger: bool | None, last_scheduling_decision: AwareDatetime | None, logical_date: AwareDatetime | None, note: str | None, run_type: str | None, start_date: AwareDatetime | None, state: str | None)¶
Bases:
BaseModelData model for dag_run entry when requesting info from airflow
- conf: dict | None¶
- dag_id: str | None¶
- dag_run_id: str | None¶
- data_interval_end: AwareDatetime | None¶
- data_interval_start: AwareDatetime | None¶
- end_date: AwareDatetime | None¶
- execution_date: AwareDatetime | None¶
- external_trigger: bool | None¶
- last_scheduling_decision: AwareDatetime | None¶
- logical_date: AwareDatetime | None¶
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- note: str | None¶
- run_type: str | None¶
- start_date: AwareDatetime | None¶
- state: str | None¶
- class aind_data_transfer_service.models.internal.AirflowDagRunsRequestParameters(*, dag_ids: list[str] = ['transform_and_upload', 'transform_and_upload_v2'], page_limit: int = 100, page_offset: int = 0, states: list[str] | None = [], execution_date_gte: str | None = '2026-05-20T22:57:07.350133+00:00', execution_date_lte: str | None = None, order_by: str = '-execution_date')¶
Bases:
BaseModelModel for parameters when requesting info from dag_runs endpoint
- dag_ids: list[str]¶
- execution_date_gte: str | None¶
- execution_date_lte: str | None¶
- classmethod from_query_params(query_params: QueryParams)¶
Maps the query parameters to the model
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- order_by: str¶
- page_limit: int¶
- page_offset: int¶
- states: list[str] | None¶
- classmethod validate_min_execution_date(execution_date_gte: str) str¶
Validate the earliest submit date filter is within 2 weeks
- class aind_data_transfer_service.models.internal.AirflowDagRunsResponse(*, dag_runs: List[AirflowDagRun], total_entries: int)¶
Bases:
BaseModelData model for response when requesting info from dag_runs endpoint
- dag_runs: List[AirflowDagRun]¶
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- total_entries: int¶
- class aind_data_transfer_service.models.internal.AirflowTaskInstance(*, dag_id: str | None, dag_run_id: str | None, duration: int | float | None, end_date: AwareDatetime | None, execution_date: AwareDatetime | None, executor_config: str | None, hostname: str | None, map_index: int | None, max_tries: int | None, note: str | None, operator: str | None, pid: int | None, pool: str | None, pool_slots: int | None, priority_weight: int | None, queue: str | None, queued_when: AwareDatetime | None, rendered_fields: dict | None, sla_miss: dict | None, start_date: AwareDatetime | None, state: str | None, task_id: str | None, trigger: dict | None, triggerer_job: dict | None, try_number: int | None, unixname: str | None)¶
Bases:
BaseModelData model for task_instance entry when requesting info from airflow
- dag_id: str | None¶
- dag_run_id: str | None¶
- duration: int | float | None¶
- end_date: AwareDatetime | None¶
- execution_date: AwareDatetime | None¶
- executor_config: str | None¶
- hostname: str | None¶
- map_index: int | None¶
- max_tries: int | None¶
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- note: str | None¶
- operator: str | None¶
- pid: int | None¶
- pool: str | None¶
- pool_slots: int | None¶
- priority_weight: int | None¶
- queue: str | None¶
- queued_when: AwareDatetime | None¶
- rendered_fields: dict | None¶
- sla_miss: dict | None¶
- start_date: AwareDatetime | None¶
- state: str | None¶
- task_id: str | None¶
- trigger: dict | None¶
- triggerer_job: dict | None¶
- try_number: int | None¶
- unixname: str | None¶
- class aind_data_transfer_service.models.internal.AirflowTaskInstanceLogsRequestParameters(*, dag_id: str, dag_run_id: str, task_id: str, try_number: int, map_index: int, full_content: bool = True)¶
Bases:
BaseModelModel for parameters when requesting info from task_instance_logs endpoint
- dag_id: str¶
- dag_run_id: str¶
- classmethod from_query_params(query_params: QueryParams)¶
Maps the query parameters to the model
- full_content: bool¶
- map_index: int¶
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- task_id: str¶
- try_number: int¶
- class aind_data_transfer_service.models.internal.AirflowTaskInstancesRequestParameters(*, dag_id: str, dag_run_id: str)¶
Bases:
BaseModelModel for parameters when requesting info from task_instances endpoint
- dag_id: str¶
- dag_run_id: str¶
- classmethod from_query_params(query_params: QueryParams)¶
Maps the query parameters to the model
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class aind_data_transfer_service.models.internal.AirflowTaskInstancesResponse(*, task_instances: List[AirflowTaskInstance], total_entries: int)¶
Bases:
BaseModelData model for response when requesting info from task_instances endpoint
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- task_instances: List[AirflowTaskInstance]¶
- total_entries: int¶
- class aind_data_transfer_service.models.internal.JobParamInfo(*, name: str | None, last_modified: datetime | None, job_type: str, task_id: str, modality: str | None, version: str | None)¶
Bases:
BaseModelModel for job parameter info from AWS Parameter Store
- classmethod from_aws_describe_parameter(parameter: ParameterMetadataTypeDef, job_type: str, task_id: str, modality: str | None, version: str | None)¶
Map the parameter to the model
- static get_parameter_name(job_type: str, task_id: str, modality: str | None, version: str | None = None) str¶
Create the parameter name from job_type and task_id
- static get_parameter_prefix(version: str | None = None) str¶
Get the prefix for job_type parameters
- static get_parameter_regex(version: str | None = None) str¶
Create the regex pattern to match the parameter name
- job_type: str¶
- last_modified: datetime | None¶
- modality: str | None¶
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- name: str | None¶
- task_id: str¶
- version: str | None¶
- class aind_data_transfer_service.models.internal.JobStatus(*, dag_id: str | None = None, end_time: datetime | None = None, job_id: str | None = None, job_state: str | None = None, name: str | None = None, job_type: str | None = None, comment: str | None = None, start_time: datetime | None = None, submit_time: datetime | None = None)¶
Bases:
BaseModelModel for what we want to render to the user.
- comment: str | None¶
- dag_id: str | None¶
- end_time: datetime | None¶
- classmethod from_airflow_dag_run(airflow_dag_run: AirflowDagRun)¶
Maps the fields from an AirflowDagRun to this model
- property jinja_dict: dict¶
Map model to a dictionary that jinja can render
- job_id: str | None¶
- job_state: str | None¶
- job_type: str | None¶
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- name: str | None¶
- start_time: datetime | None¶
- submit_time: datetime | None¶
- class aind_data_transfer_service.models.internal.JobTasks(*, dag_id: str | None = None, job_id: str | None = None, task_id: str | None = None, try_number: int | None = None, task_state: str | None = None, priority_weight: int | None = None, map_index: int | None = None, submit_time: datetime | None = None, start_time: datetime | None = None, end_time: datetime | None = None, duration: int | float | None = None, comment: str | None = None)¶
Bases:
BaseModelModel for what is rendered to the user for each task.
- comment: str | None¶
- dag_id: str | None¶
- duration: int | float | None¶
- end_time: datetime | None¶
- classmethod from_airflow_task_instance(airflow_task_instance: AirflowTaskInstance)¶
Maps the fields from an AirflowTaskInstance to this model
- job_id: str | None¶
- map_index: int | None¶
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- priority_weight: int | None¶
- start_time: datetime | None¶
- submit_time: datetime | None¶
- task_id: str | None¶
- task_state: str | None¶
- try_number: int | None¶
Module contents¶
Package for models