You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
181 lines
5.7 KiB
181 lines
5.7 KiB
import json |
|
import re |
|
import os |
|
from datetime import datetime |
|
|
|
|
|
def time_to_milliseconds(time_str): |
|
"""将时间字符串转换为毫秒""" |
|
# 解析时间格式 HH:MM:SS |
|
parts = time_str.split(':') |
|
hours = int(parts[0]) |
|
minutes = int(parts[1]) |
|
seconds = int(parts[2]) |
|
|
|
# 转换为毫秒 |
|
total_ms = (hours * 3600 + minutes * 60 + seconds) * 1000 |
|
return total_ms |
|
|
|
|
|
def parse_timeline_file(file_path, fixed_id=1104): |
|
"""解析时间轴文本文件""" |
|
result = [] |
|
|
|
try: |
|
with open(file_path, 'r', encoding='utf-8') as file: |
|
content = file.read().strip() |
|
|
|
# 按行分割内容 |
|
lines = content.split('\n') |
|
|
|
i = 0 |
|
while i < len(lines): |
|
line = lines[i].strip() |
|
|
|
# 检查是否是时间轴格式:HH:MM:SS-HH:MM:SS |
|
time_match = re.match(r'(\d{2}:\d{2}:\d{2})-(\d{2}:\d{2}:\d{2})', line) |
|
|
|
if time_match: |
|
start_time_str = time_match.group(1) |
|
end_time_str = time_match.group(2) |
|
|
|
start_time_ms = time_to_milliseconds(start_time_str) |
|
end_time_ms = time_to_milliseconds(end_time_str) |
|
|
|
# 获取下一行作为内容(如果存在) |
|
content_text = "" |
|
if i + 1 < len(lines): |
|
content_text = lines[i + 1].strip() |
|
i += 1 # 跳过内容行 |
|
|
|
# 创建JSON对象 |
|
if content_text: # 只有当内容不为空时才添加 |
|
json_obj = { |
|
"d_id": fixed_id, |
|
"start_time": start_time_ms, |
|
"end_time": end_time_ms, |
|
"content": content_text |
|
} |
|
result.append(json_obj) |
|
|
|
i += 1 |
|
|
|
except FileNotFoundError: |
|
print(f"文件未找到: {file_path}") |
|
return [] |
|
except Exception as e: |
|
print(f"处理文件时出错: {e}") |
|
return [] |
|
|
|
return result |
|
|
|
|
|
def get_txt_files(folder_path): |
|
"""获取文件夹中所有的txt文件""" |
|
txt_files = [] |
|
try: |
|
for filename in os.listdir(folder_path): |
|
if filename.lower().endswith('.txt'): |
|
full_path = os.path.join(folder_path, filename) |
|
txt_files.append((filename, full_path)) |
|
|
|
# 按文件名排序,确保处理顺序一致 |
|
txt_files.sort(key=lambda x: x[0]) |
|
return txt_files |
|
except Exception as e: |
|
print(f"读取文件夹时出错: {e}") |
|
return [] |
|
|
|
|
|
def save_to_json(data, output_path): |
|
"""保存为JSON文件""" |
|
try: |
|
with open(output_path, 'w', encoding='utf-8') as file: |
|
json.dump(data, file, ensure_ascii=False, indent=2) |
|
print(f"JSON文件已保存: {output_path}") |
|
except Exception as e: |
|
print(f"保存JSON文件时出错: {e}") |
|
|
|
|
|
def batch_process_txt_files(folder_path, start_id=1104): |
|
"""批量处理文件夹中的txt文件""" |
|
txt_files = get_txt_files(folder_path) |
|
|
|
if not txt_files: |
|
print("未找到任何txt文件") |
|
return |
|
|
|
print(f"找到 {len(txt_files)} 个txt文件:") |
|
for i, (filename, _) in enumerate(txt_files): |
|
print(f"{i + 1}. {filename} (d_id: {start_id + i})") |
|
|
|
all_data = [] |
|
file_summary = [] |
|
|
|
for i, (filename, file_path) in enumerate(txt_files): |
|
current_id = start_id + i |
|
print(f"\n正在处理: {filename} (d_id: {current_id})") |
|
|
|
# 解析单个文件 |
|
file_data = parse_timeline_file(file_path, current_id) |
|
|
|
if file_data: |
|
all_data.extend(file_data) |
|
file_summary.append({ |
|
"filename": filename, |
|
"d_id": current_id, |
|
"segments": len(file_data) |
|
}) |
|
print(f"成功解析 {len(file_data)} 个数据段") |
|
else: |
|
print(f"文件 {filename} 未能解析到有效数据") |
|
|
|
# 保存合并的JSON文件 |
|
if all_data: |
|
output_file = os.path.join(folder_path, "all_timeline_data.json") |
|
save_to_json(all_data, output_file) |
|
|
|
# 保存处理摘要 |
|
summary_file = os.path.join(folder_path, "processing_summary.json") |
|
summary_data = { |
|
"total_files": len(txt_files), |
|
"total_segments": len(all_data), |
|
"start_id": start_id, |
|
"end_id": start_id + len(txt_files) - 1, |
|
"files": file_summary |
|
} |
|
save_to_json(summary_data, summary_file) |
|
|
|
print(f"\n=== 处理完成 ===") |
|
print(f"总文件数: {len(txt_files)}") |
|
print(f"总数据段: {len(all_data)}") |
|
print(f"ID范围: {start_id} - {start_id + len(txt_files) - 1}") |
|
print(f"合并文件: all_timeline_data.json") |
|
print(f"摘要文件: processing_summary.json") |
|
|
|
# # 分别保存每个文件的JSON |
|
# print(f"\n正在保存单独的JSON文件...") |
|
# for i, (filename, file_path) in enumerate(txt_files): |
|
# current_id = start_id + i |
|
# file_data = parse_timeline_file(file_path, current_id) |
|
# if file_data: |
|
# json_filename = filename.replace('.txt', '.json') |
|
# json_path = os.path.join(folder_path, json_filename) |
|
# save_to_json(file_data, json_path) |
|
# |
|
# print("所有单独的JSON文件已保存完成") |
|
|
|
else: |
|
print("没有解析到任何有效数据") |
|
|
|
|
|
def main(): |
|
# 批量处理文件夹中的txt文件 |
|
folder_path = r"D:\workstation\voice-txt\ct-punc-test\ASR+punc\staic-应急宣传" |
|
start_id = 1104 |
|
|
|
print("开始批量处理txt文件...") |
|
batch_process_txt_files(folder_path, start_id) |
|
|
|
if __name__ == "__main__": |
|
main() |