👨‍💻簡介

Amazon Managed Streaming for Apache Kafka(Amazon MSK)是 Amazon 推出的完全代管、具有高可用以及安全的 Apache Kafka 服務。在這篇文章中,會使用 AWS 建立 MSK,並使用 SASL/SCRAM 的驗證方式來完成整個設定。

🎯setup

建立叢集

進入到 MSK 頁面,然後創建叢集

  1. 叢集設定

創建時選自訂建立,叢集類型我這邊使用的是已佈建,規格可以選最小的,版本使用 AWS 建議的,區域數量為3,達到高可用,最後叢集組態先使用預設即可

  1. 聯網

聯網的部分可以使用 AWS 幫你建立好的預設 VPC,我在前一步選擇三個可用區,因此會需要有三條不同區域的子網,安全群組也選擇預設的即可

  1. 安全

安全這邊選擇 SASL/SCRAM 身分驗證,加密靜態資料選擇使用 AWS 受管金鑰,待會會提到

SASL/SCRAM(Simple Authentication andSecurity Layer/ Salted Challenge Response Mechanism),是一種使用帳號和密碼來完成驗證的方式,但它的前提是 Client 端與 Proxy 之間必須使用 TLS 加密。

  1. 監控和標籤

這裡監控只要選基本的即可

接著等到叢集建立完成後開始設定密鑰,建立叢集大概需要15分鐘

建立KMS Key

AmazonMSK 使用 Amazon Secret Manager 來儲存 MSK 使用的帳號與密碼。在設定 Secret Manager 之前,必須先在 Amazon KMS(Key Management Service) 創建一個使用者託管的 Key。

KMS是 Amazon 託管的密鑰管理服務

  1. 設定金鑰

建立過程都先選默認的,如果後續要調整可以再改。

  1. 新增標籤

別名叫kafka-kms

  1. 定義金鑰管理許可

選擇自己的 IAM user

  1. 定義金鑰使用許可

一樣選擇自己的 IAM user 即可

回到主頁就可以看到自己的金鑰建立完成了

建立 SecretsManager

Amazon Secrets Manager 是密碼管理服務,可以把敏感資料儲存在這,不需要放在程式裡,可以透過 API 調用取得,避免在程式外洩。 AmazonMSK 通過 Secrets Manager 來儲存帳號與密碼。

這裡要注意一點,使用預設 AWS KMS 金鑰建立的密碼無法與 Amazon MSK 叢集搭配使用。

  1. 選擇機密類型

在 Secrets Manager 中選擇其他類型的密碼,這邊要打上我們需要驗證的 username 以及 password,加密的金鑰要選擇前面建立的 KMS 密鑰。

  1. 設定機密

在建立密碼時,密碼名稱必須包含前綴為 AmazonMSK_

  1. 設定輪換

選擇預設即可

關聯密鑰到 MSK Cluster

回到主頁面的屬性,往下滑找到關聯機密,選擇剛剛建立的機密即可

設定權限

建立 topic 之前,我們需要先透過 Broker 去修改帳號的權限,接著開放公開存取。 在相同子網路上建立 ec2,透過私有連線下去修改權限,先去 Client 端主機下載相對應版本的 Kafka,可以透過 [此連結](Apache Kafka)查看。

wget https://downloads.apache.org/kafka/3.8.0/kafka_2.13-3.8.0.tgz

下載完成後解壓縮。

tar -xzf kafka_2.13-3.8.0.tgz

如果機器上沒有安裝 java 也一併安裝

sudo yum -y install java-11

接著要先取得 BootstrapConnectString,將使用者權限設定完成。 在 MSK 主頁面點擊檢視用戶端資訊可以取得 BootstrapConnectString。

再來到 Client 端機器上的 kafka 目錄下,因為在執行權限設定時,需要進行身份驗證,所以需要建立一份驗證檔,取名為 client_sasl.properties,裡面會有 Secrets Manager 設定的帳號密碼,將這檔案建立在 kafka 目錄下

client_sasl.properties 內容如下

