You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

520 lines
16 KiB

import sys
sys.path.append('D:\\workstation\\voice-txt\\FireRedASR-test\\FireRedASR')
import os
import sys
import time
import json
import numpy as np
import torch
import argparse
import requests
from pathlib import Path
from pydub import AudioSegment
import librosa
def format_time_hms(seconds):
"""将秒数转换为 HH:MM:SS 格式"""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
return f"{hours:02d}:{minutes:02d}:{secs:02d}"
def load_audio(audio_path):
"""加载音频文件"""
print(f"📁 加载音频: {audio_path}")
# 使用librosa加载音频数据用于能量分析
audio_data, sr = librosa.load(str(audio_path), sr=16000, mono=True)
# 使用pydub加载用于分段导出
audio_pydub = AudioSegment.from_file(str(audio_path))
audio_pydub = audio_pydub.set_frame_rate(16000).set_channels(1)
duration = len(audio_data) / sr
print(f" 时长: {duration:.2f}秒, 采样率: {sr}Hz")
return audio_data, sr, audio_pydub
def energy_based_segmentation(audio_data, sr,
silence_threshold=0.001,
min_segment_length=2.0,
max_segment_length=455.0):
"""基于能量的音频分段"""
print("🧠 能量分段分析...")
# 计算短时能量
frame_len = int(0.025 * sr) # 25ms
hop_len = int(0.005 * sr) # 5ms
energy = []
for i in range(0, len(audio_data) - frame_len + 1, hop_len):
frame = audio_data[i:i + frame_len]
frame_energy = np.sum(frame ** 2) / len(frame)
energy.append(frame_energy)
energy = np.array(energy)
if energy.max() > 0:
energy = energy / energy.max()
# 找静音点
silence_points = []
for i, e in enumerate(energy):
if e < silence_threshold:
time_point = i * 0.005 # hop_length转换为秒
silence_points.append(time_point)
if not silence_points:
print(" 未检测到静音,使用固定分段")
return fixed_segmentation(len(audio_data) / sr, max_segment_length)
# 合并相邻静音
silence_intervals = []
if silence_points:
current_start = silence_points[0]
current_end = silence_points[0]
for point in silence_points[1:]:
if point - current_end <= 0.1: # 0.1s内视为连续
current_end = point
else:
if current_end - current_start >= 0.05: # 至少50ms
silence_intervals.append((current_start, current_end))
current_start = point
current_end = point
if current_end - current_start >= 0.05:
silence_intervals.append((current_start, current_end))
# 生成语音段
segments = []
last_end = 0.0
audio_duration = len(audio_data) / sr
for silence_start, silence_end in silence_intervals:
if silence_start - last_end >= min_segment_length:
segments.append({
'start_time': last_end,
'end_time': silence_start,
'type': 'natural'
})
elif segments and silence_start - segments[-1]['start_time'] <= max_segment_length:
segments[-1]['end_time'] = silence_start
last_end = silence_end
# 处理末尾
if audio_duration - last_end >= min_segment_length:
segments.append({
'start_time': last_end,
'end_time': audio_duration,
'type': 'natural'
})
elif segments:
segments[-1]['end_time'] = audio_duration
# 处理过长段落
final_segments = []
for segment in segments:
duration = segment['end_time'] - segment['start_time']
if duration > max_segment_length:
num_subsegments = int(np.ceil(duration / max_segment_length))
sub_duration = duration / num_subsegments
for i in range(num_subsegments):
sub_start = segment['start_time'] + i * sub_duration
sub_end = min(sub_start + sub_duration, segment['end_time'])
final_segments.append({
'start_time': sub_start,
'end_time': sub_end,
'type': 'forced'
})
else:
final_segments.append(segment)
print(f" 完成分段: {len(final_segments)}个片段")
return final_segments
def fixed_segmentation(total_duration, segment_duration):
"""固定时长分段(备用)"""
segments = []
start_time = 0
while start_time < total_duration:
end_time = min(start_time + segment_duration, total_duration)
segments.append({
'start_time': start_time,
'end_time': end_time,
'type': 'fixed'
})
start_time = end_time
return segments
def create_audio_segments(segments, audio_pydub):
"""创建音频片段文件"""
print(" 创建音频片段...")
temp_dir = Path("temp_segments")
temp_dir.mkdir(exist_ok=True)
segment_files = []
for i, segment_info in enumerate(segments):
start_time = segment_info['start_time']
end_time = segment_info['end_time']
# 转换为毫秒并添加25ms padding
start_ms = max(0, int(start_time * 1000) - 25)
end_ms = min(len(audio_pydub), int(end_time * 1000) + 25)
# 提取并保存音频片段
segment = audio_pydub[start_ms:end_ms]
segment_file = temp_dir / f"segment_{i:03d}.wav"
segment.export(str(segment_file), format="wav")
segment_files.append({
'file': segment_file,
'start_time': start_time,
'end_time': end_time,
'index': i
})
print(f" 创建完成: {len(segment_files)}个文件")
return segment_files
def setup_fireredasr_environment():
"""设置FireRedASR环境"""
# 尝试添加FireRedASR路径到sys.path
possible_paths = [
"D:/workstation/voice-txt/FireRedASR-test/FireRedASR",
"./FireRedASR",
"../FireRedASR",
"FireRedASR"
]
for path in possible_paths:
if Path(path).exists():
if str(Path(path).absolute()) not in sys.path:
sys.path.insert(0, str(Path(path).absolute()))
print(f" 添加路径: {Path(path).absolute()}")
return True
return False
def load_fireredasr_model(model_dir):
"""加载FireRedASR模型"""
print("🚀 加载FireRedASR模型...")
# 设置环境
if not setup_fireredasr_environment():
print(" 未找到FireRedASR路径,尝试直接导入...")
try:
# 修复PyTorch兼容性
torch.serialization.add_safe_globals([argparse.Namespace])
# 尝试多种导入方式
try:
from fireredasr.models.fireredasr import FireRedAsr
except ImportError:
try:
from FireRedASR.fireredasr.models.fireredasr import FireRedAsr
except ImportError:
import fireredasr
from fireredasr.models.fireredasr import FireRedAsr
model = FireRedAsr.from_pretrained("aed", model_dir)
# 尝试使用GPU
device = "cuda:0" if torch.cuda.is_available() else "cpu"
if torch.cuda.is_available():
try:
model = model.to(device)
except:
pass
print(f" 使用GPU: {torch.cuda.get_device_name(0)}")
else:
print(" 使用CPU")
if hasattr(model, 'eval'):
model.eval()
return model, device
except Exception as e:
print(f"❌ 模型加载失败: {e}")
print("请检查:")
print("1. FireRedASR是否正确安装")
print("2. 路径配置是否正确")
print("3. 依赖库是否完整")
return None, None
def transcribe_segments(segment_files, model, device):
"""转录音频片段"""
print("🎤 开始语音识别...")
results = []
use_gpu = device.startswith("cuda")
for i, segment_info in enumerate(segment_files):
segment_file = segment_info['file']
start_time = segment_info['start_time']
end_time = segment_info['end_time']
print(f" [{i + 1}/{len(segment_files)}] {start_time:.1f}s-{end_time:.1f}s")
try:
batch_uttid = [f"segment_{i:03d}"]
batch_wav_path = [str(segment_file)]
config = {
"use_gpu": 1 if use_gpu else 0,
"beam_size": 5,
"nbest": 1,
"decode_max_len": 0
}
with torch.no_grad():
transcription_result = model.transcribe(
batch_uttid, batch_wav_path, config
)
if transcription_result and len(transcription_result) > 0:
text = transcription_result[0].get('text', '').strip()
if text:
results.append({
'start_time': start_time,
'end_time': end_time,
'text': text
})
print(f"{text}")
else:
print(f" 无内容")
# 清理GPU缓存
if use_gpu:
torch.cuda.empty_cache()
except Exception as e:
print(f" ❌ 错误: {e}")
continue
print(f" 识别完成: {len(results)}/{len(segment_files)}个片段")
return results
def save_results(results, base_filename):
"""保存JSON和TXT格式结果"""
if not results:
print(" 没有识别结果")
return
# 按时间排序
results.sort(key=lambda x: x['start_time'])
# 1. 保存JSON格式
json_file = base_filename.replace('.txt', '.json')
json_data = []
for result in results:
json_data.append({
'start': result['start_time'],
'end': result['end_time'],
'content': result['text']
})
with open(json_file, 'w', encoding='utf-8') as f:
json.dump(json_data, f, ensure_ascii=False, indent=2)
print(f"✅ JSON保存: {json_file}")
# 2. 保存干净TXT格式
clean_txt_file = base_filename.replace('.txt', '_clean.txt')
with open(clean_txt_file, 'w', encoding='utf-8') as f:
for result in results:
start_time_str = format_time_hms(result['start_time'])
end_time_str = format_time_hms(result['end_time'])
f.write(f"{start_time_str}-{end_time_str}\n")
f.write(f"{result['text']}\n")
print(f"✅ 干净文本保存: {clean_txt_file}")
def call_ai_model(sentence):
"""调用本地AI模型进行文本处理"""
url = "http://192.168.3.8:7777/v1/chat/completions"
prompt = f"""对以下文本添加标点符号,中文数字转阿拉伯数字。不修改文字内容。句末可以是冒号、逗号、问号、感叹号和句号等任意合适标点。
{sentence}"""
payload = {
"model": "Qwen3-14B",
"messages": [
{"role": "user", "content": prompt}
]
}
try:
response = requests.post(url, json=payload, timeout=120)
response.raise_for_status()
result = response.json()
processed_text = result["choices"][0]["message"]["content"].strip()
return processed_text
except requests.exceptions.RequestException as e:
print(f" ❌ API调用失败: {e}")
return sentence # 失败时返回原文
def process_transcription_results(input_json):
"""处理转录结果"""
print("🔄 开始文本后处理...")
# 读取原始JSON文件
with open(input_json, 'r', encoding='utf-8') as f:
data = json.load(f)
print(f" 读取到 {len(data)} 个文本段")
processed_data = []
total_segments = len(data)
for i, item in enumerate(data):
start_time = item['start']
end_time = item['end']
original_content = item['content']
print(f" [{i + 1}/{total_segments}] 处理: {start_time:.1f}s-{end_time:.1f}s")
print(f" 原文: {original_content}")
# 调用AI模型处理
processed_content = call_ai_model(original_content)
print(f" 处理后: {processed_content}")
processed_data.append({
'start': start_time,
'end': end_time,
'content': processed_content
})
return processed_data
def save_processed_results(processed_data, output_json, output_txt):
"""保存处理后的结果"""
print("💾 保存处理结果...")
# 1. 保存JSON格式
with open(output_json, 'w', encoding='utf-8') as f:
json.dump(processed_data, f, ensure_ascii=False, indent=2)
print(f"✅ JSON保存: {output_json}")
# 2. 保存干净TXT格式
with open(output_txt, 'w', encoding='utf-8') as f:
for item in processed_data:
start_time_str = format_time_hms(item['start'])
end_time_str = format_time_hms(item['end'])
f.write(f"{start_time_str}-{end_time_str}\n")
f.write(f"{item['content']}\n")
print(f"✅ 干净文本保存: {output_txt}")
def cleanup_temp_files():
"""清理临时文件"""
temp_dir = Path("temp_segments")
if temp_dir.exists():
for file in temp_dir.glob("segment_*.wav"):
file.unlink(missing_ok=True)
temp_dir.rmdir()
def main():
"""主函数"""
print("🎵 === FireRedASR能量分段转录 + 文本后处理工具 === 🎵")
# 配置文件路径
input_audio = "test.mp3"
output_base = "test.txt"
model_dir = "D:/workstation/voice-txt/FireRedASR-test/FireRedASR/pretrained_models/FireRedASR-AED-L"
# 检查文件
if not Path(input_audio).exists():
print(f"❌ 音频文件不存在: {input_audio}")
return
if not Path(model_dir).exists():
print(f"❌ 模型目录不存在: {model_dir}")
return
total_start_time = time.time()
try:
# === 第一阶段:ASR转录 ===
print("\n=== 第一阶段:ASR转录 ===")
asr_start_time = time.time()
# 1. 加载音频
audio_data, sr, audio_pydub = load_audio(input_audio)
# 2. 能量分段
segments = energy_based_segmentation(
audio_data, sr,
silence_threshold=0.001, # 静音阈值
min_segment_length=2.0, # 最小段长2秒
max_segment_length=455.0 # 最大段长455秒
)
# 3. 创建音频片段
segment_files = create_audio_segments(segments, audio_pydub)
# 4. 加载模型
model, device = load_fireredasr_model(model_dir)
if model is None:
return
# 5. 语音识别
results = transcribe_segments(segment_files, model, device)
# 6. 保存原始结果
save_results(results, output_base)
asr_elapsed_time = time.time() - asr_start_time
print(f"\n✅ ASR转录完成! 耗时: {asr_elapsed_time:.1f}")
print(f" 成功识别: {len(results)}/{len(segments)}")
# === 第二阶段:文本后处理 ===
print("\n=== 第二阶段:文本后处理 ===")
processing_start_time = time.time()
# 处理转录结果
json_file = output_base.replace('.txt', '.json')
processed_data = process_transcription_results(json_file)
# 保存处理后的结果
output_json = "test_processed.json"
output_txt = "test_processed_clean.txt"
save_processed_results(processed_data, output_json, output_txt)
processing_elapsed_time = time.time() - processing_start_time
print(f"\n✅ 文本后处理完成! 耗时: {processing_elapsed_time / 60:.2f}分钟")
# === 总结 ===
total_elapsed_time = time.time() - total_start_time
print(f"\n🎉 全部处理完成!")
print(f" ASR转录耗时: {asr_elapsed_time:.1f}")
print(f" 文本处理耗时: {processing_elapsed_time / 60:.2f}分钟")
print(f" 总耗时: {total_elapsed_time / 60:.2f}分钟")
except KeyboardInterrupt:
print("\n 用户中断")
except Exception as e:
print(f"\n❌ 错误: {e}")
import traceback
traceback.print_exc()
finally:
cleanup_temp_files()
if __name__ == "__main__":
main()