Python中使用Rocketmq

3,539次阅读
没有评论

共计 9832 个字符,预计需要花费 25 分钟才能阅读完成。

Python中使用Rocketmq

公司内部有多个RocketMQ实例,自建的或是阿里云的上的都有,生产基本是云上的实例,其他自建要么是测试的要么是以前还没迁移的。对于RocketMQ的运维过程中,我这这边最常出现的就消息堆积严重,多则400w左右,似乎这些业务对这个消息的可用性和即时性要求不高😥。。。正好兴趣使然,就用python来折腾下rocketMQ的使用

快速部署一个rocketMQ

# 安装docker
xadocker@xadocker-virtual-machine:~$ curl -fsSL https://get.docker.com | bash -s docker --mirror Aliyun

# 安装docker-compose
xadocker@xadocker-virtual-machine:~/workdir/datadir/rocketmq$ sudo curl -L "https://github.com/docker/compose/releases/download/1.24.1/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose

# 创建rocketmq目录
xadocker@xadocker-virtual-machine:~/workdir/datadir/rocketmq$ mkdir -p broker/{conf,logs,store} namesrv/{logs,store}

# 创建broker.conf
xadocker@xadocker-virtual-machine:~/workdir/datadir/rocketmq$ cat >broker/conf/broker.conf<<-'EOF'
#所属集群名字
brokerClusterName=DefaultCluster

#broker名字,注意此处不同的配置文件填写的不一样,如果在broker-a.properties使用:broker-a,
#在broker-b.properties使用:broker-b
brokerName=broker-a

#0 表示Master,>0 表示Slave
brokerId=0

#nameServer地址,分号分割
#namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
namesrvAddr=mall4cloud-rocketmq-namesrv:9876

#启动IP,如果 docker 报 com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.0.120:10909> failed
# 解决方式1 加上一句producer.setVipChannelEnabled(false);,解决方式2 brokerIP1 设置宿主机IP,不要使用docker 内部IP
brokerIP1=192.168.44.166

#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4

#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭 !!!这里仔细看是false,false,false
autoCreateTopicEnable=true

#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true

#Broker 对外服务的监听端口
listenPort=10911

#删除文件时间点,默认凌晨4点
deleteWhen=04

#文件保留时间,默认48小时
fileReservedTime=120

#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824

#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000

#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
#storePathRootDir=/home/ztztdata/rocketmq-all-4.1.0-incubating/store
#commitLog 存储路径
#storePathCommitLog=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/commitlog
#消费队列存储
#storePathConsumeQueue=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/consumequeue
#消息索引存储路径
#storePathIndex=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/index
#checkpoint 文件存储路径
#storeCheckpoint=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/checkpoint
#abort 文件存储路径
#abortFile=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/abort
#限制的消息大小
maxMessageSize=65536

#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000

#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=ASYNC_MASTER

#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH

#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
EOF

# 创建docker-compose.yaml文件
xadocker@xadocker-virtual-machine:~/workdir/datadir/rocketmq$ cat >docker-compose.yaml<<-'EOF'
version: '3'
services:
  rocketmq-namesrv:
    image: foxiswho/rocketmq:4.8.0
    container_name: rocketmq-namesrv
    restart: always
    ports:
      - 9876:9876
    volumes:
      - ./namesrv/logs:/home/rocketmq/logs
      - ./namesrv/store:/home/rocketmq/store
    environment:
      JAVA_OPT_EXT: "-Duser.home=/home/rocketmq -Xms512M -Xmx512M -Xmn128m"
    command: ["sh","mqnamesrv"]
    networks:
      rocketmq_net:
        aliases:
          - rocketmq-namesrv


  rocketmq-broker:
    image: foxiswho/rocketmq:4.8.0
    container_name: rocketmq-broker
    restart: always
    ports:
      - 10909:10909
      - 10911:10911
    volumes:
      - ./broker/logs:/home/rocketmq/logs
      - ./broker/store:/home/rocketmq/store
      - ./broker/conf/broker.conf:/etc/rocketmq/broker.conf
    environment:
      JAVA_OPT_EXT: "-Duser.home=/home/rocketmq -Xms512M -Xmx512M -Xmn128m"
    command: ["sh","mqbroker","-c","/etc/rocketmq/broker.conf","-n","rocketmq-namesrv:9876","autoCreateTopicEnable=true"]
    depends_on:
      - rocketmq-namesrv
    networks:
      rocketmq_net:
        aliases:
          - rocketmq-broker


  rocketmq-console:
    image: styletang/rocketmq-console-ng
    container_name: rocketmq-console
    restart: always
    ports:
      - 8180:8080
    environment:
      JAVA_OPTS: "-Drocketmq.namesrv.addr=rocketmq-namesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"
    depends_on:
      - rocketmq-namesrv
    networks:
      rocketmq_net:
        aliases:
          - rocketmq-console