security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
  username="alan" \
  password="alan-secret";

執行以下命令來修改權限,替換 BootStrapConnectString 為剛剛複製的。如果 Cluster 的節點數小於3個,replication-factor要設定小於3。

export TOPIC="*"
export USER="alan"
export KAFKA_OPTS="-Xmx256m -Xms256m"
export BootStrapConnectString="b-2.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9096,b-1.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9096,b-3.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9096"

## 允許使用者所有權限。
./bin/kafka-acls.sh --bootstrap-server $BootStrapConnectString \
--add --allow-principal "User:$USER" \
--operation ALL \
--topic $TOPIC \
--command-config client_sasl.properties

以下是單獨權限設定

## 授予指定的使用者建立指定的主題。
./bin/kafka-acls.sh --bootstrap-server $BootStrapConnectString \
--add --allow-principal "User:$USER" \
--operation Create \
--topic $TOPIC \
--command-config client_sasl.properties

## 授予指定使用者對所有群組和指定主題的讀取權限。
## 該使用者可以從指定的主題讀取消息,並且適用於所有消費者群組。
./bin/kafka-acls.sh --bootstrap-server $BootStrapConnectString \
--add --allow-principal "User:$USER" \
--operation Read --group="*" \
--topic $TOPIC \
--command-config client_sasl.properties

## 授予指定的使用者對指定的主題的寫入權限。
## 該使用者可以向指定的主題發送消息。
./bin/kafka-acls.sh --bootstrap-server $BootStrapConnectString \
--add --allow-principal "User:$USER" \
--operation Write \
--topic $TOPIC \
--command-config client_sasl.properties

## 授予指定的使用者對所有群組和指定的主題的描述權限。
## 該使用者可以查看主題和群組的元數據信息,例如消費者的偏移量和配置詳情。
./bin/kafka-acls.sh --bootstrap-server $BootStrapConnectString \
--add --allow-principal "User:$USER" \
--operation Describe --group="*" \
--topic $TOPIC \
--command-config client_sasl.properties

## 授予指定的使用者對指定的主題的刪除權限。
./bin/kafka-acls.sh --bootstrap-server $BootStrapConnectString \
--add --allow-principal "User:$USER" \
--operation DELETE \
--topic $TOPIC \
--command-config client_sasl.properties

## 允許使用者修改資源(如 topic 的配置)。這個權限對於更改 topic partition 數或設定非常重要。
./bin/kafka-acls.sh --bootstrap-server $BootStrapConnectString \
--add --allow-principal "User:$USER" \
--operation ALTER \
--topic $TOPIC \
--command-config client_sasl.properties

設定完權限後,即可開始透過外網建立 topic

開放外網存取

使用外網存取 msk,必須先開啟公開存取,因此須先設定叢集組態檔,將 allow.everyone.if.no.acl.found=false 加上去,讓叢集使用這份組態檔

點擊左側菜單欄的叢集組態 -> 建立叢集組態 -> 加上 allow.everyone.if.no.acl.found=false

接著回到叢集裡面的屬性,替換剛剛新增的設定檔

好了之後回到叢集裡面的屬性,編輯聯網設定,開啟公開存取

使用帳號與密碼連接到MSK Cluster

可以存取外網後,就可以開始測試外部 Client 端是否可以操作了。先到一台需要透過外網存取的機器,先取得外網 ip,將這 ip 加到MSK 的 SG 裡,開放可以連到 9196 port

安裝kafka以及java,並到 kafka 目錄下設定 client_sasl.properties

security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
  username="alan" \
  password="alan-secret";

首先先建立 topic,一樣使用 Broker 的連線資訊,但是是使用外網,先到叢集那邊複製公有端點

到 kafka 目錄下執行指令建立 topic

export TOPIC="mytopic"
export KAFKA_OPTS="-Xmx256m -Xms256m"
export BootStrapConnectString="b-2-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196,b-3-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196,b-1-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196"

