00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015 #include "CArchMultithreadPosix.h"
00016 #include "CArch.h"
00017 #include "XArch.h"
00018 #include <signal.h>
00019 #if TIME_WITH_SYS_TIME
00020 # include <sys/time.h>
00021 # include <time.h>
00022 #else
00023 # if HAVE_SYS_TIME_H
00024 # include <sys/time.h>
00025 # else
00026 # include <time.h>
00027 # endif
00028 #endif
00029 #include <cerrno>
00030
00031 #define SIGWAKEUP SIGUSR1
00032
00033 #if !HAVE_PTHREAD_SIGNAL
00034
00035
00036
00037 # define pthread_sigmask sigprocmask
00038 # define pthread_kill(tid_, sig_) kill(0, (sig_))
00039 # define sigwait(set_, sig_)
00040 # undef HAVE_POSIX_SIGWAIT
00041 # define HAVE_POSIX_SIGWAIT 1
00042 #endif
00043
00044 static
00045 void
00046 setSignalSet(sigset_t* sigset)
00047 {
00048 sigemptyset(sigset);
00049 sigaddset(sigset, SIGHUP);
00050 sigaddset(sigset, SIGINT);
00051 sigaddset(sigset, SIGTERM);
00052 sigaddset(sigset, SIGUSR2);
00053 }
00054
00055
00056
00057
00058
00059 class CArchThreadImpl {
00060 public:
00061 CArchThreadImpl();
00062
00063 public:
00064 int m_refCount;
00065 IArchMultithread::ThreadID m_id;
00066 pthread_t m_thread;
00067 IArchMultithread::ThreadFunc m_func;
00068 void* m_userData;
00069 bool m_cancel;
00070 bool m_cancelling;
00071 bool m_exited;
00072 void* m_result;
00073 void* m_networkData;
00074 };
00075
00076 CArchThreadImpl::CArchThreadImpl() :
00077 m_refCount(1),
00078 m_id(0),
00079 m_func(NULL),
00080 m_userData(NULL),
00081 m_cancel(false),
00082 m_cancelling(false),
00083 m_exited(false),
00084 m_result(NULL),
00085 m_networkData(NULL)
00086 {
00087
00088 }
00089
00090
00091
00092
00093
00094
00095 CArchMultithreadPosix* CArchMultithreadPosix::s_instance = NULL;
00096
00097 CArchMultithreadPosix::CArchMultithreadPosix() :
00098 m_newThreadCalled(false),
00099 m_nextID(0)
00100 {
00101 assert(s_instance == NULL);
00102
00103 s_instance = this;
00104
00105
00106 for (size_t i = 0; i < kNUM_SIGNALS; ++i) {
00107 m_signalFunc[i] = NULL;
00108 m_signalUserData[i] = NULL;
00109 }
00110
00111
00112 m_threadMutex = newMutex();
00113
00114
00115
00116 m_mainThread = new CArchThreadImpl;
00117 m_mainThread->m_thread = pthread_self();
00118 insert(m_mainThread);
00119
00120
00121
00122
00123
00124
00125 struct sigaction act;
00126 sigemptyset(&act.sa_mask);
00127 # if defined(SA_INTERRUPT)
00128 act.sa_flags = SA_INTERRUPT;
00129 # else
00130 act.sa_flags = 0;
00131 # endif
00132 act.sa_handler = &threadCancel;
00133 sigaction(SIGWAKEUP, &act, NULL);
00134
00135
00136
00137 sigset_t sigset;
00138 sigemptyset(&sigset);
00139 sigaddset(&sigset, SIGWAKEUP);
00140 pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);
00141 sigemptyset(&sigset);
00142 sigaddset(&sigset, SIGPIPE);
00143 pthread_sigmask(SIG_BLOCK, &sigset, NULL);
00144 }
00145
00146 CArchMultithreadPosix::~CArchMultithreadPosix()
00147 {
00148 assert(s_instance != NULL);
00149
00150 closeMutex(m_threadMutex);
00151 s_instance = NULL;
00152 }
00153
00154 void
00155 CArchMultithreadPosix::setNetworkDataForCurrentThread(void* data)
00156 {
00157 lockMutex(m_threadMutex);
00158 CArchThreadImpl* thread = find(pthread_self());
00159 thread->m_networkData = data;
00160 unlockMutex(m_threadMutex);
00161 }
00162
00163 void*
00164 CArchMultithreadPosix::getNetworkDataForThread(CArchThread thread)
00165 {
00166 lockMutex(m_threadMutex);
00167 void* data = thread->m_networkData;
00168 unlockMutex(m_threadMutex);
00169 return data;
00170 }
00171
00172 CArchMultithreadPosix*
00173 CArchMultithreadPosix::getInstance()
00174 {
00175 return s_instance;
00176 }
00177
00178 CArchCond
00179 CArchMultithreadPosix::newCondVar()
00180 {
00181 CArchCondImpl* cond = new CArchCondImpl;
00182 int status = pthread_cond_init(&cond->m_cond, NULL);
00183 (void)status;
00184 assert(status == 0);
00185 return cond;
00186 }
00187
00188 void
00189 CArchMultithreadPosix::closeCondVar(CArchCond cond)
00190 {
00191 int status = pthread_cond_destroy(&cond->m_cond);
00192 (void)status;
00193 assert(status == 0);
00194 delete cond;
00195 }
00196
00197 void
00198 CArchMultithreadPosix::signalCondVar(CArchCond cond)
00199 {
00200 int status = pthread_cond_signal(&cond->m_cond);
00201 (void)status;
00202 assert(status == 0);
00203 }
00204
00205 void
00206 CArchMultithreadPosix::broadcastCondVar(CArchCond cond)
00207 {
00208 int status = pthread_cond_broadcast(&cond->m_cond);
00209 (void)status;
00210 assert(status == 0);
00211 }
00212
00213 bool
00214 CArchMultithreadPosix::waitCondVar(CArchCond cond,
00215 CArchMutex mutex, double timeout)
00216 {
00217
00218
00219
00220
00221
00222
00223
00224
00225 static const double maxCancellationLatency = 0.1;
00226 if (timeout < 0.0 || timeout > maxCancellationLatency) {
00227 timeout = maxCancellationLatency;
00228 }
00229
00230
00231 testCancelThread();
00232
00233
00234 struct timeval now;
00235 gettimeofday(&now, NULL);
00236 struct timespec finalTime;
00237 finalTime.tv_sec = now.tv_sec;
00238 finalTime.tv_nsec = now.tv_usec * 1000;
00239 long timeout_sec = (long)timeout;
00240 long timeout_nsec = (long)(1.0e+9 * (timeout - timeout_sec));
00241 finalTime.tv_sec += timeout_sec;
00242 finalTime.tv_nsec += timeout_nsec;
00243 if (finalTime.tv_nsec >= 1000000000) {
00244 finalTime.tv_nsec -= 1000000000;
00245 finalTime.tv_sec += 1;
00246 }
00247
00248
00249 int status = pthread_cond_timedwait(&cond->m_cond,
00250 &mutex->m_mutex, &finalTime);
00251
00252
00253 testCancelThread();
00254
00255 switch (status) {
00256 case 0:
00257
00258 return true;
00259
00260 case ETIMEDOUT:
00261 return false;
00262
00263 default:
00264 assert(0 && "condition variable wait error");
00265 return false;
00266 }
00267 }
00268
00269 CArchMutex
00270 CArchMultithreadPosix::newMutex()
00271 {
00272 pthread_mutexattr_t attr;
00273 int status = pthread_mutexattr_init(&attr);
00274 assert(status == 0);
00275 CArchMutexImpl* mutex = new CArchMutexImpl;
00276 status = pthread_mutex_init(&mutex->m_mutex, &attr);
00277 assert(status == 0);
00278 return mutex;
00279 }
00280
00281 void
00282 CArchMultithreadPosix::closeMutex(CArchMutex mutex)
00283 {
00284 int status = pthread_mutex_destroy(&mutex->m_mutex);
00285 (void)status;
00286 assert(status == 0);
00287 delete mutex;
00288 }
00289
00290 void
00291 CArchMultithreadPosix::lockMutex(CArchMutex mutex)
00292 {
00293 int status = pthread_mutex_lock(&mutex->m_mutex);
00294
00295 switch (status) {
00296 case 0:
00297
00298 return;
00299
00300 case EDEADLK:
00301 assert(0 && "lock already owned");
00302 break;
00303
00304 case EAGAIN:
00305 assert(0 && "too many recursive locks");
00306 break;
00307
00308 default:
00309 assert(0 && "unexpected error");
00310 break;
00311 }
00312 }
00313
00314 void
00315 CArchMultithreadPosix::unlockMutex(CArchMutex mutex)
00316 {
00317 int status = pthread_mutex_unlock(&mutex->m_mutex);
00318
00319 switch (status) {
00320 case 0:
00321
00322 return;
00323
00324 case EPERM:
00325 assert(0 && "thread doesn't own a lock");
00326 break;
00327
00328 default:
00329 assert(0 && "unexpected error");
00330 break;
00331 }
00332 }
00333
00334 CArchThread
00335 CArchMultithreadPosix::newThread(ThreadFunc func, void* data)
00336 {
00337 assert(func != NULL);
00338
00339
00340
00341
00342
00343
00344
00345
00346 if (!m_newThreadCalled) {
00347 m_newThreadCalled = true;
00348 #if HAVE_PTHREAD_SIGNAL
00349 startSignalHandler();
00350 #endif
00351 }
00352
00353 lockMutex(m_threadMutex);
00354
00355
00356 CArchThreadImpl* thread = new CArchThreadImpl;
00357 thread->m_func = func;
00358 thread->m_userData = data;
00359
00360
00361
00362 pthread_attr_t attr;
00363 int status = pthread_attr_init(&attr);
00364 if (status == 0) {
00365 status = pthread_create(&thread->m_thread, &attr,
00366 &CArchMultithreadPosix::threadFunc, thread);
00367 pthread_attr_destroy(&attr);
00368 }
00369
00370
00371 if (status != 0) {
00372
00373 delete thread;
00374 thread = NULL;
00375 }
00376 else {
00377
00378 insert(thread);
00379
00380
00381 refThread(thread);
00382 }
00383
00384
00385 unlockMutex(m_threadMutex);
00386
00387 return thread;
00388 }
00389
00390 CArchThread
00391 CArchMultithreadPosix::newCurrentThread()
00392 {
00393 lockMutex(m_threadMutex);
00394 CArchThreadImpl* thread = find(pthread_self());
00395 unlockMutex(m_threadMutex);
00396 assert(thread != NULL);
00397 return thread;
00398 }
00399
00400 void
00401 CArchMultithreadPosix::closeThread(CArchThread thread)
00402 {
00403 assert(thread != NULL);
00404
00405
00406 if (--thread->m_refCount == 0) {
00407
00408 if (thread->m_func != NULL) {
00409 pthread_detach(thread->m_thread);
00410 }
00411
00412
00413 lockMutex(m_threadMutex);
00414 assert(findNoRef(thread->m_thread) == thread);
00415 erase(thread);
00416 unlockMutex(m_threadMutex);
00417
00418
00419 delete thread;
00420 }
00421 }
00422
00423 CArchThread
00424 CArchMultithreadPosix::copyThread(CArchThread thread)
00425 {
00426 refThread(thread);
00427 return thread;
00428 }
00429
00430 void
00431 CArchMultithreadPosix::cancelThread(CArchThread thread)
00432 {
00433 assert(thread != NULL);
00434
00435
00436 bool wakeup = false;
00437 lockMutex(m_threadMutex);
00438 if (!thread->m_exited && !thread->m_cancelling) {
00439 thread->m_cancel = true;
00440 wakeup = true;
00441 }
00442 unlockMutex(m_threadMutex);
00443
00444
00445 if (wakeup) {
00446 pthread_kill(thread->m_thread, SIGWAKEUP);
00447 }
00448 }
00449
00450 void
00451 CArchMultithreadPosix::setPriorityOfThread(CArchThread thread, int )
00452 {
00453 assert(thread != NULL);
00454
00455
00456 }
00457
00458 void
00459 CArchMultithreadPosix::testCancelThread()
00460 {
00461
00462 lockMutex(m_threadMutex);
00463 CArchThreadImpl* thread = findNoRef(pthread_self());
00464 unlockMutex(m_threadMutex);
00465
00466
00467 testCancelThreadImpl(thread);
00468 }
00469
00470 bool
00471 CArchMultithreadPosix::wait(CArchThread target, double timeout)
00472 {
00473 assert(target != NULL);
00474
00475 lockMutex(m_threadMutex);
00476
00477
00478 CArchThreadImpl* self = findNoRef(pthread_self());
00479
00480
00481 if (target == self) {
00482 unlockMutex(m_threadMutex);
00483 return false;
00484 }
00485
00486
00487 refThread(target);
00488
00489 unlockMutex(m_threadMutex);
00490
00491 try {
00492
00493 testCancelThreadImpl(self);
00494 if (isExitedThread(target)) {
00495 closeThread(target);
00496 return true;
00497 }
00498
00499
00500 if (timeout != 0.0) {
00501 const double start = ARCH->time();
00502 do {
00503
00504 ARCH->sleep(0.05);
00505
00506
00507 testCancelThreadImpl(self);
00508 if (isExitedThread(target)) {
00509 closeThread(target);
00510 return true;
00511 }
00512
00513
00514 } while (timeout < 0.0 || (ARCH->time() - start) <= timeout);
00515 }
00516
00517 closeThread(target);
00518 return false;
00519 }
00520 catch (...) {
00521 closeThread(target);
00522 throw;
00523 }
00524 }
00525
00526 bool
00527 CArchMultithreadPosix::isSameThread(CArchThread thread1, CArchThread thread2)
00528 {
00529 return (thread1 == thread2);
00530 }
00531
00532 bool
00533 CArchMultithreadPosix::isExitedThread(CArchThread thread)
00534 {
00535 lockMutex(m_threadMutex);
00536 bool exited = thread->m_exited;
00537 unlockMutex(m_threadMutex);
00538 return exited;
00539 }
00540
00541 void*
00542 CArchMultithreadPosix::getResultOfThread(CArchThread thread)
00543 {
00544 lockMutex(m_threadMutex);
00545 void* result = thread->m_result;
00546 unlockMutex(m_threadMutex);
00547 return result;
00548 }
00549
00550 IArchMultithread::ThreadID
00551 CArchMultithreadPosix::getIDOfThread(CArchThread thread)
00552 {
00553 return thread->m_id;
00554 }
00555
00556 void
00557 CArchMultithreadPosix::setSignalHandler(
00558 ESignal signal, SignalFunc func, void* userData)
00559 {
00560 lockMutex(m_threadMutex);
00561 m_signalFunc[signal] = func;
00562 m_signalUserData[signal] = userData;
00563 unlockMutex(m_threadMutex);
00564 }
00565
00566 void
00567 CArchMultithreadPosix::raiseSignal(ESignal signal)
00568 {
00569 lockMutex(m_threadMutex);
00570 if (m_signalFunc[signal] != NULL) {
00571 m_signalFunc[signal](signal, m_signalUserData[signal]);
00572 pthread_kill(m_mainThread->m_thread, SIGWAKEUP);
00573 }
00574 else if (signal == kINTERRUPT || signal == kTERMINATE) {
00575 ARCH->cancelThread(m_mainThread);
00576 }
00577 unlockMutex(m_threadMutex);
00578 }
00579
00580 void
00581 CArchMultithreadPosix::startSignalHandler()
00582 {
00583
00584
00585 sigset_t sigset, oldsigset;
00586 setSignalSet(&sigset);
00587 pthread_sigmask(SIG_BLOCK, &sigset, &oldsigset);
00588
00589
00590
00591
00592
00593 pthread_attr_t attr;
00594 int status = pthread_attr_init(&attr);
00595 if (status == 0) {
00596 status = pthread_create(&m_signalThread, &attr,
00597 &CArchMultithreadPosix::threadSignalHandler,
00598 NULL);
00599 pthread_attr_destroy(&attr);
00600 }
00601 if (status != 0) {
00602
00603
00604 pthread_sigmask(SIG_UNBLOCK, &oldsigset, NULL);
00605 }
00606 }
00607
00608 CArchThreadImpl*
00609 CArchMultithreadPosix::find(pthread_t thread)
00610 {
00611 CArchThreadImpl* impl = findNoRef(thread);
00612 if (impl != NULL) {
00613 refThread(impl);
00614 }
00615 return impl;
00616 }
00617
00618 CArchThreadImpl*
00619 CArchMultithreadPosix::findNoRef(pthread_t thread)
00620 {
00621
00622 for (CThreadList::const_iterator index = m_threadList.begin();
00623 index != m_threadList.end(); ++index) {
00624 if ((*index)->m_thread == thread) {
00625 return *index;
00626 }
00627 }
00628 return NULL;
00629 }
00630
00631 void
00632 CArchMultithreadPosix::insert(CArchThreadImpl* thread)
00633 {
00634 assert(thread != NULL);
00635
00636
00637 assert(findNoRef(thread->m_thread) == NULL);
00638
00639
00640
00641
00642
00643 thread->m_id = ++m_nextID;
00644
00645
00646 m_threadList.push_back(thread);
00647 }
00648
00649 void
00650 CArchMultithreadPosix::erase(CArchThreadImpl* thread)
00651 {
00652 for (CThreadList::iterator index = m_threadList.begin();
00653 index != m_threadList.end(); ++index) {
00654 if (*index == thread) {
00655 m_threadList.erase(index);
00656 break;
00657 }
00658 }
00659 }
00660
00661 void
00662 CArchMultithreadPosix::refThread(CArchThreadImpl* thread)
00663 {
00664 assert(thread != NULL);
00665 assert(findNoRef(thread->m_thread) != NULL);
00666 ++thread->m_refCount;
00667 }
00668
00669 void
00670 CArchMultithreadPosix::testCancelThreadImpl(CArchThreadImpl* thread)
00671 {
00672 assert(thread != NULL);
00673
00674
00675 lockMutex(m_threadMutex);
00676 bool cancel = false;
00677 if (thread->m_cancel && !thread->m_cancelling) {
00678 thread->m_cancelling = true;
00679 thread->m_cancel = false;
00680 cancel = true;
00681 }
00682 unlockMutex(m_threadMutex);
00683
00684
00685 if (cancel) {
00686 throw XThreadCancel();
00687 }
00688 }
00689
00690 void*
00691 CArchMultithreadPosix::threadFunc(void* vrep)
00692 {
00693
00694 CArchThreadImpl* thread = reinterpret_cast<CArchThreadImpl*>(vrep);
00695
00696
00697 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
00698 pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
00699
00700
00701 s_instance->doThreadFunc(thread);
00702
00703
00704 return NULL;
00705 }
00706
00707 void
00708 CArchMultithreadPosix::doThreadFunc(CArchThread thread)
00709 {
00710
00711 setPriorityOfThread(thread, 1);
00712
00713
00714 lockMutex(m_threadMutex);
00715 unlockMutex(m_threadMutex);
00716
00717 void* result = NULL;
00718 try {
00719
00720 result = (*thread->m_func)(thread->m_userData);
00721 }
00722
00723 catch (XThreadCancel&) {
00724
00725 }
00726 catch (...) {
00727
00728 lockMutex(m_threadMutex);
00729 thread->m_exited = true;
00730 unlockMutex(m_threadMutex);
00731 closeThread(thread);
00732 throw;
00733 }
00734
00735
00736 lockMutex(m_threadMutex);
00737 thread->m_result = result;
00738 thread->m_exited = true;
00739 unlockMutex(m_threadMutex);
00740
00741
00742 closeThread(thread);
00743 }
00744
00745 void
00746 CArchMultithreadPosix::threadCancel(int)
00747 {
00748
00749 }
00750
00751 void*
00752 CArchMultithreadPosix::threadSignalHandler(void*)
00753 {
00754
00755 pthread_detach(pthread_self());
00756
00757
00758 sigset_t sigset;
00759 setSignalSet(&sigset);
00760
00761
00762
00763
00764
00765
00766
00767
00768
00769 sigaddset(&sigset, SIGABRT);
00770
00771
00772 for (;;) {
00773
00774 #if HAVE_POSIX_SIGWAIT
00775 int signal = 0;
00776 sigwait(&sigset, &signal);
00777 #else
00778 sigwait(&sigset);
00779 #endif
00780
00781
00782 switch (signal) {
00783 case SIGINT:
00784 ARCH->raiseSignal(kINTERRUPT);
00785 break;
00786
00787 case SIGTERM:
00788 ARCH->raiseSignal(kTERMINATE);
00789 break;
00790
00791 case SIGHUP:
00792 ARCH->raiseSignal(kHANGUP);
00793 break;
00794
00795 case SIGUSR2:
00796 ARCH->raiseSignal(kUSER);
00797 break;
00798
00799 default:
00800
00801 break;
00802 }
00803 }
00804
00805 return NULL;
00806 }