主旨
此架構提供了從數據源自動提取、解析、轉換到最終發布在WordPress的完整流程。下方內容以我們的Project AnimeMTM為例。
架構圖

- Data Source:
- 這是整個流程的起始點,代表著可以提供原始資料的來源。它可以是一個網站、數據庫、API等。
- Parser:
- 從「Data Source」獲取的原始資料需要進行解析,以便後續處理。解析器「
Parser1
」到「Parser4
」(或更多)專門針對特定的數據源來進行資料提取。 - 每個解析器都專門負責解析特定的資料結構或格式,並將其轉換為結構化的數據,以供後續流程使用。
- 從「Data Source」獲取的原始資料需要進行解析,以便後續處理。解析器「
- Site1到Site4:
- 這些虛擬框表示每個解析器專門針對的特定網站或數據源。例如,「
Parser1
」可能專門用來解析「Site1」的資料。
- 這些虛擬框表示每個解析器專門針對的特定網站或數據源。例如,「
- Scraped Data:
- 這是經過解析器處理後的結構化數據。這些數據已經從原始的形式轉換為更有組織、易於處理的形式。
- Jinja2:
- 「
Jinja2
」是一個流行的Python
模板引擎。在這裡,它用於將結構化的數據轉換為HTML
格式。這允許使用者在數據轉換為HTML
之前定義自己的模板和格式。
- 「
- HTML:
- 這是經過「
Jinja2
」轉換後的數據,已經轉換為HTML
格式,準備好可以直接在Web上顯示。
- 這是經過「
- WordPress API:
- 這部分代表一個中介步驟,它允許將「
HTML
」內容自動化地推送到「WordPress」平台。
- 這部分代表一個中介步驟,它允許將「
- WordPress:
- 這是最終的發布平台,代表著內容已經可以在網站上公開顯示給使用者。
技術細節
Data Source
這邊使用YAML格式來當作設定檔案使用,分別建立以下四個檔案:
- anime.yaml:此檔案涵蓋了WordPress的標籤和分類,設定動畫在Wordpress POST中顯示的title、tags以及categories。
- pattern.yaml:設定動畫的來源網址以及Parser的微調整會在這裡設定。
- sources.yaml:設定動畫的來源名稱、字幕語言。
- tags.yaml:設定WordPress的標籤和分類ID。
檔案內容
AA:
categories:
- 7
id: 53
name: 無職轉生II~到了異世界就拿出真本事~ (無職転生 Ⅱ ~異世界行ったら本気だす~)
skip: true
tags:
- 12
S1:
name: Anime1
alias: A1
translate: Chinese
S2:
name: 巴哈姆特動畫瘋
alias: 巴哈
translate: Chinese
S3:
name: Muse木棉花
alias: Muse
translate: Chinese
# 可以依需求增加
data:
- name: AA
source:
- name: S1
# 如果某個資源的動畫集數跟其他平台不一致的情況在這邊設定一個可以調整的變數
# episode_offset: -1
url: https://anime1.me/category/2023%e5%b9%b4%e5%a4%8f%e5%ad%a3/%e7%84%a1%e8%81%b7%e8%bd%89%e7%94%9f-%e5%88%b0%e4%ba%86%e7%95%b0%e4%b8%96%e7%95%8c%e5%b0%b1%e6%8b%bf%e5%87%ba%e7%9c%9f%e6%9c%ac%e4%ba%8b-%e7%ac%ac%e4%ba%8c%e5%ad%a3
- name: S2
url: https://ani.gamer.com.tw/animeVideo.php?sn=34092
- name: S3
# 木棉花是 youtube 平台所以填寫撥放清單即可
url: PL12UaAf_xzfoHafXDU3s3QbiDhKzs3QK0
categories:
7: 2023夏
15: 2023秋
tags:
12: 本季完結
13: 連載中
關聯圖

