์์ ์ค์ผ์ค๋ง
Python์ผ๋ก ์ ๊ธฐ์ ์ธ ์์ ์ ์๋์ผ๋ก ์คํํ ์ ์์ต๋๋ค. ์ผ์ผ ๋ฐฑ์ , ์ ๊ธฐ ๋ฆฌํฌํธ, ๋ฐ์ดํฐ ์์ง ๋ฑ์ ์ค์ผ์ค๋งํด๋ณด๊ฒ ์ต๋๋ค.
schedule ๋ผ์ด๋ธ๋ฌ๋ฆฌโ
์ค์นํ๊ธฐโ
pip install schedule
๊ธฐ๋ณธ ์ฌ์ฉ๋ฒโ
import schedule
import time
def job():
print("์์
์คํ!")
# 10์ด๋ง๋ค ์คํ
schedule.every(10).seconds.do(job)
# 10๋ถ๋ง๋ค ์คํ
schedule.every(10).minutes.do(job)
# 1์๊ฐ๋ง๋ค ์คํ
schedule.every().hour.do(job)
# ๋งค์ผ ์คํ
schedule.every().day.do(job)
# ๋งค์ฃผ ์์์ผ ์คํ
schedule.every().monday.do(job)
# ๋งค์ฃผ ์์์ผ 13:15์ ์คํ
schedule.every().wednesday.at("13:15").do(job)
# ๋งค์ผ 10:30์ ์คํ
schedule.every().day.at("10:30").do(job)
# ์ค์ผ์ค ์คํ
while True:
schedule.run_pending()
time.sleep(1)
๋ค์ํ ์ค์ผ์ค ํจํดโ
์๊ฐ ๊ธฐ๋ฐ ์ค์ผ์คโ
import schedule
def morning_routine():
print("์ข์ ์์นจ์
๋๋ค!")
def lunch_reminder():
print("์ ์ฌ ์๊ฐ์
๋๋ค!")
def evening_report():
print("์ผ์ผ ๋ณด๊ณ ์ ์์ฑ ์ค...")
# ๋งค์ผ ์ค์ 8์
schedule.every().day.at("08:00").do(morning_routine)
# ๋งค์ผ ์ ์ค
schedule.every().day.at("12:00").do(lunch_reminder)
# ๋งค์ผ ์คํ 6์
schedule.every().day.at("18:00").do(evening_report)
# ํ์ผ ์ค์ 9์
schedule.every().monday.at("09:00").do(morning_routine)
schedule.every().tuesday.at("09:00").do(morning_routine)
schedule.every().wednesday.at("09:00").do(morning_routine)
schedule.every().thursday.at("09:00").do(morning_routine)
schedule.every().friday.at("09:00").do(morning_routine)
๊ฐ๊ฒฉ ๊ธฐ๋ฐ ์ค์ผ์คโ
import schedule
def check_server():
print("์๋ฒ ์ํ ํ์ธ")
def collect_data():
print("๋ฐ์ดํฐ ์์ง")
# 5๋ถ๋ง๋ค
schedule.every(5).minutes.do(check_server)
# 30๋ถ๋ง๋ค
schedule.every(30).minutes.do(collect_data)
# 2์๊ฐ๋ง๋ค
schedule.every(2).hours.do(check_server)
# 3์ผ๋ง๋ค
schedule.every(3).days.do(collect_data)
๋งค๊ฐ๋ณ์๊ฐ ์๋ ์์ โ
import schedule
def greet(name):
print(f"์๋
ํ์ธ์, {name}๋!")
def send_report(email, report_type):
print(f"{email}๋ก {report_type} ๋ฆฌํฌํธ ๋ฐ์ก")
# ๋งค๊ฐ๋ณ์์ ํจ๊ป ์คํ
schedule.every().day.at("09:00").do(greet, name="ํ๊ธธ๋")
schedule.every().day.at("18:00").do(send_report,
email="admin@example.com",
report_type="์ผ์ผ")
์์ ์ทจ์โ
import schedule
def some_task():
print("์์
์คํ")
# ์กฐ๊ฑด์ด ๋ง์กฑ๋๋ฉด ์์
์ทจ์
return schedule.CancelJob
# ์์
๋ฑ๋ก
job = schedule.every().day.do(some_task)
# ๋์ค์ ์๋์ผ๋ก ์ทจ์
schedule.cancel_job(job)
# ๋ชจ๋ ์์
์ทจ์
schedule.clear()
# ํน์ ํ๊ทธ์ ์์
๋ง ์ทจ์
schedule.clear('daily-tasks')
ํ๊ทธ ์ฌ์ฉํ๊ธฐโ
import schedule
def backup():
print("๋ฐฑ์
์คํ")
def report():
print("๋ฆฌํฌํธ ์์ฑ")
# ํ๊ทธ ์ง์
schedule.every().day.at("02:00").do(backup).tag('backup', 'critical')
schedule.every().day.at("09:00").do(report).tag('report', 'daily')
# ํน์ ํ๊ทธ์ ์์
๋ง ์คํ
schedule.run_all(delay_seconds=0) # ๋ชจ๋ ์์
์ฆ์ ์คํ
schedule.run_all('backup') # 'backup' ํ๊ทธ ์์
๋ง ์คํ
# ํน์ ํ๊ทธ์ ์์
์ทจ์
schedule.clear('daily')
์ค์ ์์ โ
1. ์ผ์ผ ๋ฐฑ์ ์๋ํโ
import schedule
import time
import shutil
import os
from datetime import datetime
import zipfile
def create_backup():
"""๋งค์ผ ์๋ ๋ฐฑ์
"""
print(f"\n[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] ๋ฐฑ์
์์...")
# ๋ฐฑ์
๋์ ํด๋
source_dirs = [
'/Users/username/Documents',
'/Users/username/Projects'
]
# ๋ฐฑ์
์ ์ฅ ์์น
backup_dir = '/Users/username/Backups'
os.makedirs(backup_dir, exist_ok=True)
# ๋ฐฑ์
ํ์ผ๋ช
(๋ ์ง ํฌํจ)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
backup_file = os.path.join(backup_dir, f'backup_{timestamp}.zip')
try:
# ZIP ํ์ผ ์์ฑ
with zipfile.ZipFile(backup_file, 'w', zipfile.ZIP_DEFLATED) as zipf:
for source_dir in source_dirs:
if not os.path.exists(source_dir):
print(f"๊ฒฝ๊ณ : {source_dir} ํด๋๊ฐ ์์ต๋๋ค.")
continue
print(f"๋ฐฑ์
์ค: {source_dir}")
for root, dirs, files in os.walk(source_dir):
for file in files:
file_path = os.path.join(root, file)
arcname = os.path.relpath(file_path, os.path.dirname(source_dir))
zipf.write(file_path, arcname)
# ๋ฐฑ์
ํ์ผ ํฌ๊ธฐ ํ์ธ
size_mb = os.path.getsize(backup_file) / (1024 * 1024)
print(f"๋ฐฑ์
์๋ฃ: {backup_file} ({size_mb:.2f} MB)")
# ์ค๋๋ ๋ฐฑ์
์ ๋ฆฌ (30์ผ ์ด์ ๋ ํ์ผ ์ญ์ )
cleanup_old_backups(backup_dir, days=30)
except Exception as e:
print(f"๋ฐฑ์
์คํจ: {e}")
def cleanup_old_backups(backup_dir, days=30):
"""์ค๋๋ ๋ฐฑ์
ํ์ผ ์ญ์ """
now = time.time()
cutoff = now - (days * 86400) # days๋ฅผ ์ด๋ก ๋ณํ
for filename in os.listdir(backup_dir):
if filename.startswith('backup_') and filename.endswith('.zip'):
file_path = os.path.join(backup_dir, filename)
file_time = os.path.getmtime(file_path)
if file_time < cutoff:
os.remove(file_path)
print(f"์ญ์ : {filename} (์์ฑ ํ {days}์ผ ๊ฒฝ๊ณผ)")
# ๋งค์ผ ์๋ฒฝ 2์์ ๋ฐฑ์
์คํ
schedule.every().day.at("02:00").do(create_backup)
print("๋ฐฑ์
์ค์ผ์ค๋ฌ ์์")
print("๋งค์ผ 02:00์ ๋ฐฑ์
์ด ์คํ๋ฉ๋๋ค.")
# ํ
์คํธ๋ฅผ ์ํด ์ฆ์ ์คํ
# create_backup()
while True:
schedule.run_pending()
time.sleep(60) # 1๋ถ๋ง๋ค ์ฒดํฌ
2. ์ด๋ฉ์ผ ๋ฆฌํฌํธ ์๋ ๋ฐ์กโ
import schedule
import time
from datetime import datetime
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email import encoders
import os
def send_daily_report():
"""์ผ์ผ ๋ฆฌํฌํธ ์ด๋ฉ์ผ ๋ฐ์ก"""
print(f"\n[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] ๋ฆฌํฌํธ ์์ฑ ์ค...")
# ๋ฆฌํฌํธ ๋ฐ์ดํฐ ์์ฑ (์์)
report_data = generate_report()
# ์ด๋ฉ์ผ ์ค์
sender_email = "your_email@gmail.com"
sender_password = "your_app_password"
recipient_email = "recipient@example.com"
# ์ด๋ฉ์ผ ๋ฉ์์ง ์์ฑ
message = MIMEMultipart()
message['From'] = sender_email
message['To'] = recipient_email
message['Subject'] = f"์ผ์ผ ๋ฆฌํฌํธ - {datetime.now().strftime('%Y-%m-%d')}"
# ์ด๋ฉ์ผ ๋ณธ๋ฌธ
body = f"""
<html>
<body>
<h2>์ผ์ผ ๋ฆฌํฌํธ</h2>
<p>๋ ์ง: {datetime.now().strftime('%Y๋
%m์ %d์ผ')}</p>
<h3>์ฃผ์ ์งํ</h3>
<ul>
<li>์ด ๋ฐฉ๋ฌธ์: {report_data['visitors']:,}๋ช
</li>
<li>์ ๊ท ๊ฐ์
: {report_data['signups']:,}๋ช
</li>
<li>๋งค์ถ: {report_data['revenue']:,}์</li>
</ul>
<p>์์ธ ๋ด์ฉ์ ์ฒจ๋ถ ํ์ผ์ ํ์ธํด์ฃผ์ธ์.</p>
</body>
</html>
"""
message.attach(MIMEText(body, 'html'))
# ํ์ผ ์ฒจ๋ถ (์ ํ์ฌํญ)
report_file = 'daily_report.csv'
if os.path.exists(report_file):
with open(report_file, 'rb') as f:
part = MIMEBase('application', 'octet-stream')
part.set_payload(f.read())
encoders.encode_base64(part)
part.add_header('Content-Disposition', f'attachment; filename={report_file}')
message.attach(part)
try:
# SMTP ์๋ฒ ์ฐ๊ฒฐ ๋ฐ ์ ์ก
with smtplib.SMTP_SSL('smtp.gmail.com', 465) as server:
server.login(sender_email, sender_password)
server.send_message(message)
print("๋ฆฌํฌํธ ๋ฐ์ก ์๋ฃ!")
except Exception as e:
print(f"์ด๋ฉ์ผ ๋ฐ์ก ์คํจ: {e}")
def generate_report():
"""๋ฆฌํฌํธ ๋ฐ์ดํฐ ์์ฑ (์์)"""
# ์ค์ ๋ก๋ ๋ฐ์ดํฐ๋ฒ ์ด์ค์์ ๊ฐ์ ธ์ด
return {
'visitors': 1234,
'signups': 56,
'revenue': 5678900
}
# ๋งค์ผ ์ค์ 9์์ ๋ฆฌํฌํธ ๋ฐ์ก
schedule.every().day.at("09:00").do(send_daily_report)
# ๋งค์ฃผ ์์์ผ ์ค์ 10์์ ์ฃผ๊ฐ ๋ฆฌํฌํธ
def send_weekly_report():
print("์ฃผ๊ฐ ๋ฆฌํฌํธ ๋ฐ์ก")
# ์ฃผ๊ฐ ๋ฆฌํฌํธ ๋ก์ง
schedule.every().monday.at("10:00").do(send_weekly_report)
print("๋ฆฌํฌํธ ์ค์ผ์ค๋ฌ ์์")
print("- ๋งค์ผ 09:00: ์ผ์ผ ๋ฆฌํฌํธ")
print("- ๋งค์ฃผ ์์์ผ 10:00: ์ฃผ๊ฐ ๋ฆฌํฌํธ")
while True:
schedule.run_pending()
time.sleep(60)
3. ์น ๋ฐ์ดํฐ ์ ๊ธฐ ์์งโ
import schedule
import time
from datetime import datetime
import requests
from bs4 import BeautifulSoup
import csv
import os
def collect_stock_prices():
"""์ฃผ์ ๊ฐ๊ฒฉ ์ ๊ธฐ ์์ง"""
print(f"\n[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] ๋ฐ์ดํฐ ์์ง ์์...")
stocks = ['AAPL', 'GOOGL', 'MSFT', 'AMZN']
data = []
for stock in stocks:
try:
# API ๋๋ ์น ์คํฌ๋ํ์ผ๋ก ๋ฐ์ดํฐ ์์ง
# ์ฌ๊ธฐ์๋ ์์๋ก ๋๋ค ๋ฐ์ดํฐ ์ฌ์ฉ
import random
price = random.uniform(100, 500)
data.append({
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'stock': stock,
'price': round(price, 2)
})
print(f"{stock}: ${price:.2f}")
except Exception as e:
print(f"{stock} ์์ง ์คํจ: {e}")
# CSV ํ์ผ์ ์ ์ฅ
save_to_csv(data)
def save_to_csv(data):
"""๋ฐ์ดํฐ๋ฅผ CSV ํ์ผ์ ์ถ๊ฐ"""
filename = f"stock_prices_{datetime.now().strftime('%Y%m')}.csv"
file_exists = os.path.exists(filename)
with open(filename, 'a', newline='', encoding='utf-8') as f:
fieldnames = ['timestamp', 'stock', 'price']
writer = csv.DictWriter(f, fieldnames=fieldnames)
# ํ์ผ์ด ์์ผ๋ฉด ํค๋ ์ถ๊ฐ
if not file_exists:
writer.writeheader()
writer.writerows(data)
print(f"๋ฐ์ดํฐ ์ ์ฅ ์๋ฃ: {filename}")
def analyze_daily_data():
"""