'Live stream output from JS MediaRecorder to Python speech recognition server via soket.io

I'm trying to stream microphone from my browser to a server running a Python service connected to the google cloud speech-to-text. For the transfer I'm using socket.io. Everything seems to work but the speech recognition doesn't return any result. I suspect a problem with the format of the sent data. On my browser I'm using the MediaRecorder with the mime type audio/webm;codecs=opus.

// simpleTest.js
'strict';

// Configuration
var language = 'fr-FR';
var mimeType = 'audio/webm;codecs=opus';    // Valid sample rates: 8000, 12000, 16000, 24000, 48000
var format = 'WEBM_OPUS';   // Valid sample rates: 8000, 12000, 16000, 24000, 48000
var sampleRate = 16000;

var recording = false;
var audioStream = null;
var mediaRecorder = null;
var audioChunks = [];

var namespace = '/ingestor'; // change to an empty string to use the global namespace

// Initialize socket
var socket = io(namespace);
socket.on('connect', function () {
    console.log("connected to the SocketServer " + namespace);
});

socket.on('my_response', function (msg, callback) {
    console.log("received message from the SocketServer " + namespace);
    $('#log').append('<br>' + $('<div/>').text('logs #' + msg.count + ': ' + msg.data).html());
    if (callback)
        callback();
});

socket.on('connect_error', (error) => {
    console.error("Socket connection error: " + error);
});

socket.on('disconnect', (reason) => {
    console.log("Socket disconnected: " + reason);
    if (reason === "io server disconnect") {
        // the disconnection was initiated by the server, you need to reconnect manually
        socket.connect();
    }
});

const sendMessage = async (aSocket, msg) => {
    if (aSocket.connected) {
        aSocket.emit('my_event', {data: msg});
    }
}

const initRecording = () => {
    recording = false;
    window.AudioContext = window.AudioContext || window.webkitAudioContext;

    if (navigator.mediaDevices === undefined) {
        navigator.mediaDevices = {};
    }

    if (navigator.mediaDevices.getUserMedia === undefined) {
        navigator.mediaDevices.getUserMedia = function (constraints) {

            // First get ahold of the legacy getUserMedia, if present
            const getUserMedia = navigator.webkitGetUserMedia || navigator.mozGetUserMedia;

            // Some browsers just don't implement it - return a rejected promise with an error
            // to keep a consistent interface
            if (!getUserMedia) {
                return Promise.reject(new Error('getUserMedia is not implemented in this browser'));
            }

            // Otherwise, wrap the call to the old navigator.getUserMedia with a Promise
            return new Promise(function (resolve, reject) {
                getUserMedia.call(navigator, constraints, resolve, reject);
            });
        }
    }

    // Initialize audio stream
    console.log("Creating audio stream");
    navigator.mediaDevices.getUserMedia({audio: true})
    .then((stream) => {
        console.log("Audio stream successfully created");
        audioStream = stream;
        mediaRecorder = new MediaRecorder(stream, {
            audioBitsPerSecond: sampleRate,
            mimeType: mimeType
        });
        console.log('mimeType: ' + mediaRecorder.mimeType);
        mediaRecorder.ondataavailable = handleDataAvailable;
    }).catch((error) => {
        console.log("Error while creating the audio stream");
        console.log(error);
    });
};


const startRecording = () => {
    recording = true;
    console.log('startRecording');
    mediaRecorder.start(1000);
};


const stopRecording = () => {
    recording = false;
    console.log('stopRecording');
    mediaRecorder.stop();
};


const handleDataAvailable = (event) => {
    console.log('handleDataAvailable');
    if (event.data && event.data.size > 0) {
        console.log(event.data);
        handleBlob(event.data);
    }
};

const handleBlob = (blob) => {
    console.log('handleBlob - blob type: ' + blob.type);
    blob.arrayBuffer()
        .then((buffer) => {
            console.log(buffer);
            console.log(audioChunks.length + '. ' + buffer);
            sendMessage(socket, JSON.stringify({
                type: 'audio',
                content: {
                    command: 'stream',
                    audioData:  new Uint8Array(buffer)
                }
            }));
        })
        .catch(function(err) {
            console.log(err);
        });
};

window.toggleRecording = () => {
    if (!recording) {
        startRecording();
    } else {
        stopRecording();
    }
}

initRecording();

On the server side I specify in the google.cloud.speech.RecognitionConfig the encoding is google.cloud.speech.AudioEncoding.WEBM_OPUS. This way I suppose I'm using the same container and codec. Right? The server is divided in two parts:

  • the ingestor reading the socket and writing the data as received to a redis queue
  • the transcriber reading the redis queue and transferring the data to the Google cloud text-to-speech