networks:
  rocketmq_net:
    driver: bridge
EOF

# 启动rocketmq
xadocker@xadocker-virtual-machine:~/workdir/datadir/rocketmq$ sudo docker-compse up -d
xadocker@xadocker-virtual-machine:~/workdir/datadir/rocketmq$ sudo docker-compose ps 
      Name                    Command               State                                                      Ports                                                   
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------
rocketmq-broker    sh mqbroker -c /etc/rocket ...   Up      0.0.0.0:10909->10909/tcp,:::10909->10909/tcp, 0.0.0.0:10911->10911/tcp,:::10911->10911/tcp, 10912/tcp,     
                                                            9876/tcp                                                                                                   
rocketmq-console   sh -c java $JAVA_OPTS -jar ...   Up      0.0.0.0:8180->8080/tcp,:::8180->8080/tcp                                                                   
rocketmq-namesrv   sh mqnamesrv                     Up      10909/tcp, 10911/tcp, 10912/tcp, 0.0.0.0:9876->9876/tcp,:::9876->9876/tcp

在python中使用rocketMQ

准备环境依赖

# 安装librocketmq,参考链接:https://github.com/apache/rocketmq-client-python
# 若不安装则会在使用时报以下错误:
# raise ImportError('rocketmq dynamic library not found') 
# 博主这里是ubuntu,所以按此方式安装即可
xadocker@xadocker-virtual-machine:~/workdir/datadir/rocketmq$ wget https://github.com/apache/rocketmq-client-cpp/releases/download/2.0.0/rocketmq-client-cpp-2.0.0.amd64.deb
xadocker@xadocker-virtual-machine:~/workdir/datadir/rocketmq$ sudo dpkg -i rocketmq-client-cpp-2.0.0.amd64.deb

# 此处安装rocketmq-client-python==2.0.0
xadocker@xadocker-virtual-machine:~/workdir/datadir/rocketmq$ pip3 install rocketmq-client-python==2.0.0

测试python使用mq收发消息

producer生产者代码

cat >mq-test-producer.py<<-'EOF'
#!/usr/bin/python3
# -*- coding: UTF-8 -*-
import time
from faker import Faker
from rocketmq.client import Producer, Message

producer = Producer('PID-XXX')
producer.set_name_server_address('192.168.44.166:9876')
producer.start()
faker = Faker(locale='zh_CN')
for i in range(100):
    msg = Message('testff')
    msg.set_keys('abc'+str(time.time()))
    msg.set_tags('abcd')
    msg.set_body(faker.name()+','+faker.address()+','+faker.email()+','+faker.phone_number())
    ret = producer.send_sync(msg)
    print(ret.status, ret.msg_id, ret.offset)
producer.shutdown()
EOF

# 测试运行
(venv) xadocker@xadocker-virtual-machine:~/PycharmProjects/untitled$ python3.8 mq-test-producer.py 
SendStatus.OK 7F00010170230E6144137934D8010000 0
.....

consumer消费者代码

cat >mq-test-consumer.py<<-'EOF'
#!/usr/bin/python3
# -*- coding: UTF-8 -*-
import time

from rocketmq.client import PushConsumer, ConsumeStatus

def callback(msg):
    print(msg.id, str(msg.body,'UTF-8'))
    return ConsumeStatus.CONSUME_SUCCESS

consumer = PushConsumer('PID-CCC')
consumer.set_name_server_address('192.168.44.166:9876')
consumer.subscribe('testff', callback)
consumer.start()

while True:
    time.sleep(120)

consumer.shutdown()
EOF

# 测试运行
(venv) xadocker@xadocker-virtual-machine:~/PycharmProjects/untitled$ python3.8 mq-test-consumer.py
7F00010170230E6144137934D8010000 何宁,上海市天津县魏都黄街z座 253178,tzheng@example.net,14503234314
....

普通消息发送

同步消息发送和接受

参考上面例子

Oneway单向消息

#!/usr/bin/python3
# -*- coding: UTF-8 -*-
from rocketmq.client import Producer, Message

# 初始化生产者,并设置生产组信息
producer = Producer('group1')
# 设置服务地址
producer.set_name_server_address('192.168.44.166:9876')

# 启动生产者
producer.start()

# 组装消息
msg = Message('rocketmq-xxx|namespace_python%topic1')
# 设置keys
msg.set_keys('yourKey')
# 设置tags
msg.set_tags('yourTags')
# 消息内容
msg.set_body('This is a oneway message1.')

