-
Notifications
You must be signed in to change notification settings - Fork 172
/
Copy pathtasks.py
123 lines (105 loc) · 4.08 KB
/
tasks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import asyncio
import aiohttp
import extraction
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from functools import wraps
import aiosmtplib
from arq import create_pool, cron
from arq.worker import logger
from mako.lookup import TemplateLookup
from config import (
BLOG_URL, MAIL_PASSWORD, MAIL_PORT, MAIL_SERVER,
MAIL_USERNAME, REDIS_URL, SITE_TITLE
)
from ext import init_db
from libs.extracted import DoubanGameExtracted
from models.blog import (
PAGEVIEW_FIELD, RK_PAGEVIEW, RK_VISITED_POST_IDS, Post, Subject, Favorite)
from models.mention import EMAIL_SUBJECT, Mention
from models.utils import RedisSettings
from views.utils import save_image
from models.consts import UA
CAN_SEND = all((MAIL_SERVER, MAIL_USERNAME, MAIL_PASSWORD))
def with_context(f):
@wraps(f)
async def _deco(*args, **kwargs):
await init_db()
result = await f(*args, **kwargs)
return result
return _deco
async def send_email(subject: str, html: str, send_to: str) -> None:
if not CAN_SEND:
return
msg = MIMEMultipart()
msg['Subject'] = subject
msg['From'] = MAIL_USERNAME
msg['To'] = send_to
msg.attach(MIMEText(html, 'html'))
loop = asyncio.get_event_loop()
smtp = aiosmtplib.SMTP(hostname=MAIL_SERVER, port=MAIL_PORT,
loop=loop, use_tls=True)
await smtp.connect()
await smtp.login(MAIL_USERNAME, MAIL_PASSWORD)
await smtp.send_message(msg)
await smtp.quit()
@with_context
async def mention_users(ctx, post_id, content, author_id):
post = await Post.cache(post_id)
if not post:
return
mention_users = await Mention.get_mention_users(content, author_id)
for user in mention_users:
email = user.email
if not email:
continue
subject = EMAIL_SUBJECT.format(title=post.title)
lookup = TemplateLookup(directories=['templates'],
input_encoding='utf-8',
output_encoding='utf-8')
template = lookup.get_template('email/mention.html')
html = template.render(username=user.username,
site_url=BLOG_URL, post=post,
site_name=SITE_TITLE)
logger.info(f'Send Mail(subject={subject}, html={html}) To {email}')
await send_email(subject, html.decode(), email)
@with_context
async def flush_to_db(ctx):
redis = await create_pool(RedisSettings.from_url(REDIS_URL))
while 1:
if (post_id := await redis.spop(RK_VISITED_POST_IDS)) is None:
break
post = await Post.filter(id=post_id).first()
if post:
post._pageview = int(await redis.hget(
RK_PAGEVIEW.format(post_id), PAGEVIEW_FIELD) or 0)
await post.save()
logger.info(f'Flush Post(id={post_id}) pageview')
else:
logger.warning(f'Post(id={post_id}) have deleted!')
@with_context
async def save_subjects(ctx, jobs):
async with aiohttp.ClientSession(headers={'User-Agent': UA}) as session:
for type, url, index, item in jobs:
try:
async with session.get(url) as resp:
html = await resp.text()
except Exception:
html = ''
if not html:
logger.error(f'Save Subejct(url={url}) fail!')
extracted_class = None if type != 'game' else DoubanGameExtracted
extracted = extraction.Extractor(extracted_class).extract(
html, source_url=url)
_, basename = await save_image(extracted.image)
subject = await Subject.create_with_pid(
0, url, extracted.title, extracted.description, basename)
fav, _ = await Favorite.get_or_create(
subject_id=subject.id, type=type,
rating=item.rating, comment=item.comment)
if fav.index != index:
fav.index = index
await fav.save()
class WorkerSettings:
functions = [mention_users, save_subjects]
cron_jobs = [cron(flush_to_db, hour=None)]