-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathmain.py
192 lines (164 loc) · 5.34 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
from fastapi import FastAPI, HTTPException, Request, status
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware
from fastapi.responses import JSONResponse
from fastapi.exceptions import RequestValidationError
import uvicorn
import time
from loguru import logger
from contextlib import asynccontextmanager
from prometheus_client import make_asgi_app
from core.config import settings
from core.exceptions import ScraperException, ValidationError
from models.request import ScrapeRequest
from services.cache.cache_service import CacheService
from services.scraper.scraper import WebScraper
from services.crawler.crawler_service import CrawlerService
from api.v1.endpoints import crawler, scraper , chunker , converter
# Prometheus metrics endpoint
metrics_app = make_asgi_app()
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Lifecycle events for the application"""
try:
# Startup
logger.info("Initializing application...")
# Initialize cache service
cache_service = CacheService(settings.REDIS_URL)
await cache_service.connect()
# Initialize resources with cache service
logger.info("Initializing scraper...")
app.state.scraper = await WebScraper.create(
max_concurrent=settings.CONCURRENT_SCRAPES,
cache_service=cache_service
)
logger.info("Initializing crawler...")
app.state.crawler = CrawlerService(max_concurrent=settings.CONCURRENT_SCRAPES)
yield
# Shutdown
logger.info("Shutting down application...")
await app.state.scraper.cleanup()
except Exception as e:
logger.exception(f"Application lifecycle error: {str(e)}")
raise
app = FastAPI(
title=settings.PROJECT_NAME,
description="Production-grade web scraper API",
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc",
lifespan=lifespan
)
# Middleware setup
app.add_middleware(
CORSMiddleware,
allow_origins=settings.ALLOWED_HOSTS,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(
crawler.router,
prefix="/api/v1",
tags=["crawler"]
)
app.include_router(
scraper.router,
prefix="/api/v1",
tags=["scraper"]
)
app.include_router(
chunker.router,
prefix="/api/v1",
tags=["chunker"]
)
app.include_router(
converter.router,
prefix="/api/v1",
tags=["converter"]
)
app.add_middleware(
TrustedHostMiddleware,
allowed_hosts=settings.ALLOWED_HOSTS
)
# Custom middleware for request timing
@app.middleware("http")
async def add_timing_header(request: Request, call_next):
start_time = time.perf_counter()
response = await call_next(request)
process_time = time.perf_counter() - start_time
response.headers["X-Process-Time"] = str(process_time)
return response
# Exception handlers
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
return JSONResponse(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
content=ValidationError(errors=exc.errors()).to_dict()
)
@app.exception_handler(ScraperException)
async def scraper_exception_handler(request: Request, exc: ScraperException):
return JSONResponse(
status_code=exc.status_code,
content=exc.to_dict()
)
@app.exception_handler(Exception)
async def general_exception_handler(request: Request, exc: Exception):
logger.error(f"Unhandled exception: {str(exc)}")
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={
"error": {
"code": "INTERNAL_SERVER_ERROR",
"message": "An unexpected error occurred",
"status": 500
}
}
)
# Mount Prometheus metrics endpoint
app.mount("/metrics", metrics_app)
# Health check endpoint
@app.get("/health")
async def health_check():
return {
"status": "healthy",
"timestamp": time.time()
}
@app.get("/")
async def root():
"""Root endpoint with API information"""
return {
"name": settings.PROJECT_NAME,
"version": "1.0.0",
"description": "Production-grade web scraper API",
"docs_url": "/docs",
"health_check": "/health"
}
@app.post("/scrape", response_model_exclude_none=True)
async def scrape_url(request: ScrapeRequest, req: Request):
"""
Scrape a URL and return processed content
"""
logger.info(f"Processing scrape request for URL: {request.url}")
options = {
"only_main": request.onlyMainContent,
"timeout": request.timeout or settings.TIMEOUT,
"user_agent": settings.DEFAULT_USER_AGENT,
"headers": request.headers,
"screenshot": True,
"screenshot_quality": settings.SCREENSHOT_QUALITY,
"wait_for_selector": request.waitFor
}
if request.actions:
options["actions"] = request.actions
result = await req.app.state.scraper.scrape(str(request.url), options)
return result
if __name__ == "__main__":
uvicorn.run(
"main:app",
host="0.0.0.0",
port=settings.PORT,
reload=settings.DEBUG,
workers=settings.WORKERS,
log_level=settings.LOG_LEVEL.lower()
)