You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

181 lines
5.7 KiB

import json
import re
import os
from datetime import datetime
def time_to_milliseconds(time_str):
"""将时间字符串转换为毫秒"""
# 解析时间格式 HH:MM:SS
parts = time_str.split(':')
hours = int(parts[0])
minutes = int(parts[1])
seconds = int(parts[2])
# 转换为毫秒
total_ms = (hours * 3600 + minutes * 60 + seconds) * 1000
return total_ms
def parse_timeline_file(file_path, fixed_id=1104):
"""解析时间轴文本文件"""
result = []
try:
with open(file_path, 'r', encoding='utf-8') as file:
content = file.read().strip()
# 按行分割内容
lines = content.split('\n')
i = 0
while i < len(lines):
line = lines[i].strip()
# 检查是否是时间轴格式:HH:MM:SS-HH:MM:SS
time_match = re.match(r'(\d{2}:\d{2}:\d{2})-(\d{2}:\d{2}:\d{2})', line)
if time_match:
start_time_str = time_match.group(1)
end_time_str = time_match.group(2)
start_time_ms = time_to_milliseconds(start_time_str)
end_time_ms = time_to_milliseconds(end_time_str)
# 获取下一行作为内容(如果存在)
content_text = ""
if i + 1 < len(lines):
content_text = lines[i + 1].strip()
i += 1 # 跳过内容行
# 创建JSON对象
if content_text: # 只有当内容不为空时才添加
json_obj = {
"d_id": fixed_id,
"start_time": start_time_ms,
"end_time": end_time_ms,
"content": content_text
}
result.append(json_obj)
i += 1
except FileNotFoundError:
print(f"文件未找到: {file_path}")
return []
except Exception as e:
print(f"处理文件时出错: {e}")
return []
return result
def get_txt_files(folder_path):
"""获取文件夹中所有的txt文件"""
txt_files = []
try:
for filename in os.listdir(folder_path):
if filename.lower().endswith('.txt'):
full_path = os.path.join(folder_path, filename)
txt_files.append((filename, full_path))
# 按文件名排序,确保处理顺序一致
txt_files.sort(key=lambda x: x[0])
return txt_files
except Exception as e:
print(f"读取文件夹时出错: {e}")
return []
def save_to_json(data, output_path):
"""保存为JSON文件"""
try:
with open(output_path, 'w', encoding='utf-8') as file:
json.dump(data, file, ensure_ascii=False, indent=2)
print(f"JSON文件已保存: {output_path}")
except Exception as e:
print(f"保存JSON文件时出错: {e}")
def batch_process_txt_files(folder_path, start_id=1104):
"""批量处理文件夹中的txt文件"""
txt_files = get_txt_files(folder_path)
if not txt_files:
print("未找到任何txt文件")
return
print(f"找到 {len(txt_files)} 个txt文件:")
for i, (filename, _) in enumerate(txt_files):
print(f"{i + 1}. {filename} (d_id: {start_id + i})")
all_data = []
file_summary = []
for i, (filename, file_path) in enumerate(txt_files):
current_id = start_id + i
print(f"\n正在处理: {filename} (d_id: {current_id})")
# 解析单个文件
file_data = parse_timeline_file(file_path, current_id)
if file_data:
all_data.extend(file_data)
file_summary.append({
"filename": filename,
"d_id": current_id,
"segments": len(file_data)
})
print(f"成功解析 {len(file_data)} 个数据段")
else:
print(f"文件 {filename} 未能解析到有效数据")
# 保存合并的JSON文件
if all_data:
output_file = os.path.join(folder_path, "all_timeline_data.json")
save_to_json(all_data, output_file)
# 保存处理摘要
summary_file = os.path.join(folder_path, "processing_summary.json")
summary_data = {
"total_files": len(txt_files),
"total_segments": len(all_data),
"start_id": start_id,
"end_id": start_id + len(txt_files) - 1,
"files": file_summary
}
save_to_json(summary_data, summary_file)
print(f"\n=== 处理完成 ===")
print(f"总文件数: {len(txt_files)}")
print(f"总数据段: {len(all_data)}")
print(f"ID范围: {start_id} - {start_id + len(txt_files) - 1}")
print(f"合并文件: all_timeline_data.json")
print(f"摘要文件: processing_summary.json")
# # 分别保存每个文件的JSON
# print(f"\n正在保存单独的JSON文件...")
# for i, (filename, file_path) in enumerate(txt_files):
# current_id = start_id + i
# file_data = parse_timeline_file(file_path, current_id)
# if file_data:
# json_filename = filename.replace('.txt', '.json')
# json_path = os.path.join(folder_path, json_filename)
# save_to_json(file_data, json_path)
#
# print("所有单独的JSON文件已保存完成")
else:
print("没有解析到任何有效数据")
def main():
# 批量处理文件夹中的txt文件
folder_path = r"D:\workstation\voice-txt\ct-punc-test\ASR+punc\staic-应急宣传"
start_id = 1104
print("开始批量处理txt文件...")
batch_process_txt_files(folder_path, start_id)
if __name__ == "__main__":
main()