SourceRouter
import time
import yaml
import importlib
import argparse
import os
from datetime import datetime
import logging
def fetch_episodes(source, anime_name: str):
source_name = source.get("name", "")
parser_name = f"parsers.{source_name.lower()}_parser"
try:
module = importlib.import_module(parser_name)
if hasattr(module, "fetch"):
return module.fetch(source)
else:
print(f"{parser_name}.py 沒有 fetch 函數. 跳過此次處理。")
except ImportError:
print(f"找不到模組 {parser_name}. 跳過此次處理。")
return []
def process_anime(anime_pattern, anime_config):
anime_name = anime_pattern.get("name", "")
if anime_config[anime_name]['skip']:
print(f"跳過 {anime_name} 的處理。")
return None
anime_data = {
"name": anime_name,
"time": datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
"source": []
}
for source in anime_pattern.get("source", []):
episodes = fetch_episodes(source, anime_name)
if not episodes:
logging.info(f"Empty episodes list for {anime_name} from {source_name}. Skipping this anime.")
return None
anime_data["source"].append({
"name": source_name,
"episodes": episodes
})
return anime_data
def main():
parser = argparse.ArgumentParser(description="Anime data processing.")
parser.add_argument('-a', '--anime', type=str, help='Specify anime name to process only that anime')
args = parser.parse_args()
try:
with open("settings/pattern.yaml", "r", encoding="utf-8") as f:
pattern_config = yaml.safe_load(f)
with open("settings/anime.yaml", "r", encoding="utf-8") as f:
anime_config = yaml.safe_load(f)
pattern_list = [pattern for pattern in pattern_config.get("data", []) if not args.anime or pattern.get("name") == args.anime]
if args.anime and not pattern_list:
print(f"No patterns found for the specified anime: {args.anime}")
return
output_dir = f"../output/{datetime.now().strftime('%Y%m%d')}/sources"
os.makedirs(output_dir, exist_ok=True)
for index, anime_pattern in enumerate(pattern_list):
anime_data = process_anime(anime_pattern, anime_config)
if anime_data:
append_to_existing(anime_data, f"{output_dir}/{anime_name}.yaml")
if index < len(pattern_list) - 1:
print("等待 2 分鐘...")
time.sleep(120)
print("繼續處理下一個動畫。")
except FileNotFoundError:
print("YAML 配置檔未找到。請確保檔案路徑正確。")
except Exception as e:
print(f"出現錯誤:{e}")
if __name__ == "__main__":
main()
邏輯說明
- 擷取集數資料 (
fetch_episodes
):- 根據給定的Source名稱(S1,S2…之類的),動態導入相對應的模組。
- 如果模組存在且包含
函數,則執行該函數擷取資料。fetch
- 如果出現任何問題,例如模組不存在或缺少
fetch
函數,程式將輸出相關訊息並跳過該來源。
- 處理動畫資料 (
process_anime
):- 首先檢查是否要跳過此動畫的處理。
- 對於每個給定的資料來源,擷取集數資料並將其儲存。
- 主程式 (
main
):- 使用
argparse
模組解析命令列參數,允許用戶更新特定的動畫。 - 從
YAML
配置檔案讀取模式和動畫的設定。 - 根據所選的動畫模式處理每一部動畫。
- 將結果儲存到一個指定的輸出目錄。
- 處理完每部動畫後,等待2分鐘,然後繼續下一部。
- 使用
因為在Pattern
當中並不是每一個動畫在每一個Source
裡面都有,所以這邊依照Source
去動態載入對應的的Parser
。
Parser
在這邊每一個Parser
都需要實現一個fetch
方法,目的是將每個不同來源的資料抓取邏輯(如不同的動畫來源站點)抽象為單獨的策略。下面以s1_parser
跟s3_parser
為例子。
import requests
import re
from bs4 import BeautifulSoup
EPISODE_REGEX = r"\[(\d+)\]"
REQUEST_TIMEOUT = 10 # 10 seconds
def get_html_content(url: str) -> str:
try:
response = requests.get(url, timeout=REQUEST_TIMEOUT)
response.raise_for_status()
return response.text
except requests.RequestException as e:
print(f"HTTP 請求錯誤:{e}")
return ""
def extract_episodes(html_content: str, episode_offset: int) -> list:
episodes = []
soup = BeautifulSoup(html_content, 'html.parser')
for h2_tag in soup.find_all('h2', class_="entry-title"):
a_tag = h2_tag.find('a')
match = re.search(EPISODE_REGEX, a_tag.string)
if match:
episode_num = int(match.group(1)) + episode_offset
episodes.append({"episode": episode_num, "url": a_tag['href']})
return episodes
def fetch(source):
base_url = source.get("url", "")
episode_offset = source.get('episode_offset', 0)
html_content = get_html_content(base_url)
if not html_content:
return []
return extract_episodes(html_content, episode_offset)
import os
import re
import logging
from googleapiclient.discovery import build
logging.basicConfig(level=logging.INFO)
def get_videos_from_playlist(playlist_id, api_key):
youtube = build("youtube", "v3", developerKey=api_key)
page_token = None
all_videos = []
while True:
response = youtube.playlistItems().list(
part="snippet",
maxResults=50,
playlistId=playlist_id,
pageToken=page_token
).execute()
all_videos.extend(response.get('items'))
page_token = response.get('nextPageToken')
if not page_token:
break
return all_videos
def filter_episodes(source, videos):
pattern = source.get("pattern", {})
compile = pattern.get("compile", "")
episode_offset = source.get('episode_offset', 0)
episodes = []
for video in videos:
title = video['snippet']['title']
if title.startswith(compile):
match = re.search(r"第(\d+)話", title)
if match:
episode_num = match.group(1)
episodes.append({
"episode": int(episode_num) + episode_offset,
"url": "https://www.youtube.com/watch?v=" + video['snippet']['resourceId']['videoId']
})
return episodes
def fetch(source):
playlist_id = source.get("playlist_id", "")
try:
API_KEY = os.environ.get("YOUTUBE_API_KEY") # 從環境變量中獲取 API 金鑰
if not API_KEY:
logging.error("API 金鑰未設定在環境變量中。")
return []
videos = get_videos_from_playlist(playlist_id, API_KEY)
filtered_data = filter_episodes(source, videos)
return filtered_data
except Exception as e:
logging.error(f"處理時發生錯誤:{e}")
return []
可以看到每個parser
互不影響且其邏輯都有其專屬的fetch
方法實現,對應於特定的來源。
例如,如果你有一個名為Source1
的動畫來源,其fetch
方法的實現可能包括到該網站的請求、解析網頁內容以獲取動畫集數等。Source3
則可能是從YouTube上獲取動畫的,它會使用YouTube API來搜尋播放清單,並從中過濾出特定模式的動畫集數。
Jinja2
經過parser
之後輸出的資料格式應該會是像下方
,接著我們需要通過YAML
Jinja2
模板,轉換成HTML
格式。
episodes:
0:
- source: S1
url: https://anime1.me/20544
- source: S3
url: https://www.youtube.com/watch?v=YEUMzpnTuzQ
- source: S4
url: https://animeflv.ws//mushoku-tensei-ii-isekai-ittara-honki-dasu-0
1:
- source: S1
url: https://anime1.me/20580
- source: S2
url: https://ani.gamer.com.tw/animeVideo.php?sn=34092
- source: S3
url: https://www.youtube.com/watch?v=3YW0UD_dFU8
- source: S4
url: https://animeflv.ws//mushoku-tensei-ii-isekai-ittara-honki-dasu-1
- source: S5
url: https://gogoanimehd.io/mushoku-tensei-ii-isekai-ittara-honki-dasu-episode-1
- source: S6
url: https://9animetv.to/watch/mushoku-tensei-jobless-reincarnation-season-2-18418?ep=103010
2:
- source: S1
url: https://anime1.me/20633
- source: S2
url: https://ani.gamer.com.tw/animeVideo.php?sn=34213
- source: S3
url: https://www.youtube.com/watch?v=kMXGZL0Hjlc
- source: S4
url: https://animeflv.ws//mushoku-tensei-ii-isekai-ittara-honki-dasu-2
- source: S5
url: https://gogoanimehd.io/mushoku-tensei-ii-isekai-ittara-honki-dasu-episode-2
- source: S6
url: https://9animetv.to/watch/mushoku-tensei-jobless-reincarnation-season-2-18418?ep=103529
name: AA
time: '2023-09-23 20:49:18'
在AnimeMTM是使用Ansible的Jinja2轉換。在這邊我們使用Python的方式來進行示範。Jinja2模板簡單來說就是把內容替換成變數,藉由Jinja2來把這些變數填上去。
{% macro render_links(msources, translation, sources, display_name) %}
{% set link_list = [] %}
{% for source in msources %}
{% if sources[source.source].translate == translation %}
{% set _ = link_list.append('<a href="' + source.url + '">【' + sources[source.source].alias + '】</a>') %}
{% endif %}
{% endfor %}
{% if link_list|length > 0 %}
{{ display_name }}<br>
{{ link_list|join(" ") }}
<br>
{% endif %}
{% endmacro %}
{% for episode_num, msources in metadata.episodes.items()|reverse %}
<h4 class="wp-block-heading">{{ anime[metadata.name].name }}[{{ episode_num }}]</h4>
<p>
{% for translation, display_name in [("Chinese", "中文"), ("English", "English"), ("Raw, Multi", "Raw, Multi-language"), ("Spanish", "Español")] %}
{{ render_links(msources, translation, sources, display_name) }}
{% endfor %}
</p>
{% endfor %}
這段腳本會在指定的目錄下找到所有 .yaml 文件,讀取它們的內容,並將Jinja2渲染後的結果儲存為相應的 .html 文件。path_to_Parser_output_folder 為 Parser 輸出目錄的真實路徑。
from jinja2 import Environment, FileSystemLoader
import yaml
import os
# 讀取 YAML 文件
def load_yaml(file_path):
with open(file_path, 'r', encoding='utf-8') as file:
return yaml.safe_load(file)
# 設定 jinja2 模板環境
template_dir = os.path.dirname(os.path.abspath(__file__)) # 假設所有文件都在當前目錄下
env = Environment(loader=FileSystemLoader(template_dir))
# 加載 jinja2 模板
template = env.get_template("template.html.j2")
# 載入 anime 和 sources 的 YAML 資料
anime = load_yaml("anime.yaml")
sources = load_yaml("sources.yaml")
# 指定目標目錄
directory_path = 'path_to_Parser_output_folder' # 請將這裡的路徑替換成實際的 Parser 輸出目錄
# 讀取目標目錄下所有的文件
for filename in os.listdir(directory_path):
# 檢查是否為 .yaml 文件
if filename.endswith('.yaml'):
# 讀取 metadata
full_path = os.path.join(directory_path, filename)
metadata = load_yaml(full_path)
# 使用 jinja2 渲染模板
rendered_content = template.render(metadata=metadata, anime=anime, sources=sources)
# 儲存渲染後的內容到相應的 .html 文件
output_filename = filename.replace('.yaml', '.html')
output_path = os.path.join(directory_path, output_filename)
with open(output_path, 'w', encoding='utf-8') as output_file:
output_file.write(rendered_content)
print(f"Processed {filename} and saved as {output_filename}")
輸出的HTML如下
<h2 class="wp-block-heading">無職轉生II~到了異世界就拿出真本事~ (無職転生 Ⅱ ~異世界行ったら本気だす~)[2]</h2>
<p>
中文<br>
<a href="https://anime1.me/20633">【A1】</a> <a href="https://ani.gamer.com.tw/animeVideo.php?sn=34213">【巴哈】</a> <a
href="https://www.youtube.com/watch?v=kMXGZL0Hjlc">【Muse】</a>
<br>
English<br>
<a href="https://animeflv.ws//mushoku-tensei-ii-isekai-ittara-honki-dasu-2">【af】</a>
<br>
Raw, Multi-language<br>
<a href="https://9animetv.to/watch/mushoku-tensei-jobless-reincarnation-season-2-18418?ep=103529">【9a】</a>
<br>
</p>
<h2 class="wp-block-heading">無職轉生II~到了異世界就拿出真本事~ (無職転生 Ⅱ ~異世界行ったら本気だす~)[1]</h2>
<p>
中文<br>
<a href="https://anime1.me/20580">【A1】</a> <a href="https://ani.gamer.com.tw/animeVideo.php?sn=34092">【巴哈】</a> <a
href="https://www.youtube.com/watch?v=3YW0UD_dFU8">【Muse】</a>
<br>
English<br>
<a href="https://animeflv.ws//mushoku-tensei-ii-isekai-ittara-honki-dasu-1">【af】</a>
<br>
Raw, Multi-language<br>
<a href="https://9animetv.to/watch/mushoku-tensei-jobless-reincarnation-season-2-18418?ep=103010">【9a】</a>
<br>
</p>
<h2 class="wp-block-heading">無職轉生II~到了異世界就拿出真本事~ (無職転生 Ⅱ ~異世界行ったら本気だす~)[0]</h2>
<p>
中文<br>
<a href="https://anime1.me/20544">【A1】</a> <a href="https://www.youtube.com/watch?v=YEUMzpnTuzQ">【Muse】</a>
<br>
English<br>
<a href="https://animeflv.ws//mushoku-tensei-ii-isekai-ittara-honki-dasu-0">【af】</a>
<br>
</p>
接下來就可以上傳到WordPress囉!
WordPress API
建立應用程式密碼
先進入到個人資料,轉動到下方找到應用程式密碼,輸入密碼名稱(識別用沒有很重要)。

