当前位置:首页 > 问答 > 正文

Python怎么用Kafka搭个完整例子,边学边用实操分享

综合参考了知乎专栏“Kafka实战:从零开始搭建你的第一个Python应用”、CSDN博客“Python操作Kafka的保姆级教程”以及B站视频“手把手教你用Python玩转Kafka”的核心演示步骤)

好,咱们直接开干,想用Python和Kafka搭个能跑起来的例子,最实在的就是自己动手敲一遍,别管那些复杂的理论,先让消息发出去、收回来,有了感觉再深入,下面我就带你一步一步实现一个最简单的“生产者-消费者”模型,也就是一个程序负责发送消息,另一个程序负责接收消息。

第一步:准备工作——安装必要的库

你得确保电脑上已经有Python了,我们需要安装一个Python库,它就像是Python和Kafka之间的翻译官,这个库叫kafka-python,在命令行里用pip安装就行(来源:CSDN博客)。

打开你的终端(Windows叫命令提示符或PowerShell,Mac叫终端),输入下面这行命令然后回车:

pip install kafka-python

如果安装速度慢,可以试试加上清华的镜像源:pip install kafka-python -i https://pypi.tuna.tsinghua.edu.cn/simple

第二步:启动Kafka服务器(Broker)

光有Python库还不够,我们需要一个Kafka服务器来接收和存储消息,最方便的方法是使用Kafka官方自带的脚本快速启动一个单节点的Kafka(来源:B站视频演示流程)。

  1. 下载Kafka: 去Apache Kafka官网(kafka.apache.org)下载最新版本的二进制文件,比如kafka_2.13-3.6.1.tgz(版本号可能会变),解压到一个你找得到的位置。
  2. 启动ZooKeeper: Kafka依赖ZooKeeper来管理元数据,打开一个终端窗口,进入你解压Kafka的目录,然后运行:
    bin/zookeeper-server-start.sh config/zookeeper.properties

    (Windows系统是:bin\windows\zookeeper-server-start.bat config\zookeeper.properties) 这个窗口会一直运行,别关它。

    Python怎么用Kafka搭个完整例子,边学边用实操分享

  3. 启动Kafka Server: 再打开一个新的终端窗口,同样进入Kafka目录,运行:
    bin/kafka-server-start.sh config/server.properties

    (Windows系统是:bin\windows\kafka-server-start.bat config\server.properties) 这个窗口也会持续运行,你的简易Kafka环境就准备好了。

第三步:编写Python生产者代码

现在我们来写发送消息的Python程序,创建一个新文件,比如叫producer.py

# 导入Kafka生产者类
from kafka import KafkaProducer
# 1. 创建一个生产者实例
# bootstrap_servers:告诉生产者Kafka服务器在哪里,本地就是'localhost:9092'
# value_serializer:把我们要发送的字符串消息转换成Kafka需要的字节格式
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda x: x.encode('utf-8')
)
# 2. 准备要发送的消息主题(Topic),主题就像是消息的类别或者频道,消费者会订阅它。
topic_name = 'my_first_topic'
# 3. 发送几条测试消息
for i in range(5):
    message = f"Hello Kafka! 这是第 {i+1} 条消息"
    # 使用send方法发送消息,指定主题和消息内容
    producer.send(topic_name, value=message)
    print(f"已发送: {message}")
# 4. 确保所有消息都发送出去(清空缓冲区)
producer.flush()
print("所有消息发送完毕!")
# 5. 关闭生产者连接
producer.close()

第四步:编写Python消费者代码

再创建一个新文件,叫consumer.py,用来持续监听并接收消息。

Python怎么用Kafka搭个完整例子,边学边用实操分享

# 导入Kafka消费者类
from kafka import KafkaConsumer
# 1. 创建一个消费者实例
# bootstrap_servers:同样指定Kafka服务器地址
# auto_offset_reset:当没有初始偏移量时,从最早的消息开始读取('earliest')
# value_deserializer:把接收到的字节数据解码成字符串
# group_id:消费者组ID,同一个组内的消费者共同消费一个主题,这里我们先简单设一个名字
consumer = KafkaConsumer(
    'my_first_topic',  # 直接在这里指定要订阅的主题
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',
    value_deserializer=lambda x: x.decode('utf-8'),
    group_id='my_test_group'
)
print("开始监听消息... (按 Ctrl+C 停止)")
# 2. 消费者是一个持续不断的迭代器,会一直等待新消息
try:
    for message in consumer:
        # 打印接收到的消息详情
        print(f"""
        收到消息!
        主题: {message.topic}
        分区: {message.partition}
        偏移量: {message.offset}
        键: {message.key}
        值: {message.value}
        """)
except KeyboardInterrupt:
    print("停止监听。")
# 3. 关闭消费者
finally:
    consumer.close()

第五步:运行和测试

  1. 确保ZooKeeper和Kafka Server那两个终端窗口还在运行。
  2. 先运行消费者程序,让它进入等待状态,打开一个新的终端,输入:
    python consumer.py

    你会看到输出“开始监听消息...”。

  3. 再打开一个新的终端,运行生产者程序:
    python producer.py

    生产者终端会快速打印出5条“已发送”的信息。

  4. 立刻切换回运行消费者的终端窗口,你会看到它已经收到了那5条消息,并打印出了每条消息的详细信息。

恭喜你!你已经成功搭建了一个最基础的Kafka消息流,生产者像是一个说话的人,消费者像是一个听话的人,Kafka服务器就是他们之间的传声筒和记事本,确保消息不会丢失。

边学边用的下一步:

  • 玩一下主题管理: 在Kafka目录下,你可以用命令bin/kafka-topics.sh --list --bootstrap-server localhost:9092查看当前有哪些主题。
  • 试试多个消费者: 同时开两个consumer.py的终端,再运行一次producer.py,观察消息是怎么被分配的(默认是负载均衡)。
  • 修改消息内容: 在生产者代码里发送一些更复杂的数据,比如字典,你可能需要用到json_serializer

这个过程虽然简单,但包含了Kafka最核心的生产-消费模型,通过这个实操,你再去看那些“分区”、“副本”、“消费者组”等概念,就会觉得具体多了。