专注、坚持

浅谈小智 xiaozhi-esp32(一)

2025.03.11 by kingcos

概览

小智(xiaozhi-esp32)是一个开源的 AI 聊天机器人项目。小智分为端侧与服务端侧,本篇仅聚焦于端侧。

基本信息

  • 代码版本:release 1.4.6
  • 主要语言:C++
  • 主要框架:ESP-IDF
  • 开源协议:MIT

整体架构

整体架构梳理如下:

xiaozhi-esp32-architecture

程序的主入口位于 main.cc 中的 app_main() 主函数,其中调用了 Application::GetInstance().Start() 启动,主要流程如下:

xiaozhi-esp32 main flow

Start() 函数中做了一系列初始化工作,通过 Board 抽象类统一初始化显示器、音频编解码器等硬件设备。同时还启动了两个异步任务:MainLoopCheckNewVersion。其中 MainLoop 任务实现了事件循环(Event Loop)机制,通过死循环的方式监听并处理各类事件,包括音频输入捕获、音频输出播放以及任务调度等,确保系统能够实时响应并高效处理各类交互请求。

重点模块

以 xxxx 硬件为标准

audio_codecs

该目录下是音频编解码模块,其中 AudioCodec 抽象类定义了相关接口,并由具体硬件类型的音频编解码器实现。

display

网络

// application.cc

/* Wait for the network to be ready */
board.StartNetwork();

WifiBoard 类实现了 Board 抽象类,并提供了 WiFi 相关的具体实现。当用户启动小智时,会调用 WifiBoard::StartNetwork() 函数启动网络。若用户在启动时按下 BOOT 按钮、没有连接过 WiFi 或连接 WiFi 超时,则会进入 WiFi 配置模式,此时会开启 Xiaozhi WiFi 热点,并等待用户连接。否则会开始连接用户配置的 WiFi,并处理连接结果的回调。

SsidManagerWifiStation 以及 EnterWifiConfigMode 中使用到的 WifiConfigurationAp 等具体实现位于 esp-wifi-connect 项目中。

// wifi_board.cc

void WifiBoard::StartNetwork() {
    // User can press BOOT button while starting to enter WiFi configuration mode
    if (wifi_config_mode_) {
        // 进入 WiFi 配置模式 -> 开启 Xiaozhi WiFi 热点
        EnterWifiConfigMode();
        return;
    }

    // If no WiFi SSID is configured, enter WiFi configuration mode
    auto& ssid_manager = SsidManager::GetInstance();
    auto ssid_list = ssid_manager.GetSsidList();
    if (ssid_list.empty()) {
        wifi_config_mode_ = true;
        EnterWifiConfigMode();
        return;
    }

    auto& wifi_station = WifiStation::GetInstance();
    wifi_station.OnScanBegin([this]() {
        auto display = Board::GetInstance().GetDisplay();
        display->ShowNotification(Lang::Strings::SCANNING_WIFI, 30000);
    });
    wifi_station.OnConnect([this](const std::string& ssid) {
        auto display = Board::GetInstance().GetDisplay();
        std::string notification = Lang::Strings::CONNECT_TO;
        notification += ssid;
        notification += "...";
        display->ShowNotification(notification.c_str(), 30000);
    });
    wifi_station.OnConnected([this](const std::string& ssid) {
        auto display = Board::GetInstance().GetDisplay();
        std::string notification = Lang::Strings::CONNECTED_TO;
        notification += ssid;
        display->ShowNotification(notification.c_str(), 30000);
    });
    wifi_station.Start();

    // Try to connect to WiFi, if failed, launch the WiFi configuration AP
    if (!wifi_station.WaitForConnected(60 * 1000)) {
        wifi_station.Stop();
        wifi_config_mode_ = true;
        EnterWifiConfigMode();
        return;
    }
}

长连接协议

小智支持两种长连接协议:WebSocket 和 MQTT(默认),支持在 menuconfig 中配置。protocol_ 在实现中是一个智能指针,指向具体实现了通信协议的对象,用于管理设备与服务端的通信。

// application.cc