之後按下新增應用程式密碼

接著Python就可以利用這組密碼來進行API發送囉。
WordPress API發送
參數說明:
WORDPRESS_USER
: 登入WordPress的帳號WORDPRESS_KEY
: 這是剛剛建立的應用程式密碼WORDPRESS_URL
: 你的WordPress URL 例https://animemt.com/
import base64
import os
import yaml
import requests
from datetime import datetime
import argparse
import re
def get_text_inside_brackets(s):
# 使用正則表達式找到括號內的文字
result = re.findall(r'((.*?))', s)
return result if result else None
current_date = datetime.now().strftime('%Y%m%d')
WORDPRESS_USER = "YOUR_WORDPRESS_USER" # 替換為你的 WORDPRESS_USER
WORDPRESS_KEY = "YOUR_WORDPRESS_KEY" # 替換為你的 WORDPRESS_KEY
WP_CONNECTION = WORDPRESS_USER + ":" + WORDPRESS_KEY
TOKEN = base64.b64encode(WP_CONNECTION.encode())
WORDPRESS_URL = "YOUR_WORDPRESS_URL" # 替換為你的 WORDPRESS_URL
HEADERS = {
"Authorization": f"Basic {TOKEN.decode('utf-8')}",
}
html_dir = f"../output/{current_date}/html"
class TagManager:
def __init__(self):
with open("./settings/tags.yaml", "r", encoding="utf-8") as f:
self.tags_config = yaml.safe_load(f)
def tags_to_string(self, tag_ids):
tag_names = [self.tags_config["tags"][tag_id]
for tag_id in tag_ids if tag_id in self.tags_config["tags"]]
return ''.join(['[' + tag + ']' for tag in tag_names])
def create_post(content, title, slug, tags=[], categories=[]):
data = {
"title": title,
"slug": slug,
"content": content,
"status": "publish",
"tags": tags,
"categories": categories,
}
response = requests.post(
f"{WORDPRESS_URL}/wp-json/wp/v2/posts", headers=HEADERS, json=data)
return response.json().get("id")
def update_post(post_id, title, slug, content, tags=[], categories=[]):
current_time = datetime.now().isoformat()
data = {
"title": title,
"slug": slug,
"content": content,
"tags": tags,
"categories": categories,
"date": current_time,
}
response = requests.post(
f"{WORDPRESS_URL}/wp-json/wp/v2/posts/{post_id}", headers=HEADERS, json=data)
return response.status_code == 200
def episodes_parse(yaml_path):
with open(yaml_path, "r", encoding="utf-8") as f:
data = yaml.safe_load(f)
# 取得所有的集數keys
episodes = data['episodes']
# 取得所有的鍵並對它們進行分類
numbers = []
strings = []
for key in episodes:
if str(key).isdigit():
numbers.append(int(key))
else:
strings.append(key)
# 找出最大數字和最小數字
min_num = min(numbers)
max_num = max(numbers)
# 格式化結果
if strings:
result = f"[{min_num}-{max_num} + {' + '.join(strings)}]"
else:
result = f"[{min_num}-{max_num}]"
return result
def main():
parser = argparse.ArgumentParser(description="處理動畫資料。")
parser.add_argument('-a', '--anime', type=str, help="要處理的動畫名稱")
args = parser.parse_args()
tag_manager = TagManager()
# 根據是否指定anime_name來篩選.html文件
if args.anime:
html_files = [f for f in os.listdir(f"../output/{current_date}/html")
if f.endswith(".html") and f.startswith(args.anime)]
else:
html_files = [f for f in os.listdir(f"../output/{current_date}/html")
if f.endswith(".html")]
with open("./settings/anime.yaml", "r", encoding="utf-8") as f:
anime_config = yaml.safe_load(f)
for html_file in html_files:
anime_name = os.path.splitext(html_file)[0]
if anime_config[anime_name]['skip']:
print(f"跳過 {anime_name} 的處理。")
continue
current_anime_perser_output = f"../output/{current_date}/sources/{anime_name}.yaml"
# 取得集數
if os.path.exists(current_anime_perser_output):
episodes = episodes_parse(current_anime_perser_output)
else:
print(f"找不到 {anime_name} 的集數資料,跳過處理。")
continue
filepath = os.path.join(html_dir, html_file)
with open(filepath, "r", encoding="utf-8") as f:
content = f.read()
tag_str = tag_manager.tags_to_string(anime_config[anime_name]["tags"])
title = tag_str + episodes + anime_config[anime_name]["name"]
slug = get_text_inside_brackets(anime_config[anime_name]["name"])[-1]
# 比對檔名與anime.yaml中的項目
if anime_config[anime_name]["id"] == None:
new_id = create_post(content, title, slug,
anime_config[anime_name]["tags"], anime_config[anime_name]["categories"])
anime_config[anime_name]["id"] = new_id
else:
update_post(anime_config[anime_name]["id"], title, slug, content,
anime_config[anime_name]["tags"], anime_config[anime_name]["categories"])
print(f"已處理 {anime_name}:{anime_config[anime_name]['name']} 的資料。")
# 更新anime.yaml
with open("./settings/anime.yaml", "w", encoding="utf-8") as f:
yaml.safe_dump(anime_config, f, allow_unicode=True)
if __name__ == "__main__":
main()
其中主要負責發送AIP的是下面這兩個function,其餘的內容主要就是針對tag跟文章標題的文字處理。
def create_post(content, title, slug, tags=[], categories=[]):
data = {
"title": title,
"slug": slug,
"content": content,
"status": "publish",
"tags": tags,
"categories": categories,
}
response = requests.post(
f"{WORDPRESS_URL}/wp-json/wp/v2/posts", headers=HEADERS, json=data)
return response.json().get("id")
def update_post(post_id, title, slug, content, tags=[], categories=[]):
current_time = datetime.now().isoformat()
data = {
"title": title,
"slug": slug,
"content": content,
"tags": tags,
"categories": categories,
"date": current_time,
}
response = requests.post(
f"{WORDPRESS_URL}/wp-json/wp/v2/posts/{post_id}", headers=HEADERS, json=data)
return response.status_code == 200
create_post
函數:- 目的: 這個函數用於在WordPress中建立一篇新的文章。
- 參數:
content
: 文章的內容。title
: 文章的標題。slug
: 文章的縮寫。在WordPress中,這通常用於URL,使其更有可讀性。tags
: 文章的標籤列表。categories
: 文章的分類列表。
- 函數的核心:
- 函數首先建立一個資料字典,其中包括要發送到WordPress的所有相關信息。
- 然後,它使用
requests.post
方法向WordPress的REST API發送POST請求以建立新文章。 - 最後,它從響應中返回新文章的ID。
update_post
函數:- 目的: 這個函數用於更新WordPress中的現有文章。
- 參數:
post_id
: 要更新的文章的ID。title
: 文章的新標題。slug
: 文章的新縮寫。content
: 文章的新內容。tags
: 文章的新標籤列表。categories
: 文章的新分類列表。
- 函數的核心:
- 函數首先獲取當前的ISO時間格式,這將用於更新文章的時間戳。
- 然後建立一個資料字典,其中包括要發送到WordPress的所有相關更新。
- 使用
requests.post
方法向WordPress的REST API發送POST請求以更新指定ID的文章。 - 最後,檢查響應的狀態碼是否為200,如果是,表示更新成功,並返回True,否則返回False。
定期任務
接著把上方的內容利用cronjob
或Scheduler
工具定期去執行,就可以完成這個自動化平台囉。
附上AnimeMTM運作畫面

發佈留言