# Ingestor
import queue
import time
import eventlet
from flask import Flask, render_template, session, copy_current_request_context
from flask_socketio import SocketIO, emit, disconnect
import redis

eventlet.monkey_patch()

host = '127.0.0.1'
port = 5000
redisHost = 's2t_memory_store'
redisPort = 6379
redisQueue = 'livequeue'
id = 'ingestor'
maxPackets = 500

async_mode = None
app = Flask(__name__)
socketio = SocketIO(app, async_mode='eventlet')

thread = None

redisDatabase = redis.Redis(host=redisHost, port=redisPort, db=0,
                            health_check_interval=2, socket_timeout=3)
buffer = queue.Queue()

@socketio.on('connect', namespace='/ingestor')
def connect():
    print('%s socket connected!' % id)
    global thread
    if thread is None:
        thread = socketio.start_background_task(_enqueue_audio, redisQueue)

@socketio.on('my_event', namespace='/ingestor')
def handle_data(data):
    """Stores the received audio data in a local buffer."""
    buffer.put(data['data'], block=False)
    session['receive_count'] = session.get('receive_count', 0) + 1
    emit('my_response',
         {'data': data['data'], 'count': session['receive_count']})

def _enqueue_audio(redis_queue):
    """Blocking-reads data from the buffer and adds to Redis queue."""
    print('%s enqueue_audio thread started!' % id)
    while True:
        try:
            chunk = buffer.get(block=True)
            print('Buffer read: {}'.format(chunk))
            val = redisDatabase.lpush(redis_queue, chunk)
            # debugging; under normal circumstances audio should not be accumulating
            if val > 5:
                print('Ingested audio queue length: %d' % val)
        except redis.exceptions.RedisError as err:
            print('Error pushing into Redis queue: %s' % err)

@app.route('/')
def index():
    return render_template('test.html', sync_mode=socketio.async_mode)

if __name__ == '__main__':
    print("Starting ingestor")
    socketio.init_app(app)
# Transcriber
import redis
import json
from google.cloud import speech


encoding = speech.RecognitionConfig.AudioEncoding.WEBM_OPUS
sample_rate = 16000
language_code = 'fr-FR'
host = '127.0.0.1'
port = 5000
redis_host = 's2t_memory_store'
redis_port = 6379
redis_queue = 'livequeue'
id = 'transcriber'


class redisStream(object):
    def __init__(self, host, port, queue):
        self.host = host
        self.port = port
        self.queue = queue
        self.redis_conn = redis.Redis(host=self.host, port=self.port)

def redis_generator(redis_conn, redis_queue):
    while True:
        yield redis_conn.blpop(redis_queue)[1]

def main():
    redis_conn = redis.Redis(host=redis_host, port=redis_port, db=0,
                             health_check_interval=2,
                             socket_timeout=None,
                             socket_connect_timeout=None)

    speech_client = speech.SpeechClient()
    recognition_config = speech.RecognitionConfig(
        encoding=encoding,
        sample_rate_hertz=sample_rate,
        language_code=language_code)
    streaming_config = speech.StreamingRecognitionConfig(
        config=recognition_config,
        interim_results=True)

    for message in redis_generator(redis_conn, redis_queue):
        print(f'REDIS STREAM: {message}')
        messageData = json.loads(message)
        if messageData['content']['command'] == 'stream':
            print('AUDIO DATA: %s' % messageData['content']['audioData'])
            chunk = bytes(messageData['content']['audioData'].values())
            print('CHUNK: %s' % chunk)
            request = speech.StreamingRecognizeRequest(audio_content=chunk)
            responses = speech_client.streaming_recognize(config=streaming_config, requests=[request])
            print('RESPONSES: %s' % responses)
            if responses:
                for response in responses:
                    for i, result in response.results:
                        alternative = result.alternatives[0]
                        print("-" * 20)
                        print(u"First alternative of result {}".format(i))
                        print(u"Transcript: {}".format(alternative.transcript))
                        print(u"Confidence: {}".format(alternative.confidence))
                        print("-" * 20)
            else:
                print('No response')

if __name__ == "__main__":
    print("Starting transcriber")
    main()

What is wrong? Do you have somewhere an example of the best (right) way to realize the transfer of such a live stream?

I have read many threads and publications on the web but I was never able to make it run correctly.

Thanks for your answer.



Solution 1:[1]

You need to write some Python code or show your code that did not fix the issue you were seeing.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Cody B.