mirror of
https://github.com/creyD/creyPY.git
synced 2026-04-13 11:50:31 +02:00
Compare commits
12 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
79dde8008a | ||
|
|
adb017d6ce | ||
|
|
e160cc5fea | ||
|
|
7afb8e2fd8 | ||
|
|
badf2b157f | ||
|
|
c903266ec4 | ||
|
|
910638e3a6 | ||
|
|
83dca59817 | ||
|
|
b80d26586d | ||
|
|
c2e2469027 | ||
|
|
8363055984 | ||
|
|
f586ce5c03 |
4
.github/workflows/ci.yml
vendored
4
.github/workflows/ci.yml
vendored
@@ -38,7 +38,7 @@ jobs:
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: '3.12'
|
||||
python-version: '3.13'
|
||||
- run: python -m pip install --upgrade pip
|
||||
- run: |
|
||||
python -m pip install -r requirements.txt
|
||||
@@ -93,7 +93,7 @@ jobs:
|
||||
- name: Set up Python
|
||||
uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: '3.12'
|
||||
python-version: '3.13'
|
||||
|
||||
- name: Install dependencies
|
||||
run: |
|
||||
|
||||
@@ -1,12 +1,13 @@
|
||||
from typing import Type, TypeVar, overload
|
||||
import asyncio
|
||||
from typing import List, Type, TypeVar, overload
|
||||
from uuid import UUID
|
||||
|
||||
from fastapi import HTTPException
|
||||
from pydantic import BaseModel
|
||||
from sqlalchemy.orm import Session
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from sqlalchemy.future import select
|
||||
import asyncio
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from .models.base import Base
|
||||
|
||||
T = TypeVar("T", bound=Base)
|
||||
@@ -19,13 +20,19 @@ async def get_object_or_404(
|
||||
db: AsyncSession,
|
||||
expunge: bool = False,
|
||||
lookup_column: str = "id",
|
||||
response_fields: List[str] = [],
|
||||
) -> T:
|
||||
pass
|
||||
|
||||
|
||||
@overload
|
||||
def get_object_or_404(
|
||||
db_class: Type[T], id: UUID | str, db: Session, expunge: bool = False, lookup_column: str = "id"
|
||||
db_class: Type[T],
|
||||
id: UUID | str,
|
||||
db: Session,
|
||||
expunge: bool = False,
|
||||
lookup_column: str = "id",
|
||||
response_fields: List[str] = [],
|
||||
) -> T:
|
||||
pass
|
||||
|
||||
@@ -36,20 +43,44 @@ def get_object_or_404(
|
||||
db: Session | AsyncSession,
|
||||
expunge: bool = False,
|
||||
lookup_column: str = "id",
|
||||
response_fields: List[str] = [],
|
||||
) -> T:
|
||||
|
||||
async def _get_async_object() -> T:
|
||||
query = select(db_class).filter(getattr(db_class, lookup_column) == id)
|
||||
result = await db.execute(query)
|
||||
obj = result.scalar_one_or_none()
|
||||
if obj is None:
|
||||
raise HTTPException(status_code=404, detail="The object does not exist.") # type: ignore
|
||||
if response_fields:
|
||||
selected_columns = [
|
||||
getattr(db_class, field) for field in response_fields if hasattr(db_class, field)
|
||||
]
|
||||
query = select(*selected_columns).where(getattr(db_class, lookup_column) == id)
|
||||
result = await db.execute(query)
|
||||
row = result.first()
|
||||
|
||||
if row is None:
|
||||
raise HTTPException(status_code=404, detail="The object does not exist.")
|
||||
if hasattr(row, "_mapping"):
|
||||
obj_dict = dict(row._mapping)
|
||||
else:
|
||||
obj_dict = {column.key: getattr(row, column.key) for column in selected_columns}
|
||||
else:
|
||||
query = select(db_class).where(getattr(db_class, lookup_column) == id)
|
||||
result = await db.execute(query)
|
||||
row = result.scalar_one_or_none()
|
||||
if row is None:
|
||||
raise HTTPException(status_code=404, detail="The object does not exist.")
|
||||
obj_dict = row
|
||||
if expunge:
|
||||
await db.expunge(obj)
|
||||
return obj
|
||||
await db.expunge(obj_dict)
|
||||
return obj_dict
|
||||
|
||||
def _get_sync_object() -> T:
|
||||
obj = db.query(db_class).filter(getattr(db_class, lookup_column) == id).one_or_none()
|
||||
if response_fields:
|
||||
selected_columns = [
|
||||
getattr(db_class, field) for field in response_fields if hasattr(db_class, field)
|
||||
]
|
||||
query = db.query(*selected_columns).filter(getattr(db_class, lookup_column) == id)
|
||||
else:
|
||||
query = db.query(db_class).filter(getattr(db_class, lookup_column) == id)
|
||||
obj = query.one_or_none()
|
||||
if obj is None:
|
||||
raise HTTPException(status_code=404, detail="The object does not exist.") # type: ignore
|
||||
if expunge:
|
||||
|
||||
@@ -1 +1,3 @@
|
||||
from .base import * # noqa
|
||||
from .response_schema import * #noqa
|
||||
from .schema_optional import * #noqa
|
||||
|
||||
@@ -11,6 +11,6 @@ class BaseSchemaModelIN(BaseModel):
|
||||
|
||||
|
||||
class BaseSchemaModelOUT(BaseSchemaModelIN):
|
||||
id: UUID
|
||||
id: UUID | str
|
||||
created_at: datetime
|
||||
updated_at: datetime
|
||||
|
||||
43
creyPY/fastapi/schemas/response_schema.py
Normal file
43
creyPY/fastapi/schemas/response_schema.py
Normal file
@@ -0,0 +1,43 @@
|
||||
from typing import List, Optional, Type
|
||||
|
||||
from fastapi import Query
|
||||
from pydantic import BaseModel, create_model
|
||||
|
||||
|
||||
class ResponseModelDependency:
|
||||
def __init__(self, model_class: Type[BaseModel]):
|
||||
self.model_class = model_class
|
||||
|
||||
def __call__(self, response_fields: Optional[List[str]] = Query(None)) -> Type[BaseModel]:
|
||||
def process_result(result, fields=None, async_session=False):
|
||||
if not fields:
|
||||
if async_session:
|
||||
return {k: v for k, v in result.__dict__.items() if not k.startswith("_")}
|
||||
return result
|
||||
|
||||
if hasattr(result, "_fields"):
|
||||
row_fields = result._fields
|
||||
return dict(zip(row_fields, result))
|
||||
elif isinstance(result, tuple):
|
||||
return dict(zip(fields, result))
|
||||
elif isinstance(result, dict):
|
||||
return result
|
||||
else:
|
||||
return {field: getattr(result, field) for field in fields if hasattr(result, field)}
|
||||
|
||||
if not response_fields:
|
||||
return self.model_class, None, process_result
|
||||
|
||||
all_annotations = {}
|
||||
for cls in self.model_class.__mro__:
|
||||
if hasattr(cls, "__annotations__"):
|
||||
all_annotations.update(cls.__annotations__)
|
||||
|
||||
fields = {}
|
||||
for field in response_fields:
|
||||
if field in all_annotations:
|
||||
fields[field] = (all_annotations[field], None)
|
||||
|
||||
dynamic_model = create_model(f"Dynamic{self.model_class.__name__}", **fields)
|
||||
|
||||
return dynamic_model, response_fields, process_result
|
||||
19
creyPY/fastapi/schemas/schema_optional.py
Normal file
19
creyPY/fastapi/schemas/schema_optional.py
Normal file
@@ -0,0 +1,19 @@
|
||||
from typing import Optional, Type, Union, get_args, get_origin, get_type_hints
|
||||
|
||||
from pydantic import BaseModel, create_model
|
||||
|
||||
|
||||
def optional_fields(cls: Type[BaseModel]) -> Type[BaseModel]:
|
||||
fields = {}
|
||||
for name, hint in get_type_hints(cls).items():
|
||||
if name.startswith("_"):
|
||||
continue
|
||||
|
||||
if get_origin(hint) is not Union or type(None) not in get_args(hint):
|
||||
hint = Optional[hint]
|
||||
|
||||
fields[name] = (hint, None)
|
||||
|
||||
new_model = create_model(cls.__name__, __base__=cls, **fields)
|
||||
|
||||
return new_model
|
||||
@@ -1 +1 @@
|
||||
stripe==11.6.0 # Stripe
|
||||
stripe==12.0.0 # Stripe
|
||||
|
||||
Reference in New Issue
Block a user