概览
小智(xiaozhi-esp32)是一个开源的 AI 聊天机器人项目。小智分为端侧与服务端侧,本篇仅聚焦于端侧。
基本信息
- 代码版本:release 1.4.6
- 主要语言:C++
- 主要框架:ESP-IDF
- 开源协议:MIT
整体架构
整体架构梳理如下:
程序的主入口位于 main.cc 中的 app_main()
主函数,其中调用了 Application::GetInstance().Start()
启动,主要流程如下:
Start()
函数中做了一系列初始化工作,通过 Board
抽象类统一初始化显示器、音频编解码器等硬件设备。同时还启动了两个异步任务:MainLoop
和 CheckNewVersion
。其中 MainLoop
任务实现了事件循环(Event Loop)机制,通过死循环的方式监听并处理各类事件,包括音频输入捕获、音频输出播放以及任务调度等,确保系统能够实时响应并高效处理各类交互请求。
重点模块
以 xxxx 硬件为标准
audio_codecs
该目录下是音频编解码模块,其中 AudioCodec
抽象类定义了相关接口,并由具体硬件类型的音频编解码器实现。
display
网络
// application.cc
/* Wait for the network to be ready */
board.StartNetwork();
WifiBoard
类实现了 Board
抽象类,并提供了 WiFi 相关的具体实现。当用户启动小智时,会调用 WifiBoard::StartNetwork()
函数启动网络。若用户在启动时按下 BOOT 按钮、没有连接过 WiFi 或连接 WiFi 超时,则会进入 WiFi 配置模式,此时会开启 Xiaozhi WiFi 热点,并等待用户连接。否则会开始连接用户配置的 WiFi,并处理连接结果的回调。
SsidManager
、WifiStation
以及EnterWifiConfigMode
中使用到的WifiConfigurationAp
等具体实现位于 esp-wifi-connect 项目中。
// wifi_board.cc
void WifiBoard::StartNetwork() {
// User can press BOOT button while starting to enter WiFi configuration mode
if (wifi_config_mode_) {
// 进入 WiFi 配置模式 -> 开启 Xiaozhi WiFi 热点
EnterWifiConfigMode();
return;
}
// If no WiFi SSID is configured, enter WiFi configuration mode
auto& ssid_manager = SsidManager::GetInstance();
auto ssid_list = ssid_manager.GetSsidList();
if (ssid_list.empty()) {
wifi_config_mode_ = true;
EnterWifiConfigMode();
return;
}
auto& wifi_station = WifiStation::GetInstance();
wifi_station.OnScanBegin([this]() {
auto display = Board::GetInstance().GetDisplay();
display->ShowNotification(Lang::Strings::SCANNING_WIFI, 30000);
});
wifi_station.OnConnect([this](const std::string& ssid) {
auto display = Board::GetInstance().GetDisplay();
std::string notification = Lang::Strings::CONNECT_TO;
notification += ssid;
notification += "...";
display->ShowNotification(notification.c_str(), 30000);
});
wifi_station.OnConnected([this](const std::string& ssid) {
auto display = Board::GetInstance().GetDisplay();
std::string notification = Lang::Strings::CONNECTED_TO;
notification += ssid;
display->ShowNotification(notification.c_str(), 30000);
});
wifi_station.Start();
// Try to connect to WiFi, if failed, launch the WiFi configuration AP
if (!wifi_station.WaitForConnected(60 * 1000)) {
wifi_station.Stop();
wifi_config_mode_ = true;
EnterWifiConfigMode();
return;
}
}
长连接协议
小智支持两种长连接协议:WebSocket 和 MQTT(默认),支持在 menuconfig 中配置。protocol_
在实现中是一个智能指针,指向具体实现了通信协议的对象,用于管理设备与服务端的通信。
// application.cc
void Application::Start() {
// ...
#ifdef CONFIG_CONNECTION_TYPE_WEBSOCKET
protocol_ = std::make_unique<WebsocketProtocol>();
#else
protocol_ = std::make_unique<MqttProtocol>();
#endif
protocol_->OnNetworkError([this](const std::string& message) {
// 处理网络错误...
});
protocol_->OnIncomingAudio([this](std::vector<uint8_t>&& data) {
// 处理收到的音频数据...
});
protocol_->OnAudioChannelOpened([this, codec, &board]() {
// 音频通道打开时的处理...
});
protocol_->OnAudioChannelClosed([this, &board]() {
// 音频通道关闭时的处理...
});
protocol_->OnIncomingJson([this, display](const cJSON* root) {
// 处理收到的JSON数据...
});
protocol_->Start();
// ...
}
// websocket_protocol.cc
void WebsocketProtocol::SendAudio(const std::vector<uint8_t>& data) {
if (websocket_ == nullptr) {
return;
}
websocket_->Send(data.data(), data.size(), true);
}
WebsocketProtocol
和 MqttProtocol
均遵守 Protocol
协议。
board
board.GetDisplay()
board.GetAudioCodec()
board.StartNetwork()
MainLoop
MainLoop
通过 while (true)
循环等待事件,负责持续处理音频输入、音频输出、任务调度等事件。
// application.cc
// The Main Loop controls the chat state and websocket connection
// If other tasks need to access the websocket or chat state,
// they should use Schedule to call this function
// 主循环控制聊天状态和 WebSocket 连接
// 如果其他任务需要访问 WebSocket 或聊天状态,
// 它们应该使用 Schedule 来调用此函数
void Application::MainLoop() {
while (true) {
// 使用 FreeRTOS 事件组(event_group_)等待三种事件
auto bits = xEventGroupWaitBits(event_group_,
SCHEDULE_EVENT | AUDIO_INPUT_READY_EVENT | AUDIO_OUTPUT_READY_EVENT,
pdTRUE, pdFALSE, portMAX_DELAY);
// 如果音频输入准备好,处理音频输入
if (bits & AUDIO_INPUT_READY_EVENT) {
InputAudio();
}
// 如果音频输出准备好,处理音频输出
if (bits & AUDIO_OUTPUT_READY_EVENT) {
OutputAudio();
}
// 如果需要调度任务,处理调度任务
if (bits & SCHEDULE_EVENT) {
// 使用互斥锁保护 main_tasks_ 列表
std::unique_lock<std::mutex> lock(mutex_);
std::list<std::function<void()>> tasks = std::move(main_tasks_);
lock.unlock();
for (auto& task : tasks) {
task();
}
}
}
}
InputAudio
首先来看音频输入事件,具体流程如下:
// application.cc
void Application::Start() {
// ... 开启 MainLoop -> InputAudio
#if CONFIG_USE_AUDIO_PROCESSOR
// 初始化音频处理器(初始化,设置工作参数,创建处理任务)
audio_processor_.Initialize(codec->input_channels(), codec->input_reference());
// 注册回调
audio_processor_.OnOutput([this](std::vector<int16_t>&& data) {
// ...
protocol_->SendAudio(opus);
// ...
});
#endif
// ...
}
void Application::InputAudio() {
// 获取音频编解码器
auto codec = Board::GetInstance().GetAudioCodec();
std::vector<int16_t> data;
// 从音频编解码器获取音频数据
if (!codec->InputData(data)) {
return;
}
// 音频数据(data)处理
if (codec->input_sample_rate() != 16000) {
// ...
}
// ...
#if CONFIG_USE_AUDIO_PROCESSOR
// 如果启用音频处理器(默认启用),则将音频数据传递给音频处理器
if (audio_processor_.IsRunning()) {
// audio_processor_ 类型为 AudioProcessor
// Input -> esp_afe_vc_v1.feed
audio_processor_.Input(data);
}
#endif
}
// audio_processor.cc
void AudioProcessor::Initialize(int channels, bool reference) {
// ...
xTaskCreate([](void* arg) {
auto this_ = (AudioProcessor*)arg;
// 创建音频处理器任务
this_->AudioProcessorTask();
vTaskDelete(NULL);
}, "audio_communication", 4096 * 2, this, 2, NULL);
}
void AudioProcessor::Input(const std::vector<int16_t>& data) {
// ...
// 将音频数据传递给音频处理器
esp_afe_vc_v1.feed(afe_communication_data_, chunk);
// ...
}
void AudioProcessor::OnOutput(std::function<void(std::vector<int16_t>&& data)> callback) {
output_callback_ = callback;
}
// Initialize -> AudioProcessorTask
void AudioProcessor::AudioProcessorTask() {
// ...
while (true) {
// 等待 PROCESSOR_RUNNING 事件
xEventGroupWaitBits(event_group_, PROCESSOR_RUNNING, pdFALSE, pdTRUE, portMAX_DELAY);
// 获取音频数据(AudioProcessor::Input 中设置的数据)
auto res = esp_afe_vc_v1.fetch(afe_communication_data_);
// ...
if (output_callback_) {
// 回调 -> OnOutput
output_callback_(std::vector<int16_t>(res->data, res->data + res->data_size / sizeof(int16_t)));
}
}
}
唤醒词检测
在 MainLoop
的 InputAudio
中也包含了唤醒词检测逻辑,整体流程类似 InputAudio
,具体如下:
// application.cc
void Application::Start() {
// ...
#if CONFIG_USE_WAKE_WORD_DETECT
wake_word_detect_.Initialize(codec->input_channels(), codec->input_reference());
wake_word_detect_.OnVadStateChange([this](bool speaking) {
// VAD(语音活动检测)状态变化回调
// ...
});
// 唤醒词检测回调
wake_word_detect_.OnWakeWordDetected([this](const std::string& wake_word) {
Schedule([this, &wake_word]() {
if (device_state_ == kDeviceStateIdle) {
// 空闲状态
SetDeviceState(kDeviceStateConnecting);
wake_word_detect_.EncodeWakeWordData();
if (!protocol_->OpenAudioChannel()) {
wake_word_detect_.StartDetection();
return;
}
std::vector<uint8_t> opus;
// Encode and send the wake word data to the server
// 编码唤醒词数据并发送给服务器
while (wake_word_detect_.GetWakeWordOpus(opus)) {
protocol_->SendAudio(opus);
}
// Set the chat state to wake word detected
protocol_->SendWakeWordDetected(wake_word);
ESP_LOGI(TAG, "Wake word detected: %s", wake_word.c_str());
keep_listening_ = true;
SetDeviceState(kDeviceStateIdle);
} else if (device_state_ == kDeviceStateSpeaking) {
// 正在说话
AbortSpeaking(kAbortReasonWakeWordDetected);
} else if (device_state_ == kDeviceStateActivating) {
// 正在激活
SetDeviceState(kDeviceStateIdle);
}
// Resume detection
wake_word_detect_.StartDetection();
});
});
// 启动唤醒词检测
wake_word_detect_.StartDetection();
#endif
// ...
}
void Application::InputAudio() {
// ...
#if CONFIG_USE_WAKE_WORD_DETECT
// 如果启用唤醒词检测(默认启用),则将音频数据传递给唤醒词检测器
if (wake_word_detect_.IsDetectionRunning()) {
wake_word_detect_.Feed(data);
}
#endif
// ...
}
// wake_word_detect.cc
void WakeWordDetect::Initialize(int channels, bool reference) {
// ...
xTaskCreate([](void* arg) {
auto this_ = (WakeWordDetect*)arg;
// 创建唤醒词检测任务
this_->AudioDetectionTask();
vTaskDelete(NULL);
}, "audio_detection", 4096 * 2, this, 2, nullptr);
}
void WakeWordDetect::Feed(const std::vector<int16_t>& data) {
// ...
// 将音频数据传递给唤醒词检测器
esp_afe_sr_v1.feed(afe_detection_data_, input_buffer_.data());
// ...
}
void WakeWordDetect::OnWakeWordDetected(std::function<void(const std::string& wake_word)> callback) {
wake_word_detected_callback_ = callback;
}
void WakeWordDetect::AudioDetectionTask() {
// ...
while (true) {
// 等待 DETECTION_RUNNING_EVENT 事件
xEventGroupWaitBits(event_group_, DETECTION_RUNNING_EVENT, pdFALSE, pdTRUE, portMAX_DELAY);
// 获取音频数据(WakeWordDetect::Feed 中设置的数据)
auto res = esp_afe_sr_v1.fetch(afe_detection_data_);
// ...
if (wake_word_detected_callback_) {
// 回调 -> OnWakeWordDetected
wake_word_detected_callback_(last_detected_wake_word_);
}
}
}
hello
消息的发送与接收
protocol_->OpenAudioChannel()
即 hello
信息的发送流程如下:
// esp32_bread_board.cc
void InitializeButtons() {
// ...
asr_button_.OnClick([this]() {
std::string wake_word="你好小智";
// 唤醒小智
Application::GetInstance().WakeWordInvoke(wake_word);
});
// ...
}
// application.cc
void Application::WakeWordInvoke(const std::string& wake_word) {
if (device_state_ == kDeviceStateIdle) {
// 空闲状态
ToggleChatState();
Schedule([this, wake_word]() {
if (protocol_) {
// 发送唤醒词
protocol_->SendWakeWordDetected(wake_word);
}
});
}
// ...
}
void Application::ToggleChatState() {
// ...
if (device_state_ == kDeviceStateIdle) {
// 空闲状态
Schedule([this]() {
// Idle → Connecting
SetDeviceState(kDeviceStateConnecting);
// 打开音频通道
if (!protocol_->OpenAudioChannel()) {
return;
}
keep_listening_ = true;
// 构造并发送一个启动监听的 JSON 格式消息
protocol_->SendStartListening(kListeningModeAutoStop);
// Connecting → Listening
SetDeviceState(kDeviceStateListening);
});
}
// ...
}
// protocol.cc
void Protocol::SendStartListening(ListeningMode mode) {
// ...
SendText(message);
}
// websocket_protocol.cc
bool WebsocketProtocol::OpenAudioChannel() {
// 建立 WebSocket 音频通道
// ...
std::string message = "{";
message += "\"type\":\"hello\",";
message += "\"version\": 1,";
message += "\"transport\":\"websocket\",";
message += "\"audio_params\":{";
message += "\"format\":\"opus\", \"sample_rate\":16000, \"channels\":1, \"frame_duration\":" + std::to_string(OPUS_FRAME_DURATION_MS);
message += "}}";
websocket_->Send(message);
// ...
}
void WebsocketProtocol::SendText(const std::string& text) {
// ...
websocket_->Send(text)
// ...
}
服务端基于 WebSocket 实现,具体处理如下:
# app.py
async def main():
# ...
# 启动 WebSocket 服务器
ws_server = WebSocketServer(config)
ws_task = asyncio.create_task(ws_server.start())
# ...
# 程序入口
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("手动中断,程序终止。")
# websocket_server.py
class WebSocketServer:
async def start(self):
# ...
async with websockets.serve(
self._handle_connection,
host,
port
):
await asyncio.Future()
# ...
async def _handle_connection(self, websocket):
"""处理新连接,每次创建独立的ConnectionHandler"""
# 创建ConnectionHandler时传入当前server实例
handler = ConnectionHandler(self.config, self._vad, self._asr, self._llm, self._tts, self._music, self._memory, self.intent)
self.active_connections.add(handler)
try:
await handler.handle_connection(websocket)
# connection.py
class ConnectionHandler:
async def handle_connection(self, ws):
# ...
async for message in self.websocket:
await self._route_message(message)
# ...
async def _route_message(self, message):
"""消息路由"""
if isinstance(message, str):
# 文本消息
await handleTextMessage(self, message)
elif isinstance(message, bytes):
# 音频消息
await handleAudioMessage(self, message)
# textHanlde.py
async def handleTextMessage(conn, message):
"""处理文本消息"""
# ...
if msg_json["type"] == "hello":
await handleHelloMessage(conn)
# ...
# helloHandle.py
async def handleHelloMessage(conn):
await conn.websocket.send(json.dumps(conn.welcome_msg))