👨‍💻簡介

上次介紹了使用 SASL/SCRAM 的驗證方式來完成整個設定,這次改成使用 IAM 的驗證方式,因為後續使用其他服務,像是連接器,並不能接受使用 SASL/SCRAM 驗證方式的 MSK,因此會需要改成 IAM 的驗證方式下去建立 MSK

🎯setup

建立叢集

進入到 [MSK 頁面](叢集 | MSK | ap-northeast-1 (amazon.com)),然後創建叢集

  1. 叢集設定

創建時選自訂建立,叢集類型我這邊使用的是已佈建,規格可以選最小的,版本使用 AWS 建議的,區域數量為3,達到高可用,最後叢集組態先使用預設即可

  1. 聯網

聯網的部分可以使用 AWS 幫你建立好的預設 VPC,我在前一步選擇三個可用區,因此會需要有三條不同區域的子網,安全群組也選擇預設的即可

  1. 安全

安全這邊選擇 IAM 角色型身份驗證,加密靜態資料選擇使用 AWS 受管金鑰

  1. 監控和標籤

這裡監控只要選基本的即可

接著等到叢集建立完成後開始設定密鑰,建立叢集大概需要15分鐘

建立 IAM Policy

叢集建立好後可以到屬性查看範例 Policy

這邊快速介紹一下權限用意,由上至下的區塊分別對應的是叢集、Topic、Group

叢集:

  • kafka-cluster:Connect:允許連接到 Kafka 叢集,確保 Client 端或服務可以與 Kafka 叢集進行通信。
  • kafka-cluster:AlterCluster:允許對叢集進行修改操作,比如設定更改、叢集拓撲結構修改等。
  • kafka-cluster:DescribeCluster:允許檢視 Kafka 叢集的設定和狀態,這可能包括叢集節點、分區等信息。

Topic:

  • kafka-cluster:*Topic*:允許與主題相關的所有操作,這裡使用通配符表示操作(例如,創建、刪除、列出、修改 Kafka Topic 等)。
  • kafka-cluster:WriteData:允許將資料寫入 Kafka Topic,這涉及生產者(Producer)將訊息發送到 Kafka。
  • kafka-cluster:ReadData:允許從 Kafka Topic 讀取資料,這涉及消費者(Consumer)從 Kafka 主題中接收訊息。

Group:

  • kafka-cluster:AlterGroup:允許修改 Kafka 消費者群組的相關設定或狀態,這可能包括消費者偏移量的重置等。
  • kafka-cluster:DescribeGroup:允許查看 Kafka 消費者群組的狀態或設定,如消費者群組的成員、消費偏移量等信息。

複製好 Policy 後,去 IAM 建立 Policy,建立時先選擇 JSON

將剛剛複製的 Policy 貼上,並修改裡面的 topic 以及 group,我這邊都使用萬用字元取代

取名為 msk-iam-policy

再來是建立使用者,並套用剛剛建立好的 Policy

好了之後產生 Key,選擇命令列介面即可

複製上述的存取金鑰,待會會在 Client 端機器使用到

開啟公開存取

IAM 驗證模式下,並不需要調整預設的叢集組態檔,就可以把公開存取打開

建立 Client 端機器

這裡一樣可以建立一台 ec2,但不用相同子網,因為我們是使用外部存取。

首先去 Client 端主機下載相對應版本的 Kafka,可以透過 [此連結](Apache Kafka)查看。

wget https://downloads.apache.org/kafka/3.8.0/kafka_2.13-3.8.0.tgz

下載完成後解壓縮。

tar -xzf kafka_2.13-3.8.0.tgz

如果機器上沒有安裝 java 也一併安裝

sudo yum -y install java-11

好了之後到 kafka_2.13-3.8.0/libs 目錄下,下載 Amazon MSK IAM JAR 檔。這份檔案可以讓 Client 端機器存取 MSK

wget https://github.com/aws/aws-msk-iam-auth/releases/download/v1.1.1/aws-msk-iam-auth-1.1.1-all.jar

接著到 kafka_2.13-3.8.0/bin 目錄下,建立一份檔名為 client_iam.properties,將底下內容複製貼上

security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler

再來是要取得 Client 端的外部 ip,添加到安全群組裡,允許我們 Client 端機器可以訪問 MSK,在 IAM 驗證模式下,我們必須允許 Client 端機器訪問 9198 Port

上述都設定好後,就可以測試建立 topic 了

先去 msk 主控台,將公有端點複製

kafka_2.13-3.8.0/bin 使用以下腳本,並將剛剛的存取金鑰以及公有端點替換上去

  • List Topic
export AWS_ACCESS_KEY_ID="your-access-key"
export AWS_SECRET_ACCESS_KEY="your-secret-key"
export KAFKA_OPTS="-Xmx256m -Xms256m"
export BootStrapConnectString="your-public-BootStrapConnectString"

./kafka-topics.sh --list --bootstrap-server $BootStrapConnectString \
--command-config client_iam.properties

  • Create Topic
export AWS_ACCESS_KEY_ID="your-access-key"
export AWS_SECRET_ACCESS_KEY="your-secret-key"
export TOPIC="mytopic"
export KAFKA_OPTS="-Xmx256m -Xms256m"
export BootStrapConnectString="your-public-BootStrapConnectString"

./kafka-topics.sh --create --bootstrap-server $BootStrapConnectString \
--replication-factor 3 \
--partitions 1 \
--topic $TOPIC \
--command-config client_iam.properties

接著建立兩個腳本,來測試 Producer 以及 Consumer

  • Producer
export AWS_ACCESS_KEY_ID="your-access-key"
export AWS_SECRET_ACCESS_KEY="your-secret-key"
export TOPIC="mytopic"
export BootStrapConnectString="your-public-BootStrapConnectString"

./kafka-console-producer.sh --broker-list $BootStrapConnectString \
--topic $TOPIC \
--producer.config client_iam.properties
  • Consumer
export AWS_ACCESS_KEY_ID="your-access-key"
export AWS_SECRET_ACCESS_KEY="your-secret-key"
export TOPIC="mytopic"
export BootStrapConnectString="your-public-BootStrapConnectString"

./kafka-console-consumer.sh --bootstrap-server $BootStrapConnectString \
--topic $TOPIC \
--consumer.config client_iam.properties

開啟兩個終端視窗,使用 kafka-console,讓第一個視窗成為 Producer,第二個視窗成為 Consumer

執行後可以看到 Producer 會將訊息傳送出去,並且 Consumer 可以成功接收到訊息

📚Reference