CMD2 - 强大的命令行解释器框架

CMD2 是一个基于 Python 标准库 cmd 模块的强大命令行解释器框架,提供丰富的内置功能和插件支持

分类: thirdparty 难度: 中级 更新: 2024-01-15
cmd2 命令行 CLI REPL 解释器 交互式

CMD2 - 强大的命令行解释器框架

📝 概述

CMD2 是一个 Python 包,用于构建强大的命令行解释器(CLI)程序 0。它扩展了 Python 标准库的 cmd 包,提供了丰富的功能和更好的用户体验 0。CMD2 的基本使用方法与 cmd 相同,但提供了更多高级功能,如参数解析、插件系统、输出重定向、历史记录、自动补全等 0

无论是构建新的 REPL(Read-Eval-Print Loop)应用还是为现有的 cmd 应用添加更多功能,CMD2 都是理想的选择 0

🎯 学习目标

  • 掌握 CMD2 的基本概念和使用方法
  • 学会创建自定义命令和参数解析
  • 熟练使用内置命令和功能
  • 掌握插件系统和模块化开发
  • 了解高级功能如管道、重定向和脚本
  • 学会应用打包和分发

📋 前置知识

  • Python 面向对象编程基础
  • 命令行界面基本概念
  • Python 标准库 cmd 模块(可选)
  • 正则表达式基础知识

🔍 详细内容

基本概念和安装

安装配置

# 基础安装
pip install cmd2

# 从 GitHub 安装最新版本
pip install -U git+https://github.com/python-cmd2/cmd2.git

# 验证安装
python -c "import cmd2; print(cmd2.__version__)"

基础应用结构

CMD2 的基本使用与 cmd 相同 0

from cmd2 import Cmd

class MyApp(Cmd):
    """基础的 CMD2 应用"""
    
    def __init__(self):
        """初始化应用"""
        super().__init__()
        self.intro = '欢迎使用 MyApp!输入 help 查看可用命令。'
        self.prompt = 'MyApp> '
    
    def do_hello(self, args):
        """问候命令"""
        if args:
            self.poutput(f'你好, {args}!')
        else:
            self.poutput('你好!')
    
    def do_quit(self, args):
        """退出应用"""
        self.poutput('再见!')
        return True

# 启动应用
if __name__ == '__main__':
    app = MyApp()
    app.cmdloop()

命令定义和参数解析

使用 argparse 进行参数解析

import argparse
from cmd2 import Cmd, with_argparser

class AdvancedApp(Cmd):
    """高级 CMD2 应用示例"""
    
    # 创建子命令解析器
    file_parser = argparse.ArgumentParser()
    file_subparsers = file_parser.add_subparsers(dest='action', help='文件操作')
    
    # 列表命令
    list_parser = file_subparsers.add_parser('list', help='列出文件')
    list_parser.add_argument('-a', '--all', action='store_true', help='显示隐藏文件')
    list_parser.add_argument('path', nargs='?', default='.', help='目录路径')
    
    # 创建文件命令
    create_parser = file_subparsers.add_parser('create', help='创建文件')
    create_parser.add_argument('filename', help='文件名')
    create_parser.add_argument('-c', '--content', default='', help='文件内容')
    
    @with_argparser(file_parser)
    def do_file(self, args):
        """文件管理命令"""
        if args.action == 'list':
            self._list_files(args.path, args.all)
        elif args.action == 'create':
            self._create_file(args.filename, args.content)
    
    def _list_files(self, path, show_all):
        """列出文件"""
        import os
        try:
            files = os.listdir(path)
            if not show_all:
                files = [f for f in files if not f.startswith('.')]
            
            self.poutput(f'目录 {path} 中的文件:')
            for file in files:
                self.poutput(f'  {file}')
        except OSError as e:
            self.perror(f'错误: {e}')
    
    def _create_file(self, filename, content):
        """创建文件"""
        try:
            with open(filename, 'w', encoding='utf-8') as f:
                f.write(content)
            self.poutput(f'文件 {filename} 创建成功')
        except OSError as e:
            self.perror(f'创建文件失败: {e}')

