Source code for servicex.models
# Copyright (c) 2022, IRIS-HEP
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# * Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import hashlib
from datetime import datetime
from enum import Enum
from pydantic import BaseModel, Field, field_validator
from typing import List, Optional, Any
def _get_typename(typeish) -> str:
return typeish.__name__ if isinstance(typeish, type) else str(typeish)
def _generate_model_docstring(model: type) -> str:
NL = "\n"
return "\n".join(
[(model.__doc__ if model.__doc__ else model.__name__).strip(), "", "Args:"]
+ [
f" {field}: ({_get_typename(info.annotation)}) "
f'{info.description.replace(NL, NL + " " * 8) if info.description else ""}'
for field, info in model.model_fields.items()
]
)
[docs]
class DocStringBaseModel(BaseModel):
"""Class to autogenerate a docstring for a Pydantic model"""
@classmethod
def __pydantic_init_subclass__(cls, **kwargs: Any):
super().__pydantic_init_subclass__(**kwargs)
cls.__doc__ = _generate_model_docstring(cls)
[docs]
class ResultDestination(str, Enum):
r"""
Direct the output to object store or posix volume
"""
object_store = "object-store"
volume = "volume"
[docs]
class ResultFormat(str, Enum):
r"""
Specify the file format for the generated output
"""
parquet = "parquet"
root_ttree = "root-file"
[docs]
class Status(str, Enum):
r"""
Status of a submitted transform
"""
complete = ("Complete",)
fatal = ("Fatal",)
canceled = ("Canceled",)
submitted = ("Submitted",)
looking = ("Lookup",)
pending = "Pending Lookup"
running = "Running"
[docs]
class TransformRequest(DocStringBaseModel):
r"""
Transform request sent to ServiceX
"""
title: Optional[str] = None
did: Optional[str] = None
file_list: Optional[List[str]] = Field(default=None, alias="file-list")
selection: str
image: Optional[str] = None
codegen: str
tree_name: Optional[str] = Field(default=None, alias="tree-name")
result_destination: ResultDestination = Field(
serialization_alias="result-destination"
)
result_format: ResultFormat = Field(serialization_alias="result-format")
model_config = {"populate_by_name": True, "use_attribute_docstrings": True}
[docs]
def compute_hash(self):
r"""
Compute a hash for this submission. Only include properties that impact the result
so we have maximal ability to reuse transforms
:return: SHA256 hash of request
"""
sha = hashlib.sha256(
str(
[
self.did,
self.selection,
self.tree_name,
self.codegen,
self.image,
self.result_format.name,
sorted(self.file_list) if self.file_list else None,
]
).encode("utf-8")
)
return sha.hexdigest()
[docs]
class TransformStatus(DocStringBaseModel):
r"""
Status object returned by servicex
"""
model_config = {"use_attribute_docstrings": True}
request_id: str
did: str
title: Optional[str] = None
selection: str
tree_name: Optional[str] = Field(validation_alias="tree-name")
image: str
result_destination: ResultDestination = Field(validation_alias="result-destination")
result_format: ResultFormat = Field(validation_alias="result-format")
generated_code_cm: str = Field(validation_alias="generated-code-cm")
status: Status
app_version: str = Field(validation_alias="app-version")
files: int
files_completed: int = Field(validation_alias="files-completed")
files_failed: int = Field(validation_alias="files-failed")
files_remaining: Optional[int] = Field(
validation_alias="files-remaining", default=0
)
submit_time: datetime = Field(validation_alias="submit-time", default=None)
finish_time: Optional[datetime] = Field(
validation_alias="finish-time", default=None
)
minio_endpoint: Optional[str] = Field(
validation_alias="minio-endpoint", default=None
)
minio_secured: Optional[bool] = Field(
validation_alias="minio-secured", default=None
)
minio_access_key: Optional[str] = Field(
validation_alias="minio-access-key", default=None
)
minio_secret_key: Optional[str] = Field(
validation_alias="minio-secret-key", default=None
)
log_url: Optional[str] = Field(validation_alias="log-url", default=None)
[docs]
@field_validator("finish_time", mode="before")
@classmethod
def parse_finish_time(cls, v):
if isinstance(v, str) and v == "None":
return None
return v
[docs]
class ResultFile(DocStringBaseModel):
r"""
Record reporting the properties of a transformed file result
"""
model_config = {"use_attribute_docstrings": True}
filename: str
size: int
extension: str
[docs]
class TransformedResults(DocStringBaseModel):
r"""
Returned for a submission. Gives you everything you need to know about a completed
transform.
"""
model_config = {"use_attribute_docstrings": True}
hash: str
"""Unique hash for transformation (used to look up results in cache)"""
title: str
"""Title of transformation request"""
codegen: str
"""Code generator used (internal ServiceX information related to query type)"""
request_id: str
"""Associated request ID from the ServiceX server"""
submit_time: datetime
"""Time of submission"""
data_dir: str
"""Local directory for output"""
file_list: List[str]
"""List of downloaded files on local disk"""
signed_url_list: List[str]
"""List of URLs to retrieve output from remote ServiceX object store"""
files: int
"""Number of files in result"""
result_format: ResultFormat
"""File format for results"""
log_url: Optional[str] = None
"""URL for looking up logs on the ServiceX server"""
[docs]
class DatasetFile(BaseModel):
"""
Model for a file in a cached dataset
"""
id: int
adler32: Optional[str]
file_size: int
file_events: int
paths: str
[docs]
class CachedDataset(BaseModel):
"""
Model for a cached dataset held by ServiceX server
"""
id: int
name: str
did_finder: str
n_files: int
size: int
events: int
last_used: datetime
last_updated: datetime
lookup_status: str
is_stale: bool
files: Optional[List[DatasetFile]] = []