//============================================================================= // // Copyright (c) 2006-2007, Carnegie Mellon University. // All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions // are met: // // 1. Redistributions of source code must retain the above copyright // notice, this list of conditions and the following disclaimer. // // 2. Redistributions in binary form must reproduce the above copyright // notice, this list of conditions and the following disclaimer in // the documentation and/or other materials provided with the // distribution. // // This work was supported in part by funding from the Defense Advanced // Research Projects Agency and the National Science Foundation of the // United States of America, and the CMU Sphinx Speech Consortium. // // THIS SOFTWARE IS PROVIDED BY CARNEGIE MELLON UNIVERSITY ``AS IS'' AND // ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, // THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL CARNEGIE MELLON UNIVERSITY // NOR ITS EMPLOYEES BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. // //============================================================================= //----------------------------------------------------------------------------- // // ENGINESOCKET.CPP - Functions for communicating with the recognition // engines. // // ---------------------------------------------------------------------------- // // BEFORE MAKING CHANGES TO THIS CODE, please read the appropriate // documentation, available in the Documentation folder. // // ANY SIGNIFICANT CHANGES made should be reflected back in the documentation // file(s) // // ANY CHANGES made (even small bug fixes, should be reflected in the history // below, in reverse chronological order // // HISTORY -------------------------------------------------------------------- // // [2005-08-05] (antoine) : started this // //----------------------------------------------------------------------------- #include using namespace Olympus; #include "GalaxyInterface.h" #ifdef __cplusplus extern "C" { #endif #ifdef __cplusplus } #endif #include "EngineInterface.h" #include "Constants.h" /* Engine sockets and buffers */ /* Engine specifications */ int iNumEngines = 0; char **asEngineName; char **asEngineHost; int *asEngineNBestSize; // (bthomson) Adding support for N-best lists int asTotalNBestSize; int *pEnginePort; int iSampleRate; int iSampleRateMS; SOCKET *psEngineSocket; static char **asEngineBuffer; static int *pEngineP; // A: Reads the list of engines from a descriptive string void ReadEngineListFromString(const string sEngineList) { vector vsEngines = PartitionString(sEngineList, ","); iNumEngines = (int)vsEngines.size(); /* initializes engine description arrays */ asEngineName = (char**)malloc( iNumEngines * sizeof(char*)); asEngineHost = (char**)malloc( iNumEngines * sizeof(char*)); asEngineNBestSize = (int*)malloc( iNumEngines * sizeof(int)); // (bthomson) Adding support for N-best, need to specify N for each engine (default 1 if not specified) pEnginePort = (int*)malloc( iNumEngines * sizeof(int)); psEngineSocket = (SOCKET*)malloc( iNumEngines * sizeof( SOCKET)); asEngineBuffer = (char**)malloc( iNumEngines * sizeof(char*)); pEngineP = (int*)malloc( iNumEngines * sizeof(int)); /* initializes each engine's parameters */ Log( STD_STREAM, "%d engines: ", iNumEngines); asTotalNBestSize = 0; for (int i = 0; i < iNumEngines; i++) { asEngineName[i] = (char*)malloc( MAX_NAME_SIZE); asEngineHost[i] = (char*)malloc( MAX_HOST_SIZE); asEngineBuffer[i] = (char*)malloc( RETURN_BUF_SIZE + 1); pEngineP[i] = 0; vector sParams = PartitionString(vsEngines[i], ":"); strncpy_s(asEngineName[i], MAX_NAME_SIZE, sParams[0].c_str(), MAX_NAME_SIZE-1); strncpy_s(asEngineHost[i], MAX_HOST_SIZE, sParams[1].c_str(), MAX_HOST_SIZE-1); pEnginePort[i] = atoi(sParams[2].c_str()); // (bthomson) Specifying N-Best sizes: if (sParams.size() > 3) // User specified N-Best size asEngineNBestSize[i] = atoi(sParams[3].c_str()); else asEngineNBestSize[i] = 1; asTotalNBestSize += asEngineNBestSize[i]; Log( STD_STREAM, "%s: host %s port %d", asEngineName[i], asEngineHost[i], pEnginePort[i]); psEngineSocket[i] = INVALID_SOCKET; } return; } // A: Reads the list of engines and their location from a config file void ReadEngineListFromFile( const string sEngineList) { int i; char line[MAX_LINE_SIZE]; FILE *stream; /* Reads engine list */ if( fopen_s( &stream, sEngineList.c_str(), "r" ) != 0 ) { Log( ERR_STREAM, "failed to open engine list file %s", sEngineList ); exit(-1); } /* Traverses the file once to get the number of lines */ iNumEngines = 0; while (fgets( line, MAX_FILENAME_SIZE, stream)) if ((strlen(line) > 1)&&(line[0] != '#')) iNumEngines++; fseek( stream, 0, SEEK_SET); /* initializes engine description arrays */ asEngineName = (char**)malloc( iNumEngines * sizeof(char*)); asEngineHost = (char**)malloc( iNumEngines * sizeof(char*)); pEnginePort = (int*)malloc( iNumEngines * sizeof(int)); psEngineSocket = (SOCKET*)malloc( iNumEngines * sizeof( SOCKET)); asEngineBuffer = (char**)malloc( iNumEngines * sizeof(char*)); pEngineP = (int*)malloc( iNumEngines * sizeof(int)); /* initializes each engine's parameters */ for (i = 0; i < iNumEngines; i++) { asEngineName[i] = (char*)malloc( MAX_NAME_SIZE + 1); asEngineHost[i] = (char*)malloc( MAX_HOST_SIZE + 1); asEngineBuffer[i] = (char*)malloc( RETURN_BUF_SIZE + 1); pEngineP[i] = 0; /* reads from the file the description of the engines (name host port) */ fgets( line, MAX_FILENAME_SIZE, stream); while ((strlen(line) <= 1)||(line[0] == '#')) { if (!fgets( line, MAX_FILENAME_SIZE, stream)) { Log( ERR_STREAM, "invalid format in list file %s", sEngineList ); exit(-1); } } sscanf_s( line, "%s %s %d", asEngineName[i], asEngineHost[i], &pEnginePort[i]); Log( STD_STREAM, "%s: host %s port %d", asEngineName[i], asEngineHost[i], pEnginePort[i]); psEngineSocket[i] = INVALID_SOCKET; } return; } // A: Checks the status of a socket and opens it if necessary int CheckEngineConnection(int n) { // connection problem, try to reconnect if (psEngineSocket[n] == INVALID_SOCKET) { Log( ERR_STREAM, "%s decoding engine socket is invalid, try to connect", asEngineName[n]); if ((psEngineSocket[n] = sock_connect(asEngineHost[n], pEnginePort[n])) == INVALID_SOCKET) Log( ERR_STREAM, "Couldn't connect %s decoding engine", asEngineName[n]); } return (psEngineSocket[n] != INVALID_SOCKET); } // A: Close engine socket void CloseEngineConnection(int n) { if (psEngineSocket[n] != INVALID_SOCKET) { close_socket(psEngineSocket[n]); psEngineSocket[n] = INVALID_SOCKET; } } // A: Closes all sockets and declares them invalid void CloseAllEngineConnections() { for (int i = 0; i < iNumEngines; i++) { CloseEngineConnection(i); } } // A: Sends a message to engine n int SendMessageToEngine(int n, const char *sMessage) { int iStatus = 1; Log( DBG_STREAM, "Sending message to %s engine:\n%s", asEngineName[n], sMessage); if (psEngineSocket[n] == INVALID_SOCKET) { Log( ERR_STREAM, "%s decoding engine socket is invalid, try to connect", asEngineName[n]); if ((psEngineSocket[n] = sock_connect(asEngineHost[n], pEnginePort[n])) == INVALID_SOCKET) { Log( ERR_STREAM, "Couldn't connect %s decoding engine", asEngineName[n]); iStatus = 0; } } if (psEngineSocket[n] != INVALID_SOCKET) { if (sock_send_block(psEngineSocket[n], sMessage, (int)strlen(sMessage)) < (int)strlen(sMessage)) { Log( ERR_STREAM, "Problems on sending message to %s decoding engine: %s", asEngineName[n], sMessage); iStatus = 0; } } return iStatus; } // TK: Sets the current acoustic model void* engine_set_acoustic_model(const string& s) { //send request string cmd("set_acoustic_model:"); cmd.append(s).append(" \n"); SendMessageToAllEngines(cmd.c_str()); //read result return GetFrameFromEngine(0, true); } // TK: Gets the current Acoustic Model void* engine_get_acoustic_model() { //send request SendMessageToAllEngines("get_acoustic_model \n"); //read result return GetFrameFromEngine(0, true); } // A: Sends a sMessage to all engines int SendMessageToAllEngines(const char *sMessage) { int iNumOK = 0; for (int i = 0; i < iNumEngines; i++) { if (SendMessageToEngine(i, sMessage)) { iNumOK++; } } Log( STD_STREAM, "Message sent to %d/%d engines", iNumOK, iNumEngines); return iNumOK; } // A: Sends some data to engine n int SendDataToEngine(int n, char *pData, int iSize) { int iStatus = 1; if (psEngineSocket[n] == INVALID_SOCKET) { Log( ERR_STREAM, "%s decoding engine socket is invalid, try to connect", asEngineName[n]); if ((psEngineSocket[n] = sock_connect(asEngineHost[n], pEnginePort[n])) == INVALID_SOCKET) { Log( ERR_STREAM, "Couldn't connect %s decoding engine", asEngineName[n]); iStatus = 0; } } if (psEngineSocket[n] != INVALID_SOCKET) { if (sock_send_block(psEngineSocket[n], pData, iSize) < iSize) { Log( ERR_STREAM, "Problems on sending data to %s decoding engine", asEngineName[n]); iStatus = 0; } } if (iStatus == 1) Log( DBG_STREAM, "%d bytes sent to engine %s.", iSize, asEngineName[n]); return iStatus; } // A: Sends some data to all engines int SendDataToAllEngines(char *pData, int iSize) { int iNumOK = 0; for (int i = 0; i < iNumEngines; i++) { if (SendDataToEngine(i, pData, iSize)) { iNumOK++; } } return iNumOK; } /* reads next full Galaxy frame from a string */ int get_frame(char **string, char *frame) { // the first "non-empty" (i.e. non-space, CR) character // has to be "{" for the string to be a valid Galaxy frame int k; for (k = 0; k < (int)strlen((*string)); k++) { if ((*string)[k] == '{') { break; } else if (((*string)[k] != ' ') && ((*string)[k] != '\n')) { // fail the whole process Log(ERR_STREAM, "Cannot parse frame. Does not start with {\n(%s)", *string); return 0; } } // the buffer contained only "empty" characters go to next engine if (k == strlen((*string))) { return 0; } (*string) += k; int n_braces = 0; frame[0] = '\0'; for (k = 0; k < (int)strlen((*string)); k++) { // Counts open curly brackets if ((*string)[k] == '{') { n_braces++; } else if ((*string)[k] == '}') { n_braces--; if (n_braces == 0) { strncpy_s(frame, MAX_MESSAGE_SIZE, (*string), k+1); frame[k+1] = '\0'; (*string) += k+1; break; } } } return (int)strlen(frame); } // A: Gets a Galaxy frame from one engine void *GetFrameFromEngine(int n, bool bBlocking) { char sFrame[MAX_MESSAGE_SIZE]; Gal_Frame gf = NULL; // checks if connection is working properly if (CheckEngineConnection(n)) { int j; if (bBlocking) { j = sock_recv_block(psEngineSocket[n], asEngineBuffer[n] + pEngineP[n], RETURN_BUF_SIZE - pEngineP[n]); } else { j = sock_recv_noblock(psEngineSocket[n], asEngineBuffer[n] + pEngineP[n], RETURN_BUF_SIZE - pEngineP[n]); } if (j < 0) { //Log(DBG_STREAM, "Closing socket (j=%d)", j); //CloseEngineConnection(n); //pEngineP[n] = 0; } else { pEngineP[n] += j; // properly terminates the string from the engine asEngineBuffer[n][pEngineP[n]] = '\0'; char *p = asEngineBuffer[n]; // get the last frame received from the engine int k = 0; while (pEngineP[n] > 0) { k = get_frame(&p, sFrame); if (k == 0) { Log(DBG_STREAM, "Incomplete frame from engine %s:\n%s", asEngineName[n], sFrame, p); break; } else { Log( DBG_STREAM, "Got frame from engine %s:\n%s\nRest:%s", asEngineName[n], sFrame, p); // Shift the buffer to go to the next data from this engine pEngineP[n] -= k; memmove(asEngineBuffer[n], p, pEngineP[n]); asEngineBuffer[n][pEngineP[n]] = '\0'; p = asEngineBuffer[n]; } } if (k > 0) { Log( DBG_STREAM, "Converting frame from engine %s:\n%s", asEngineName[n], sFrame); // yes, attempt to parse it gf = Gal_ReadFrameFromString(sFrame); } } } return gf; } // A: Flushes the socket from an engine void FlushEngineSocket(int n) { char pBuf[RETURN_BUF_SIZE]; if (CheckEngineConnection(n)) { while(int j = sock_recv_noblock(psEngineSocket[n], pBuf, RETURN_BUF_SIZE) > 0) { Log(STD_STREAM, "Flushing %d bytes from engine %s", j, asEngineName[n]); } } } // A: Flushes all engine sockets void FlushAllEngineSockets() { for (int i = 0; i < iNumEngines; i++) { FlushEngineSocket(i); } } // A: Gets the current partial results from all engines void *GetPartialResultFromAllEngines() { FlushAllEngineSockets(); // Notifies all engines that the utterance is finished and we want // a final hypothesis SendMessageToAllEngines("engine_proc_partial \n"); // Prepares the arrays that will receive all the Galaxy frame results Gal_Frame *agfResult = (Gal_Frame*)malloc(iNumEngines * sizeof(Gal_Frame)); for (int i = 0; i < iNumEngines; i++) { agfResult[i] = NULL; } int n_ready = 0; bool bGotNew; fd_set fs; TIMEVAL timeout; // wait at most 100 msec for partial hypotheses timeout.tv_usec = 100000; for (int i = 0; i < MAX_ENGINE_WAIT_ITERATIONS; i++) { Log( DBG_STREAM, "wait for return (partial)"); bGotNew = false; FD_ZERO(&fs); for (int n=0; n < iNumEngines; n++) { if (!agfResult[n]) { CheckEngineConnection(n); FD_SET(psEngineSocket[n], &fs); } } int iSelectStatus = select(iNumEngines, &fs, NULL, NULL, &timeout); if (iSelectStatus == SOCKET_ERROR) { Log(STD_STREAM, "ERROR: Socket select error (%d) in GetPartialResultFromAllEngine", WSAGetLastError()); break; } else if (iSelectStatus == 0) { Log(STD_STREAM, "WARNING: Socket timeout in GetPartialResultFromAllEngine"); break; } // process each engine's results for (int n = 0; n < iNumEngines; n++) { if (FD_ISSET(psEngineSocket[n], &fs)) { //// Skips this engine if we already got its results //if (agfResult[n]) continue; // Attempts to get a frame from engine n Gal_Frame gf = (Gal_Frame)GetFrameFromEngine(n, false); if (gf) { // We got one, is this a result frame? // (NB: for now we discard all non-partial-result frame // remaining in the buffer at this point) if (!strcmp(Gal_FrameName(gf),"partial_result")) { // it's a result, store it agfResult[n] = gf; // declare that the final result for this engine // was received n_ready++; bGotNew = true; FD_CLR(psEngineSocket[n], &fs); } } } } // if all engines are ready, exit loop if (n_ready == iNumEngines) break; } return agfResult; } // A: Waits for final results from all engines void *GetFinalResultFromAllEngines() { // Notifies all engines that the utterance is finished and we want // a final hypothesis SendMessageToAllEngines("engine_end_utt \nengine_proc_result \n"); // Prepares the arrays that will receive all the Galaxy frame results Gal_Frame *agfResult = (Gal_Frame*)malloc(iNumEngines * sizeof(Gal_Frame)); for (int i = 0; i < iNumEngines; i++) { agfResult[i] = NULL; } fd_set fs; TIMEVAL timeout; // wait at most 1 sec for final hypotheses timeout.tv_sec = 1; int n_ready = 0; bool bGotNew; for (int i = 0; i < MAX_ENGINE_WAIT_ITERATIONS; i++) { Log( DBG_STREAM, "wait for return"); bGotNew = false; FD_ZERO(&fs); for (int n=0; n < iNumEngines; n++) { if (!agfResult[n]) FD_SET(psEngineSocket[n], &fs); } int iSelectStatus = select(iNumEngines, &fs, NULL, NULL, &timeout); if (iSelectStatus == SOCKET_ERROR) { Log(STD_STREAM, "ERROR: Socket select error (%d) in GetResultFromAllEngine", WSAGetLastError()); break; } else if (iSelectStatus == 0) { Log(STD_STREAM, "WARNING: Socket timeout in GetResultFromAllEngine"); break; } // process each engine's results for (int n = 0; n < iNumEngines; n++) { if (FD_ISSET(psEngineSocket[n], &fs)) { // Attempts to get a frame from engine n Gal_Frame gf = (Gal_Frame)GetFrameFromEngine(n, false); if (gf) { // We got one, is this a result frame? // (NB: for now we discard all non-result frame // remaining in the buffer at this point) if (!strcmp(Gal_FrameName(gf),"result")) { // it's a result, store it agfResult[n] = gf; // declare that the final result for this engine // was received n_ready++; bGotNew = true; FD_CLR(psEngineSocket[n], &fs); } } } } // if all engines are ready, exit loop if (n_ready == iNumEngines) break; // wait before receiving again //if (!bGotNew) // Sleep(20); } return agfResult; }