选择和自动补全

from cmd2 import Cmd, with_argparser, Cmd2ArgumentParser
from cmd2.decorators import with_choices

class DatabaseApp(Cmd):
    """数据库管理应用示例"""
    
    # 可用的数据库类型
    DB_TYPES = ['mysql', 'postgresql', 'sqlite', 'mongodb']
    
    def __init__(self):
        super().__init__()
        self.current_db = None
    
    # 使用选择装饰器
    @with_choices(DB_TYPES, 'db_type')
    def do_connect(self, statement):
        """连接到数据库
        用法: connect <数据库类型>
        """
        db_type = statement.split()[0] if statement else None
        if db_type:
            self.current_db = db_type
            self.poutput(f'已连接到 {db_type} 数据库')
            self.prompt = f'{db_type}> '
        else:
            self.poutput('请指定数据库类型')
    
    # 创建表命令解析器
    create_parser = Cmd2ArgumentParser()
    create_parser.add_argument('table_name', help='表名')
    create_parser.add_argument('-c', '--columns', nargs='+', 
                              help='列定义 (格式: 列名:类型)')
    
    @with_argparser(create_parser)
    def do_create_table(self, args):
        """创建数据表"""
        if not self.current_db:
            self.perror('请先连接数据库')
            return
        
        self.poutput(f'在 {self.current_db} 中创建表: {args.table_name}')
        if args.columns:
            for col in args.columns:
                self.poutput(f'  列: {col}')

内置命令和功能

历史记录和脚本

class ScriptApp(Cmd):
    """支持脚本和历史的应用"""
    
    def __init__(self):
        super().__init__()
        # 启用历史记录
        self.persistent_history_file = 'myapp_history.txt'
        self.max_completion_items = 20
    
    def do_calculate(self, args):
        """简单计算器
        用法: calculate <表达式>
        示例: calculate 2 + 3 * 4
        """
        try:
            # 安全的数学表达式计算
            allowed_chars = set('0123456789+-*/(). ')
            if all(c in allowed_chars for c in args):
                result = eval(args)
                self.poutput(f'{args} = {result}')
            else:
                self.perror('表达式包含不允许的字符')
        except Exception as e:
            self.perror(f'计算错误: {e}')
    
    def do_repeat(self, args):
        """重复执行历史命令
        用法: repeat <命令编号>
        """
        try:
            cmd_num = int(args)
            if 0 < cmd_num <= len(self.history):
                cmd = self.history[cmd_num - 1]
                self.poutput(f'重复执行: {cmd}')
                self.onecmd_plus_hooks(cmd)
            else:
                self.perror('无效的命令编号')
        except ValueError:
            self.perror('请输入有效的数字')

高级功能

输出重定向和管道

from cmd2 import Cmd, with_argparser
import sys

class PipelineApp(Cmd):
    """支持管道和重定向的应用"""
    
    def __init__(self):
        super().__init__()
        # 启用管道功能
        self.allow_redirection = True
    
    def do_generate_numbers(self, args):
        """生成数字序列
        用法: generate_numbers <起始> <结束>
        示例: generate_numbers 1 10 | grep 5
        """
        try:
            parts = args.split()
            start = int(parts[0]) if len(parts) > 0 else 1
            end = int(parts[1]) if len(parts) > 1 else 10
            
            for i in range(start, end + 1):
                self.poutput(str(i))
        except (ValueError, IndexError):
            self.perror('用法: generate_numbers <起始> <结束>')
    
    def do_filter_even(self, args):
        """过滤偶数(用于管道)
        用法: generate_numbers 1 20 | filter_even
        """
        # 从标准输入读取
        if hasattr(self, 'last_result'):
            lines = self.last_result
        else:
            lines = sys.stdin.readlines()
        
        for line in lines:
            try:
                num = int(line.strip())
                if num % 2 == 0:
                    self.poutput(str(num))
            except ValueError:
                continue
    
    # 自定义命令完成
    def complete_generate_numbers(self, text, line, begidx, endidx):
        """为 generate_numbers 命令提供补全"""
        return ['1', '10', '100']

