👨‍💻 簡介

在這篇文章中,我們將對 Telegram 憑證監控機器人的代碼進行優化。我們會新增一些簡單的指令,並且將部分變數改為從 yaml 檔案中讀取,而不是直接寫死在程式碼中。這些改進目的是提高可讀性和可維護性。

🛠️ 使用工具

  • Python 3.9.6
  • MongoDB
  • TG Bot

📝 功能需求

  • 新增 TG 指令 show_cert_info
  • 新增 TG 指令 help
  • 調整參數讀取

🎯Setup

  1. 新增 TG 指令 show_cert_info

這裡會用到之前寫的 get_ssl_cert 函數,可以印出證書來看一下有什麼資訊:

{'subject': ((('countryName', 'US'),), (('stateOrProvinceName', 'California'),), (('localityName', 'San Francisco'),), (('organizationName', 'Wikimedia Foundation, Inc.'),), (('commonName', '*.wikipedia.org'),)), 'issuer': ((('countryName', 'US'),), (('organizationName', 'DigiCert Inc'),), (('commonName', 'DigiCert TLS Hybrid ECC SHA384 2020 CA1'),)), 'version': 3, 'serialNumber': '07419E39583A4C76CF1EA14347FA5F3A', 'notBefore': 'Oct 18 00:00:00 2023 GMT', 'notAfter': 'Oct 16 23:59:59 2024 GMT', 'subjectAltName': (('DNS', '*.wikipedia.org'), ('DNS', 'wikimedia.org'), ('DNS', 'mediawiki.org'), ('DNS', 'wikibooks.org'), ('DNS', 'wikidata.org'), ('DNS', 'wikinews.org'), ('DNS', 'wikiquote.org'), ('DNS', 'wikisource.org'), ('DNS',
....

資訊整理如下:

下表將上述 SSL 證書資訊整理成表格形式:

編號欄位名稱描述
1subject證書的主題,包含證書擁有者的詳細資訊,如國家名稱、省份名稱、地區名稱、組織名稱和通用名稱。
1.1 countryName國家名稱
1.2 stateOrProvinceName州或省的名稱
1.3 localityName地區或城市的名稱
1.4 organizationName擁有證書的組織名稱
1.5 commonName通用名稱,通常是域名
2issuer簽發者的資訊,包含簽發該 SSL 證書的機構的詳細資訊。
2.1 countryName簽發者的國家名稱
2.2 organizationName簽發該證書的組織名稱
2.3 commonName簽發者的通用名稱,通常是 CA 的名稱
3version證書的版本號,目前有三個版本:v1、v2 和 v3。SSL/TLS 交易通常使用版本 3。
4serialNumber證書的序列號,簽發者賦予的唯一標識符。
5notBefore證書的生效日期,表示該證書在此日期之前不可用。
6notAfter證書的到期日期,表示該證書在此日期之後不再有效。
7subjectAltName主體別名,包括證書可以使用的其他域名列表,使一個證書可以對多個域名有效。
8OCSP在線證書狀態協定(Online Certificate Status Protocol)的網址,用於實時檢查 SSL 證書的撤銷狀態。
9caIssuers指向簽發此證書的 CA 證書的 URL,客戶端可以從這個位置獲取簽發者的證書。
10crlDistributionPoints指向證書撤銷列表(Certificate Revocation List, CRL)的 URL,客戶端可以從這些位置檢查證書是否被撤銷。

這個表格整理了 SSL 證書中的關鍵資訊,幫助理解和快速查找每個部分的功能和用途。

可以依照自己的需求看需要什麼資訊,這裡我新增一個新的函數,來解析取得的證書訊息:

def parse_ssl_cert_info(domain, cert):
    issuer = dict(x[0] for x in cert["issuer"])
    subject = dict(x[0] for x in cert["subject"])
    issued_to = subject.get("commonName", subject.get("organizationName", ""))
    issued_by = issuer.get("commonName", issuer.get("organizationName", ""))
    valid_from = datetime.strptime(cert["notBefore"], "%b %d %H:%M:%S %Y %Z").strftime(
        "%Y-%m-%d"
    )
    valid_until = datetime.strptime(cert["notAfter"], "%b %d %H:%M:%S %Y %Z").strftime(
        "%Y-%m-%d"
    )

    cert_info = (
        f"Domain: {domain}\n"
        f"Issued To: {issued_to}\n"
        f"Issued By: {issued_by}\n"
        f"Valid From: {valid_from}\n"
        f"Valid Until: {valid_until}"
    )
    return cert_info

主要作法是將憑證的資訊從字典中提取自己所需要的欄位,然後做一些格式上的轉換。

接著就可以到 TG Bot 來新增這條指令:

@bot.message_handler(commands=["cert_info"])
def send_cert_info(message):
    _, get_domain = message.text.split(maxsplit=1)
    cert = get_ssl_cert_info(get_domain, check_only=False)
    cert_info = parse_ssl_cert_info(get_domain, cert)
    bot.send_message(message.chat.id, cert_info)

如果解析失敗或是沒有輸入域名,也需要告訴使用者錯誤訊息:

@bot.message_handler(commands=["cert_info"])
def send_cert_info(message):
    try:
        _, get_domain = message.text.split(maxsplit=1)
    except ValueError:
        # 處理使用者沒有提供域名的情況
        bot.send_message(message.chat.id, "請提供一個域名。例如:/cert_info example.com")
        return
    cert = get_ssl_cert_info(get_domain, check_only=False)
    if cert is None or cert is False:
        # 如果無法獲取證書,直接回傳錯誤訊息
        error_message = f"域名錯誤: 無法獲取 {get_domain} 的 SSL 證書資訊。"
        bot.send_message(message.chat.id, error_message)
    else:
        cert_info = parse_ssl_cert_info(get_domain, cert)
        bot.send_message(message.chat.id, cert_info)

最後到 TG 輸入指令來確認一下:

確認輸出的資訊符合自己所需。

  1. 新增 TG 指令 help

當指令比較多時,需要一個 help 指令來幫助我了解每個指令的用途,因此也可以新增一個簡單的 help 指令:

@bot.message_handler(commands=["help"])
def send_help_message(message):
    help_text = """
以下是可用的命令列表及其用途:

/cert_info <domain> - 獲取指定域名的 SSL 證書資訊。
/get_all - 從 MongoDB 獲取所有域名環境的資訊。
/get <env> <domain> - 獲取指定環境下域名的資訊。
/edit <env> <old_domain> <new_domain> - 更新指定環境下域名的資訊。
/add <env> <domain> - 向 MongoDB 添加一個新的域名及其環境。
/del <env> <domain> - 從 MongoDB 刪除指定環境下的域名。
/bulk_add <env> <domain1> <domain2> ... - 向 MongoDB 批量添加多個域名及其環境。

請根據需要使用上述命令。
"""
    bot.send_message(message.chat.id, help_text)

執行後的輸出會長這樣: 這樣當指令多時就可以比較方便顯示指令。

  1. 調整參數讀取

這裡有兩種做法,一個是儲存到 config.yaml,然後進行讀取,另一個作法是寫成環境變數放在 docker-compose.yaml,然後透過啟動容器時映射到容器裡。

3-1. 透過 yaml 傳入參數

第一種做法比較直覺,把一些 Mongo 以及 TG 的相關參數放到 config.yaml,接著透過 yaml Library 進行讀取:

mongodb_uri: "mongodb://rootuser:rootpass@localhost:27017/mydatabase?authSource=admin"
telegram_bot_token: "<YOUR-BOT-TOKEN>"
platform: "gcp"
telegram_group_id: "<YOUR-CHAT-ID>"
# config_loader.py
import yaml

def load_config():
    with open("config.yaml", "r") as file:
        return yaml.safe_load(file)

def get_mongodb_uri():
    config = load_config()
    return config.get("mongodb_uri", None)

def get_telegram_config():
    config = load_config()
    return {
        "telegram_bot_token": config.get("telegram_bot_token", None),
        "platform": config.get("platform", None),
        "telegram_group_id": config.get("telegram_group_id", None)
    }

3-2. 透過環境變數傳入參數

直接在 docker-compose.yaml 添加 env:

services
  telegram_bot:
    build: .
    container_name: telegram_bot_container
    depends_on:
      - mongodb
    environment:
      TELEGRAM_BOT_TOKEN: <YOUR-BOT-TOKEN>
      TELEGRAM_GROUP_ID: <YOUR-CHAT-ID>
      MONGODB_URI: mongodb://rootuser:rootpass@mongodb:27017/mydatabase?authSource=admin
    command: python main.py
    restart: always

然後使用 os Library 進行調用:

import os

def get_mongodb_uri():
    return os.getenv('MONGODB_URI', 'mongodb://localhost:27017/mydatabase')

def get_telegram_config():
    return {
        "telegram_bot_token": os.getenv('TELEGRAM_BOT_TOKEN', ''),
        "platform": os.getenv('PLATFORM', 'local'),
        "telegram_group_id": os.getenv('TELEGRAM_GROUP_ID', '')
    }

最後就能到 main.py 進行調用:

from config_loader import get_telegram_config

telegram_config = get_telegram_config()
telegram_bot_token = telegram_config["telegram_bot_token"]
platform = telegram_config["platform"]
telegram_group_id = telegram_config["telegram_group_id"]

以上就是目前的優化,之後有想到會再回來補充。 下一篇會開始進行容器化的部分~

如果想看完整程式碼可以參考這裡 🔗 專案 repo –> ep5-optimize