Picologging - 高性能日志记录库
Picologging是一个高性能的Python日志库,与标准logging完全兼容,性能提升5-10倍
Picologging - 高性能日志记录库
概述
Picologging是一个高性能的Python日志库,作为标准logging模块的直接替代品。它在保持100%API兼容性的同时,通过C扩展和优化算法实现了5-10倍的性能提升,特别适合高并发、大量日志输出的应用场景。
学习目标
通过本文档的学习,你将掌握:
- Picologging的性能优势和适用场景
- 安装和基本使用方法
- 与标准logging的兼容性
- 性能优化技巧和最佳实践
- 在不同环境下的部署策略
- 实际项目中的应用案例
- 性能测试和监控方法
前置知识
- Python基础语法
- 标准logging库的使用
- 基本的性能优化概念
- 多线程和异步编程基础
详细内容
1. 为什么选择Picologging?
1.1 性能优势
- 5-10倍性能提升 - 相比标准logging库显著提升
- 低内存占用 - 优化的内存管理减少GC压力
- 高并发支持 - 更好的多线程性能
- 快速格式化 - 优化的字符串格式化算法
- 智能过滤 - 高效的日志级别过滤机制
1.2 兼容性保证
# 标准logging代码
import logging
logger = logging.getLogger(__name__)
logger.info("Hello World")
# 直接替换为picologging
import picologging as logging # 只需要改这一行
logger = logging.getLogger(__name__)
logger.info("Hello World") # 其他代码完全不变
2. 安装和基本使用
2.1 安装
# 使用pip安装
pip install picologging
# 或者从源码安装
git clone https://github.com/microsoft/picologging.git
cd picologging
pip install .
2.2 基本使用
# 方式1:直接替换import
import picologging as logging
# 方式2:显式导入
from picologging import getLogger, basicConfig, INFO, DEBUG
# 基本配置
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('app.log')
]
)
# 获取logger
logger = logging.getLogger(__name__)
# 记录日志
logger.debug("调试信息")
logger.info("应用启动")
logger.warning("警告信息")
logger.error("错误信息")
logger.critical("严重错误")
# 格式化日志
user_id = 12345
action = "login"
logger.info("用户操作: %s (ID: %d)", action, user_id)
logger.info(f"用户操作: {action} (ID: {user_id})")
2.3 配置示例
import picologging as logging
from picologging.handlers import RotatingFileHandler, TimedRotatingFileHandler
import sys
def setup_logging(app_name: str, debug: bool = False):
"""配置高性能日志系统"""
# 设置日志级别
level = logging.DEBUG if debug else logging.INFO
# 创建格式化器
formatter = logging.Formatter(
fmt='%(asctime)s [%(levelname)8s] %(name)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# 控制台处理器
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(level)
console_handler.setFormatter(formatter)
# 文件处理器 - 按大小轮转
file_handler = RotatingFileHandler(
filename=f'{app_name}.log',
maxBytes=10*1024*1024, # 10MB
backupCount=5,
encoding='utf-8'
)
file_handler.setLevel(logging.INFO)
file_handler.setFormatter(formatter)
# 错误日志处理器 - 按时间轮转
error_handler = TimedRotatingFileHandler(
filename=f'{app_name}_error.log',
when='midnight',
interval=1,
backupCount=30,
encoding='utf-8'
)
error_handler.setLevel(logging.ERROR)
error_handler.setFormatter(formatter)
# 配置根logger
logging.basicConfig(
level=level,
handlers=[console_handler, file_handler, error_handler]
)
return logging.getLogger(app_name)
# 使用示例
logger = setup_logging("myapp", debug=True)
logger.info("应用启动完成")
3. 性能优化特性
3.1 延迟格式化
import picologging as logging
import time
logger = logging.getLogger(__name__)
def expensive_operation():
"""模拟耗时操作"""
time.sleep(0.1)
return "expensive result"
# 不推荐:总是执行耗时操作
logger.debug(f"结果: {expensive_operation()}")
# 推荐:使用延迟格式化
logger.debug("结果: %s", expensive_operation()) # 只在DEBUG级别启用时执行
# 更好的方式:检查日志级别
if logger.isEnabledFor(logging.DEBUG):
result = expensive_operation()
logger.debug("结果: %s", result)
3.2 批量日志处理
import picologging as logging
from picologging.handlers import QueueHandler, QueueListener
from queue import Queue
import threading
# 创建队列和处理器
log_queue = Queue()
queue_handler = QueueHandler(log_queue)
# 创建实际的日志处理器
file_handler = logging.FileHandler('batch.log')
file_handler.setFormatter(
logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
)
# 创建队列监听器
queue_listener = QueueListener(log_queue, file_handler)
queue_listener.start()
# 配置logger使用队列处理器
logger = logging.getLogger('batch_logger')
logger.addHandler(queue_handler)
logger.setLevel(logging.INFO)
# 高频日志记录
def worker(worker_id):
"""工作线程"""
for i in range(1000):
logger.info("Worker %d processing item %d", worker_id, i)
# 启动多个工作线程
threads = []
for i in range(5):
t = threading.Thread(target=worker, args=(i,))
t.start()
threads.append(t)
# 等待所有线程完成
for t in threads:
t.join()
# 停止队列监听器
queue_listener.stop()
3.3 条件日志记录
import picologging as logging
logger = logging.getLogger(__name__)
class ConditionalLogger:
"""条件日志记录器"""
def __init__(self, logger, sample_rate=0.1):
self.logger = logger
self.sample_rate = sample_rate
self.counter = 0
def sample_log(self, level, msg, *args, **kwargs):
"""采样日志记录"""
self.counter += 1
if self.counter % int(1 / self.sample_rate) == 0:
getattr(self.logger, level)(msg, *args, **kwargs)
def rate_limited_log(self, level, msg, *args, max_per_second=10, **kwargs):
"""限流日志记录"""
import time
current_time = time.time()
if not hasattr(self, '_last_log_time'):
self._last_log_time = current_time
self._log_count = 0
# 重置计数器(每秒)
if current_time - self._last_log_time >= 1.0:
self._last_log_time = current_time
self._log_count = 0
# 检查是否超过限制
if self._log_count < max_per_second:
self._log_count += 1
getattr(self.logger, level)(msg, *args, **kwargs)
# 使用示例
conditional_logger = ConditionalLogger(logger, sample_rate=0.01) # 1%采样率
# 高频操作,只记录1%的日志
for i in range(10000):
conditional_logger.sample_log('info', "处理项目 %d", i)
# 限流日志记录
for i in range(100):
conditional_logger.rate_limited_log('warning', "警告信息 %d", i, max_per_second=5)
4. 实际应用场景
4.1 微服务日志系统
import picologging as logging
from picologging.handlers import SysLogHandler, HTTPHandler
import json
import time
import uuid
from typing import Dict, Any
class MicroserviceLogger:
"""微服务日志系统"""
def __init__(self, service_name: str, version: str, environment: str):
self.service_name = service_name
self.version = version
self.environment = environment
self.logger = self._setup_logger()
def _setup_logger(self):
"""设置logger"""
logger = logging.getLogger(self.service_name)
logger.setLevel(logging.INFO)
# JSON格式化器
class JSONFormatter(logging.Formatter):
def format(self, record):
log_entry = {
'timestamp': time.time(),
'level': record.levelname,
'service': self.service_name,
'version': self.version,
'environment': self.environment,
'message': record.getMessage(),
'module': record.module,
'function': record.funcName,
'line': record.lineno
}
# 添加异常信息
if record.exc_info:
log_entry['exception'] = self.formatException(record.exc_info)
# 添加自定义字段
if hasattr(record, 'extra_fields'):
log_entry.update(record.extra_fields)
return json.dumps(log_entry, ensure_ascii=False)
# 控制台处理器
console_handler = logging.StreamHandler()
console_handler.setFormatter(JSONFormatter())
logger.addHandler(console_handler)
# 文件处理器
file_handler = logging.FileHandler(f'{self.service_name}.log')
file_handler.setFormatter(JSONFormatter())
logger.addHandler(file_handler)
return logger
def log_request(self, request_id: str, method: str, path: str,
status_code: int, duration_ms: float, **kwargs):
"""记录请求日志"""
extra_fields = {
'request_id': request_id,
'method': method,
'path': path,
'status_code': status_code,
'duration_ms': duration_ms,
'type': 'request',
**kwargs
}
# 创建LogRecord并添加额外字段
record = self.logger.makeRecord(
self.logger.name, logging.INFO, __file__, 0,
f"{method} {path} - {status_code} ({duration_ms}ms)",
(), None
)
record.extra_fields = extra_fields
self.logger.handle(record)
def log_business_event(self, event_type: str, event_data: Dict[str, Any],
user_id: str = None, **kwargs):
"""记录业务事件"""
extra_fields = {
'event_type': event_type,
'event_data': event_data,
'user_id': user_id,
'type': 'business_event',
**kwargs
}
record = self.logger.makeRecord(
self.logger.name, logging.INFO, __file__, 0,
f"Business event: {event_type}",
(), None
)
record.extra_fields = extra_fields
self.logger.handle(record)
def log_error(self, error: Exception, context: Dict[str, Any] = None, **kwargs):
"""记录错误日志"""
extra_fields = {
'error_type': type(error).__name__,
'error_message': str(error),
'context': context or {},
'type': 'error',
**kwargs
}
record = self.logger.makeRecord(
self.logger.name, logging.ERROR, __file__, 0,
f"Error occurred: {error}",
(), (type(error), error, error.__traceback__)
)
record.extra_fields = extra_fields
self.logger.handle(record)
# 使用示例
service_logger = MicroserviceLogger(
service_name="user-service",
version="1.2.3",
environment="production"
)
# 记录请求日志
request_id = str(uuid.uuid4())
service_logger.log_request(
request_id=request_id,
method="POST",
path="/api/users",
status_code=201,
duration_ms=45.6,
user_agent="Mozilla/5.0",
ip_address="192.168.1.100"
)
# 记录业务事件
service_logger.log_business_event(
event_type="user_created",
event_data={"user_id": 12345, "email": "user@example.com"},
user_id="12345",
request_id=request_id
)
# 记录错误
try:
# 模拟错误
raise ValueError("Invalid user data")
except Exception as e:
service_logger.log_error(
error=e,
context={"user_data": {"email": "invalid-email"}},
request_id=request_id
)
4.2 高并发Web应用
import picologging as logging
from picologging.handlers import QueueHandler, QueueListener
from queue import Queue
import asyncio
import aiohttp
from aiohttp import web
import time
import threading
# 设置异步日志系统
log_queue = Queue(maxsize=10000) # 限制队列大小防止内存溢出
queue_handler = QueueHandler(log_queue)
# 文件处理器
file_handler = logging.FileHandler('webapp.log')
file_handler.setFormatter(
logging.Formatter('%(asctime)s [%(levelname)s] %(name)s: %(message)s')
)
# 启动队列监听器
queue_listener = QueueListener(log_queue, file_handler, respect_handler_level=True)
queue_listener.start()
# 配置应用logger
app_logger = logging.getLogger('webapp')
app_logger.addHandler(queue_handler)
app_logger.setLevel(logging.INFO)
class PerformanceLogger:
"""性能日志记录器"""
def __init__(self, logger):
self.logger = logger
self.request_count = 0
self.total_duration = 0
self.last_report = time.time()
def log_request(self, method: str, path: str, status: int, duration: float):
"""记录请求性能"""
self.request_count += 1
self.total_duration += duration
# 每1000个请求或每60秒报告一次性能统计
current_time = time.time()
if (self.request_count % 1000 == 0 or
current_time - self.last_report >= 60):
avg_duration = self.total_duration / self.request_count
rps = self.request_count / (current_time - self.last_report + 0.001)
self.logger.info(
"性能统计 - 请求数: %d, 平均响应时间: %.2fms, RPS: %.1f",
self.request_count, avg_duration * 1000, rps
)
# 重置统计
self.request_count = 0
self.total_duration = 0
self.last_report = current_time
# 记录慢请求
if duration > 1.0: # 超过1秒的请求
self.logger.warning(
"慢请求 - %s %s - %d (%.2fms)",
method, path, status, duration * 1000
)
perf_logger = PerformanceLogger(app_logger)
# 中间件
async def logging_middleware(request, handler):
"""日志中间件"""
start_time = time.time()
try:
response = await handler(request)
duration = time.time() - start_time
# 记录成功请求
perf_logger.log_request(
request.method, request.path, response.status, duration
)
return response
except Exception as e:
duration = time.time() - start_time
# 记录异常请求
app_logger.error(
"请求异常 - %s %s - %s (%.2fms)",
request.method, request.path, str(e), duration * 1000
)
perf_logger.log_request(
request.method, request.path, 500, duration
)
raise
# 路由处理器
async def hello_handler(request):
"""示例处理器"""
name = request.query.get('name', 'World')
# 模拟一些处理时间
await asyncio.sleep(0.01)
app_logger.info("处理hello请求", extra={'name': name})
return web.json_response({'message': f'Hello, {name}!'})
async def heavy_handler(request):
"""重负载处理器"""
# 模拟重负载操作
await asyncio.sleep(0.5)
app_logger.info("完成重负载操作")
return web.json_response({'status': 'completed'})
# 创建应用
app = web.Application(middlewares=[logging_middleware])
app.router.add_get('/hello', hello_handler)
app.router.add_post('/heavy', heavy_handler)
# 启动应用
if __name__ == '__main__':
app_logger.info("启动Web应用")
try:
web.run_app(app, host='localhost', port=8080)
finally:
app_logger.info("关闭Web应用")
queue_listener.stop()
4.3 数据处理管道
import picologging as logging
from picologging.handlers import RotatingFileHandler
import multiprocessing as mp
import time
import random
from typing import List, Dict, Any
import json
# 配置多进程安全的日志系统
def setup_worker_logging(worker_id: int):
"""为工作进程设置日志"""
logger = logging.getLogger(f'worker_{worker_id}')
logger.setLevel(logging.INFO)
# 每个进程使用独立的日志文件
handler = RotatingFileHandler(
f'worker_{worker_id}.log',
maxBytes=5*1024*1024, # 5MB
backupCount=3
)
formatter = logging.Formatter(
'%(asctime)s [%(levelname)s] %(name)s: %(message)s'
)
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
class DataProcessor:
"""数据处理器"""
def __init__(self, worker_id: int):
self.worker_id = worker_id
self.logger = setup_worker_logging(worker_id)
self.processed_count = 0
self.error_count = 0
self.start_time = time.time()
def process_batch(self, batch: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""处理数据批次"""
batch_id = f"batch_{int(time.time())}_{self.worker_id}"
self.logger.info(
"开始处理批次 - ID: %s, 大小: %d",
batch_id, len(batch)
)
results = []
batch_start = time.time()
for i, item in enumerate(batch):
try:
# 模拟数据处理
processed_item = self._process_item(item)
results.append(processed_item)
self.processed_count += 1
# 每处理100个项目记录一次进度
if (i + 1) % 100 == 0:
self.logger.debug(
"批次进度 - %s: %d/%d (%.1f%%)",
batch_id, i + 1, len(batch), (i + 1) / len(batch) * 100
)
except Exception as e:
self.error_count += 1
self.logger.error(
"处理项目失败 - 批次: %s, 项目: %d, 错误: %s",
batch_id, i, str(e)
)
batch_duration = time.time() - batch_start
self.logger.info(
"批次处理完成 - ID: %s, 成功: %d, 失败: %d, 耗时: %.2fs",
batch_id, len(results), len(batch) - len(results), batch_duration
)
# 记录性能统计
self._log_performance_stats()
return results
def _process_item(self, item: Dict[str, Any]) -> Dict[str, Any]:
"""处理单个数据项"""
# 模拟处理时间
time.sleep(random.uniform(0.001, 0.01))
# 模拟处理错误
if random.random() < 0.05: # 5%错误率
raise ValueError(f"处理失败: {item.get('id', 'unknown')}")
# 模拟数据转换
return {
'id': item['id'],
'processed_value': item['value'] * 2,
'timestamp': time.time(),
'worker_id': self.worker_id
}
def _log_performance_stats(self):
"""记录性能统计"""
current_time = time.time()
total_duration = current_time - self.start_time
if total_duration > 0:
processing_rate = self.processed_count / total_duration
error_rate = self.error_count / (self.processed_count + self.error_count) if (self.processed_count + self.error_count) > 0 else 0
self.logger.info(
"性能统计 - 处理速度: %.1f items/s, 错误率: %.2f%%, 总处理: %d, 总错误: %d",
processing_rate, error_rate * 100, self.processed_count, self.error_count
)
def worker_process(worker_id: int, input_queue: mp.Queue, output_queue: mp.Queue):
"""工作进程函数"""
processor = DataProcessor(worker_id)
while True:
try:
batch = input_queue.get(timeout=5)
if batch is None: # 结束信号
break
results = processor.process_batch(batch)
output_queue.put(results)
except Exception as e:
processor.logger.exception("工作进程异常: %s", str(e))
def main():
"""主函数"""
# 设置主进程日志
main_logger = logging.getLogger('main')
main_logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(name)s: %(message)s')
handler.setFormatter(formatter)
main_logger.addHandler(handler)
# 创建队列
input_queue = mp.Queue(maxsize=100)
output_queue = mp.Queue()
# 启动工作进程
num_workers = mp.cpu_count()
workers = []
main_logger.info("启动 %d 个工作进程", num_workers)
for i in range(num_workers):
worker = mp.Process(
target=worker_process,
args=(i, input_queue, output_queue)
)
worker.start()
workers.append(worker)
# 生成测试数据
total_items = 10000
batch_size = 100
main_logger.info("开始处理 %d 个数据项,批次大小: %d", total_items, batch_size)
# 发送数据批次
for i in range(0, total_items, batch_size):
batch = [
{'id': j, 'value': random.randint(1, 100)}
for j in range(i, min(i + batch_size, total_items))
]
input_queue.put(batch)
# 发送结束信号
for _ in range(num_workers):
input_queue.put(None)
# 收集结果
all_results = []
processed_batches = 0
expected_batches = (total_items + batch_size - 1) // batch_size
while processed_batches < expected_batches:
try:
results = output_queue.get(timeout=10)
all_results.extend(results)
processed_batches += 1
if processed_batches % 10 == 0:
main_logger.info(
"已处理批次: %d/%d (%.1f%%)",
processed_batches, expected_batches,
processed_batches / expected_batches * 100
)
except:
main_logger.warning("等待结果超时")
break
# 等待所有工作进程结束
for worker in workers:
worker.join(timeout=5)
if worker.is_alive():
main_logger.warning("强制终止工作进程 %d", worker.pid)
worker.terminate()
main_logger.info(
"数据处理完成 - 输入: %d, 输出: %d, 成功率: %.2f%%",
total_items, len(all_results), len(all_results) / total_items * 100
)
if __name__ == '__main__':
main()
5. 性能测试和监控
5.1 性能基准测试
import picologging as logging
import logging as stdlib_logging
import time
import threading
from concurrent.futures import ThreadPoolExecutor
import statistics
def benchmark_logging_performance():
"""日志性能基准测试"""
# 测试配置
num_threads = 10
logs_per_thread = 10000
total_logs = num_threads * logs_per_thread
def setup_picologging():
"""设置picologging"""
logger = logging.getLogger('pico_test')
logger.setLevel(logging.INFO)
handler = logging.FileHandler('pico_test.log')
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
def setup_stdlib_logging():
"""设置标准logging"""
logger = stdlib_logging.getLogger('stdlib_test')
logger.setLevel(stdlib_logging.INFO)
handler = stdlib_logging.FileHandler('stdlib_test.log')
formatter = stdlib_logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
def log_worker(logger, thread_id, num_logs):
"""日志工作函数"""
start_time = time.time()
for i in range(num_logs):
logger.info("Thread %d - Log message %d with some data: %s",
thread_id, i, "sample_data" * 10)
return time.time() - start_time
def run_benchmark(logger_setup_func, test_name):
"""运行基准测试"""
print(f"\n开始 {test_name} 性能测试...")
logger = logger_setup_func()
# 预热
for i in range(100):
logger.info("Warmup message %d", i)
# 开始测试
start_time = time.time()
with ThreadPoolExecutor(max_workers=num_threads) as executor:
futures = [
executor.submit(log_worker, logger, i, logs_per_thread)
for i in range(num_threads)
]
thread_times = [future.result() for future in futures]
total_time = time.time() - start_time
# 计算统计信息
avg_thread_time = statistics.mean(thread_times)
max_thread_time = max(thread_times)
min_thread_time = min(thread_times)
logs_per_second = total_logs / total_time
print(f"{test_name} 结果:")
print(f" 总时间: {total_time:.2f}s")
print(f" 平均线程时间: {avg_thread_time:.2f}s")
print(f" 最大线程时间: {max_thread_time:.2f}s")
print(f" 最小线程时间: {min_thread_time:.2f}s")
print(f" 日志/秒: {logs_per_second:.0f}")
print(f" 总日志数: {total_logs}")
return {
'total_time': total_time,
'logs_per_second': logs_per_second,
'avg_thread_time': avg_thread_time
}
# 运行测试
pico_results = run_benchmark(setup_picologging, "Picologging")
stdlib_results = run_benchmark(setup_stdlib_logging, "标准Logging")
# 比较结果
speedup = stdlib_results['logs_per_second'] / pico_results['logs_per_second']
time_improvement = (stdlib_results['total_time'] - pico_results['total_time']) / stdlib_results['total_time'] * 100
print(f"\n性能对比:")
print(f" Picologging 相对标准库提升: {speedup:.2f}x")
print(f" 时间节省: {time_improvement:.1f}%")
if __name__ == '__main__':
benchmark_logging_performance()
5.2 实时性能监控
import picologging as logging
import time
import threading
import psutil
import gc
from collections import deque
from typing import Dict, List
class LoggingPerformanceMonitor:
"""日志性能监控器"""
def __init__(self, monitor_interval: float = 5.0):
self.monitor_interval = monitor_interval
self.logger = logging.getLogger('perf_monitor')
self.logger.setLevel(logging.INFO)
# 性能指标
self.log_counts = deque(maxlen=100) # 最近100个时间窗口的日志数量
self.response_times = deque(maxlen=1000) # 最近1000次日志操作的响应时间
self.memory_usage = deque(maxlen=100) # 内存使用情况
self.cpu_usage = deque(maxlen=100) # CPU使用情况
self.current_log_count = 0
self.last_monitor_time = time.time()
# 启动监控线程
self.monitor_thread = threading.Thread(target=self._monitor_loop, daemon=True)
self.monitor_thread.start()
def record_log_operation(self, operation_time: float):
"""记录日志操作"""
self.current_log_count += 1
self.response_times.append(operation_time)
def _monitor_loop(self):
"""监控循环"""
while True:
try:
current_time = time.time()
# 计算日志速率
time_diff = current_time - self.last_monitor_time
log_rate = self.current_log_count / time_diff if time_diff > 0 else 0
self.log_counts.append(log_rate)
# 获取系统资源使用情况
process = psutil.Process()
memory_percent = process.memory_percent()
cpu_percent = process.cpu_percent()
self.memory_usage.append(memory_percent)
self.cpu_usage.append(cpu_percent)
# 计算响应时间统计
if self.response_times:
avg_response_time = sum(self.response_times) / len(self.response_times)
max_response_time = max(self.response_times)
min_response_time = min(self.response_times)
else:
avg_response_time = max_response_time = min_response_time = 0
# 记录性能指标
self.logger.info(
"性能监控 - 日志速率: %.1f/s, 内存: %.1f%%, CPU: %.1f%%, "
"响应时间 avg/max/min: %.3f/%.3f/%.3fms",
log_rate, memory_percent, cpu_percent,
avg_response_time * 1000, max_response_time * 1000, min_response_time * 1000
)
# 检查性能警告
self._check_performance_warnings(log_rate, memory_percent, cpu_percent, avg_response_time)
# 重置计数器
self.current_log_count = 0
self.last_monitor_time = current_time
time.sleep(self.monitor_interval)
except Exception as e:
self.logger.error("监控异常: %s", str(e))
time.sleep(self.monitor_interval)
def _check_performance_warnings(self, log_rate: float, memory_percent: float,
cpu_percent: float, avg_response_time: float):
"""检查性能警告"""
# 日志速率过低警告
if log_rate < 100 and len(self.log_counts) > 5:
recent_avg = sum(list(self.log_counts)[-5:]) / 5
if recent_avg < 100:
self.logger.warning("日志速率较低: %.1f/s (最近5分钟平均: %.1f/s)", log_rate, recent_avg)
# 内存使用过高警告
if memory_percent > 80:
self.logger.warning("内存使用过高: %.1f%%", memory_percent)
# CPU使用过高警告
if cpu_percent > 80:
self.logger.warning("CPU使用过高: %.1f%%", cpu_percent)
# 响应时间过长警告
if avg_response_time > 0.01: # 10ms
self.logger.warning("日志响应时间过长: %.3fms", avg_response_time * 1000)
def get_performance_summary(self) -> Dict:
"""获取性能摘要"""
if not self.log_counts:
return {}
return {
'avg_log_rate': sum(self.log_counts) / len(self.log_counts),
'max_log_rate': max(self.log_counts),
'min_log_rate': min(self.log_counts),
'avg_memory_usage': sum(self.memory_usage) / len(self.memory_usage) if self.memory_usage else 0,
'avg_cpu_usage': sum(self.cpu_usage) / len(self.cpu_usage) if self.cpu_usage else 0,
'avg_response_time': sum(self.response_times) / len(self.response_times) if self.response_times else 0,
}
# 使用示例
monitor = LoggingPerformanceMonitor(monitor_interval=3.0)
# 模拟高频日志记录
def simulate_logging():
"""模拟日志记录"""
logger = logging.getLogger('test_app')
logger.setLevel(logging.INFO)
handler = logging.FileHandler('test_app.log')
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
for i in range(10000):
start_time = time.time()
logger.info("测试日志消息 %d - 包含一些数据: %s", i, "data" * 20)
operation_time = time.time() - start_time
monitor.record_log_operation(operation_time)
# 模拟不同的负载
if i % 1000 == 0:
time.sleep(0.1) # 模拟偶尔的延迟
elif i % 100 == 0:
time.sleep(0.01) # 模拟小延迟
# 输出性能摘要
summary = monitor.get_performance_summary()
print("\n性能摘要:")
for key, value in summary.items():
if 'time' in key:
print(f" {key}: {value*1000:.3f}ms")
elif 'rate' in key:
print(f" {key}: {value:.1f}/s")
else:
print(f" {key}: {value:.2f}")
if __name__ == '__main__':
simulate_logging()
最佳实践
1. 部署建议
- 渐进式迁移 - 先在非关键模块测试,逐步扩展
- 性能监控 - 部署后持续监控性能指标
- 回退计划 - 准备快速回退到标准logging的方案
- 版本锁定 - 在生产环境中锁定picologging版本
2. 配置优化
- 合理设置日志级别 - 避免过多的DEBUG日志
- 使用异步处理 - 对于高频日志使用队列处理
- 文件轮转 - 合理配置日志文件大小和保留策略
- 格式化优化 - 避免复杂的格式化操作
3. 监控和维护
- 性能指标 - 监控日志吞吐量和响应时间
- 资源使用 - 关注内存和CPU使用情况
- 错误率 - 监控日志记录失败率
- 存储空间 - 监控日志文件大小和磁盘使用
注意事项
- 兼容性 - 虽然API兼容,但某些边缘情况可能有差异
- 依赖管理 - 需要C编译环境,在某些环境下可能安装困难
- 内存使用 - 高频日志可能导致内存使用增加
- 调试困难 - C扩展部分的错误较难调试
- 版本更新 - 新版本可能引入不兼容的变化
- 平台支持 - 某些平台可能不支持C扩展
相关内容
- logging模块 - Python标准日志库
- loguru模块 - 简单易用的日志库
- structlog模块 - 结构化日志库
讨论与反馈
欢迎在下方留言讨论,分享你的学习心得或提出问题。评论基于GitHub Issues,需要GitHub账号。