👨‍💻 簡介

前篇已經將 TG Bot 整合 MongoDB,現在要將一些函數修改,在對 DB 進行操作前,先對 domain 進行驗證操作。

🛠️ 使用工具

  • Python 3.9.6
  • MongoDB
  • TG Bot

📝 功能需求

  • 新增 domain 前檢查 domain 憑證
  • 透過 TG Bot 檢查所有 domain 是否有過期

🎯Setup

  1. 新增 domain 前檢查 domain 憑證

之前在撰寫 CloudFunction 時有使用到 get_ssl_cert_expiry_date 函數,在這裡先對這個函數做一個簡單的修改,將驗證憑證是否有效以及取得過期時間拆成兩個函數:

def get_ssl_cert_info(domain, check_only=False):
    ssl_context = ssl.create_default_context()
    with ssl_context.wrap_socket(socket.socket(socket.AF_INET), server_hostname=domain) as conn:
        conn.settimeout(3.0)
        try:
            conn.connect((domain, 443))
            if check_only:
                return True
            else:
                return conn.getpeercert()
        except Exception as e:
            print(f"無法獲取 {domain} 的 SSL,錯誤:{e}")
            return False if check_only else None

def get_ssl_cert_expiry_date(cert):
    if cert is None:
        return None
    try:
        expire_date = datetime.strptime(cert["notAfter"], "%b %d %H:%M:%S %Y %Z")
        return expire_date
    except Exception as e:
        print(f"Error parsing SSL certificate's expiry date: {e}")
        return None

接著在 handle_add_command 新增這個函數做驗證:

@bot.message_handler(commands=["add"])
def handle_add_command(message):
    try:
        _, env, domain = message.text.split(maxsplit=2)
    except ValueError:
        bot.reply_to(
            message, "使用方式不正確。請按照以下格式輸入:\n/add <env> <domain>"
        )
        return
    # 驗證憑證
    if get_ssl_cert(domain,check_only=True):
        add_successful = add_domain_to_mongodb(collection, env, domain)

        if add_successful:
            bot.reply_to(message, f"域名添加成功\n環境為{env}\ndomain為{domain}")
        else:
            bot.reply_to(message, "域名添加失敗,請檢查輸入的數據。")
    else:
        bot.reply_to(message, "證書檢查失敗,請檢查輸入的域名是否正確。")

這樣就完成了添加前的驗證了,接著來處理對所有的 domain 進行檢查過期時間。

  1. 透過 TG Bot 檢查所有 domain 是否有過期

這裡會用到之前在撰寫 CloudFunction 時使用的 check_ssl_expiration 以及 send_notification 兩個函數,多添加一個參數,改成傳入憑證取得過期時間,我們先稍微修改一下:

def check_ssl_expiration(
    domain, cert, env, platform, telegram_bot_token, telegram_group_id
):
    expire_date = get_ssl_cert_expiry_date(cert)
    if expire_date:
        remaining_days = (expire_date - datetime.utcnow()).days
        if remaining_days <= 30:
            message = "\n".join(
                [
                    "來源: Python",
                    "標題: 憑證到期",
                    f"域名: {domain}",
                    f"到期日: {expire_date.strftime('%Y-%m-%d')}",
                    f"平台: {platform}",
                    f"環境: {env}",
                ]
            )

            print(f"{domain} 的 SSL 證書將在 {remaining_days} 天內過期。")
            send_notification(message, domain, telegram_bot_token, telegram_group_id)
        else:
            print(
                f"{domain} 的 SSL 證書過期日期是 {expire_date.strftime('%Y-%m-%d')}。"
            )


def send_notification(message, domain, telegram_bot_token, telegram_group_id):
    telegram_send_message_url = (
        f"https://api.telegram.org/bot{telegram_bot_token}/sendMessage"
    )
    response = requests.post(
        telegram_send_message_url, data={"chat_id": telegram_group_id, "text": message}
    )

    if response.status_code == 200:
        print(f"已為 {domain} 發送通知")
    else:
        print(f"為 {domain} 發送失敗")

接著新增一個新檔案叫做 scheduler_jobs.py ,然後將之前的 check_ssl_cloud_function 重新命名並稍微做個修改:

def perform_ssl_checks(
    bot, collection, chat_id, platform, telegram_bot_token, telegram_group_id
):
    domain_data = load_domain_envs_from_mongodb(collection)
    for env, domains in domain_data.items():
        for domain in domains:
            cert = get_ssl_cert(domain, check_only=False)
            check_ssl_expiration(
                domain, cert, env, platform, telegram_bot_token, telegram_group_id
            )
    bot.send_message(
        chat_id, "所有域名的 SSL 到期時間檢查完成。", parse_mode="Markdown"
    )

再來可以將 perform_ssl_checks 進行包裝:

def setup_scheduler(
    bot, collection, chat_id, platform, telegram_bot_token, telegram_group_id
):
    schedule.every(10).seconds.do(
        perform_ssl_checks,
        bot,
        collection,
        chat_id,
        platform,
        telegram_bot_token,
        telegram_group_id,
    )

最後就是在 main 裡進行調用:

import schedule

mongodb_uri = "mongodb://rootuser:rootpass@localhost:27017/mydatabase?authSource=admin"
telegram_bot_token = "your-bot-token"
bot = telebot.TeleBot(telegram_bot_token)
platform = "gcp"
telegram_group_id = "your-chat-id"

def run_schedule():
    while True:
        schedule.run_pending()
        time.sleep(1)


if __name__ == "__main__":
    client = init_mongo_client(mongodb_uri)
    collection_name = "domains"
    collection = get_collection(client, collection_name)
    setup_scheduler(
        bot, collection, telegram_group_id, platform, telegram_bot_token, telegram_group_id
    )
    # 創建一個 Thread 物件,target 指定了這個線程要執行的函數
    t = threading.Thread(target=run_schedule)
    # 啟動線程
    t.start()
    # 主線程繼續執行 Telegram bot 的無限輪詢
    bot.infinity_polling()

這裡需注意,因為需要同時執行多個任務,像是定時檢查域名或是當有指令檢查所有域名,如果剛好某個操作比較耗時,會等到操作完,才執行下一個操作,因此需要使用到 threading Library,讓這些操作都可以在後台執行,不影響主程序,文件在這

使用的原因為:

  1. 非阻塞執行:在不使用 threading 的情況下,如果你的程式碼先執行定時任務的無限循環(如 run_schedule 函數),那麼程式將永遠停留在那個循環中,後面的代碼(如啟動 Telegram bot 的代碼)將永遠不會執行。使用 threading 可以讓定時任務在一個獨立的線程中運行,從而不會阻塞主線程上的其他操作。

  2. 提高效率:threading 允許多個操作同時進行,從而提高程式的執行效率。特別是在進行網路請求或者其他需要等待的操作時,多線程可以在等待時處理其他任務,提高資源利用率。

執行後可以在 terminal 看到如下資訊: TG 會顯示資訊如圖: 而如果 30 天內快到期,顯示的資訊如圖: TG 資訊如圖:

到此,基本的程式碼算是都完成了,下一篇會做簡單的整理程式碼,稍微優化。

如果想看完整程式碼可以參考這裡 🔗 專案 repo –> ep4-cert