综合介绍
insanely-fast-whisper是一个结合了OpenAI的Whisper模型和各种优化技术(如Transformers, Optimum, Flash Attention)的音频转录工具,提供了命令行界面(CLI),旨在快速高效地转录大量音频。它使用Whisper Large v3模型,能够在不到98秒的时间内转录150分钟的音频内容。用户可以通过GitHub仓库了解更多详情、安装指南和使用帮助。
多发言人识别
pyannote.audio是一个用Python编写的用于扬声器diarization的开源工具包。基于PyTorch机器学习框架,它具有最先进的预训练模型和管道,可以进一步对自己的数据进行微调,以获得更好的性能。
faster-whisper + pyannote.audio 实现语者识别,实际上只要将二者的识别结果进行结合即可
官方仓库:https://github.com/pyannote/pyannote-audio
功能列表
使用Whisper Large v3模型进行音频转录
采用Transformers, Optimum, Flash Attention等技术
提供CLI界面
支持不同的优化类型并展示基准测试
使用帮助
安装: 利用pip进行安装和配置
使用: 直接通过命令行传递参数并运行转录任务
获取帮助: 访问GitHub仓库阅读文档和社区交流
https://github.com/SYSTRAN/faster-whisper项目编写的google colab代码
# 安装必要的库get_ipython().system('pip install faster-whisper')# 导入必要的库from faster_whisper import available_modelsimport torchimport ipywidgets as widgetsfrom IPython.display import display, clear_outputimport os # 导入操作系统库,用于处理文件操作import gc # 导入垃圾回收库# 自动检测设备类型并选择GPU或CPUdevice = "cuda" if torch.cuda.is_available() else "cpu"model_size = "large-v2" # 默认选择模型大小compute_type = "float16" if device == "cuda" else "float32" # 如果使用CPU,则切换到float32# 获取可用模型的列表models_list = available_models()# 默认语言列表supported_languages = ['en', 'fr', 'de', 'zh', '...'] # 使用默认的语言列表default_language = 'zh' if 'zh' in supported_languages else supported_languages[0] # 如果列表中有'zh',则使用它作为默认值;否则使用列表中的第一个值
# 创建GUI界面model_label = widgets.Label('选择模型:')model_dropdown = widgets.Dropdown(options=models_list, value=model_size)language_label = widgets.Label('语言:')language_dropdown = widgets.Dropdown(options=supported_languages, value=default_language)beam_size_label = widgets.Label('Beam大小:')beam_size_slider = widgets.IntSlider(value=5, min=1, max=10, step=1)compute_type_label = widgets.Label('计算类型:')if device == "cuda":compute_type_options = ['float16', 'int8']else:compute_type_options = ['float32'] # 如果是CPU,则锁定为float32compute_type_dropdown = widgets.Dropdown(options=compute_type_options, value=compute_type)mode_label = widgets.Label('Format Mode:')mode_dropdown = widgets.Dropdown(options=['normal', 'timeline', 'subtitle'], value='normal')initial_prompt_label = widgets.Label('初始提示:') # 新增的初始提示标签initial_prompt_text = widgets.Text(value='') # 新增的初始提示输入框file_name_text = widgets.Text(description='文件名:', value='/content/') # 允许用户输入文件名transcribe_button = widgets.Button(description='转译')output_area = widgets.Output()
# 定义转译函数def transcribe_audio(b):with output_area:clear_output()print("开始转录...")from faster_whisper import WhisperModel # 动态导入WhisperModel:在需要时导入以节省RAMtry:file_name = file_name_text.value # 使用用户输入的文件名initial_prompt = initial_prompt_text.value # 使用用户输入的初始提示# 确保文件存在if not os.path.exists(file_name):print(f"文件 {file_name} 不存在,请检查文件名和路径是否正确。")return# 获取选取的模型selected_model = model_dropdown.valueselected_compute_type = compute_type_dropdown.valueselected_language = language_dropdown.value# 创建新的模型实例并做转译model = WhisperModel(selected_model, device=device, compute_type=selected_compute_type)try:# 转译音频segments, info = model.transcribe(file_name, beam_size=beam_size_slider.value, language=selected_language, initial_prompt=initial_prompt) # 新增的初始提示参数# 打印结果print("Detected language '%s' with probability %f" % (info.language, info.language_probability))for segment in segments:if mode_dropdown.value == 'normal':print("%s " % (segment.text))elif mode_dropdown.value == 'timeline':print("[%.2fs -> %.2fs] %s" % (segment.start, segment.end, segment.text))else: # subtitlestart_time = "{:02d}:{:02d}:{:02d},{:03d}".format(int(segment.start // 3600), int((segment.start % 3600) // 60), int(segment.start % 60), int((segment.start % 1) * 1000))end_time = "{:02d}:{:02d}:{:02d},{:03d}".format(int(segment.end // 3600), int((segment.end % 3600) // 60), int(segment.end % 60), int((segment.end % 1) * 1000))print("%d\n%s --> %s\n%s\n" % (segment.id, start_time, end_time, segment.text))finally:# 删除模型实例以释放RAMdel modelexcept Exception as e:print("An error occurred during transcription:")print(str(e))finally:# 调用垃圾回收gc.collect()print("转录完成。")
# 组装GUI界面display(model_label, model_dropdown, language_label, language_dropdown, beam_size_label, beam_size_slider, compute_type_label, compute_type_dropdown, mode_label, mode_dropdown, initial_prompt_label, initial_prompt_text, file_name_text, transcribe_button, output_area)transcribe_button.on_click(transcribe_audio)
from pyannote.core import Segment
def get_text_with_timestamp(transcribe_res):
timestamp_texts = []
for item in transcribe_res:
start = item.start
end = item.end
text = item.text.strip()
timestamp_texts.append((Segment(start, end), text))
return timestamp_textsdef add_speaker_info_to_text(timestamp_texts, ann):
spk_text = []
for seg, text in timestamp_texts:
spk = ann.crop(seg).argmax()
spk_text.append((seg, spk, text))
return spk_textdef merge_cache(text_cache):
sentence = ''.join([item[-1] for item in text_cache])
spk = text_cache[0][1]
start = round(text_cache[0][0].start, 1)
end = round(text_cache[-1][0].end, 1)
return Segment(start, end), spk, sentencePUNC_SENT_END = [',', '.', '?', '!', ",", "。", "?", "!"]
def merge_sentence(spk_text):
merged_spk_text = []
pre_spk = None
text_cache = []
for seg, spk, text in spk_text:
if spk != pre_spk and pre_spk is not None and len(text_cache) > 0:
merged_spk_text.append(merge_cache(text_cache))
text_cache = [(seg, spk, text)]
pre_spk = spkelif text and len(text) > 0 and text[-1] in PUNC_SENT_END:
text_cache.append((seg, spk, text))
merged_spk_text.append(merge_cache(text_cache))
text_cache = []
pre_spk = spk
else:
text_cache.append((seg, spk, text))
pre_spk = spk
if len(text_cache) > 0:
merged_spk_text.append(merge_cache(text_cache))
return merged_spk_textdef diarize_text(transcribe_res, diarization_result):
timestamp_texts = get_text_with_timestamp(transcribe_res)
spk_text = add_speaker_info_to_text(timestamp_texts, diarization_result)
res_processed = merge_sentence(spk_text)
return res_processeddef write_to_txt(spk_sent, file):
with open(file, 'w') as fp:
for seg, spk, sentence in spk_sent:
line = f'{seg.start:.2f} {seg.end:.2f} {spk} {sentence}\n'
fp.write(line)
import torch
import whisper
import numpy as np
from pydub import AudioSegment
from loguru import logger
from faster_whisper import WhisperModel
from pyannote.audio import Pipeline
from pyannote.audio import Audiofrom common.error import ErrorCode
model_path = config["asr"]["faster-whisper-large-v3"]
# 测试音频: https://isv-data.oss-cn-hangzhou.aliyuncs.com/ics/MaaS/ASR/test_audio/asr_speaker_demo.wav
audio = "./test/asr/data/asr_speaker_demo.wav"
asr_model = WhisperModel(model_path, device="cuda", compute_type="float16")
spk_rec_pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization-3.1", use_auth_token="your huggingface token")
spk_rec_pipeline.to(torch.device("cuda"))asr_result, info = asr_model.transcribe(audio, language="zh", beam_size=5)
diarization_result = spk_rec_pipeline(audio)final_result = diarize_text(asr_result, diarization_result)
for segment, spk, sent in final_result:
print("[%.2fs -> %.2fs] %s %s" % (segment.start, segment.end, sent, spk))