웹 스크래핑
BeautifulSoup을 사용하여 웹 페이지에서 원하는 데이터를 추출하는 방법을 배워봅시다.
웹 스크래핑이란?
웹 스크래핑은 웹 페이지에서 자동으로 데이터를 수집하는 기술입니다. 뉴스 기사, 상품 가격, 날씨 정보 등 웹에 있는 다양한 데이터를 프로그래밍적으로 추출할 수 있습니다.
주요 용도
- 가격 비교 및 모니터링
- 뉴스 및 콘텐츠 수집
- 데이터 분석 및 연 구
- 검색 엔진 인덱싱
- 시장 조사 및 경쟁사 분석
주의사항
- robots.txt 확인: 웹사이트의 크롤링 정책 준수
- 이용 약관 검토: 법적 문제 방지
- 서버 부하 고려: 요청 간격 조절
- 개인정보 보호: 민감한 정보 수집 금지
설치하기
# BeautifulSoup 4
pip install beautifulsoup4
# HTML 파서 (lxml이 더 빠름)
pip install lxml
# 또는 html5lib (더 관대한 파싱)
pip install html5lib
# requests (HTTP 요청용)
pip install requests
기본 사용법
HTML 가져오기와 파싱
import requests
from bs4 import BeautifulSoup
# 웹 페이지 가져오기
url = 'https://example.com'
response = requests.get(url)
html = response.text
# BeautifulSoup 객체 생성
soup = BeautifulSoup(html, 'lxml')
# 전체 HTML 출력 (보기 좋게)
print(soup.prettify())
# 제목 가져오기
title = soup.title
print(title.string) # 제목 텍스트
# 모든 텍스트 가져오기
text = soup.get_text()
print(text)
파서 종류
# lxml (빠르고 유연함, 권장)
soup = BeautifulSoup(html, 'lxml')
# html.parser (내장, 추가 설치 불필요)
soup = BeautifulSoup(html, 'html.parser')
# html5lib (가장 관대함, 느림)
soup = BeautifulSoup(html, 'html5lib')
# XML 파싱
soup = BeautifulSoup(xml, 'xml')
태그 선택하기
기본 선택 방법
from bs4 import BeautifulSoup
html = '''
<html>
<head><title>My Page</title></head>
<body>
<h1 id="main-title">Welcome</h1>
<div class="content">
<p class="intro">First paragraph</p>
<p class="text">Second paragraph</p>
<a href="/page1">Link 1</a>
<a href="/page2">Link 2</a>
</div>
</body>
</html>
'''
soup = BeautifulSoup(html, 'lxml')
# 첫 번째 태그 찾기
h1 = soup.find('h1')
print(h1.string) # Welcome
# 모든 태그 찾기
paragraphs = soup.find_all('p')
for p in paragraphs:
print(p.string)
# ID로 찾기
main_title = soup.find(id='main-title')
print(main_title.string)
# 클래스로 찾기
intro = soup.find(class_='intro')
print(intro.string)
# 여러 조건
link = soup.find('a', href='/page1')
print(link.string)
CSS 선택자 사용
# CSS 선택자로 찾기 (더 직관적)
h1 = soup.select_one('h1#main-title')
print(h1.string)
# 클래스 선택
intro = soup.select_one('.intro')
paragraphs = soup.select('.content p')
# 자식 선택자
content_links = soup.select('div.content > a')
# 속성 선택자
external_links = soup.select('a[href^="http"]')
# 여러 개 선택
items = soup.select('.item')
for item in items:
print(item.text)
데이터 추출하기
텍스트 추출
# 태그의 텍스트
text = tag.string # 직접 자식 텍스트만
text = tag.get_text() # 모든 하위 텍스트
text = tag.text # get_text()와 동일
# 공백 처리
text = tag.get_text(strip=True) # 앞뒤 공백 제거
# 구분자 지정
text = tag.get_text(separator=' | ') # 태그 사이에 구분자
# 예제
html = '<div> <p>Hello</p> <p>World</p> </div>'
soup = BeautifulSoup(html, 'lxml')
div = soup.find('div')
print(div.get_text()) # " Hello World "
print(div.get_text(strip=True)) # "HelloWorld"
print(div.get_text(separator=' | ', strip=True)) # "Hello | World"
속성 추출
# 속성 가져오기
link = soup.find('a')
# 딕셔너리 방식
href = link['href']
title = link.get('title', 'No title') # 기본값 설정
# 모든 속성
attrs = link.attrs
print(attrs) # {'href': '/page1', 'class': ['link'], 'id': 'first'}
# 여러 값을 가진 속성 (class, rel 등)
classes = link.get('class') # 리스트 반환
print(classes) # ['link', 'external']
# 예제: 모든 이미지 URL 추출
images = soup.find_all('img')
for img in images:
src = img.get('src')
alt = img.get('alt', 'No description')
print(f"Image: {src} - {alt}")
탐색 및 순회
# 부모 탐색
tag = soup.find('p')
parent = tag.parent
print(parent.name)
# 모든 부모
for parent in tag.parents:
print(parent.name)
# 형제 탐색
next_sibling = tag.next_sibling # 다음 형제 (텍스트 포함)
next_tag = tag.find_next_sibling() # 다음 태그
prev_tag = tag.find_previous_sibling() # 이전 태그
# 모든 형제
for sibling in tag.next_siblings:
print(sibling)
# 자식 탐색
children = tag.children # 직접 자식 (이터레이터)
descendants = tag.descendants # 모든 하위 요소
# 예제: 테이블 탐색
table = soup.find('table')
for row in table.find_all('tr'):
cells = row.find_all(['td', 'th'])
data = [cell.get_text(strip=True) for cell in cells]
print(data)
실전 예제
예제 1: 뉴스 헤드라인 수집
import requests
from bs4 import BeautifulSoup
from datetime import datetime
def scrape_news_headlines(url):
"""뉴스 사이트에서 헤드라인을 수집합니다."""
try:
# 헤더 설정 (봇 차단 방지)
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'lxml')
# 뉴스 기사 찾기 (사이트마다 다름)
articles = soup.select('.news-item') # CSS 선택자는 실제 사이트에 맞게 수정
news_list = []
for article in articles:
# 제목
title_tag = article.select_one('.title')
title = title_tag.get_text(strip=True) if title_tag else 'No title'
# 링크
link_tag = article.select_one('a')
link = link_tag['href'] if link_tag else ''
# 절대 URL로 변환
if link and not link.startswith('http'):
from urllib.parse import urljoin
link = urljoin(url, link)
# 요약
summary_tag = article.select_one('.summary')
summary = summary_tag.get_text(strip=True) if summary_tag else ''
# 시간
time_tag = article.select_one('.time')
timestamp = time_tag.get_text(strip=True) if time_tag else ''
news_list.append({
'title': title,
'link': link,
'summary': summary,
'timestamp': timestamp
})
return news_list
except Exception as e:
print(f"스크래핑 오류: {e}")
return []
# 사용 예시
url = 'https://news.example.com'
news = scrape_news_headlines(url)
for i, article in enumerate(news[:10], 1):
print(f"\n{i}. {article['title']}")
print(f" {article['link']}")
print(f" {article['summary'][:100]}...")
print(f" 시간: {article['timestamp']}")
예제 2: 쇼핑몰 가격 비교
import requests
from bs4 import BeautifulSoup
import re
from typing import List, Dict
class PriceScraper:
"""여러 쇼핑몰의 가격을 비교합니다."""
def __init__(self):
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
def extract_price(self, price_text: str) -> int:
"""텍스트에서 가격(숫자)을 추출합니다."""
# 숫자만 추출
numbers = re.findall(r'\d+', price_text.replace(',', ''))
return int(''.join(numbers)) if numbers else 0
def scrape_site_a(self, product_name: str) -> List[Dict]:
"""사이트 A에서 상품 검색"""
url = f"https://site-a.com/search?q={product_name}"
try:
response = requests.get(url, headers=self.headers, timeout=10)
soup = BeautifulSoup(response.text, 'lxml')
products = []
items = soup.select('.product-item')
for item in items[:5]: # 상위 5개만
title = item.select_one('.product-title').get_text(strip=True)
price_text = item.select_one('.price').get_text(strip=True)
price = self.extract_price(price_text)
link = item.select_one('a')['href']
products.append({
'site': 'Site A',
'title': title,
'price': price,
'link': link
})
return products
except Exception as e:
print(f"Site A 스크래핑 오류: {e}")
return []
def scrape_site_b(self, product_name: str) -> List[Dict]:
"""사이트 B에서 상품 검색"""
url = f"https://site-b.com/products?keyword={product_name}"
try:
response = requests.get(url, headers=self.headers, timeout=10)
soup = BeautifulSoup(response.text, 'lxml')
products = []
items = soup.select('.item')
for item in items[:5]:
title = item.select_one('h3').get_text(strip=True)
price_text = item.select_one('.price-now').get_text(strip=True)
price = self.extract_price(price_text)
link = item.select_one('a')['href']
products.append({
'site': 'Site B',
'title': title,
'price': price,
'link': link
})
return products
except Exception as e:
print(f"Site B 스크래핑 오류: {e}")
return []
def compare_prices(self, product_name: str) -> List[Dict]:
"""여러 사이트에서 가격을 비교합니다."""
all_products = []
# 여러 사이트에서 검색
all_products.extend(self.scrape_site_a(product_name))
all_products.extend(self.scrape_site_b(product_name))
# 가격순 정렬
all_products.sort(key=lambda x: x['price'])
return all_products
# 사용 예시
scraper = PriceScraper()
products = scraper.compare_prices('무선 이어폰')
print(f"'{product_name}' 검색 결과:\n")
for i, product in enumerate(products, 1):
print(f"{i}. [{product['site']}] {product['title']}")
print(f" 가격: {product['price']:,}원")
print(f" 링크: {product['link']}\n")
if products:
cheapest = products[0]
print(f"최저가: {cheapest['site']} - {cheapest['price']:,}원")
예제 3: 테이블 데이터 추출
import requests
from bs4 import BeautifulSoup
import pandas as pd
def scrape_table_to_dataframe(url: str, table_index: int = 0) -> pd.DataFrame:
"""웹 페이지의 테이블을 DataFrame으로 변환합니다."""
response = requests.get(url)
soup = BeautifulSoup(response.text, 'lxml')
# 원하는 테이블 선택
tables = soup.find_all('table')
if not tables or table_index >= len(tables):
raise ValueError("테이블을 찾을 수 없습니다.")
table = tables[table_index]
# 헤더 추출
headers = []
header_row = table.find('thead')
if header_row:
headers = [th.get_text(strip=True) for th in header_row.find_all('th')]
else:
# thead가 없으면 첫 번째 tr을 헤더로
first_row = table.find('tr')
headers = [th.get_text(strip=True) for th in first_row.find_all(['th', 'td'])]
# 데이터 추출
rows = []
for tr in table.find_all('tr')[1:]: # 첫 행(헤더) 제외
cells = tr.find_all(['td', 'th'])
row = [cell.get_text(strip=True) for cell in cells]
if row: # 빈 행 제외
rows.append(row)
# DataFrame 생성
df = pd.DataFrame(rows, columns=headers)
return df
# 사용 예시
url = 'https://example.com/data-table'
df = scrape_table_to_dataframe(url)
print(df.head())
print(f"\n총 {len(df)}개의 행")
# CSV로 저장
df.to_csv('scraped_data.csv', index=False, encoding='utf-8-sig')
예제 4: 이미지 다운로드
import requests
from bs4 import BeautifulSoup
import os
from urllib.parse import urljoin, urlparse
def download_images(url: str, save_dir: str = 'images', limit: int = 10):
"""웹 페이지의 이미지를 다운로드합니다."""
# 저장 디렉토리 생성
os.makedirs(save_dir, exist_ok=True)
# 페이지 가져오기
response = requests.get(url)
soup = BeautifulSoup(response.text, 'lxml')
# 모든 이미지 태그 찾기
images = soup.find_all('img')
downloaded = 0
for img in images:
if downloaded >= limit:
break
# 이미지 URL
img_url = img.get('src') or img.get('data-src')
if not img_url:
continue
# 절대 URL로 변환
img_url = urljoin(url, img_url)
# 외부 URL은 제외 (선택사항)
if urlparse(img_url).netloc != urlparse(url).netloc:
continue
try:
# 이미지 다운로드
img_response = requests.get(img_url, timeout=10)
img_response.raise_for_status()
# 파일명 생성
filename = os.path.basename(urlparse(img_url).path)
if not filename:
filename = f"image_{downloaded + 1}.jpg"
# 저장
filepath = os.path.join(save_dir, filename)
with open(filepath, 'wb') as f:
f.write(img_response.content)
print(f"다운로드: {filename}")
downloaded += 1
except Exception as e:
print(f"다운로드 실패 ({img_url}): {e}")
print(f"\n총 {downloaded}개의 이미지 다운로드 완료")
# 사용 예시
url = 'https://example.com/gallery'
download_images(url, save_dir='downloaded_images', limit=20)
예제 5: 페이지 네비게이션 (여러 페이지 크롤링)
import requests
from bs4 import BeautifulSoup
import time
from typing import List, Dict
def scrape_multiple_pages(base_url: str, max_pages: int = 5) -> List[Dict]:
"""여러 페이지를 순회하며 데이터를 수집합니다."""
all_items = []
for page in range(1, max_pages + 1):
print(f"페이지 {page} 스크래핑 중...")
# 페이지 URL (사이트마다 다름)
url = f"{base_url}?page={page}"
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'lxml')
# 아이템 추출
items = soup.select('.item')
if not items:
print("더 이상 데이터가 없습니다.")
break
for item in items:
title = item.select_one('.title').get_text(strip=True)
link = item.select_one('a')['href']
all_items.append({
'page': page,
'title': title,
'link': link
})
print(f" - {len(items)}개 아이템 수집")
# 다음 페이지로 이동 (방법 1: 번호)
# 이미 위에서 구현됨
# 다음 페이지로 이동 (방법 2: "다음" 버튼)
next_button = soup.select_one('.pagination .next')
if not next_button:
print("마지막 페이지입니다.")
break
# 서버 부하 방지를 위한 대기
time.sleep(1)
except Exception as e:
print(f"페이지 {page} 스크래핑 오류: {e}")
break
print(f"\n총 {len(all_items)}개 아이템 수집 완료")
return all_items
# 사용 예시
base_url = 'https://example.com/items'
items = scrape_multiple_pages(base_url, max_pages=10)
# 결과 저장
import json
with open('scraped_items.json', 'w', encoding='utf-8') as f:
json.dump(items, f, ensure_ascii=False, indent=2)
예제 6: 동적 컨텐츠 스크래핑 (Selenium 사용)
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from bs4 import BeautifulSoup
import time
def scrape_dynamic_content(url: str):
"""JavaScript로 로딩되는 동적 콘텐츠를 스크래핑합니다."""
# Selenium 설치: pip install selenium
# ChromeDriver 다운로드 필요
# 브라우저 옵션
options = webdriver.ChromeOptions()
options.add_argument('--headless') # 백그라운드 실행
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
# 드라이버 초기화
driver = webdriver.Chrome(options=options)
try:
# 페이지 열기
driver.get(url)
# 특정 요소가 로드될 때까지 대기
wait = WebDriverWait(driver, 10)
wait.until(EC.presence_of_element_located((By.CLASS_NAME, 'content')))
# 스크롤 다운 (무한 스크롤 페이지)
SCROLL_PAUSE_TIME = 2
last_height = driver.execute_script("return document.body.scrollHeight")
while True:
# 아래로 스크롤
driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
time.sleep(SCROLL_PAUSE_TIME)
# 새로운 높이 계산
new_height = driver.execute_script("return document.body.scrollHeight")
if new_height == last_height:
break
last_height = new_height
# 페이지 소스 가져오기
html = driver.page_source
# BeautifulSoup으로 파싱
soup = BeautifulSoup(html, 'lxml')
# 데이터 추출
items = soup.select('.item')
results = []
for item in items:
title = item.select_one('.title').get_text(strip=True)
results.append({'title': title})
return results
finally:
# 브라우저 종료
driver.quit()
# 사용 예시
url = 'https://example.com/dynamic-content'
results = scrape_dynamic_content(url)
print(f"{len(results)}개 아이템 수집")
고급 기법
User-Agent 로테이션
import random
import requests
from bs4 import BeautifulSoup
USER_AGENTS = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36',
]
def get_random_user_agent():
return random.choice(USER_AGENTS)
def scrape_with_rotation(url):
headers = {'User-Agent': get_random_user_agent()}
response = requests.get(url, headers=headers)
return BeautifulSoup(response.text, 'lxml')
에러 처리와 재시도
import time
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
def create_session_with_retry():
"""재시도 로직이 있는 세션 생성"""
session = requests.Session()
retry = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504]
)
adapter = HTTPAdapter(max_retries=retry)
session.mount('http://', adapter)
session.mount('https://', adapter)
return session
# 사용
session = create_session_with_retry()
response = session.get('https://example.com')
속도 제한
import time
from functools import wraps
def rate_limit(max_per_minute):
"""분당 최대 호출 횟수 제한 데코레이터"""
min_interval = 60.0 / max_per_minute
def decorator(func):
last_called = [0.0]
@wraps(func)
def wrapper(*args, **kwargs):
elapsed = time.time() - last_called[0]
left_to_wait = min_interval - elapsed
if left_to_wait > 0:
time.sleep(left_to_wait)
result = func(*args, **kwargs)
last_called[0] = time.time()
return result
return wrapper
return decorator
@rate_limit(max_per_minute=30)
def scrape_page(url):
response = requests.get(url)
return BeautifulSoup(response.text, 'lxml')
자주 묻는 질문
Q1. 웹 스크래핑은 합법인가요?
A: 웹 스크래핑 자체는 합법이지만, 다음을 준수해야 합니다:
- robots.txt 확인 및 준수
- 사이트의 이용 약관 검토
- 개인정보 및 저작권 존중
- 서버에 과도한 부하 주지 않기
# robots.txt 확인
import urllib.robotparser
rp = urllib.robotparser.RobotFileParser()
rp.set_url("https://example.com/robots.txt")
rp.read()
url = "https://example.com/page"
user_agent = "MyBot"
if rp.can_fetch(user_agent, url):
print("스크래핑 허용")
else:
print("스크래핑 금지")
Q2. JavaScript로 렌더링되는 페이지는 어떻게 스크래핑하나요?
A: Selenium이나 Playwright를 사용해야 합니다.
# Selenium
pip install selenium
# Playwright (더 현대적)
pip install playwright
playwright install
Q3. 봇으로 감지되어 차단됩니다.
A: 다음 방법을 시도해보세요:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'ko-KR,ko;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'DNT': '1',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1'
}
# 요청 간격 추가
time.sleep(random.uniform(1, 3))
Q4. 인코딩 문제가 발생합니다.
A: 응답의 인코딩 을 명시적으로 설정하세요.
response = requests.get(url)
response.encoding = 'utf-8' # 또는 'euc-kr', 'cp949' 등
# 자동 감지
import chardet
encoding = chardet.detect(response.content)['encoding']
response.encoding = encoding
Q5. 속도를 높이려면?
A: 비동기 요청을 사용하세요.
import asyncio
import aiohttp
from bs4 import BeautifulSoup
async def fetch(session, url):
async with session.get(url) as response:
return await response.text()
async def scrape_multiple(urls):
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
htmls = await asyncio.gather(*tasks)
results = []
for html in htmls:
soup = BeautifulSoup(html, 'lxml')
# 데이터 추출
results.append(soup.title.string)
return results
# 실행
urls = ['https://example.com/page1', 'https://example.com/page2']
results = asyncio.run(scrape_multiple(urls))
다음 단계
웹 스크래핑을 마스터했다면, 다음 주제를 학습해보세요:
- Selenium/Playwright - 동적 웹 페이지 스크래핑
- Scrapy - 대규모 웹 크롤링 프레임워크
- 데이터 분석 - pandas로 수집한 데이터 분석하기
- API 개발 - 수집한 데이터를 API로 제공하기