插件和模块化

from cmd2 import Cmd, CommandSet, with_argparser

class MathCommands(CommandSet):
    """数学运算命令集"""
    
    def __init__(self):
        super().__init__()
    
    def do_add(self, args):
        """加法运算
        用法: add <数字1> <数字2>
        """
        try:
            nums = [float(x) for x in args.split()]
            if len(nums) >= 2:
                result = sum(nums)
                self._cmd.poutput(f'结果: {result}')
            else:
                self._cmd.perror('至少需要两个数字')
        except ValueError:
            self._cmd.perror('请输入有效数字')
    
    def do_multiply(self, args):
        """乘法运算"""
        try:
            nums = [float(x) for x in args.split()]
            if len(nums) >= 2:
                result = 1
                for num in nums:
                    result *= num
                self._cmd.poutput(f'结果: {result}')
            else:
                self._cmd.perror('至少需要两个数字')
        except ValueError:
            self._cmd.perror('请输入有效数字')

class StringCommands(CommandSet):
    """字符串处理命令集"""
    
    def do_upper(self, args):
        """转换为大写"""
        self._cmd.poutput(args.upper())
    
    def do_lower(self, args):
        """转换为小写"""
        self._cmd.poutput(args.lower())
    
    def do_reverse(self, args):
        """反转字符串"""
        self._cmd.poutput(args[::-1])

class ModularApp(Cmd):
    """模块化应用示例"""
    
    def __init__(self):
        super().__init__()
        # 注册命令集
        self.register_command_set(MathCommands())
        self.register_command_set(StringCommands())
        
        self.intro = '模块化应用 - 包含数学和字符串处理功能'

实际应用案例

文件管理工具

import os
import shutil
from cmd2 import Cmd, with_argparser, Cmd2ArgumentParser

class FileManager(Cmd):
    """命令行文件管理器"""
    
    def __init__(self):
        super().__init__()
        self.current_dir = os.getcwd()
        self.update_prompt()
        self.intro = '文件管理器启动完成。输入 help 查看命令。'
    
    def update_prompt(self):
        """更新命令提示符"""
        dir_name = os.path.basename(self.current_dir)
        self.prompt = f'FileManager[{dir_name}]> '
    
    def do_pwd(self, args):
        """显示当前目录"""
        self.poutput(self.current_dir)
    
    def do_ls(self, args):
        """列出目录内容"""
        try:
            items = os.listdir(self.current_dir)
            for item in sorted(items):
                path = os.path.join(self.current_dir, item)
                if os.path.isdir(path):
                    self.poutput(f'{item}/')
                else:
                    size = os.path.getsize(path)
                    self.poutput(f'{item} ({size} 字节)')
        except OSError as e:
            self.perror(f'错误: {e}')
    
    def do_cd(self, args):
        """切换目录"""
        if not args:
            target = os.path.expanduser('~')
        else:
            target = os.path.abspath(os.path.join(self.current_dir, args))
        
        if os.path.isdir(target):
            self.current_dir = target
            self.update_prompt()
        else:
            self.perror(f'目录不存在: {args}')
    
    # 复制文件命令
    copy_parser = Cmd2ArgumentParser()
    copy_parser.add_argument('source', help='源文件')
    copy_parser.add_argument('destination', help='目标位置')
    copy_parser.add_argument('-r', '--recursive', action='store_true',
                            help='递归复制目录')
    
    @with_argparser(copy_parser)
    def do_copy(self, args):
        """复制文件或目录"""
        source = os.path.join(self.current_dir, args.source)
        dest = os.path.join(self.current_dir, args.destination)
        
        try:
            if os.path.isdir(source):
                if args.recursive:
                    shutil.copytree(source, dest)
                    self.poutput(f'目录复制完成: {args.source} -> {args.destination}')
                else:
                    self.perror('复制目录需要 -r 参数')
            else:
                shutil.copy2(source, dest)
                self.poutput(f'文件复制完成: {args.source} -> {args.destination}')
        except Exception as e:
            self.perror(f'复制失败: {e}')
    
    def complete_cd(self, text, line, begidx, endidx):
        """为 cd 命令提供目录补全"""
        try:
            items = os.listdir(self.current_dir)
            dirs = [item for item in items 
                   if os.path.isdir(os.path.join(self.current_dir, item))
                   and item.startswith(text)]
            return dirs
        except OSError:
            return []

