244 lines
6.4 KiB
C++

extern "C" {
#include <Python.h>
// include the numPy array interface :
//#define PY_ARRAY_UNIQUE_SYMBOL
///#define NPY_NO_DEPRECATED_API NPY_1_7_API_VERSION
#include <numpy/arrayobject.h>
//#include <arrayobject.h>
}
#include <speex/speex_resampler.h>
//http://dsnra.jpl.nasa.gov/software/Python/numpydoc/numpy-13.html
#if 0
#define TEST_SOFT_LOG_MAX_LENGTH (4096)
void pythonLog(int32_t level, char* module, char* format, ...)
{
if (NULL == g_callback_debug) {
return;
}
static char szLog[TEST_SOFT_LOG_MAX_LENGTH] = "";
uint32_t nLength = 0;
va_list args;
// Start dynamic argument
va_start(args, format);
// add message
vsnprintf(szLog + nLength, TEST_SOFT_LOG_MAX_LENGTH - nLength, format, args);
szLog[TEST_SOFT_LOG_MAX_LENGTH - 1] = '\0';
nLength = strlen(szLog);
// End dynamic argument
va_end(args);
/* Time to call the callback */
PyObject * arglist = Py_BuildValue("(iss)", level, module, szLog);
if (arglist != NULL) {
PyObject * result = PyObject_CallObject(g_callback_debug, arglist);
Py_DECREF(arglist);
if (result != NULL) {
Py_DECREF(result);
}
}
}
#define WRAP_CRITICAL(format, ...) pythonLog(0, "WRAP ", format, ##__VA_ARGS__)
#define WRAP_ERROR(format, ...) pythonLog(1, "WRAP ", format, ##__VA_ARGS__)
#define WRAP_WARNING(format, ...) pythonLog(2, "WRAP ", format, ##__VA_ARGS__)
#define WRAP_INFO(format, ...) pythonLog(3, "WRAP ", format, ##__VA_ARGS__)
#define WRAP_DEBUG(format, ...) pythonLog(4, "WRAP ", format, ##__VA_ARGS__)
#define WRAP_VERBOSE(format, ...) pythonLog(5, "WRAP ", format, ##__VA_ARGS__)
#endif
typedef struct {
PyObject_HEAD
/* Type-specific fields go here. */
SpeexResamplerState* m_speexResampler;
} SpeexResamplerObject;
static int
SpeexResampler_init(SpeexResamplerObject *self, PyObject *args, PyObject *kwds)
{
static char *kwlist[] = {"_nbChannel", "_inputSampleRate", "_outputSampleRate", "_quality", NULL};
int _nbChannel = 0, _inputSampleRate = 0, _outputSampleRate = 0, _quality = 10;
if (!PyArg_ParseTupleAndKeywords(args, kwds, "iii|i", kwlist, // | mean optionnal ..
&_nbChannel,
&_inputSampleRate, &_outputSampleRate,
&_quality)) {
return -1;
}
int err = 0;
self->m_speexResampler = speex_resampler_init(_nbChannel,
_inputSampleRate,
_outputSampleRate,
_quality, &err);
return 0;
}
static PyObject *
SpeexResampler_process(SpeexResamplerObject *self, PyObject *_args)
{
PyArrayObject* array;
double sum;
int i, n;
if (!PyArg_ParseTuple(_args, "O!", &PyArray_Type, &array)) {
return NULL;
}
if ( array->nd != 2
|| array->descr->type_num != PyArray_DOUBLE) {
PyErr_SetString(PyExc_ValueError, "array must be two-dimensional and of type float");
return NULL;
}
/*
if (self->first == NULL) {
PyErr_SetString(PyExc_AttributeError, "first");
return NULL;
}
if (self->last == NULL) {
PyErr_SetString(PyExc_AttributeError, "last");
return NULL;
}
return PyUnicode_FromFormat("%S %S", self->first, self->last);
*/
PyErr_SetString(PyExc_AttributeError, "plop");
return NULL;
}
static PyMethodDef SpeexResampler_methods[] = {
{"process", (PyCFunction) SpeexResampler_process, METH_VARARGS,
"Process to resample a full or a part of data. (input: numpy.[i16]"
},
{NULL} /* Sentinel */
};
static PyTypeObject SpeexResamplerType = {
PyVarObject_HEAD_INIT(NULL, 0)
.tp_name = "speex_process.SpeexResampler",
.tp_basicsize = sizeof(SpeexResamplerObject),
.tp_itemsize = 0,
.tp_flags = Py_TPFLAGS_DEFAULT,
.tp_doc = "Speex Resampler objects",
.tp_methods = SpeexResampler_methods,
.tp_init = (initproc) SpeexResampler_init,
.tp_new = PyType_GenericNew,
};
#if 1
static PyObject* Process(PyObject* _self)
{
printf("Process call *********************\n\n");
//WRAP_ERROR("Execute a process");
/*
if (NULL == myChain) {
WRAP_ERROR("No selection of the Chain ... ==> can not process");
return NULL;
}
// simulate the thread calling ...
myChain->Start();
if (true == MainErrorOccured) {
return NULL;
}
myChain->ThreadCall();
if (true == MainErrorOccured) {
return NULL;
}
myChain->Stop();
if (true == MainErrorOccured) {
return NULL;
}
*/
Py_RETURN_NONE;
}
/**
* @brief define external API ...
*/
static PyMethodDef module_methods[] = {
/*
{"set_callback", (PyCFunction)SetCallback, METH_VARARGS, NULL },
{"init", (PyCFunction)Init, METH_NOARGS, NULL },
{"un_init", (PyCFunction)UnInit, METH_NOARGS, NULL },
{"load_xml", (PyCFunction)LoadXml, METH_VARARGS, NULL },
{"select_chain", (PyCFunction)SelectChain, METH_VARARGS, NULL },
*/
{"process", (PyCFunction)Process, METH_NOARGS, NULL },
{NULL}
};
static struct PyModuleDef MyLocalModule =
{
PyModuleDef_HEAD_INIT,
"speex_process", /* name of module */
"usage: process_system.process(plop, plop)\n", /* module documentation, may be NULL */
-1, /* size of per-interpreter state of the module, or -1 if the module keeps state in global variables. */
module_methods
};
PyMODINIT_FUNC PyInit_speex_process(void)
{
PyObject *m;
if (PyType_Ready(&SpeexResamplerType) < 0)
return NULL;
m = PyModule_Create(&MyLocalModule);
if (m == NULL) {
return NULL;
}
Py_INCREF(&SpeexResamplerType);
PyModule_AddObject(m, "SpeexResampler", (PyObject *) &SpeexResamplerType);
return m;
}
#else
static PyObject *
speex_process_system(PyObject *self, PyObject *args)
{
const char *command;
int sts;
WRAP_ERROR("Execute a speex_process_system");
if (!PyArg_ParseTuple(args, "s", &command))
return NULL;
sts = system(command);
if (sts < 0) {
PyErr_SetString(SpamError, "System command failed");
return NULL;
}
return PyLong_FromLong(sts);
}
static PyMethodDef SpamMethods[] = {
{"system", speex_process_system, METH_VARARGS, "Execute a shell command."},
{NULL, NULL, 0, NULL} /* Sentinel */
};
PyMODINIT_FUNC PyInit_speex_process(void)
{
PyObject* m;
m = PyModule_Create(&spammodule);
if (m == NULL)
return NULL;
SpamError = PyErr_NewException("spam.error", NULL, NULL);
Py_INCREF(SpamError);
PyModule_AddObject(m, "error", SpamError);
return m;
}
#endif