aind_data_transfer_service package

Subpackages

Submodules

aind_data_transfer_service.log_handler module

Module to handle logging submit job requests

class aind_data_transfer_service.log_handler.EventType(value)

Bases: str, Enum

Enum for event types in structured logging

STAGE_COMPLETE = 'stage_complete'
STAGE_FAILURE = 'stage_failure'
STAGE_START = 'stage_start'
aind_data_transfer_service.log_handler.log_submit_job_request(content: Any, event_type: EventType | None = None) None

Parses content object to log any lines with a subject_id and acquisition_name.

Parameters:
  • content (Any) – Pulled from request json, which may or may not return expected dict

  • event_type (EventType | None) – Type of event to log. Default is None.

aind_data_transfer_service.server module

Starts and Runs Starlette Service

async aind_data_transfer_service.server.admin(request: Request)

Get admin page if authenticated, else redirect to login.

async aind_data_transfer_service.server.auth(request: Request)

Authenticate user and store user info in session

async aind_data_transfer_service.server.cancel_job(request: Request)

Cancel a running job.

async aind_data_transfer_service.server.download_job_template(_: Request)

Get job template as xlsx filestream for download

async aind_data_transfer_service.server.get_airflow_jobs(params: AirflowDagRunsRequestParameters, get_confs: bool = False) tuple[int, List[JobStatus] | List[dict]]

Get Airflow jobs using input query params. If get_confs is true, only the job conf dictionaries are returned.

async aind_data_transfer_service.server.get_job_status_list(request: Request)

Get status of jobs using input query params.

aind_data_transfer_service.server.get_job_types(version: str | None = None) List[str]

Get a list of job_types

aind_data_transfer_service.server.get_parameter_infos(version: str | None = None) List[JobParamInfo]

Get a list of job_type parameters

aind_data_transfer_service.server.get_parameter_v2(request: Request)

Get v2 parameter from AWS param store based on job_type and task_id

aind_data_transfer_service.server.get_parameter_value(param_name: str) dict

Get a parameter value from AWS param store based on parameter name

async aind_data_transfer_service.server.get_project_names() List[str]

Get a list of project_names

async aind_data_transfer_service.server.get_task_logs(request: Request)

Get task logs given dag id, job id, task id, and task try number.

async aind_data_transfer_service.server.get_tasks_list(request: Request)

Get list of task instances given dag id and job id.

async aind_data_transfer_service.server.index(request: Request)

GET|POST /: form handler

async aind_data_transfer_service.server.job_params(request: Request)

Get Job Parameters page

async aind_data_transfer_service.server.job_tasks_table(request: Request)

Get Job Tasks table given a job id

async aind_data_transfer_service.server.jobs(request: Request)

Get Job Status page with pagination

aind_data_transfer_service.server.list_parameters_v2(_: Request)

List v2 job type parameters

async aind_data_transfer_service.server.login(request: Request)

Redirect to Azure login page

async aind_data_transfer_service.server.logout(request: Request)

Logout user and clear session

async aind_data_transfer_service.server.put_parameter(request: Request)

Set parameter in AWS param store based on job_type and task_id

aind_data_transfer_service.server.put_parameter_value(param_name: str, param_value: dict) Any

Set a parameter value in AWS param store based on parameter name

aind_data_transfer_service.server.set_oauth() OAuth

Set up OAuth for the service

async aind_data_transfer_service.server.submit_jobs_v2(request: Request)

Post SubmitJobRequestV2 raw json to Airflow to process.

async aind_data_transfer_service.server.task_logs(request: Request)

Get task logs given a job id, task id, and task try number.

async aind_data_transfer_service.server.validate_csv(request: Request)

Validate a csv or xlsx file. Return parsed contents as json.

async aind_data_transfer_service.server.validate_json_v2(request: Request)

Validate raw json against data transfer models. Returns validated json or errors if request is invalid.

Module contents

Package for data transfer service api

class aind_data_transfer_service.CustomJsonFormatter(*args, json_default: ~collections.abc.Callable | None = None, json_encoder: ~collections.abc.Callable | None = None, json_serializer: ~collections.abc.Callable = <function dumps>, json_indent: int | str | None = None, json_ensure_ascii: bool = True, **kwargs)

Bases: JsonFormatter

Custom class to format log timestamps as ISO-8601 UTC

formatTime(record: LogRecord, datefmt=None) str

Format timestamp as ISO-8601 UTC

Parameters:
  • record (LogRecord)

  • datefmt (str, optional) – Default is None. Unused parameter, kept for signature compatibility.

Return type:

str