void Application::Start() {
    // ...
#ifdef CONFIG_CONNECTION_TYPE_WEBSOCKET
    protocol_ = std::make_unique<WebsocketProtocol>();
#else
    protocol_ = std::make_unique<MqttProtocol>();
#endif

    protocol_->OnNetworkError([this](const std::string& message) {
        // 处理网络错误...
    });
    protocol_->OnIncomingAudio([this](std::vector<uint8_t>&& data) {
        // 处理收到的音频数据...
    });
    protocol_->OnAudioChannelOpened([this, codec, &board]() {
        // 音频通道打开时的处理...
    });
    protocol_->OnAudioChannelClosed([this, &board]() {
        // 音频通道关闭时的处理...
    });
    protocol_->OnIncomingJson([this, display](const cJSON* root) {
        // 处理收到的JSON数据...
    });

    protocol_->Start();
    // ...
}

// websocket_protocol.cc

void WebsocketProtocol::SendAudio(const std::vector<uint8_t>& data) {
    if (websocket_ == nullptr) {
        return;
    }

    websocket_->Send(data.data(), data.size(), true);
}

WebsocketProtocolMqttProtocol 均遵守 Protocol 协议。

board

board.GetDisplay()

board.GetAudioCodec()

board.StartNetwork()

MainLoop

MainLoop 通过 while (true) 循环等待事件,负责持续处理音频输入、音频输出、任务调度等事件。

// application.cc

// The Main Loop controls the chat state and websocket connection
// If other tasks need to access the websocket or chat state,
// they should use Schedule to call this function
// 主循环控制聊天状态和 WebSocket 连接
// 如果其他任务需要访问 WebSocket 或聊天状态,
// 它们应该使用 Schedule 来调用此函数
void Application::MainLoop() {
    while (true) {
        // 使用 FreeRTOS 事件组(event_group_)等待三种事件
        auto bits = xEventGroupWaitBits(event_group_,
            SCHEDULE_EVENT | AUDIO_INPUT_READY_EVENT | AUDIO_OUTPUT_READY_EVENT,
            pdTRUE, pdFALSE, portMAX_DELAY);

        // 如果音频输入准备好,处理音频输入
        if (bits & AUDIO_INPUT_READY_EVENT) {
            InputAudio();
        }
        // 如果音频输出准备好,处理音频输出
        if (bits & AUDIO_OUTPUT_READY_EVENT) {
            OutputAudio();
        }
        // 如果需要调度任务,处理调度任务
        if (bits & SCHEDULE_EVENT) {
            // 使用互斥锁保护 main_tasks_ 列表
            std::unique_lock<std::mutex> lock(mutex_);
            std::list<std::function<void()>> tasks = std::move(main_tasks_);
            lock.unlock();
            for (auto& task : tasks) {
                task();
            }
        }
    }
}

InputAudio

首先来看音频输入事件,具体流程如下:

Audio input flow

// application.cc

void Application::Start() {
    // ... 开启 MainLoop -> InputAudio
#if CONFIG_USE_AUDIO_PROCESSOR
    // 初始化音频处理器(初始化,设置工作参数,创建处理任务)
    audio_processor_.Initialize(codec->input_channels(), codec->input_reference());
    // 注册回调
    audio_processor_.OnOutput([this](std::vector<int16_t>&& data) {
        // ...
        protocol_->SendAudio(opus);
        // ...
    });
#endif
    // ...
}

void Application::InputAudio() {
    // 获取音频编解码器
    auto codec = Board::GetInstance().GetAudioCodec();
    std::vector<int16_t> data;
    // 从音频编解码器获取音频数据
    if (!codec->InputData(data)) {
        return;
    }

    // 音频数据(data)处理
    if (codec->input_sample_rate() != 16000) {
        // ...
    }

    // ...
#if CONFIG_USE_AUDIO_PROCESSOR
    // 如果启用音频处理器(默认启用),则将音频数据传递给音频处理器
    if (audio_processor_.IsRunning()) {
        // audio_processor_ 类型为 AudioProcessor
        // Input -> esp_afe_vc_v1.feed
        audio_processor_.Input(data);
    }
#endif
}

// audio_processor.cc

void AudioProcessor::Initialize(int channels, bool reference) {
    // ...
    xTaskCreate([](void* arg) {
        auto this_ = (AudioProcessor*)arg;
        // 创建音频处理器任务
        this_->AudioProcessorTask();
        vTaskDelete(NULL);
    }, "audio_communication", 4096 * 2, this, 2, NULL);
}