# 发送单向消息
producer.send_oneway(msg)
# 模拟业务
print('Send oneway message.')
producer.shutdown()

顺序消息发送

producer生产者

#!/usr/bin/python3
# -*- coding: UTF-8 -*-
from rocketmq.client import Producer, Message

"""
 顺序消息可使用 顺序类型的topic 来实现。顺序类型有全局顺序和局部顺序两种,可根据业务类型选择顺序类型
"""

# 初始化生产者,并设置生产组信息
producer = Producer('group2')
# 设置服务地址
producer.set_name_server_address('192.168.44.166:9876')
# 设置权限(角色名和密钥)
# 启动生产者
producer.start()

for i in range(10):
    # 组装消息
    msg = Message('rocketmq-xxx|namespace_python%topic2')
    # 消息内容
    msg.set_body('This is a new message' + str(i) + '.')

    # 发送同步消息
    ret = producer.send_sync(msg)
    print(ret.status, ret.msg_id, ret.offset)
producer.shutdown()

consumer消费者

#!/usr/bin/python3
# -*- coding: UTF-8 -*-
import time

from rocketmq.client import PushConsumer, ConsumeStatus

"""
 顺序消息可使用 顺序类型的topic 来实现。顺序类型有全局顺序和局部顺序两种,可根据业务类型选择顺序类型
"""


# 消息处理回调
def callback(msg):
    # 模拟业务
    print('Received message. messageId: ', msg.id, ' body: ', msg.body)
    # 消费成功回复CONSUME_SUCCESS
    return ConsumeStatus.CONSUME_SUCCESS
    # 消费失败返回RECONSUME_LATER,该消息将会被重新消费
    # return ConsumeStatus.RECONSUME_LATER


# 初始化消费者,并设置消费者组信息
consumer = PushConsumer('rocketmq-xxx|namespace_python%group22')
# 设置服务地址
consumer.set_name_server_address('192.168.44.166:9876')

# 订阅topic
consumer.subscribe('rocketmq-xxx|namespace_python%topic2', callback)
print(' [Consumer] Waiting for messages.')
# 启动消费者
consumer.start()

while True:
    time.sleep(3600)

consumer.shutdown()

延迟消息发送

producer生产者

#!/usr/bin/python3
# -*- coding: UTF-8 -*-
import time

from rocketmq.client import Producer, Message

# 初始化生产者,并设置生产组信息
producer = Producer('group1')
# 设置服务地址
producer.set_name_server_address('192.168.44.166:9876')
# 设置权限(角色名和密钥)
producer.set_session_credentials(
    'eyJrZXlJZC......',
    'admin',
    ''
)
# 启动生产者
producer.start()

# 组装消息
msg = Message('rocketmq-xxx|namespace_python%topic1')
# 设置keys
msg.set_keys('yourKey')
# 设置tags
msg.set_tags('yourTags')
# 设置消息延迟级别
# 1s、 5s、 10s、 30s、  1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h
# 1    2    3     4     5    6   7    8   9   10   11   12  13   14    15    16   17   18
msg.set_delay_time_level(2)
# 消息内容
msg.set_body('This is a new message1.')

# 发送同步消息
ret = producer.send_sync(msg)
print(ret.status, ret.msg_id, ret.offset)
producer.shutdown()

consumer消费模式

广播模式

#!/usr/bin/python3
# -*- coding: UTF-8 -*-
import time

from rocketmq.client import PushConsumer, ConsumeStatus
# 消息处理回调
from rocketmq.ffi import MessageModel


def callback(msg):
    # 模拟业务
    print('Received message. messageId: ', msg.id, ' body: ', msg.body)
    # 消费成功回复CONSUME_SUCCESS
    return ConsumeStatus.CONSUME_SUCCESS
    # 消费失败返回RECONSUME_LATER,该消息将会被重新消费
    # return ConsumeStatus.RECONSUME_LATER


# 初始化消费者,并设置消费者组信息
consumer = PushConsumer('rocketmq-xxx|namespace_python%group11')
# 设置服务地址
consumer.set_name_server_address('192.168.44.166:9876')
# 设置权限(角色名和密钥)
consumer.set_session_credentials(
    'eyJrZXlJZC......',
    'admin',
    ''
)
# 设置为集群消息模式
consumer.set_message_model(MessageModel.BROADCASTING)
# 订阅topic
consumer.subscribe('rocketmq-xxx|namespace_python%topic1', callback)
print(' [Consumer] Waiting for messages.')
# 启动消费者
consumer.start()

while True:
    time.sleep(3600)

consumer.shutdown()

集群模式

默认模式,略

正文完
 1
xadocker
版权声明:本站原创文章,由 xadocker 2021-06-19发表,共计9832字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
评论(没有评论)