./bin/kafka-topics.sh --create --bootstrap-server $BootStrapConnectString \
--replication-factor 3 \
--partitions 1 \
--topic $TOPIC \
--command-config client_sasl.properties

接著開啟兩個終端視窗,使用 kafka-console,讓第一個視窗成為producer

export TOPIC="mytopic"
export BootStrapConnectString="b-2-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196,b-3-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196,b-1-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196"

./bin/kafka-console-producer.sh --broker-list $BootStrapConnectString \
--topic $TOPIC \
--producer.config client_sasl.properties

接著讓第二個視窗成為consumer

export TOPIC="mytopic"
export BootStrapConnectString="b-2-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196,b-3-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196,b-1-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196"

./bin/kafka-console-consumer.sh --bootstrap-server $BootStrapConnectString \
--topic $TOPIC \
--consumer.config client_sasl.properties

執行後可以看到 Producer 會將訊息傳送出去,並且 Consumer 可以成功接收到訊息

如果要用 python 來測試,須先安裝 boto3 以及 kafka-python package,並且設定好 aws-cli,因為需要取得 secret

pip install boto3
pip install kafka-python

建立一份 producer.py,成為producer

import boto3
import base64
import json
from kafka import KafkaProducer
from kafka import TopicPartition

def get_secret():

    secret_name = "AmazonMSK_kfk01"
    region_name = "ap-northeast-1"

    session = boto3.session.Session()
    client = session.client(
        service_name='secretsmanager',
        region_name=region_name
    )

    try:
        get_secret_value_response = client.get_secret_value(
            SecretId=secret_name
        )
    except ClientError as e:
        # For a list of exceptions thrown, see
        # https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_GetSecretValue.html
        raise e

    secret = get_secret_value_response['SecretString']
    return json.loads(secret)

topic = 'mytopic'
secret = get_secret()
BootstrapBroker_String = 'b-2-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196,b-3-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196,b-1-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196'
producer = KafkaProducer(bootstrap_servers=BootstrapBroker_String,
                                 security_protocol='SASL_SSL',
                                 sasl_mechanism='SCRAM-SHA-512',
                                 sasl_plain_username=secret['username'],
                                 sasl_plain_password=secret['password'],
)

try:
    while True:
        message = input("請輸入您要發送的消息 (輸入 'exit' 退出): ")
        if message.lower() == 'exit':
            print("結束輸入,關閉生產者...")
            break

        # 將用戶輸入的消息編碼為 bytes
        producer.send(topic, message.encode('utf-8'))
        producer.flush()  # 確保消息已經被發送
        print("消息發送成功")
except Exception as e:
    print(f"發送消息失敗: {e}")
finally:
    producer.close()
    print("Kafka 生產者已關閉。")

建立一份consumer.py,成為 consumer

import boto3
import base64
import json
from kafka import KafkaConsumer
from kafka import TopicPartition

def get_secret():

    secret_name = "AmazonMSK_kfk01"
    region_name = "ap-northeast-1"

    session = boto3.session.Session()
    client = session.client(
        service_name='secretsmanager',
        region_name=region_name
    )

    get_secret_value_response = client.get_secret_value(SecretId=secret_name)

    if 'SecretString' in get_secret_value_response:
        secret = get_secret_value_response['SecretString']
    else:
        secret = base64.b64decode(get_secret_value_response['SecretBinary'])

    return json.loads(secret)

TOPIC = "mytopic"

secret = get_secret()
BootstrapBroker_String = 'b-2-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196,b-3-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196,b-1-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196'
consumer = KafkaConsumer(TOPIC, bootstrap_servers=BootstrapBroker_String,
                                 security_protocol='SASL_SSL',
                                 sasl_mechanism='SCRAM-SHA-512',
                                 sasl_plain_username=secret['username'],
                                 sasl_plain_password=secret['password']
)

print("開始接收消息...")
try:
    for message in consumer:
        print(f"接收到消息: {message.value.decode('utf-8')}")
except Exception as e:
    print(f"接收消息失敗: {e}")
finally:
    consumer.close()

📚Reference