void AudioProcessor::Input(const std::vector<int16_t>& data) {
    // ...
        // 将音频数据传递给音频处理器
        esp_afe_vc_v1.feed(afe_communication_data_, chunk);
    // ...
}

void AudioProcessor::OnOutput(std::function<void(std::vector<int16_t>&& data)> callback) {
    output_callback_ = callback;
}

// Initialize -> AudioProcessorTask
void AudioProcessor::AudioProcessorTask() {
    // ...

    while (true) {
        // 等待 PROCESSOR_RUNNING 事件
        xEventGroupWaitBits(event_group_, PROCESSOR_RUNNING, pdFALSE, pdTRUE, portMAX_DELAY);

        // 获取音频数据(AudioProcessor::Input 中设置的数据)
        auto res = esp_afe_vc_v1.fetch(afe_communication_data_);

        // ...

        if (output_callback_) {
            // 回调 -> OnOutput
            output_callback_(std::vector<int16_t>(res->data, res->data + res->data_size / sizeof(int16_t)));
        }
    }
}

唤醒词检测

MainLoopInputAudio 中也包含了唤醒词检测逻辑,整体流程类似 InputAudio,具体如下:

Audio input flow

// application.cc

void Application::Start() {
    // ...
#if CONFIG_USE_WAKE_WORD_DETECT
    wake_word_detect_.Initialize(codec->input_channels(), codec->input_reference());
    wake_word_detect_.OnVadStateChange([this](bool speaking) {
        // VAD(语音活动检测)状态变化回调
        // ...
    });

    // 唤醒词检测回调
    wake_word_detect_.OnWakeWordDetected([this](const std::string& wake_word) {
        Schedule([this, &wake_word]() {
            if (device_state_ == kDeviceStateIdle) {
                // 空闲状态
                SetDeviceState(kDeviceStateConnecting);
                wake_word_detect_.EncodeWakeWordData();

                if (!protocol_->OpenAudioChannel()) {
                    wake_word_detect_.StartDetection();
                    return;
                }
                
                std::vector<uint8_t> opus;
                // Encode and send the wake word data to the server
                // 编码唤醒词数据并发送给服务器
                while (wake_word_detect_.GetWakeWordOpus(opus)) {
                    protocol_->SendAudio(opus);
                }
                // Set the chat state to wake word detected
                protocol_->SendWakeWordDetected(wake_word);
                ESP_LOGI(TAG, "Wake word detected: %s", wake_word.c_str());
                keep_listening_ = true;
                SetDeviceState(kDeviceStateIdle);
            } else if (device_state_ == kDeviceStateSpeaking) {
                // 正在说话
                AbortSpeaking(kAbortReasonWakeWordDetected);
            } else if (device_state_ == kDeviceStateActivating) {
                // 正在激活
                SetDeviceState(kDeviceStateIdle);
            }

            // Resume detection
            wake_word_detect_.StartDetection();
        });
    });
    // 启动唤醒词检测
    wake_word_detect_.StartDetection();
#endif
    // ...
}

void Application::InputAudio() {
    // ...

#if CONFIG_USE_WAKE_WORD_DETECT
    // 如果启用唤醒词检测(默认启用),则将音频数据传递给唤醒词检测器
    if (wake_word_detect_.IsDetectionRunning()) {
        wake_word_detect_.Feed(data);
    }
#endif
    // ...
}

// wake_word_detect.cc

void WakeWordDetect::Initialize(int channels, bool reference) {
    // ...
    xTaskCreate([](void* arg) {
        auto this_ = (WakeWordDetect*)arg;
        // 创建唤醒词检测任务
        this_->AudioDetectionTask();
        vTaskDelete(NULL);
    }, "audio_detection", 4096 * 2, this, 2, nullptr);
}

void WakeWordDetect::Feed(const std::vector<int16_t>& data) {
    // ...
        // 将音频数据传递给唤醒词检测器
        esp_afe_sr_v1.feed(afe_detection_data_, input_buffer_.data());
    // ...
}

void WakeWordDetect::OnWakeWordDetected(std::function<void(const std::string& wake_word)> callback) {
    wake_word_detected_callback_ = callback;
}

