//2011-11 Proyectos Equis Ka, s.l., jorge@jorgechamorro.com //WebWorkerThreads.cc #include #include #include #include #include #include #include #if defined(__unix__) || defined(__POSIX__) || defined(__APPLE__) || defined(_AIX) #define WWT_PTHREAD 1 #include #include #ifndef uv_cond_t #define uv_cond_signal(x) pthread_cond_signal(x) #define uv_cond_init(x) pthread_cond_init(x, NULL) #define uv_cond_wait(x,y) pthread_cond_wait(x, y) typedef pthread_cond_t uv_cond_t; #endif #else #define pthread_setcancelstate(x,y) NULL #define pthread_setcanceltype(x,y) NULL #endif /* static int debug_threads= 0; static int debug_allocs= 0; */ #include "queues_a_gogo.cc" #include "bson.cc" #include "jslib.cc" //using namespace node; using namespace v8; static Persistent id_symbol; static Persistent threadTemplate; static bool useLocker; static typeQueue* freeJobsQueue= NULL; static typeQueue* freeThreadsQueue= NULL; #define kThreadMagicCookie 0x99c0ffee typedef struct { uv_async_t async_watcher; //MUST be the first one long int id; uv_thread_t thread; volatile int sigkill; typeQueue inQueue; //Jobs to run typeQueue outQueue; //Jobs done volatile int IDLE; uv_cond_t IDLE_cv; uv_mutex_t IDLE_mutex; Isolate* isolate; Persistent context; Persistent JSObject; Persistent threadJSObject; Persistent dispatchEvents; unsigned long threadMagicCookie; } typeThread; enum jobTypes { kJobTypeEval, kJobTypeEvent, kJobTypeEventSerialized }; typedef struct { int jobType; Persistent cb; union { struct { int length; String::Utf8Value* eventName; String::Utf8Value** argumentos; } typeEvent; struct { int length; String::Utf8Value* eventName; char* buffer; size_t bufferSize; } typeEventSerialized; struct { int error; int tiene_callBack; int useStringObject; String::Utf8Value* resultado; union { char* scriptText_CharPtr; String::Utf8Value* scriptText_StringObject; }; } typeEval; }; } typeJob; /* cd deps/minifier/src gcc minify.c -o minify cat ../../../src/events.js | ./minify kEvents_js > ../../../src/kEvents_js cat ../../../src/load.js | ./minify kLoad_js > ../../../src/kLoad_js cat ../../../src/createPool.js | ./minify kCreatePool_js > ../../../src/kCreatePool_js cat ../../../src/worker.js | ./minify kWorker_js > ../../../src/kWorker_js cat ../../../src/thread_nextTick.js | ./minify kThread_nextTick_js > ../../../src/kThread_nextTick_js */ #include "events.js.c" #include "load.js.c" #include "createPool.js.c" #include "worker.js.c" #include "thread_nextTick.js.c" //#include "JASON.js.c" //node-waf configure uninstall distclean configure build install static typeQueueItem* nuJobQueueItem (void) { typeQueueItem* qitem= queue_pull(freeJobsQueue); if (!qitem) { qitem= nuItem(kItemTypePointer, calloc(1, sizeof(typeJob))); } return qitem; } static typeThread* isAThread (Handle receiver) { typeThread* thread; if (receiver->IsObject()) { if (receiver->InternalFieldCount() == 1) { thread= (typeThread*) receiver->GetPointerFromInternalField(0); if (thread && (thread->threadMagicCookie == kThreadMagicCookie)) { return thread; } } } return NULL; } static void pushToInQueue (typeQueueItem* qitem, typeThread* thread) { uv_mutex_lock(&thread->IDLE_mutex); queue_push(qitem, &thread->inQueue); if (thread->IDLE) { uv_cond_signal(&thread->IDLE_cv); } uv_mutex_unlock(&thread->IDLE_mutex); } static Handle Puts (const Arguments &args) { //fprintf(stdout, "*** Puts BEGIN\n"); HandleScope scope; int i= 0; while (i < args.Length()) { String::Utf8Value c_str(args[i]); fputs(*c_str, stdout); i++; } fflush(stdout); //fprintf(stdout, "*** Puts END\n"); return Undefined(); } static Handle Print (const Arguments &args) { HandleScope scope; int i= 0; while (i < args.Length()) { String::Utf8Value c_str(args[i]); fputs(*c_str, stdout); i++; } static char end = '\n'; fputs(&end, stdout); fflush(stdout); //fprintf(stdout, "*** Puts END\n"); return Undefined(); } static void eventLoop (typeThread* thread); // A background thread #ifdef WWT_PTHREAD static void* aThread (void* arg) { #else static void aThread (void* arg) { #endif int dummy; pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &dummy); pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &dummy); typeThread* thread= (typeThread*) arg; thread->isolate= Isolate::New(); thread->isolate->SetData(thread); if (useLocker) { //printf("**** USING LOCKER: YES\n"); v8::Locker myLocker(thread->isolate); //v8::Isolate::Scope isolate_scope(thread->isolate); eventLoop(thread); } else { //printf("**** USING LOCKER: NO\n"); //v8::Isolate::Scope isolate_scope(thread->isolate); eventLoop(thread); } thread->isolate->Exit(); thread->isolate->Dispose(); // wake up callback if (!(thread->inQueue.length)) uv_async_send(&thread->async_watcher); #ifdef WWT_PTHREAD return NULL; #endif } static Handle threadEmit (const Arguments &args); static Handle postMessage (const Arguments &args); static Handle postError (const Arguments &args); static void eventLoop (typeThread* thread) { thread->isolate->Enter(); thread->context= Context::New(); thread->context->Enter(); { HandleScope scope1; Local global= thread->context->Global(); Handle fs_obj = Object::New(); JSObjFn(fs_obj, "readFileSync", readFileSync_); global->Set(String::New("native_fs_"), fs_obj, attribute_ro_dd); Handle console_obj = Object::New(); JSObjFn(console_obj, "log", console_log); JSObjFn(console_obj, "error", console_error); global->Set(String::New("console"), console_obj, attribute_ro_dd); global->Set(String::NewSymbol("self"), global); global->Set(String::NewSymbol("global"), global); global->Set(String::NewSymbol("puts"), FunctionTemplate::New(Puts)->GetFunction()); global->Set(String::NewSymbol("print"), FunctionTemplate::New(Print)->GetFunction()); global->Set(String::NewSymbol("postMessage"), FunctionTemplate::New(postMessage)->GetFunction()); global->Set(String::NewSymbol("__postError"), FunctionTemplate::New(postError)->GetFunction()); Local threadObject= Object::New(); global->Set(String::NewSymbol("thread"), threadObject); threadObject->Set(String::NewSymbol("id"), Number::New(thread->id)); threadObject->Set(String::NewSymbol("emit"), FunctionTemplate::New(threadEmit)->GetFunction()); Local dispatchEvents= Script::Compile(String::New(kEvents_js))->Run()->ToObject()->CallAsFunction(threadObject, 0, NULL)->ToObject(); Local dispatchNextTicks= Script::Compile(String::New(kThread_nextTick_js))->Run()->ToObject(); Local _ntq= (v8::Array*) *threadObject->Get(String::NewSymbol("_ntq")); Script::Compile(String::New(kLoad_js))->Run(); double nextTickQueueLength= 0; long int ctr= 0; //SetFatalErrorHandler(FatalErrorCB); while (!thread->sigkill) { typeJob* job; typeQueueItem* qitem; { HandleScope scope2; TryCatch onError; String::Utf8Value* str; Local source; Local