系统监控工具

import psutil
import time
from cmd2 import Cmd, with_argparser, Cmd2ArgumentParser

class SystemMonitor(Cmd):
    """系统监控工具"""
    
    def __init__(self):
        super().__init__()
        self.intro = '系统监控工具启动。输入 help 查看可用命令。'
        self.prompt = 'SysMonitor> '
    
    def do_cpu(self, args):
        """显示CPU使用率"""
        cpu_percent = psutil.cpu_percent(interval=1)
        cpu_count = psutil.cpu_count()
        
        self.poutput(f'CPU 使用率: {cpu_percent}%')
        self.poutput(f'CPU 核心数: {cpu_count}')
    
    def do_memory(self, args):
        """显示内存信息"""
        memory = psutil.virtual_memory()
        
        self.poutput(f'总内存: {memory.total // (1024**3)} GB')
        self.poutput(f'已用内存: {memory.used // (1024**3)} GB')
        self.poutput(f'内存使用率: {memory.percent}%')
        self.poutput(f'可用内存: {memory.available // (1024**3)} GB')
    
    def do_disk(self, args):
        """显示磁盘信息"""
        if not args:
            args = '/'
        
        try:
            usage = psutil.disk_usage(args)
            self.poutput(f'磁盘路径: {args}')
            self.poutput(f'总空间: {usage.total // (1024**3)} GB')
            self.poutput(f'已用空间: {usage.used // (1024**3)} GB')
            self.poutput(f'剩余空间: {usage.free // (1024**3)} GB')
            self.poutput(f'使用率: {(usage.used/usage.total)*100:.1f}%')
        except Exception as e:
            self.perror(f'获取磁盘信息失败: {e}')
    
    # 进程监控命令
    process_parser = Cmd2ArgumentParser()
    process_parser.add_argument('-n', '--name', help='按进程名过滤')
    process_parser.add_argument('-t', '--top', type=int, default=10,
                               help='显示前N个进程 (按CPU排序)')
    
    @with_argparser(process_parser)
    def do_processes(self, args):
        """显示进程信息"""
        try:
            processes = []
            for proc in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_percent']):
                try:
                    info = proc.info
                    if args.name and args.name.lower() not in info['name'].lower():
                        continue
                    processes.append(info)
                except (psutil.NoSuchProcess, psutil.AccessDenied):
                    continue
            
            # 按CPU使用率排序
            processes.sort(key=lambda x: x['cpu_percent'] or 0, reverse=True)
            
            self.poutput(f'{"PID":<8} {"进程名":<20} {"CPU%":<8} {"内存%":<8}')
            self.poutput('-' * 50)
            
            for proc in processes[:args.top]:
                self.poutput(f'{proc["pid"]:<8} {proc["name"][:20]:<20} '
                           f'{proc["cpu_percent"] or 0:<8.1f} '
                           f'{proc["memory_percent"] or 0:<8.1f}')
        except Exception as e:
            self.perror(f'获取进程信息失败: {e}')

💡 实际应用

应用打包和分发

使用 setup.py 打包

# setup.py
from setuptools import setup, find_packages

