Telegram 憑證監控機器人實作 EP5 - 代碼優化
👨💻 簡介
在這篇文章中,我們將對 Telegram 憑證監控機器人的代碼進行優化。我們會新增一些簡單的指令,並且將部分變數改為從 yaml 檔案中讀取,而不是直接寫死在程式碼中。這些改進目的是提高可讀性和可維護性。
🛠️ 使用工具
- Python 3.9.6
- MongoDB
- TG Bot
📝 功能需求
- 新增 TG 指令 show_cert_info
- 新增 TG 指令 help
- 調整參數讀取
🎯Setup
- 新增 TG 指令 show_cert_info
這裡會用到之前寫的 get_ssl_cert
函數,可以印出證書來看一下有什麼資訊:
{'subject': ((('countryName', 'US'),), (('stateOrProvinceName', 'California'),), (('localityName', 'San Francisco'),), (('organizationName', 'Wikimedia Foundation, Inc.'),), (('commonName', '*.wikipedia.org'),)), 'issuer': ((('countryName', 'US'),), (('organizationName', 'DigiCert Inc'),), (('commonName', 'DigiCert TLS Hybrid ECC SHA384 2020 CA1'),)), 'version': 3, 'serialNumber': '07419E39583A4C76CF1EA14347FA5F3A', 'notBefore': 'Oct 18 00:00:00 2023 GMT', 'notAfter': 'Oct 16 23:59:59 2024 GMT', 'subjectAltName': (('DNS', '*.wikipedia.org'), ('DNS', 'wikimedia.org'), ('DNS', 'mediawiki.org'), ('DNS', 'wikibooks.org'), ('DNS', 'wikidata.org'), ('DNS', 'wikinews.org'), ('DNS', 'wikiquote.org'), ('DNS', 'wikisource.org'), ('DNS',
....
資訊整理如下:
下表將上述 SSL 證書資訊整理成表格形式:
編號 | 欄位名稱 | 描述 |
---|---|---|
1 | subject | 證書的主題,包含證書擁有者的詳細資訊,如國家名稱、省份名稱、地區名稱、組織名稱和通用名稱。 |
1.1 countryName | 國家名稱 | |
1.2 stateOrProvinceName | 州或省的名稱 | |
1.3 localityName | 地區或城市的名稱 | |
1.4 organizationName | 擁有證書的組織名稱 | |
1.5 commonName | 通用名稱,通常是域名 | |
2 | issuer | 簽發者的資訊,包含簽發該 SSL 證書的機構的詳細資訊。 |
2.1 countryName | 簽發者的國家名稱 | |
2.2 organizationName | 簽發該證書的組織名稱 | |
2.3 commonName | 簽發者的通用名稱,通常是 CA 的名稱 | |
3 | version | 證書的版本號,目前有三個版本:v1、v2 和 v3。SSL/TLS 交易通常使用版本 3。 |
4 | serialNumber | 證書的序列號,簽發者賦予的唯一標識符。 |
5 | notBefore | 證書的生效日期,表示該證書在此日期之前不可用。 |
6 | notAfter | 證書的到期日期,表示該證書在此日期之後不再有效。 |
7 | subjectAltName | 主體別名,包括證書可以使用的其他域名列表,使一個證書可以對多個域名有效。 |
8 | OCSP | 在線證書狀態協定(Online Certificate Status Protocol)的網址,用於實時檢查 SSL 證書的撤銷狀態。 |
9 | caIssuers | 指向簽發此證書的 CA 證書的 URL,客戶端可以從這個位置獲取簽發者的證書。 |
10 | crlDistributionPoints | 指向證書撤銷列表(Certificate Revocation List, CRL)的 URL,客戶端可以從這些位置檢查證書是否被撤銷。 |
這個表格整理了 SSL 證書中的關鍵資訊,幫助理解和快速查找每個部分的功能和用途。
可以依照自己的需求看需要什麼資訊,這裡我新增一個新的函數,來解析取得的證書訊息:
def parse_ssl_cert_info(domain, cert):
issuer = dict(x[0] for x in cert["issuer"])
subject = dict(x[0] for x in cert["subject"])
issued_to = subject.get("commonName", subject.get("organizationName", ""))
issued_by = issuer.get("commonName", issuer.get("organizationName", ""))
valid_from = datetime.strptime(cert["notBefore"], "%b %d %H:%M:%S %Y %Z").strftime(
"%Y-%m-%d"
)
valid_until = datetime.strptime(cert["notAfter"], "%b %d %H:%M:%S %Y %Z").strftime(
"%Y-%m-%d"
)
cert_info = (
f"Domain: {domain}\n"
f"Issued To: {issued_to}\n"
f"Issued By: {issued_by}\n"
f"Valid From: {valid_from}\n"
f"Valid Until: {valid_until}"
)
return cert_info
主要作法是將憑證的資訊從字典中提取自己所需要的欄位,然後做一些格式上的轉換。
接著就可以到 TG Bot 來新增這條指令:
@bot.message_handler(commands=["cert_info"])
def send_cert_info(message):
_, get_domain = message.text.split(maxsplit=1)
cert = get_ssl_cert_info(get_domain, check_only=False)
cert_info = parse_ssl_cert_info(get_domain, cert)
bot.send_message(message.chat.id, cert_info)
如果解析失敗或是沒有輸入域名,也需要告訴使用者錯誤訊息:
@bot.message_handler(commands=["cert_info"])
def send_cert_info(message):
try:
_, get_domain = message.text.split(maxsplit=1)
except ValueError:
# 處理使用者沒有提供域名的情況
bot.send_message(message.chat.id, "請提供一個域名。例如:/cert_info example.com")
return
cert = get_ssl_cert_info(get_domain, check_only=False)
if cert is None or cert is False:
# 如果無法獲取證書,直接回傳錯誤訊息
error_message = f"域名錯誤: 無法獲取 {get_domain} 的 SSL 證書資訊。"
bot.send_message(message.chat.id, error_message)
else:
cert_info = parse_ssl_cert_info(get_domain, cert)
bot.send_message(message.chat.id, cert_info)
最後到 TG 輸入指令來確認一下:
確認輸出的資訊符合自己所需。
- 新增 TG 指令 help
當指令比較多時,需要一個 help 指令來幫助我了解每個指令的用途,因此也可以新增一個簡單的 help 指令:
@bot.message_handler(commands=["help"])
def send_help_message(message):
help_text = """
以下是可用的命令列表及其用途:
/cert_info <domain> - 獲取指定域名的 SSL 證書資訊。
/get_all - 從 MongoDB 獲取所有域名環境的資訊。
/get <env> <domain> - 獲取指定環境下域名的資訊。
/edit <env> <old_domain> <new_domain> - 更新指定環境下域名的資訊。
/add <env> <domain> - 向 MongoDB 添加一個新的域名及其環境。
/del <env> <domain> - 從 MongoDB 刪除指定環境下的域名。
/bulk_add <env> <domain1> <domain2> ... - 向 MongoDB 批量添加多個域名及其環境。
請根據需要使用上述命令。
"""
bot.send_message(message.chat.id, help_text)
執行後的輸出會長這樣: 這樣當指令多時就可以比較方便顯示指令。
- 調整參數讀取
這裡有兩種做法,一個是儲存到 config.yaml
,然後進行讀取,另一個作法是寫成環境變數放在 docker-compose.yaml
,然後透過啟動容器時映射到容器裡。
3-1. 透過 yaml 傳入參數
第一種做法比較直覺,把一些 Mongo 以及 TG 的相關參數放到 config.yaml
,接著透過 yaml Library 進行讀取:
mongodb_uri: "mongodb://rootuser:rootpass@localhost:27017/mydatabase?authSource=admin"
telegram_bot_token: "<YOUR-BOT-TOKEN>"
platform: "gcp"
telegram_group_id: "<YOUR-CHAT-ID>"
# config_loader.py
import yaml
def load_config():
with open("config.yaml", "r") as file:
return yaml.safe_load(file)
def get_mongodb_uri():
config = load_config()
return config.get("mongodb_uri", None)
def get_telegram_config():
config = load_config()
return {
"telegram_bot_token": config.get("telegram_bot_token", None),
"platform": config.get("platform", None),
"telegram_group_id": config.get("telegram_group_id", None)
}
3-2. 透過環境變數傳入參數
直接在 docker-compose.yaml
添加 env:
services
telegram_bot:
build: .
container_name: telegram_bot_container
depends_on:
- mongodb
environment:
TELEGRAM_BOT_TOKEN: <YOUR-BOT-TOKEN>
TELEGRAM_GROUP_ID: <YOUR-CHAT-ID>
MONGODB_URI: mongodb://rootuser:rootpass@mongodb:27017/mydatabase?authSource=admin
command: python main.py
restart: always
然後使用 os
Library 進行調用:
import os
def get_mongodb_uri():
return os.getenv('MONGODB_URI', 'mongodb://localhost:27017/mydatabase')
def get_telegram_config():
return {
"telegram_bot_token": os.getenv('TELEGRAM_BOT_TOKEN', ''),
"platform": os.getenv('PLATFORM', 'local'),
"telegram_group_id": os.getenv('TELEGRAM_GROUP_ID', '')
}
最後就能到 main.py
進行調用:
from config_loader import get_telegram_config
telegram_config = get_telegram_config()
telegram_bot_token = telegram_config["telegram_bot_token"]
platform = telegram_config["platform"]
telegram_group_id = telegram_config["telegram_group_id"]
以上就是目前的優化,之後有想到會再回來補充。 下一篇會開始進行容器化的部分~
如果想看完整程式碼可以參考這裡 🔗 專案 repo –> ep5-optimize