void WakeWordDetect::AudioDetectionTask() {
    // ...
    while (true) {
        // 等待 DETECTION_RUNNING_EVENT 事件
        xEventGroupWaitBits(event_group_, DETECTION_RUNNING_EVENT, pdFALSE, pdTRUE, portMAX_DELAY);
        // 获取音频数据(WakeWordDetect::Feed 中设置的数据)
        auto res = esp_afe_sr_v1.fetch(afe_detection_data_);
        // ...
            if (wake_word_detected_callback_) {
                // 回调 -> OnWakeWordDetected
                wake_word_detected_callback_(last_detected_wake_word_);
            }
    }
}

hello 消息的发送与接收

protocol_->OpenAudioChannel()hello 信息的发送流程如下:

// esp32_bread_board.cc

void InitializeButtons() {
    // ...
    asr_button_.OnClick([this]() {
        std::string wake_word="你好小智";
        // 唤醒小智
        Application::GetInstance().WakeWordInvoke(wake_word);
    });
    // ...
}

// application.cc

void Application::WakeWordInvoke(const std::string& wake_word) {
    if (device_state_ == kDeviceStateIdle) {
        // 空闲状态
        ToggleChatState();
        Schedule([this, wake_word]() {
            if (protocol_) {
                // 发送唤醒词
                protocol_->SendWakeWordDetected(wake_word); 
            }
        }); 
    }
    // ...
}

void Application::ToggleChatState() {
    // ...

    if (device_state_ == kDeviceStateIdle) {
        // 空闲状态
        Schedule([this]() {
            // Idle → Connecting
            SetDeviceState(kDeviceStateConnecting);
            // 打开音频通道
            if (!protocol_->OpenAudioChannel()) {
                return;
            }

            keep_listening_ = true;
            // 构造并发送一个启动监听的 JSON 格式消息
            protocol_->SendStartListening(kListeningModeAutoStop);
            // Connecting → Listening
            SetDeviceState(kDeviceStateListening);
        });
    }
    // ...
}

// protocol.cc

void Protocol::SendStartListening(ListeningMode mode) {
    // ...
    SendText(message);
}

// websocket_protocol.cc

bool WebsocketProtocol::OpenAudioChannel() {
    // 建立 WebSocket 音频通道
    // ...
    std::string message = "{";
    message += "\"type\":\"hello\",";
    message += "\"version\": 1,";
    message += "\"transport\":\"websocket\",";
    message += "\"audio_params\":{";
    message += "\"format\":\"opus\", \"sample_rate\":16000, \"channels\":1, \"frame_duration\":" + std::to_string(OPUS_FRAME_DURATION_MS);
    message += "}}";
    websocket_->Send(message);
    // ...
}

void WebsocketProtocol::SendText(const std::string& text) {
    // ...
    websocket_->Send(text)
    // ...
}

服务端基于 WebSocket 实现,具体处理如下:

# app.py

async def main():
    # ...
    # 启动 WebSocket 服务器
    ws_server = WebSocketServer(config)
    ws_task = asyncio.create_task(ws_server.start())
    # ...

# 程序入口
if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("手动中断,程序终止。")

# websocket_server.py

class WebSocketServer:
    async def start(self):
        # ...
        async with websockets.serve(
                self._handle_connection,
                host,
                port
        ):
            await asyncio.Future()
        # ...

    async def _handle_connection(self, websocket):
        """处理新连接,每次创建独立的ConnectionHandler"""
        # 创建ConnectionHandler时传入当前server实例
        handler = ConnectionHandler(self.config, self._vad, self._asr, self._llm, self._tts, self._music, self._memory, self.intent)
        self.active_connections.add(handler)
        try:
            await handler.handle_connection(websocket)

# connection.py

class ConnectionHandler:
    async def handle_connection(self, ws):
        # ...
        async for message in self.websocket:
            await self._route_message(message)
        # ...

    async def _route_message(self, message):
        """消息路由"""
        if isinstance(message, str):
            # 文本消息
            await handleTextMessage(self, message)
        elif isinstance(message, bytes):
            # 音频消息
            await handleAudioMessage(self, message)

# textHanlde.py

async def handleTextMessage(conn, message):
    """处理文本消息"""
    # ...
    if msg_json["type"] == "hello":
        await handleHelloMessage(conn)
    # ...

# helloHandle.py

async def handleHelloMessage(conn):
    await conn.websocket.send(json.dumps(conn.welcome_msg))

参考