setup(
    name='mycli-app',
    version='1.0.0',
    packages=find_packages(),
    install_requires=[
        'cmd2>=2.0.0',
        'psutil',  # 如果需要系统监控功能
    ],
    entry_points={
        'console_scripts': [
            'mycli=mycli.main:main',
        ],
    },
    author='你的名字',
    author_email='your.email@example.com',
    description='基于 CMD2 的命令行应用',
    long_description=open('README.md').read(),
    long_description_content_type='text/markdown',
    url='https://github.com/yourusername/mycli',
    classifiers=[
        'Programming Language :: Python :: 3',
        'License :: OSI Approved :: MIT License',
        'Operating System :: OS Independent',
    ],
    python_requires='>=3.8',
)

主入口文件

# mycli/main.py
#!/usr/bin/env python3
"""MyCLI 应用主入口"""

import sys
from mycli.app import MyApp

def main():
    """主函数"""
    app = MyApp()
    
    # 如果有命令行参数,执行单个命令后退出
    if len(sys.argv) > 1:
        command = ' '.join(sys.argv[1:])
        app.onecmd_plus_hooks(command)
    else:
        # 否则进入交互模式
        app.cmdloop()

if __name__ == '__main__':
    main()

配置文件和自定义设置

import configparser
import os
from cmd2 import Cmd

class ConfigurableApp(Cmd):
    """支持配置文件的应用"""
    
    def __init__(self):
        super().__init__()
        self.config_file = 'myapp.ini'
        self.config = configparser.ConfigParser()
        self.load_config()
        
        # 应用配置
        self.debug_mode = self.config.getboolean('app', 'debug', fallback=False)
        self.max_history = self.config.getint('app', 'max_history', fallback=100)
        
        if self.debug_mode:
            self.debug = True
    
    def load_config(self):
        """加载配置文件"""
        if os.path.exists(self.config_file):
            self.config.read(self.config_file)
        else:
            # 创建默认配置
            self.config['app'] = {
                'debug': 'false',
                'max_history': '100',
                'prompt': 'MyApp> '
            }
            self.save_config()
    
    def save_config(self):
        """保存配置文件"""
        with open(self.config_file, 'w') as f:
            self.config.write(f)
    
    def do_config(self, args):
        """配置管理
        用法: 
          config show - 显示当前配置
          config set <section> <key> <value> - 设置配置项
        """
        parts = args.split()
        if not parts:
            self.perror('用法: config show|set')
            return
        
        if parts[0] == 'show':
            for section in self.config.sections():
                self.poutput(f'[{section}]')
                for key, value in self.config[section].items():
                    self.poutput(f'  {key} = {value}')
        
        elif parts[0] == 'set' and len(parts) >= 4:
            section, key, value = parts[1], parts[2], ' '.join(parts[3:])
            if section not in self.config:
                self.config[section] = {}
            self.config[section][key] = value
            self.save_config()
            self.poutput(f'设置 {section}.{key} = {value}')
        
        else:
            self.perror('用法: config set <section> <key> <value>')

⚠️ 注意事项

  • 版本兼容性:CMD2 要求 Python 3.9+ 版本 4
  • 命令命名:避免与内置命令冲突,使用 do_ 前缀定义命令
  • 参数解析:推荐使用 @with_argparser 装饰器进行复杂参数处理
  • 错误处理:使用 self.perror() 输出错误信息,使用 self.poutput() 输出正常信息
  • 性能考虑:大量数据输出时考虑分页或限制输出量
  • 平台兼容性:某些功能在不同操作系统上可能有差异
  • 插件管理:谨慎使用第三方插件,确保来源可靠
  • 内存使用:长时间运行的应用注意内存泄漏问题

🔗 相关内容

📚 扩展阅读

🏷️ 标签

cmd2 命令行 CLI REPL 解释器 交互式


最后更新: 2024-01-15
作者: Python 技术文档工程师
版本: 1.0

作者: Python 技术文档工程师

版本: 1.0

讨论与反馈

欢迎在下方留言讨论,分享你的学习心得或提出问题。评论基于GitHub Issues,需要GitHub账号。